一、前言

强化学习是机器学习的一个重要分支,通过智能体与环境的交互来学习最优策略。在量化交易中,强化学习可以自动学习交易策略,适应市场变化。本文将介绍如何将强化学习应用于期货量化交易策略。

本文将介绍:

  • 强化学习在量化交易中的应用原理
  • Q-Learning算法
  • DQN(深度Q网络)算法
  • 策略梯度方法
  • 实盘应用注意事项

二、为什么选择天勤量化(TqSdk)

TqSdk强化学习支持:

功能 说明
模拟交易 TqSim提供完整交易环境
实时数据 支持实时行情数据
回测框架 支持强化学习策略回测
状态空间 灵活的状态和动作定义

安装方法

pip install tqsdk pandas numpy gym stable-baselines3

三、强化学习基础

3.1 核心概念

概念 说明
状态(State) 当前市场状态,如价格、技术指标等
动作(Action) 交易动作,如买入、卖出、持有
奖励(Reward) 执行动作后的收益或损失
策略(Policy) 从状态到动作的映射
价值函数 评估状态或动作的价值

3.2 强化学习流程

观察状态 → 选择动作 → 执行交易 → 获得奖励 → 更新策略 → 重复

3.3 交易环境设计

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:期货交易环境设计
说明:本代码仅供学习参考
"""

import numpy as np
import pandas as pd
from tqsdk import TqApi, TqAuth
from tqsdk.tafunc import ma, macd, rsi

class TradingEnvironment:
    """期货交易环境"""
    
    def __init__(self, klines, initial_balance=100000, commission_rate=0.0001):
        """
        初始化交易环境
        
        参数:
            klines: K线数据
            initial_balance: 初始资金
            commission_rate: 手续费率
        """
        self.klines = klines
        self.initial_balance = initial_balance
        self.commission_rate = commission_rate
        
        self.current_step = 0
        self.balance = initial_balance
        self.position = 0  # 持仓数量
        self.entry_price = 0  # 开仓价格
        
        # 构建特征
        self.features = self._build_features()
        
    def _build_features(self):
        """构建特征"""
        features = pd.DataFrame()
        features['close'] = self.klines['close']
        features['volume'] = self.klines['volume']
        features['ma5'] = ma(self.klines['close'], 5)
        features['ma20'] = ma(self.klines['close'], 20)
        
        macd_data = macd(self.klines['close'], 12, 26, 9)
        features['macd'] = macd_data['macd']
        features['rsi'] = rsi(self.klines['close'], 14)
        
        features['return_1'] = self.klines['close'].pct_change(1)
        features['volatility'] = self.klines['close'].pct_change().rolling(20).std()
        
        return features.dropna()
    
    def reset(self):
        """重置环境"""
        self.current_step = 0
        self.balance = self.initial_balance
        self.position = 0
        self.entry_price = 0
        return self._get_state()
    
    def _get_state(self):
        """获取当前状态"""
        if self.current_step >= len(self.features):
            return None
        
        state = self.features.iloc[self.current_step].values
        
        # 添加账户信息
        account_info = np.array([
            self.balance / self.initial_balance,  # 资金比例
            self.position,  # 持仓
            self.entry_price / self.features.iloc[self.current_step]['close'] if self.position != 0 else 0  # 持仓盈亏比例
        ])
        
        return np.concatenate([state, account_info])
    
    def step(self, action):
        """
        执行动作
        
        参数:
            action: 0=卖出, 1=持有, 2=买入
        
        返回:
            state: 新状态
            reward: 奖励
            done: 是否结束
            info: 额外信息
        """
        if self.current_step >= len(self.features) - 1:
            return None, 0, True, {}
        
        current_price = self.features.iloc[self.current_step]['close']
        next_price = self.features.iloc[self.current_step + 1]['close']
        
        reward = 0
        
        # 执行动作
        if action == 0:  # 卖出
            if self.position > 0:
                # 平多
                pnl = (current_price - self.entry_price) * self.position
                commission = current_price * self.position * self.commission_rate
                reward = pnl - commission
                self.balance += pnl - commission
                self.position = 0
                self.entry_price = 0
        elif action == 2:  # 买入
            if self.position == 0:
                # 开多
                shares = int(self.balance / current_price)
                if shares > 0:
                    commission = current_price * shares * self.commission_rate
                    if self.balance >= current_price * shares + commission:
                        self.position = shares
                        self.entry_price = current_price
                        self.balance -= current_price * shares + commission
        
        # 计算持仓盈亏
        if self.position > 0:
            unrealized_pnl = (next_price - self.entry_price) * self.position
            reward += unrealized_pnl * 0.1  # 持仓盈亏的10%作为奖励
        
        # 更新步数
        self.current_step += 1
        
        # 获取新状态
        state = self._get_state()
        done = (self.current_step >= len(self.features) - 1)
        
        # 计算总资产
        total_asset = self.balance
        if self.position > 0:
            total_asset += next_price * self.position
        
        info = {
            'balance': self.balance,
            'position': self.position,
            'total_asset': total_asset
        }
        
        return state, reward, done, info

# 使用示例
api = TqApi(auth=TqAuth("快期账户", "快期密码"))
SYMBOL = "SHFE.rb2510"
klines = api.get_kline_serial(SYMBOL, 3600, 1000)
api.wait_update()

env = TradingEnvironment(klines)
state = env.reset()

print(f"环境初始化完成")
print(f"状态维度: {len(state)}")
print(f"特征数量: {len(env.features.columns)}")

api.close()

四、Q-Learning算法

4.1 Q-Learning实现

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:Q-Learning交易策略
说明:本代码仅供学习参考
"""

