客户端管理
使用上下文管理器
推荐使用 with 语句管理客户端生命周期,确保资源正确释放:
from tickflow import TickFlow
with TickFlow( api_key = "your-api-key" ) as tf:
df = tf.klines.get( "600000.SH" , as_dataframe = True )
# 使用完毕后自动关闭连接
复用客户端实例
在应用程序中应复用客户端实例,避免频繁创建销毁:
# ❌ 不推荐:每次请求创建新客户端
def get_stock_price ( symbol ):
tf = TickFlow( api_key = "your-api-key" )
quotes = tf.quotes.get( symbols = [symbol])
return quotes[ 0 ][ "last_price" ]
# ✅ 推荐:复用客户端
class StockService :
def __init__ ( self , api_key ):
self .tf = TickFlow( api_key = api_key)
def get_price ( self , symbol ):
quotes = self .tf.quotes.get( symbols = [symbol])
return quotes[ 0 ][ "last_price" ]
def close ( self ):
self .tf.close()
错误处理
捕获特定异常
SDK 提供了细粒度的异常类型,便于针对性处理:
from tickflow import (
TickFlow,
AuthenticationError,
NotFoundError,
RateLimitError,
ConnectionError ,
TimeoutError ,
)
tf = TickFlow( api_key = "your-api-key" )
try :
quotes = tf.quotes.get( symbols = [ "INVALID.XX" ])
except AuthenticationError:
print ( "API Key 无效或已过期" )
except NotFoundError as e:
print ( f "标的不存在: { e.message } " )
except RateLimitError:
print ( "请求过于频繁,请稍后重试" )
except ( ConnectionError , TimeoutError ):
print ( "网络异常,请检查网络连接" )
except Exception as e:
print ( f "未知错误: { e } " )
异常层级
TickFlowError
├── APIError
│ ├── AuthenticationError (401)
│ ├── PermissionError (403)
│ ├── NotFoundError (404)
│ ├── BadRequestError (400)
│ ├── RateLimitError (429)
│ └── InternalServerError (5xx)
├── ConnectionError
└── TimeoutError
重试机制
自动重试
SDK 内置了智能重试机制,以下情况会自动重试:
网络连接失败
请求超时
服务器错误 (5xx)
频率限制 (429)
# 默认重试 3 次,可自定义
tf = TickFlow(
api_key = "your-api-key" ,
max_retries = 5 , # 最大重试次数
timeout = 60.0 # 超时时间(秒)
)
重试使用指数退避策略(1s, 2s, 4s…),并添加随机抖动,避免雪崩效应。
批量请求优化
使用批量接口
当需要获取多只股票数据时,使用批量接口而非循环单独请求:
# ❌ 不推荐:循环单独请求
symbols = [ "600000.SH" , "000001.SZ" , "600519.SH" ]
data = {}
for s in symbols:
data[s] = tf.klines.get(s) # 3 次网络请求
# ✅ 推荐:使用批量接口
data = tf.klines.batch(symbols) # 1 次网络请求
# 日内数据同理
data = tf.klines.intraday_batch(symbols) # 1 次网络请求
处理大量标的
批量接口自动分批并发请求,默认每批 100 个标的:
# 获取 2000 只股票的数据
instruments = tf.exchanges.get_instruments( "SH" , instrument_type = "stock" )[: 2000 ]
symbols = [inst[ "symbol" ] for inst in instruments]
# SDK 自动分成 20 批并发请求
df = tf.klines.batch(
symbols,
as_dataframe = True ,
show_progress = True , # 显示进度条
max_workers = 5 # 控制并发数,避免过载
)
调整每批标的数量
如果服务端对每次请求的标的数量有限制(例如套餐限制每次只能查询 50 个标的),可通过 batch_size 参数调整:
# 历史 K 线批量
dfs = tf.klines.batch(
symbols,
as_dataframe = True ,
batch_size = 50 , # 每批 50 个标的
show_progress = True ,
)
# 日内 K 线批量(用法一致)
dfs = tf.klines.intraday_batch(
symbols,
as_dataframe = True ,
batch_size = 50 ,
show_progress = True ,
)
过高的并发数可能触发频率限制,建议 max_workers 设置为 3-10。
DataFrame 最佳实践
按需使用 DataFrame
DataFrame 转换有一定开销,只在需要时启用:
# 简单查询:使用原始数据
data = tf.klines.get( "600000.SH" )
latest_price = data[ "close" ][ - 1 ]
# 复杂分析:使用 DataFrame
df = tf.klines.get( "600000.SH" , as_dataframe = True )
df[ "ma20" ] = df[ "close" ].rolling( 20 ).mean()
批量数据的使用
批量接口返回 Dict[str, pd.DataFrame],按标的代码索引:
dfs = tf.klines.batch([ "600000.SH" , "000001.SZ" ], as_dataframe = True )
# 获取单只股票的 DataFrame
df_600000 = dfs[ "600000.SH" ]
print (df_600000.tail())
# 遍历所有股票
for symbol, df in dfs.items():
latest_close = df[ "close" ].iloc[ - 1 ]
print ( f " { symbol } : { latest_close } " )
# 合并为一个大 DataFrame 进行横截面分析
import pandas as pd
all_df = pd.concat(dfs.values())
异步最佳实践
控制并发
使用信号量控制并发数量:
import asyncio
from tickflow import AsyncTickFlow
async def main ():
semaphore = asyncio.Semaphore( 10 ) # 最大 10 个并发
async def fetch_with_limit ( tf , symbol ):
async with semaphore:
return await tf.klines.get(symbol, as_dataframe = True )
async with AsyncTickFlow( api_key = "your-api-key" ) as tf:
symbols = [ "600000.SH" , "000001.SZ" , ... ] # 大量股票
tasks = [fetch_with_limit(tf, s) for s in symbols]
results = await asyncio.gather( * tasks)
asyncio.run(main())
处理部分失败
使用 return_exceptions=True 允许部分任务失败:
async def main ():
async with AsyncTickFlow( api_key = "your-api-key" ) as tf:
symbols = [ "600000.SH" , "INVALID.XX" , "000001.SZ" ]
tasks = [tf.klines.get(s, as_dataframe = True ) for s in symbols]
results = await asyncio.gather( * tasks, return_exceptions = True )
for symbol, result in zip (symbols, results):
if isinstance (result, Exception ):
print ( f " { symbol } : 获取失败 - { result } " )
else :
print ( f " { symbol } : 成功获取 { len (result) } 条数据" )
asyncio.run(main())
生产环境配置
环境变量配置
# .env 文件
TICKFLOW_API_KEY = your-production-api-key
TICKFLOW_BASE_URL = https://api.tickflow.org
import os
from dotenv import load_dotenv
from tickflow import TickFlow
load_dotenv()
tf = TickFlow() # 自动读取环境变量
日志配置
import logging
# 配置 httpx 日志查看请求详情
logging.basicConfig( level = logging. INFO )
logging.getLogger( "httpx" ).setLevel(logging. DEBUG )
常见问题
SDK 会自动重试被限流的请求。如果频繁触发限流,建议:
减少 max_workers 或 max_concurrency 并发数
使用批量接口减少请求次数
在请求间添加适当延迟
升级套餐
import time
for symbol in symbols:
data = tf.klines.get(symbol)
time.sleep( 0.1 ) # 添加 100ms 延迟
检查以下几点:
确认已安装 pandas:pip install pandas
确认标的代码正确(如 600000.SH 而非 600000)
确认时间范围内有数据
检查是否有异常抛出
try :
df = tf.klines.get( "600000.SH" , as_dataframe = True )
if df.empty:
print ( "数据为空,请检查参数" )
except Exception as e:
print ( f "请求失败: { e } " )
使用异步客户端 AsyncTickFlow
适当提高 max_workers(同步客户端)、max_concurrency(异步客户端)
减少单次请求的数据量(如减少 count)
使用 show_progress=True 监控进度
# 异步批量获取,性能最佳
async with AsyncTickFlow() as tf:
df = await tf.klines.batch(
symbols,
count = 30 , # 只取最近 30 根
max_concurrency = 10 ,
show_progress = True
)