【期货量化进阶】TqSdk异步编程与多策略并行(Python协程实战)
TqSdk异步编程摘要:本文介绍了天勤量化(TqSdk)的异步编程功能,重点阐述了其在多策略并行运行中的优势。内容涵盖异步编程基础概念(协程、async/await)、TqSdk异步接口使用方法,以及实际应用案例,包括多品种行情监控、双均线策略与突破策略并行执行、网格交易实现等。通过异步编程,TqSdk可在单线程内高效运行多个策略任务,共享网络连接,降低资源消耗,提升交易系统性能。文中提供了完整的
·
一、前言
当需要同时运行多个策略或监控多个品种时,传统的单线程阻塞方式效率低下。TqSdk内置支持Python异步编程(asyncio),允许在单线程内高效运行多个策略任务,共享网络连接,实现最优性能。
本文将介绍:
- TqSdk异步编程核心概念
- 协程与
async/await语法 - 多策略并行运行
- 异步网格交易实战
二、为什么选择天勤量化(TqSdk)
TqSdk的异步支持使其在多策略场景下具有独特优势:
| 优势 | 说明 |
|---|---|
| 原生异步 | 内核基于asyncio构建 |
| 共享连接 | 多策略共用一个网络连接 |
| 低延迟 | 无线程切换开销 |
| await支持 | get_quote等接口支持await |
安装方法:
pip install tqsdk
三、异步编程基础
3.1 同步vs异步
| 对比 | 同步编程 | 异步编程 |
|---|---|---|
| 执行方式 | 顺序阻塞 | 并发非阻塞 |
| 资源消耗 | 多进程/线程开销大 | 单线程开销小 |
| 适用场景 | 简单单策略 | 多策略并行 |
| 代码复杂度 | 简单 | 需掌握async/await |
3.2 核心概念
| 概念 | 说明 |
|---|---|
| 协程(Coroutine) | 可暂停和恢复的函数 |
| async def | 定义协程函数 |
| await | 等待协程完成 |
| Task | 协程的执行单元 |
| api.create_task | 创建异步任务 |
3.3 TqSdk异步接口
| 接口 | 同步用法 | 异步用法 |
|---|---|---|
| get_quote | quote = api.get_quote(s) |
quote = await api.get_quote(s) |
| get_kline_serial | klines = api.get_kline_serial(...) |
klines = await api.get_kline_serial(...) |
| get_tick_serial | ticks = api.get_tick_serial(...) |
ticks = await api.get_tick_serial(...) |
四、异步策略实现
4.1 基础异步示例
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:TqSdk异步编程基础
说明:本代码仅供学习参考
"""
from tqsdk import TqApi, TqAuth
api = TqApi(auth=TqAuth("快期账户", "快期密码"))
async def monitor_quote(symbol):
"""异步监控单个合约行情"""
# await方式获取行情,会等待数据到达后返回
quote = await api.get_quote(symbol)
print(f"[{symbol}] 首次获取: 最新价={quote.last_price}")
# 注册更新通知
async with api.register_update_notify() as update_chan:
async for _ in update_chan:
if api.is_changing(quote, "last_price"):
print(f"[{symbol}] 价格更新: {quote.last_price}")
# 创建多个异步任务
api.create_task(monitor_quote("SHFE.rb2510"))
api.create_task(monitor_quote("DCE.i2509"))
api.create_task(monitor_quote("SHFE.cu2507"))
print("=" * 50)
print("异步监控多品种行情")
print("=" * 50)
# 主循环驱动异步任务
while True:
api.wait_update()
4.2 多策略并行
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:多策略异步并行运行
说明:本代码仅供学习参考
"""
from tqsdk import TqApi, TqAuth, TqSim
from tqsdk.lib import TargetPosTask
from tqsdk.tafunc import ma
api = TqApi(TqSim(init_balance=500000), auth=TqAuth("快期账户", "快期密码"))
async def ma_strategy(symbol, fast, slow):
"""
双均线策略(异步版)
"""
print(f"[{symbol}] 策略启动: MA({fast},{slow})")
# await获取K线数据
klines = await api.get_kline_serial(symbol, 60*60, slow+5)
target_pos = TargetPosTask(api, symbol)
async with api.register_update_notify() as update_chan:
async for _ in update_chan:
if api.is_changing(klines.iloc[-1], "datetime"):
ma_fast = ma(klines["close"], fast)
ma_slow = ma(klines["close"], slow)
# 金叉做多
if ma_fast.iloc[-2] < ma_slow.iloc[-2] and ma_fast.iloc[-1] > ma_slow.iloc[-1]:
target_pos.set_target_volume(1)
print(f"[{symbol}] 金叉做多")
# 死叉做空
if ma_fast.iloc[-2] > ma_slow.iloc[-2] and ma_fast.iloc[-1] < ma_slow.iloc[-1]:
target_pos.set_target_volume(-1)
print(f"[{symbol}] 死叉做空")
async def breakout_strategy(symbol, period):
"""
突破策略(异步版)
"""
print(f"[{symbol}] 突破策略启动: 周期={period}")
klines = await api.get_kline_serial(symbol, 60*60, period+5)
target_pos = TargetPosTask(api, symbol)
async with api.register_update_notify() as update_chan:
async for _ in update_chan:
if api.is_changing(klines.iloc[-1], "datetime"):
high = klines["high"].iloc[-period-1:-1].max()
low = klines["low"].iloc[-period-1:-1].min()
close = klines["close"].iloc[-1]
if close > high:
target_pos.set_target_volume(1)
print(f"[{symbol}] 突破高点做多")
elif close < low:
target_pos.set_target_volume(-1)
print(f"[{symbol}] 突破低点做空")
print("=" * 60)
print("多策略异步并行运行")
print("=" * 60)
# 创建多个策略任务(不同品种、不同策略)
api.create_task(ma_strategy("SHFE.rb2510", 5, 20)) # 螺纹钢-双均线
api.create_task(ma_strategy("DCE.i2509", 10, 30)) # 铁矿石-双均线
api.create_task(breakout_strategy("DCE.m2509", 20)) # 豆粕-突破
api.create_task(breakout_strategy("SHFE.cu2507", 15)) # 沪铜-突破
# 主循环
while True:
api.wait_update()
4.3 异步网格交易
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:异步网格交易策略
说明:本代码仅供学习参考
"""
from tqsdk import TqApi, TqAuth, TqSim
from tqsdk.lib import TargetPosTask
api = TqApi(TqSim(init_balance=200000), auth=TqAuth("快期账户", "快期密码"))
async def grid_trading(symbol, grid_size, max_pos):
"""
网格交易策略(异步版)
参数:
symbol: 合约代码
grid_size: 网格间距(点数)
max_pos: 最大持仓手数
"""
print(f"[{symbol}] 网格策略启动: 网格={grid_size}点, 最大持仓={max_pos}")
quote = await api.get_quote(symbol)
position = api.get_position(symbol)
# 以当前价为基准价
base_price = quote.last_price
print(f"[{symbol}] 基准价格: {base_price}")
async with api.register_update_notify() as update_chan:
async for _ in update_chan:
if api.is_changing(quote, "last_price"):
current_price = quote.last_price
current_pos = position.pos_long - position.pos_short
# 计算当前价格偏离基准的网格数
grid_offset = int((current_price - base_price) / grid_size)
# 网格逻辑:价格下跌时加多仓,价格上涨时减多仓
target_pos = -grid_offset # 网格数取负作为目标持仓
# 限制持仓范围
target_pos = max(-max_pos, min(max_pos, target_pos))
if target_pos != current_pos:
api.insert_order(
symbol=symbol,
direction="BUY" if target_pos > current_pos else "SELL",
offset="OPEN" if abs(target_pos) > abs(current_pos) else "CLOSE",
volume=abs(target_pos - current_pos),
limit_price=current_price
)
print(f"[{symbol}] 网格={grid_offset} 持仓调整: {current_pos} -> {target_pos}")
print("=" * 60)
print("异步网格交易")
print("=" * 60)
# 同时运行多个品种的网格
api.create_task(grid_trading("SHFE.rb2510", 10, 5)) # 螺纹钢
api.create_task(grid_trading("DCE.i2509", 5, 3)) # 铁矿石
while True:
api.wait_update()
五、异步编程最佳实践
5.1 错误处理
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:异步策略错误处理
说明:本代码仅供学习参考
"""
from tqsdk import TqApi, TqAuth, TqSim
api = TqApi(TqSim(), auth=TqAuth("快期账户", "快期密码"))
async def safe_strategy(symbol):
"""带错误处理的异步策略"""
try:
quote = await api.get_quote(symbol)
print(f"[{symbol}] 策略启动成功")
async with api.register_update_notify() as update_chan:
async for _ in update_chan:
try:
if api.is_changing(quote, "last_price"):
# 策略逻辑...
print(f"[{symbol}] 价格={quote.last_price}")
except Exception as e:
print(f"[{symbol}] 策略执行错误: {e}")
continue
except Exception as e:
print(f"[{symbol}] 策略初始化失败: {e}")
# 创建任务
api.create_task(safe_strategy("SHFE.rb2510"))
while True:
api.wait_update()
5.2 任务管理
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:异步任务生命周期管理
说明:本代码仅供学习参考
"""
from tqsdk import TqApi, TqAuth, TqSim
import asyncio
api = TqApi(TqSim(), auth=TqAuth("快期账户", "快期密码"))
# 任务字典
tasks = {}
async def managed_strategy(symbol, task_id):
"""可管理的策略任务"""
print(f"[任务{task_id}] {symbol} 启动")
quote = await api.get_quote(symbol)
try:
async with api.register_update_notify() as update_chan:
async for _ in update_chan:
if api.is_changing(quote, "last_price"):
print(f"[任务{task_id}] {symbol} 价格={quote.last_price}")
except asyncio.CancelledError:
print(f"[任务{task_id}] {symbol} 被取消")
raise
finally:
print(f"[任务{task_id}] {symbol} 结束清理")
def start_strategy(symbol):
"""启动策略"""
task_id = len(tasks) + 1
task = api.create_task(managed_strategy(symbol, task_id))
tasks[task_id] = {"symbol": symbol, "task": task}
return task_id
def stop_strategy(task_id):
"""停止策略"""
if task_id in tasks:
tasks[task_id]["task"].cancel()
print(f"已请求取消任务 {task_id}")
# 启动策略
print("=" * 50)
print("任务管理示例")
print("=" * 50)
id1 = start_strategy("SHFE.rb2510")
id2 = start_strategy("DCE.i2509")
# 主循环
count = 0
while True:
api.wait_update()
count += 1
# 示例:运行50次后停止第一个任务
if count == 50:
stop_strategy(id1)
六、总结
| 要点 | 内容 |
|---|---|
| 核心语法 | async def + await |
| 创建任务 | api.create_task(coro) |
| 获取数据 | await api.get_quote/get_kline_serial |
| 监听更新 | async with api.register_update_notify() |
| 优势 | 单连接、低延迟、高并发 |
异步编程速查
# 定义异步策略
async def strategy(symbol):
quote = await api.get_quote(symbol)
async with api.register_update_notify() as update_chan:
async for _ in update_chan:
if api.is_changing(quote):
# 策略逻辑
pass
# 创建任务
api.create_task(strategy("SHFE.rb2510"))
# 主循环
while True:
api.wait_update()
免责声明:本文仅供学习交流使用,不构成任何投资建议。期货交易有风险,入市需谨慎。
更多资源:
- 天勤量化官网:https://www.shinnytech.com
- GitHub开源地址:https://github.com/shinnytech/tqsdk-python
- 官方文档:https://doc.shinnytech.com/tqsdk/latest
更多推荐



所有评论(0)