跳转到主要内容

客户端管理

使用上下文管理器

推荐使用 with 语句管理客户端生命周期,确保资源正确释放:
from tickflow import TickFlow

with TickFlow(api_key="your-api-key") as tf:
    df = tf.klines.get("600000.SH", as_dataframe=True)
    # 使用完毕后自动关闭连接

复用客户端实例

在应用程序中应复用客户端实例,避免频繁创建销毁:
# ❌ 不推荐:每次请求创建新客户端
def get_stock_price(symbol):
    tf = TickFlow(api_key="your-api-key")
    quotes = tf.quotes.get(symbols=[symbol])
    return quotes[0]["last_price"]

# ✅ 推荐:复用客户端
class StockService:
    def __init__(self, api_key):
        self.tf = TickFlow(api_key=api_key)
    
    def get_price(self, symbol):
        quotes = self.tf.quotes.get(symbols=[symbol])
        return quotes[0]["last_price"]
    
    def close(self):
        self.tf.close()

错误处理

捕获特定异常

SDK 提供了细粒度的异常类型,便于针对性处理:
from tickflow import (
    TickFlow,
    AuthenticationError,
    NotFoundError,
    RateLimitError,
    ConnectionError,
    TimeoutError,
)

tf = TickFlow(api_key="your-api-key")

try:
    quotes = tf.quotes.get(symbols=["INVALID.XX"])
except AuthenticationError:
    print("API Key 无效或已过期")
except NotFoundError as e:
    print(f"标的不存在: {e.message}")
except RateLimitError:
    print("请求过于频繁,请稍后重试")
except (ConnectionError, TimeoutError):
    print("网络异常,请检查网络连接")
except Exception as e:
    print(f"未知错误: {e}")

异常层级

TickFlowError
├── APIError
│   ├── AuthenticationError (401)
│   ├── PermissionError (403)
│   ├── NotFoundError (404)
│   ├── BadRequestError (400)
│   ├── RateLimitError (429)
│   └── InternalServerError (5xx)
├── ConnectionError
└── TimeoutError

重试机制

自动重试

SDK 内置了智能重试机制,以下情况会自动重试:
  • 网络连接失败
  • 请求超时
  • 服务器错误 (5xx)
  • 频率限制 (429)
# 默认重试 3 次,可自定义
tf = TickFlow(
    api_key="your-api-key",
    max_retries=5,      # 最大重试次数
    timeout=60.0        # 超时时间(秒)
)
重试使用指数退避策略(1s, 2s, 4s…),并添加随机抖动,避免雪崩效应。

批量请求优化

使用批量接口

当需要获取多只股票数据时,使用批量接口而非循环单独请求:
# ❌ 不推荐:循环单独请求
symbols = ["600000.SH", "000001.SZ", "600519.SH"]
data = {}
for s in symbols:
    data[s] = tf.klines.get(s)  # 3 次网络请求

# ✅ 推荐:使用批量接口
data = tf.klines.batch(symbols)  # 1 次网络请求

# 日内数据同理
data = tf.klines.intraday_batch(symbols)  # 1 次网络请求

处理大量标的

批量接口自动分批并发请求,默认每批 100 个标的:
# 获取 2000 只股票的数据
instruments = tf.exchanges.get_instruments("SH", instrument_type="stock")[:2000]
symbols = [inst["symbol"] for inst in instruments]

# SDK 自动分成 20 批并发请求
df = tf.klines.batch(
    symbols,
    as_dataframe=True,
    show_progress=True,  # 显示进度条
    max_workers=5        # 控制并发数,避免过载
)

调整每批标的数量

如果服务端对每次请求的标的数量有限制(例如套餐限制每次只能查询 50 个标的),可通过 batch_size 参数调整:
# 历史 K 线批量
dfs = tf.klines.batch(
    symbols,
    as_dataframe=True,
    batch_size=50,       # 每批 50 个标的
    show_progress=True,
)

# 日内 K 线批量(用法一致)
dfs = tf.klines.intraday_batch(
    symbols,
    as_dataframe=True,
    batch_size=50,
    show_progress=True,
)
过高的并发数可能触发频率限制,建议 max_workers 设置为 3-10。