import numpy as np
import pandas as pd
from collections import defaultdict
from tqsdk import TqApi, TqAuth
from tqsdk.tafunc import ma, macd, rsi

class QLearningAgent:
    """Q-Learning智能体"""
    
    def __init__(self, state_size, action_size, learning_rate=0.1, discount=0.95, epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.01):
        """
        初始化Q-Learning智能体
        
        参数:
            state_size: 状态空间大小
            action_size: 动作空间大小
            learning_rate: 学习率
            discount: 折扣因子
            epsilon: 探索率
            epsilon_decay: 探索率衰减
            epsilon_min: 最小探索率
        """
        self.state_size = state_size
        self.action_size = action_size
        self.learning_rate = learning_rate
        self.discount = discount
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        
        # Q表(使用字典存储,支持连续状态离散化)
        self.q_table = defaultdict(lambda: np.zeros(action_size))
        
    def _discretize_state(self, state):
        """将连续状态离散化"""
        # 简单离散化:将每个特征分成几个区间
        discretized = []
        for i, value in enumerate(state):
            # 将值映射到0-9的整数
            if i < len(state) - 3:  # 特征部分
                discretized.append(int(np.clip(value * 10, 0, 9)))
            else:  # 账户信息部分
                discretized.append(int(np.clip(value * 10, 0, 9)))
        return tuple(discretized)
    
    def choose_action(self, state):
        """选择动作(epsilon-贪婪策略)"""
        if np.random.random() < self.epsilon:
            return np.random.randint(self.action_size)
        
        discretized_state = self._discretize_state(state)
        return np.argmax(self.q_table[discretized_state])
    
    def update(self, state, action, reward, next_state, done):
        """更新Q值"""
        discretized_state = self._discretize_state(state)
        discretized_next_state = self._discretize_state(next_state) if not done else None
        
        current_q = self.q_table[discretized_state][action]
        
        if done:
            target_q = reward
        else:
            target_q = reward + self.discount * np.max(self.q_table[discretized_next_state])
        
        # Q值更新
        self.q_table[discretized_state][action] += self.learning_rate * (target_q - current_q)
        
        # 衰减探索率
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

# 训练Q-Learning智能体
api = TqApi(auth=TqAuth("快期账户", "快期密码"))
SYMBOL = "SHFE.rb2510"
klines = api.get_kline_serial(SYMBOL, 3600, 2000)
api.wait_update()

from trading_env import TradingEnvironment

env = TradingEnvironment(klines)
state_size = len(env.reset())
action_size = 3  # 卖出、持有、买入

agent = QLearningAgent(state_size, action_size)

# 训练
episodes = 100
for episode in range(episodes):
    state = env.reset()
    total_reward = 0
    
    while True:
        action = agent.choose_action(state)
        next_state, reward, done, info = env.step(action)
        
        if next_state is None:
            break
        
        agent.update(state, action, reward, next_state, done)
        
        state = next_state
        total_reward += reward
        
        if done:
            break
    
    if (episode + 1) % 10 == 0:
        print(f"Episode {episode + 1}/{episodes}, Total Reward: {total_reward:.2f}, Epsilon: {agent.epsilon:.3f}")

print(f"\n训练完成!Q表大小: {len(agent.q_table)}")

api.close()

五、DQN(深度Q网络)

