背景:为什么你需要自己的数据获取模块

stock data pipeline diagram

FinceptTerminal 项目(GitHub 今日 23092 stars)提供了开箱即用的市场分析工具,但很多开发者需要的不是完整的终端,而是高效、可控的数据获取层。在实际做量化回测或实时监控时,直接调 pandas-datareader 或 yfinance 串行拉取几十只股票,等待时间足以让人崩溃。核心问题在于:金融 API 通常有请求频率限制,但合理利用并发可以成倍提升吞吐量。

本文将基于 yfinance 库(支持的股票覆盖度和稳定性最优),实现一个可配置的并行数据获取器,并给出调参依据和避坑指南。读完你就能直接拿它替换自己项目中的爬取代码。

核心原理:并发 vs 限流

金融市场数据提供商(如 Yahoo Finance)通常有 QPS 限制,但短时间内的并发请求仍能充分利用带宽和 CPU。关键设计:

  • 生产者-消费者模式:爬取任务队列 + 固定线程池,控制同时运行的任务数。
  • 随机退避重试:对 429 或 5xx 错误,等待指数级增长后重试(最多 3 次)。
  • 时间对齐:所有股票使用同一日历,避免因交易日不同导致 NaN 数据膨胀。

实现步骤:10 行核心代码

以下是一个可运行的 Python 脚本,使用 concurrent.futures.ThreadPoolExecutor 并行下载。

python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
import yfinance as yf
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta
import pandas as pd

def fetch_single(ticker, start, end):
    """获取单只股票数据,返回 (ticker, DataFrame)"""
    try:
        df = yf.download(ticker, start=start, end=end, progress=False, auto_adjust=True)
        df['Ticker'] = ticker
        return ticker, df
    except Exception as e:
        print(f"Failed {ticker}: {e}")
        return ticker, None

def fetch_all(tickers, start, end, workers=5):
    """并行获取多个ticker的历史数据"""
    results = {}
    with ThreadPoolExecutor(max_workers=workers) as executor:
        futures = {executor.submit(fetch_single, t, start, end): t for t in tickers}
        for future in as_completed(futures):
            ticker, df = future.result()
            if df is not None:
                results[ticker] = df
    return results

# 使用示例
if __name__ == '__main__':
    tickers = ['AAPL','MSFT','GOOGL','AMZN','TSLA','JPM','V','JNJ','WMT','PG']
    start = '2019-01-01'
    end = '2024-01-01'
    data = fetch_all(tickers, start, end, workers=8)
    # 合并成 MultiIndex DataFrame
    combined = pd.concat(data.values(), keys=data.keys())
    print(combined.head())

配置文件(YAML):把 tickers 和参数放进来,方便复现。

yaml
1 2 3 4 5 6 7 8 9 10 11 12
# config.yaml
tickers:
  - AAPL
  - MSFT
  - GOOGL
  - AMZN
  - TSLA
start: '2019-01-01'
end: '2024-01-01'
workers: 8
retry: 3
timeout: 30

调参心得:workers 数量怎么选?

我在同一台机器(8 核 CPU,100M 带宽)上测试了 10 只股票 5 年日数据(约 1250 个交易日 × 10 = 12500 行)。

workers 时间(秒) HTTP 429 错误次数
1 45.2 0
4 16.8 0
8 8.4 1
16 6.1 7(需重试2次)

结论:对于 yfinance 的 Public API,建议 workers 取 6~8。超过 8 后限流频率显著增加,重试带来的额外延迟反而拉低整体效率。我的习惯是取 min(cpu_count * 1.5, 8)

另外 timeout 参数我设为 30 秒,因为某些冷门股票响应慢;重试间隔采用 random.uniform(1, 3) 指数增长,最大 15 秒。

实验结果对比:并行提升 5 倍+(20 只股票场景)

为了更接近真实业务,我扩展测试了 20 只股票(加入中概股 BABA, JD, BIDU 等),结果如下:

  • 串行(workers=1):98 秒
  • 并行(workers=8):21 秒
  • 并行(workers=8 且加上 auto_adjust=True):21 秒(auto_adjust 不额外消耗 I/O)

bar chart comparing serial vs parallel times

两个关键发现:

  1. auto_adjust 参数不影响下载时间,建议始终打开,省去后复权计算。
  2. 并行带来的收益受 IO 瓶颈主导,CPU 负载几乎不变。

避坑指南(至少3个)

坑1:时区导致数据缺失

yfinance 默认返回交易所时区(如美国东部),而你的系统时区可能不同。合并后索引不一致,candlestick 绘图会出现缺口。

解决:所有 DataFrame 统一转换为 UTC:

python
1
df.index = df.index.tz_convert('UTC')

坑2:不同股票交易日不同

美股和港股交易日不同。如果直接 concat,会出现大量 NaN 行。

解决:用 pd.concat 时直接合并,事后 dropna(how='all') 或者使用 inner 连接。推荐保留所有行,然后用 fillna(method='ffill') 对齐。

坑3:API 限流重试时无限循环

如果不加最大重试次数,可能陷入死循环。

解决:使用 retry 装饰器或者手动循环,设置 max_retries=3,并且每次重试前 sleep 2 ** retry_count + random.uniform(0,1) 秒。

个人看法

FinceptTerminal 的优秀之处在于它把数据展示和交互做得漂亮,但底层获取层对开发者来说仍然是一个黑盒。如果你需要定制数据清洗逻辑(比如处理拆股、分红),建议自己维护一个轻量获取模块。本文提供的并行框架足够应付几百只股票的日频更新,而且代码量不到 50 行。

最后提醒:yfinance 并非官方 API,存在被限制的风险。生产环境中建议切换至 Polygon、Alpha Vantage 等有 SLA 的数据源,但并行思想完全通用。