> ## Documentation Index
> Fetch the complete documentation index at: https://docs.tickflow.org/llms.txt
> Use this file to discover all available pages before exploring further.

# 最佳实践

> 生产环境使用的最佳实践和注意事项

## 客户端管理

### 使用上下文管理器

推荐使用 `with` 语句管理客户端生命周期，确保资源正确释放：

<CodeGroup>
  ```python 同步 theme={null}
  from tickflow import TickFlow

  with TickFlow(api_key="your-api-key") as tf:
      df = tf.klines.get("600000.SH", as_dataframe=True)
      # 使用完毕后自动关闭连接
  ```

  ```python 异步 theme={null}
  from tickflow import AsyncTickFlow

  async with AsyncTickFlow(api_key="your-api-key") as tf:
      df = await tf.klines.get("600000.SH", as_dataframe=True)
      # 使用完毕后自动关闭连接
  ```
</CodeGroup>

### 复用客户端实例

在应用程序中应复用客户端实例，避免频繁创建销毁：

```python theme={null}
# ❌ 不推荐：每次请求创建新客户端
def get_stock_price(symbol):
    tf = TickFlow(api_key="your-api-key")
    quotes = tf.quotes.get(symbols=[symbol])
    return quotes[0]["last_price"]

# ✅ 推荐：复用客户端
class StockService:
    def __init__(self, api_key):
        self.tf = TickFlow(api_key=api_key)
    
    def get_price(self, symbol):
        quotes = self.tf.quotes.get(symbols=[symbol])
        return quotes[0]["last_price"]
    
    def close(self):
        self.tf.close()
```

## 错误处理

### 捕获特定异常

SDK 提供了细粒度的异常类型，便于针对性处理：

```python theme={null}
from tickflow import (
    TickFlow,
    AuthenticationError,
    NotFoundError,
    RateLimitError,
    ConnectionError,
    TimeoutError,
)

tf = TickFlow(api_key="your-api-key")

try:
    quotes = tf.quotes.get(symbols=["INVALID.XX"])
except AuthenticationError:
    print("API Key 无效或已过期")
except NotFoundError as e:
    print(f"标的不存在: {e.message}")
except RateLimitError:
    print("请求过于频繁，请稍后重试")
except (ConnectionError, TimeoutError):
    print("网络异常，请检查网络连接")
except Exception as e:
    print(f"未知错误: {e}")
```

### 异常层级

```
TickFlowError
├── APIError
│   ├── AuthenticationError (401)
│   ├── PermissionError (403)
│   ├── NotFoundError (404)
│   ├── BadRequestError (400)
│   ├── RateLimitError (429)
│   └── InternalServerError (5xx)
├── ConnectionError
└── TimeoutError
```

## 重试机制

### 自动重试

SDK 内置了智能重试机制，以下情况会自动重试：

* 网络连接失败
* 请求超时
* 服务器错误 (5xx)
* 频率限制 (429)

```python theme={null}
# 默认重试 3 次，可自定义
tf = TickFlow(
    api_key="your-api-key",
    max_retries=5,      # 最大重试次数
    timeout=60.0        # 超时时间（秒）
)
```

<Note>
  重试使用指数退避策略（1s, 2s, 4s...），并添加随机抖动，避免雪崩效应。
</Note>

## 批量请求优化

### 使用批量接口

当需要获取多只股票数据时，使用批量接口而非循环单独请求：

```python theme={null}
# ❌ 不推荐：循环单独请求
symbols = ["600000.SH", "000001.SZ", "600519.SH"]
data = {}
for s in symbols:
    data[s] = tf.klines.get(s)  # 3 次网络请求

# ✅ 推荐：使用批量接口
data = tf.klines.batch(symbols)  # 1 次网络请求

# 日内数据同理
data = tf.klines.intraday_batch(symbols)  # 1 次网络请求
```

### 处理大量标的

批量接口自动分批并发请求，默认每批 100 个标的：

```python theme={null}
# 获取 2000 只股票的数据
instruments = tf.exchanges.get_instruments("SH", instrument_type="stock")[:2000]
symbols = [inst["symbol"] for inst in instruments]

# SDK 自动分成 20 批并发请求
df = tf.klines.batch(
    symbols,
    as_dataframe=True,
    show_progress=True,  # 显示进度条
    max_workers=5        # 控制并发数，避免过载
)
```

### 调整每批标的数量

如果服务端对每次请求的标的数量有限制（例如套餐限制每次只能查询 50 个标的），可通过 `batch_size` 参数调整：

```python theme={null}
# 历史 K 线批量
dfs = tf.klines.batch(
    symbols,
    as_dataframe=True,
    batch_size=50,       # 每批 50 个标的
    show_progress=True,
)

# 日内 K 线批量（用法一致）
dfs = tf.klines.intraday_batch(
    symbols,
    as_dataframe=True,
    batch_size=50,
    show_progress=True,
)
```

