TickFlow 提供 WebSocket 接口,支持客户端订阅指定标的的实时行情推送。相比 REST 轮询,WebSocket 延迟更低、带宽占用更少。
WebSocket 实时行情为付费功能,需要购买包含此功能的套餐或单独订阅。每个连接的最大订阅标的数由您的套餐决定。
连接地址
wss://api.tickflow.org/v1/ws/quotes?api_key=YOUR_API_KEY
通过 api_key 查询参数进行认证。连接成功后即可发送订阅/退订消息。
认证失败时返回 HTTP 401 或 403,客户端应停止重连并检查 API Key 和套餐权限。
数据格式
所有消息使用 JSON 文本帧,支持标准 WebSocket 压缩(permessage-deflate),客户端无需额外处理。
客户端命令
subscribe — 订阅标的
订阅一组标的的实时行情。可多次调用追加订阅,已订阅的标的会自动去重。
{
"op": "subscribe",
"symbols": ["600000.SH", "000001.SZ", "AAPL.US"]
}
服务端返回 subscribed 确认,并立即推送新增标的的最新缓存行情。若新增后超出套餐上限,返回 error,本次订阅不生效。
unsubscribe — 退订标的
退订指定标的,服务端返回更新后的订阅列表。
{
"op": "unsubscribe",
"symbols": ["600000.SH"]
}
服务端消息
subscribed — 订阅状态
每次 subscribe / unsubscribe 操作后返回当前完整的订阅状态:
{
"op": "subscribed",
"symbols": ["600000.SH", "000001.SZ", "AAPL.US"],
"total": 3
}
首次订阅成功后,服务端会立即推送已订阅标的的最新缓存行情,无需等待下一个行情更新。
quotes — 行情推送
{
"op": "quotes",
"data": [
{
"symbol": "600000.SH",
"region": "CN",
"last_price": 10.15,
"prev_close": 10.04,
"open": 10.06,
"high": 10.25,
"low": 10.03,
"volume": 1057712,
"amount": 1072786000,
"timestamp": 1769961600000
}
]
}
行情推送仅包含客户端已订阅的标的数据,服务端过滤后发送以减少带宽。
error — 错误消息
{
"op": "error",
"message": "exceeded max 50 symbols (current: 48, requested: +5)"
}
常见错误:
exceeded max N symbols — 订阅标的数超出套餐上限
invalid message: ... — JSON 格式不正确
unknown op: ... — 未知的操作类型
命令总览
| 客户端命令 | 说明 | 服务端响应 |
|---|
subscribe | 订阅标的行情 | subscribed + quotes(初始快照) |
unsubscribe | 退订标的 | subscribed |
| 服务端推送 | 说明 | 触发条件 |
|---|
subscribed | 当前订阅状态 | 每次 subscribe / unsubscribe 后 |
quotes | 实时行情数据 | 已订阅标的有行情更新时 |
error | 错误信息 | 操作失败时 |
连接保活
服务端每 30 秒发送一次 Ping 帧,客户端需回复 Pong 帧。大多数 WebSocket 库会自动处理。
连接管理
- 断开清理:连接断开后该连接的所有订阅自动清除
- 重连恢复:客户端断线重连后需重新发送
subscribe 恢复订阅
- 认证错误:收到 HTTP 401/403 时不应自动重连,请检查 API Key 和套餐权限
Python SDK 示例
安装实时行情依赖:
pip install tickflow[realtime]
同步用法
import datetime
from tickflow import TickFlow
client = TickFlow(api_key="your-api-key")
stream = client.realtime
@stream.on_quotes
def handle(quotes):
for q in quotes:
print(f"行情时间: {datetime.datetime.fromtimestamp(q['timestamp'] / 1000).strftime('%Y-%m-%d %H:%M:%S')}, 标的代码: {q['symbol']}, 最新价: {q['last_price']}, 开盘价: {q['open']}, 最高价: {q['high']}, 最低价: {q['low']}, 昨收价: {q['prev_close']}, 成交量: {q['volume']}, 成交额: {q['amount']}, 扩展字段: {q['ext']}")
@stream.on_error
def on_error(msg):
print(f"错误: {msg}")
stream.subscribe(["600000.SH", "000001.SZ"])
stream.connect() # 阻塞直到 close() 或 Ctrl+C
异步用法
import asyncio
from tickflow import AsyncTickFlow
async def main():
async with AsyncTickFlow(api_key="your-api-key") as client:
stream = client.realtime
@stream.on_quotes
def handle(quotes):
for q in quotes:
print(f"行情时间: {datetime.datetime.fromtimestamp(q['timestamp'] / 1000).strftime('%Y-%m-%d %H:%M:%S')}, 标的代码: {q['symbol']}, 最新价: {q['last_price']}, 开盘价: {q['open']}, 最高价: {q['high']}, 最低价: {q['low']}, 昨收价: {q['prev_close']}, 成交量: {q['volume']}, 成交额: {q['amount']}, 扩展字段: {q['ext']}")
await stream.subscribe(["600000.SH", "000001.SZ"])
await stream.connect()
asyncio.run(main())
后台运行(非阻塞)
from tickflow import TickFlow
import time
client = TickFlow(api_key="your-api-key")
stream = client.realtime
@stream.on_quotes
def handle(quotes):
for q in quotes:
print(f"{q['symbol']}: {q['last_price']}")
stream.subscribe(["600000.SH"])
stream.connect(block=False) # 后台线程运行
# 主线程继续执行其他逻辑
time.sleep(10)
stream.subscribe(["000001.SZ"]) # 动态追加订阅
time.sleep(30)
stream.close()
如果只需获取某一时刻的行情快照,使用 REST 接口 client.quotes.get() 更为简单。WebSocket 适合需要持续接收行情更新的场景。