5.1 DQN实现

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:DQN交易策略
说明:本代码仅供学习参考
"""

import numpy as np
import pandas as pd
import random
from collections import deque
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from tqsdk import TqApi, TqAuth

class DQNAgent:
    """DQN智能体"""
    
    def __init__(self, state_size, action_size):
        """
        初始化DQN智能体
        
        参数:
            state_size: 状态空间大小
            action_size: 动作空间大小
        """
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)  # 经验回放缓冲区
        self.epsilon = 1.0  # 探索率
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.gamma = 0.95  # 折扣因子
        
        # 主网络和目标网络
        self.model = self._build_model()
        self.target_model = self._build_model()
        self.update_target_model()
    
    def _build_model(self):
        """构建神经网络"""
        model = Sequential([
            Dense(64, activation='relu', input_dim=self.state_size),
            Dense(64, activation='relu'),
            Dense(32, activation='relu'),
            Dense(self.action_size, activation='linear')
        ])
        model.compile(loss='mse', optimizer=Adam(learning_rate=self.learning_rate))
        return model
    
    def update_target_model(self):
        """更新目标网络"""
        self.target_model.set_weights(self.model.get_weights())
    
    def remember(self, state, action, reward, next_state, done):
        """存储经验"""
        self.memory.append((state, action, reward, next_state, done))
    
    def choose_action(self, state):
        """选择动作"""
        if np.random.random() <= self.epsilon:
            return random.randrange(self.action_size)
        
        q_values = self.model.predict(state.reshape(1, -1), verbose=0)
        return np.argmax(q_values[0])
    
    def replay(self, batch_size=32):
        """经验回放训练"""
        if len(self.memory) < batch_size:
            return
        
        batch = random.sample(self.memory, batch_size)
        states = np.array([e[0] for e in batch])
        actions = np.array([e[1] for e in batch])
        rewards = np.array([e[2] for e in batch])
        next_states = np.array([e[3] for e in batch])
        dones = np.array([e[4] for e in batch])
        
        # 当前Q值
        current_q = self.model.predict(states, verbose=0)
        
        # 目标Q值
        next_q = self.target_model.predict(next_states, verbose=0)
        target_q = current_q.copy()
        
        for i in range(batch_size):
            if dones[i]:
                target_q[i][actions[i]] = rewards[i]
            else:
                target_q[i][actions[i]] = rewards[i] + self.gamma * np.max(next_q[i])
        
        # 训练
        self.model.fit(states, target_q, epochs=1, verbose=0)
        
        # 衰减探索率
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

# 训练DQN智能体
api = TqApi(auth=TqAuth("快期账户", "快期密码"))
SYMBOL = "SHFE.rb2510"
klines = api.get_kline_serial(SYMBOL, 3600, 2000)
api.wait_update()

from trading_env import TradingEnvironment

env = TradingEnvironment(klines)
state_size = len(env.reset())
action_size = 3

agent = DQNAgent(state_size, action_size)

# 训练
episodes = 100
batch_size = 32

for episode in range(episodes):
    state = env.reset()
    total_reward = 0
    step = 0
    
    while True:
        action = agent.choose_action(state)
        next_state, reward, done, info = env.step(action)
        
        if next_state is None:
            break
        
        agent.remember(state, action, reward, next_state, done)
        
        state = next_state
        total_reward += reward
        step += 1
        
        if done:
            break
        
        # 经验回放
        if len(agent.memory) > batch_size:
            agent.replay(batch_size)
    
    # 更新目标网络
    if episode % 10 == 0:
        agent.update_target_model()
    
    if (episode + 1) % 10 == 0:
        print(f"Episode {episode + 1}/{episodes}, Total Reward: {total_reward:.2f}, Steps: {step}")

print(f"\n训练完成!")

api.close()

六、策略梯度方法

6.1 PPO(近端策略优化)

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:使用stable-baselines3的PPO算法
说明:本代码仅供学习参考
"""

import numpy as np
import gym
from gym import spaces
from stable_baselines3 import PPO
from tqsdk import TqApi, TqAuth
from tqsdk.tafunc import ma, macd, rsi

