一、前言

当需要同时运行多个策略或监控多个品种时,传统的单线程阻塞方式效率低下。TqSdk内置支持Python异步编程(asyncio),允许在单线程内高效运行多个策略任务,共享网络连接,实现最优性能。

本文将介绍:

  • TqSdk异步编程核心概念
  • 协程与async/await语法
  • 多策略并行运行
  • 异步网格交易实战

二、为什么选择天勤量化(TqSdk)

TqSdk的异步支持使其在多策略场景下具有独特优势:

优势 说明
原生异步 内核基于asyncio构建
共享连接 多策略共用一个网络连接
低延迟 无线程切换开销
await支持 get_quote等接口支持await

安装方法

pip install tqsdk

三、异步编程基础

3.1 同步vs异步

对比 同步编程 异步编程
执行方式 顺序阻塞 并发非阻塞
资源消耗 多进程/线程开销大 单线程开销小
适用场景 简单单策略 多策略并行
代码复杂度 简单 需掌握async/await

3.2 核心概念

概念 说明
协程(Coroutine) 可暂停和恢复的函数
async def 定义协程函数
await 等待协程完成
Task 协程的执行单元
api.create_task 创建异步任务

3.3 TqSdk异步接口

接口 同步用法 异步用法
get_quote quote = api.get_quote(s) quote = await api.get_quote(s)
get_kline_serial klines = api.get_kline_serial(...) klines = await api.get_kline_serial(...)
get_tick_serial ticks = api.get_tick_serial(...) ticks = await api.get_tick_serial(...)

四、异步策略实现

4.1 基础异步示例

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:TqSdk异步编程基础
说明:本代码仅供学习参考
"""

from tqsdk import TqApi, TqAuth

api = TqApi(auth=TqAuth("快期账户", "快期密码"))

async def monitor_quote(symbol):
    """异步监控单个合约行情"""
    # await方式获取行情,会等待数据到达后返回
    quote = await api.get_quote(symbol)
    print(f"[{symbol}] 首次获取: 最新价={quote.last_price}")
    
    # 注册更新通知
    async with api.register_update_notify() as update_chan:
        async for _ in update_chan:
            if api.is_changing(quote, "last_price"):
                print(f"[{symbol}] 价格更新: {quote.last_price}")

# 创建多个异步任务
api.create_task(monitor_quote("SHFE.rb2510"))
api.create_task(monitor_quote("DCE.i2509"))
api.create_task(monitor_quote("SHFE.cu2507"))

print("=" * 50)
print("异步监控多品种行情")
print("=" * 50)

# 主循环驱动异步任务
while True:
    api.wait_update()

4.2 多策略并行

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:多策略异步并行运行
说明:本代码仅供学习参考
"""

from tqsdk import TqApi, TqAuth, TqSim
from tqsdk.lib import TargetPosTask
from tqsdk.tafunc import ma

api = TqApi(TqSim(init_balance=500000), auth=TqAuth("快期账户", "快期密码"))

async def ma_strategy(symbol, fast, slow):
    """
    双均线策略(异步版)
    """
    print(f"[{symbol}] 策略启动: MA({fast},{slow})")
    
    # await获取K线数据
    klines = await api.get_kline_serial(symbol, 60*60, slow+5)
    target_pos = TargetPosTask(api, symbol)
    
    async with api.register_update_notify() as update_chan:
        async for _ in update_chan:
            if api.is_changing(klines.iloc[-1], "datetime"):
                ma_fast = ma(klines["close"], fast)
                ma_slow = ma(klines["close"], slow)
                
                # 金叉做多
                if ma_fast.iloc[-2] < ma_slow.iloc[-2] and ma_fast.iloc[-1] > ma_slow.iloc[-1]:
                    target_pos.set_target_volume(1)
                    print(f"[{symbol}] 金叉做多")
                
                # 死叉做空
                if ma_fast.iloc[-2] > ma_slow.iloc[-2] and ma_fast.iloc[-1] < ma_slow.iloc[-1]:
                    target_pos.set_target_volume(-1)
                    print(f"[{symbol}] 死叉做空")


async def breakout_strategy(symbol, period):
    """
    突破策略(异步版)
    """
    print(f"[{symbol}] 突破策略启动: 周期={period}")
    
    klines = await api.get_kline_serial(symbol, 60*60, period+5)
    target_pos = TargetPosTask(api, symbol)
    
    async with api.register_update_notify() as update_chan:
        async for _ in update_chan:
            if api.is_changing(klines.iloc[-1], "datetime"):
                high = klines["high"].iloc[-period-1:-1].max()
                low = klines["low"].iloc[-period-1:-1].min()
                close = klines["close"].iloc[-1]
                
                if close > high:
                    target_pos.set_target_volume(1)
                    print(f"[{symbol}] 突破高点做多")
                elif close < low:
                    target_pos.set_target_volume(-1)
                    print(f"[{symbol}] 突破低点做空")


print("=" * 60)
print("多策略异步并行运行")
print("=" * 60)

# 创建多个策略任务(不同品种、不同策略)
api.create_task(ma_strategy("SHFE.rb2510", 5, 20))      # 螺纹钢-双均线
api.create_task(ma_strategy("DCE.i2509", 10, 30))       # 铁矿石-双均线
api.create_task(breakout_strategy("DCE.m2509", 20))     # 豆粕-突破
api.create_task(breakout_strategy("SHFE.cu2507", 15))   # 沪铜-突破

# 主循环
while True:
    api.wait_update()