DataFrame 最佳实践

按需使用 DataFrame

DataFrame 转换有一定开销,只在需要时启用:
# 简单查询:使用原始数据
data = tf.klines.get("600000.SH")
latest_price = data["close"][-1]

# 复杂分析:使用 DataFrame
df = tf.klines.get("600000.SH", as_dataframe=True)
df["ma20"] = df["close"].rolling(20).mean()

批量数据的使用

批量接口返回 Dict[str, pd.DataFrame],按标的代码索引:
dfs = tf.klines.batch(["600000.SH", "000001.SZ"], as_dataframe=True)

# 获取单只股票的 DataFrame
df_600000 = dfs["600000.SH"]
print(df_600000.tail())

# 遍历所有股票
for symbol, df in dfs.items():
    latest_close = df["close"].iloc[-1]
    print(f"{symbol}: {latest_close}")

# 合并为一个大 DataFrame 进行横截面分析
import pandas as pd
all_df = pd.concat(dfs.values())

异步最佳实践

控制并发

使用信号量控制并发数量:
import asyncio
from tickflow import AsyncTickFlow

async def main():
    semaphore = asyncio.Semaphore(10)  # 最大 10 个并发
    
    async def fetch_with_limit(tf, symbol):
        async with semaphore:
            return await tf.klines.get(symbol, as_dataframe=True)
    
    async with AsyncTickFlow(api_key="your-api-key") as tf:
        symbols = ["600000.SH", "000001.SZ", ...]  # 大量股票
        tasks = [fetch_with_limit(tf, s) for s in symbols]
        results = await asyncio.gather(*tasks)

asyncio.run(main())

处理部分失败

使用 return_exceptions=True 允许部分任务失败:
async def main():
    async with AsyncTickFlow(api_key="your-api-key") as tf:
        symbols = ["600000.SH", "INVALID.XX", "000001.SZ"]
        
        tasks = [tf.klines.get(s, as_dataframe=True) for s in symbols]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        for symbol, result in zip(symbols, results):
            if isinstance(result, Exception):
                print(f"{symbol}: 获取失败 - {result}")
            else:
                print(f"{symbol}: 成功获取 {len(result)} 条数据")

asyncio.run(main())

生产环境配置

环境变量配置

# .env 文件
TICKFLOW_API_KEY=your-production-api-key
TICKFLOW_BASE_URL=https://api.tickflow.org
import os
from dotenv import load_dotenv
from tickflow import TickFlow

load_dotenv()

tf = TickFlow()  # 自动读取环境变量

日志配置

import logging

# 配置 httpx 日志查看请求详情
logging.basicConfig(level=logging.INFO)
logging.getLogger("httpx").setLevel(logging.DEBUG)

常见问题

SDK 会自动重试被限流的请求。如果频繁触发限流,建议:
  1. 减少 max_workersmax_concurrency 并发数
  2. 使用批量接口减少请求次数
  3. 在请求间添加适当延迟
  4. 升级套餐
import time

for symbol in symbols:
    data = tf.klines.get(symbol)
    time.sleep(0.1)  # 添加 100ms 延迟
检查以下几点:
  1. 确认已安装 pandas:pip install pandas
  2. 确认标的代码正确(如 600000.SH 而非 600000
  3. 确认时间范围内有数据
  4. 检查是否有异常抛出
try:
    df = tf.klines.get("600000.SH", as_dataframe=True)
    if df.empty:
        print("数据为空,请检查参数")
except Exception as e:
    print(f"请求失败: {e}")
  1. 使用异步客户端 AsyncTickFlow
  2. 适当提高 max_workers(同步客户端)、max_concurrency(异步客户端)
  3. 减少单次请求的数据量(如减少 count
  4. 使用 show_progress=True 监控进度
# 异步批量获取,性能最佳
async with AsyncTickFlow() as tf:
    df = await tf.klines.batch(
        symbols,
        count=30,           # 只取最近 30 根
        max_concurrency=10,
        show_progress=True
    )