class TradingGymEnv(gym.Env):
    """Gym交易环境"""
    
    def __init__(self, klines):
        super(TradingGymEnv, self).__init__()
        
        self.klines = klines
        self.features = self._build_features()
        
        # 动作空间:0=卖出, 1=持有, 2=买入
        self.action_space = spaces.Discrete(3)
        
        # 状态空间
        self.observation_space = spaces.Box(
            low=-np.inf, high=np.inf,
            shape=(len(self.features.columns) + 3,),  # 特征 + 账户信息
            dtype=np.float32
        )
        
        self.reset()
    
    def _build_features(self):
        """构建特征"""
        features = pd.DataFrame()
        features['close'] = self.klines['close']
        features['volume'] = self.klines['volume']
        features['ma5'] = ma(self.klines['close'], 5)
        features['ma20'] = ma(self.klines['close'], 20)
        macd_data = macd(self.klines['close'], 12, 26, 9)
        features['macd'] = macd_data['macd']
        features['rsi'] = rsi(self.klines['close'], 14)
        return features.dropna()
    
    def reset(self):
        """重置环境"""
        self.current_step = 0
        self.balance = 100000
        self.position = 0
        self.entry_price = 0
        return self._get_observation()
    
    def _get_observation(self):
        """获取观察"""
        state = self.features.iloc[self.current_step].values
        account_info = np.array([
            self.balance / 100000,
            self.position / 100,
            self.entry_price / self.features.iloc[self.current_step]['close'] if self.position != 0 else 0
        ])
        return np.concatenate([state, account_info]).astype(np.float32)
    
    def step(self, action):
        """执行动作"""
        if self.current_step >= len(self.features) - 1:
            return self._get_observation(), 0, True, {}
        
        current_price = self.features.iloc[self.current_step]['close']
        next_price = self.features.iloc[self.current_step + 1]['close']
        
        reward = 0
        
        # 执行动作(简化版)
        if action == 0 and self.position > 0:  # 卖出
            pnl = (current_price - self.entry_price) * self.position
            self.balance += pnl
            reward = pnl / 1000
            self.position = 0
        elif action == 2 and self.position == 0:  # 买入
            shares = int(self.balance / current_price)
            if shares > 0:
                self.position = shares
                self.entry_price = current_price
                self.balance -= current_price * shares
        
        # 持仓盈亏
        if self.position > 0:
            unrealized_pnl = (next_price - self.entry_price) * self.position
            reward += unrealized_pnl / 1000
        
        self.current_step += 1
        done = (self.current_step >= len(self.features) - 1)
        
        return self._get_observation(), reward, done, {}

# 使用PPO训练
api = TqApi(auth=TqAuth("快期账户", "快期密码"))
SYMBOL = "SHFE.rb2510"
klines = api.get_kline_serial(SYMBOL, 3600, 2000)
api.wait_update()

env = TradingGymEnv(klines)

# 创建PPO模型
model = PPO('MlpPolicy', env, verbose=1)

# 训练
model.learn(total_timesteps=10000)

# 测试
obs = env.reset()
for _ in range(1000):
    action, _ = model.predict(obs)
    obs, reward, done, info = env.step(action)
    if done:
        break

print(f"\n训练完成!")

api.close()

七、实盘应用注意事项

7.1 关键注意事项

注意事项 说明
过拟合风险 强化学习容易过拟合,需要严格验证
样本外测试 必须在未见过的数据上测试
交易成本 考虑手续费和滑点
风险控制 设置止损和仓位限制
模型更新 定期重新训练模型

7.2 奖励函数设计

def calculate_reward(self, action, current_price, next_price, position, entry_price):
    """
    设计奖励函数
    
    考虑因素:
    1. 收益奖励
    2. 风险惩罚
    3. 交易成本
    4. 持仓时间成本
    """
    reward = 0
    
    # 收益奖励
    if position > 0:
        pnl = (next_price - entry_price) * position
        reward += pnl / 1000
    
    # 风险惩罚(波动率)
    volatility = abs(next_price - current_price) / current_price
    reward -= volatility * 10
    
    # 交易成本惩罚
    if action != 1:  # 非持有动作
        reward -= 0.001
    
    return reward

八、常见问题

Q1: 强化学习在量化交易中真的有效吗?

A: 强化学习有潜力,但需要注意:

  • 需要大量数据训练
  • 容易过拟合
  • 奖励函数设计很关键
  • 实盘表现可能与训练有差异

Q2: 如何设计奖励函数?

A: 建议:

  • 考虑收益和风险
  • 惩罚频繁交易
  • 鼓励长期盈利
  • 结合夏普比率等指标

Q3: Q-Learning和DQN哪个更好?

A: 各有优势:

  • Q-Learning:简单,适合离散状态
  • DQN:适合连续状态,更强大
  • 建议:先试Q-Learning,再试DQN

九、总结

要点 说明
环境设计 定义状态、动作、奖励
Q-Learning 适合离散状态空间
DQN 适合连续状态空间
策略梯度 PPO等算法更稳定
奖励函数 设计合理的奖励函数很关键

下一步学习建议

  1. 深入学习DQN变种(Double DQN、Dueling DQN)
  2. 研究Actor-Critic方法
  3. 探索多智能体强化学习
  4. 学习模型解释性方法

免责声明:本文仅供学习交流使用,不构成任何投资建议。期货交易有风险,入市需谨慎。

更多资源

  • 天勤量化官网:https://www.shinnytech.com
  • GitHub开源地址:https://github.com/shinnytech/tqsdk-python
  • 官方文档:https://doc.shinnytech.com/tqsdk/latest
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