4.3 异步网格交易

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:异步网格交易策略
说明:本代码仅供学习参考
"""

from tqsdk import TqApi, TqAuth, TqSim
from tqsdk.lib import TargetPosTask

api = TqApi(TqSim(init_balance=200000), auth=TqAuth("快期账户", "快期密码"))

async def grid_trading(symbol, grid_size, max_pos):
    """
    网格交易策略(异步版)
    
    参数:
        symbol: 合约代码
        grid_size: 网格间距(点数)
        max_pos: 最大持仓手数
    """
    print(f"[{symbol}] 网格策略启动: 网格={grid_size}点, 最大持仓={max_pos}")
    
    quote = await api.get_quote(symbol)
    position = api.get_position(symbol)
    
    # 以当前价为基准价
    base_price = quote.last_price
    print(f"[{symbol}] 基准价格: {base_price}")
    
    async with api.register_update_notify() as update_chan:
        async for _ in update_chan:
            if api.is_changing(quote, "last_price"):
                current_price = quote.last_price
                current_pos = position.pos_long - position.pos_short
                
                # 计算当前价格偏离基准的网格数
                grid_offset = int((current_price - base_price) / grid_size)
                
                # 网格逻辑:价格下跌时加多仓,价格上涨时减多仓
                target_pos = -grid_offset  # 网格数取负作为目标持仓
                
                # 限制持仓范围
                target_pos = max(-max_pos, min(max_pos, target_pos))
                
                if target_pos != current_pos:
                    api.insert_order(
                        symbol=symbol,
                        direction="BUY" if target_pos > current_pos else "SELL",
                        offset="OPEN" if abs(target_pos) > abs(current_pos) else "CLOSE",
                        volume=abs(target_pos - current_pos),
                        limit_price=current_price
                    )
                    print(f"[{symbol}] 网格={grid_offset} 持仓调整: {current_pos} -> {target_pos}")


print("=" * 60)
print("异步网格交易")
print("=" * 60)

# 同时运行多个品种的网格
api.create_task(grid_trading("SHFE.rb2510", 10, 5))  # 螺纹钢
api.create_task(grid_trading("DCE.i2509", 5, 3))     # 铁矿石

while True:
    api.wait_update()

五、异步编程最佳实践

5.1 错误处理

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:异步策略错误处理
说明:本代码仅供学习参考
"""

from tqsdk import TqApi, TqAuth, TqSim

api = TqApi(TqSim(), auth=TqAuth("快期账户", "快期密码"))

async def safe_strategy(symbol):
    """带错误处理的异步策略"""
    try:
        quote = await api.get_quote(symbol)
        print(f"[{symbol}] 策略启动成功")
        
        async with api.register_update_notify() as update_chan:
            async for _ in update_chan:
                try:
                    if api.is_changing(quote, "last_price"):
                        # 策略逻辑...
                        print(f"[{symbol}] 价格={quote.last_price}")
                except Exception as e:
                    print(f"[{symbol}] 策略执行错误: {e}")
                    continue
                    
    except Exception as e:
        print(f"[{symbol}] 策略初始化失败: {e}")


# 创建任务
api.create_task(safe_strategy("SHFE.rb2510"))

while True:
    api.wait_update()

5.2 任务管理

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:异步任务生命周期管理
说明:本代码仅供学习参考
"""

from tqsdk import TqApi, TqAuth, TqSim
import asyncio

api = TqApi(TqSim(), auth=TqAuth("快期账户", "快期密码"))

# 任务字典
tasks = {}

async def managed_strategy(symbol, task_id):
    """可管理的策略任务"""
    print(f"[任务{task_id}] {symbol} 启动")
    
    quote = await api.get_quote(symbol)
    
    try:
        async with api.register_update_notify() as update_chan:
            async for _ in update_chan:
                if api.is_changing(quote, "last_price"):
                    print(f"[任务{task_id}] {symbol} 价格={quote.last_price}")
    except asyncio.CancelledError:
        print(f"[任务{task_id}] {symbol} 被取消")
        raise
    finally:
        print(f"[任务{task_id}] {symbol} 结束清理")


def start_strategy(symbol):
    """启动策略"""
    task_id = len(tasks) + 1
    task = api.create_task(managed_strategy(symbol, task_id))
    tasks[task_id] = {"symbol": symbol, "task": task}
    return task_id


def stop_strategy(task_id):
    """停止策略"""
    if task_id in tasks:
        tasks[task_id]["task"].cancel()
        print(f"已请求取消任务 {task_id}")


# 启动策略
print("=" * 50)
print("任务管理示例")
print("=" * 50)

id1 = start_strategy("SHFE.rb2510")
id2 = start_strategy("DCE.i2509")

# 主循环
count = 0
while True:
    api.wait_update()
    count += 1
    
    # 示例:运行50次后停止第一个任务
    if count == 50:
        stop_strategy(id1)

六、总结

要点 内容
核心语法 async def + await
创建任务 api.create_task(coro)
获取数据 await api.get_quote/get_kline_serial
监听更新 async with api.register_update_notify()
优势 单连接、低延迟、高并发

异步编程速查

# 定义异步策略
async def strategy(symbol):
    quote = await api.get_quote(symbol)
    async with api.register_update_notify() as update_chan:
        async for _ in update_chan:
            if api.is_changing(quote):
                # 策略逻辑
                pass

# 创建任务
api.create_task(strategy("SHFE.rb2510"))

# 主循环
while True:
    api.wait_update()

免责声明:本文仅供学习交流使用,不构成任何投资建议。期货交易有风险,入市需谨慎。

更多资源

  • 天勤量化官网:https://www.shinnytech.com
  • GitHub开源地址:https://github.com/shinnytech/tqsdk-python
  • 官方文档:https://doc.shinnytech.com/tqsdk/latest
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