<Warning>
  过高的并发数可能触发频率限制，建议 `max_workers` 设置为 3-10。
</Warning>

## DataFrame 最佳实践

### 按需使用 DataFrame

DataFrame 转换有一定开销，只在需要时启用：

```python theme={null}
# 简单查询：使用原始数据
data = tf.klines.get("600000.SH")
latest_price = data["close"][-1]

# 复杂分析：使用 DataFrame
df = tf.klines.get("600000.SH", as_dataframe=True)
df["ma20"] = df["close"].rolling(20).mean()
```

### 批量数据的使用

批量接口返回 `Dict[str, pd.DataFrame]`，按标的代码索引：

```python theme={null}
dfs = tf.klines.batch(["600000.SH", "000001.SZ"], as_dataframe=True)

# 获取单只股票的 DataFrame
df_600000 = dfs["600000.SH"]
print(df_600000.tail())

# 遍历所有股票
for symbol, df in dfs.items():
    latest_close = df["close"].iloc[-1]
    print(f"{symbol}: {latest_close}")

# 合并为一个大 DataFrame 进行横截面分析
import pandas as pd
all_df = pd.concat(dfs.values())
```

## 异步最佳实践

### 控制并发

使用信号量控制并发数量：

```python theme={null}
import asyncio
from tickflow import AsyncTickFlow

async def main():
    semaphore = asyncio.Semaphore(10)  # 最大 10 个并发
    
    async def fetch_with_limit(tf, symbol):
        async with semaphore:
            return await tf.klines.get(symbol, as_dataframe=True)
    
    async with AsyncTickFlow(api_key="your-api-key") as tf:
        symbols = ["600000.SH", "000001.SZ", ...]  # 大量股票
        tasks = [fetch_with_limit(tf, s) for s in symbols]
        results = await asyncio.gather(*tasks)

asyncio.run(main())
```

### 处理部分失败

使用 `return_exceptions=True` 允许部分任务失败：

```python theme={null}
async def main():
    async with AsyncTickFlow(api_key="your-api-key") as tf:
        symbols = ["600000.SH", "INVALID.XX", "000001.SZ"]
        
        tasks = [tf.klines.get(s, as_dataframe=True) for s in symbols]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        for symbol, result in zip(symbols, results):
            if isinstance(result, Exception):
                print(f"{symbol}: 获取失败 - {result}")
            else:
                print(f"{symbol}: 成功获取 {len(result)} 条数据")

asyncio.run(main())
```

## 生产环境配置

### 环境变量配置

```bash theme={null}
# .env 文件
TICKFLOW_API_KEY=your-production-api-key
TICKFLOW_BASE_URL=https://api.tickflow.org
```

```python theme={null}
import os
from dotenv import load_dotenv
from tickflow import TickFlow

load_dotenv()

tf = TickFlow()  # 自动读取环境变量
```

### 日志配置

```python theme={null}
import logging

# 配置 httpx 日志查看请求详情
logging.basicConfig(level=logging.INFO)
logging.getLogger("httpx").setLevel(logging.DEBUG)
```

## 常见问题

<AccordionGroup>
  <Accordion title="如何处理频率限制？">
    SDK 会自动重试被限流的请求。如果频繁触发限流，建议：

    1. 减少 `max_workers` 或 `max_concurrency` 并发数
    2. 使用批量接口减少请求次数
    3. 在请求间添加适当延迟
    4. 升级套餐

    ```python theme={null}
    import time

    for symbol in symbols:
        data = tf.klines.get(symbol)
        time.sleep(0.1)  # 添加 100ms 延迟
    ```
  </Accordion>

  <Accordion title="DataFrame 返回空怎么办？">
    检查以下几点：

    1. 确认已安装 pandas：`pip install pandas`
    2. 确认标的代码正确（如 `600000.SH` 而非 `600000`）
    3. 确认时间范围内有数据
    4. 检查是否有异常抛出

    ```python theme={null}
    try:
        df = tf.klines.get("600000.SH", as_dataframe=True)
        if df.empty:
            print("数据为空，请检查参数")
    except Exception as e:
        print(f"请求失败: {e}")
    ```
  </Accordion>

  <Accordion title="如何提高批量请求性能？">
    1. 使用异步客户端 `AsyncTickFlow`
    2. 适当提高 `max_workers`(同步客户端)、`max_concurrency`(异步客户端)
    3. 减少单次请求的数据量（如减少 `count`）
    4. 使用 `show_progress=True` 监控进度

    ```python theme={null}
    # 异步批量获取，性能最佳
    async with AsyncTickFlow() as tf:
        df = await tf.klines.batch(
            symbols,
            count=30,           # 只取最近 30 根
            max_concurrency=10,
            show_progress=True
        )
    ```
  </Accordion>
</AccordionGroup>
