高频交易入门
高频交易是量化交易中技术要求最高的领域,涉及微秒级的订单执行和复杂的系统架构。本章将介绍高频交易的基本概念、核心技术和入门要求。
高频交易概述
什么是高频交易?
高频交易(High-Frequency Trading,HFT)是指利用高速计算机系统,在极短时间内(毫秒甚至微秒级别)完成大量交易的自动化交易方式。
高频交易的核心特征:
极快的速度:从信号产生到订单执行的整个过程在微秒级别完成。这需要高性能的硬件、优化的软件和低延迟的网络。
高频次交易:每天进行成千上万次交易,单笔收益很小,但通过大量交易累积收益。
持仓时间短:通常不持有隔夜头寸,甚至持仓时间只有几秒或更短。
算法驱动:完全由算法自动执行,没有人工干预。
市场微观结构:利用市场微观结构的特性获利,如订单簿不平衡、价差套利等。
高频交易的策略类型
做市策略:同时挂出买卖双边订单,赚取买卖价差。做市商为市场提供流动性,但面临存货风险和逆向选择风险。
套利策略:利用不同市场或不同合约之间的价格差异获利。包括跨市场套利、统计套利、ETF套利等。
动量点火:检测到大量买单时快速跟进买入,推动价格上涨后卖出获利。这种策略在某些市场受到监管限制。
订单流预测:分析订单流数据,预测短期价格走势。例如,检测到大单买入可能预示价格上涨。
高频交易的争议
高频交易在金融界存在争议:
支持者观点:
- 提高市场流动性
- 缩小买卖价差
- 提高价格发现效率
- 降低交易成本
批评者观点:
- 加剧市场波动
- 可能导致闪崩
- 对普通投资者不公平
- 技术军备竞赛浪费资源
延迟与性能
延迟的组成
在高频交易中,延迟(Latency)是核心竞争力。总延迟由多个部分组成:
网络延迟:数据从交易所传输到交易服务器的延迟。物理距离越远延迟越大,光速限制是硬约束。
数据处理延迟:解析行情数据、更新订单簿、计算信号的时间。
策略计算延迟:执行交易策略逻辑的时间。
订单传输延迟:将订单发送到交易所的时间。
交易所处理延迟:交易所匹配引擎处理订单的时间。
降低延迟的方法
地理位置优化:将服务器部署在交易所数据中心附近(Co-location服务),最小化网络延迟。
硬件加速:使用FPGA(现场可编程门阵列)或ASIC(专用集成电路)进行硬件加速,将关键逻辑固化在硬件中。
网络优化:
- 使用专线网络
- 选择低延迟的网络协议
- 优化TCP参数或使用UDP
软件优化:
- 使用高性能编程语言(C++、Rust)
- 避免动态内存分配
- 使用无锁数据结构
- CPU亲和性绑定
- 减少系统调用
延迟测量
import time
import numpy as np
class LatencyTracker:
"""延迟追踪器"""
def __init__(self):
self.latencies = []
def start(self):
"""开始计时"""
self.start_time = time.perf_counter_ns()
def end(self):
"""结束计时"""
end_time = time.perf_counter_ns()
latency_ns = end_time - self.start_time
self.latencies.append(latency_ns)
return latency_ns
def stats(self):
"""统计信息"""
latencies = np.array(self.latencies)
return {
'count': len(latencies),
'mean_ns': latencies.mean(),
'median_ns': np.median(latencies),
'p99_ns': np.percentile(latencies, 99),
'p999_ns': np.percentile(latencies, 99.9),
'min_ns': latencies.min(),
'max_ns': latencies.max()
}
# 使用示例
tracker = LatencyTracker()
for _ in range(10000):
tracker.start()
# 执行需要测量的代码
_ = 1 + 1
tracker.end()
print(tracker.stats())
订单簿与市场微观结构
订单簿结构
订单簿(Order Book)是市场的核心数据结构,记录了所有待成交的买卖订单。
买单(Bid) 卖单(Ask)
价格 数量 价格 数量
100.05 500 100.10 300
100.04 1000 100.11 500
100.03 800 100.12 1000
100.02 1200 100.13 800
100.01 2000 100.14 1500
最优买价:100.05 最优卖价:100.10
买卖价差:0.05
中间价:100.075
订单簿数据处理
from collections import defaultdict
import heapq
class OrderBook:
"""订单簿实现"""
def __init__(self):
# 买单使用负价格作为堆键,实现最大堆
self.bids = [] # (负价格, 时间戳, 订单ID, 数量)
self.asks = [] # (价格, 时间戳, 订单ID, 数量)
self.orders = {} # 订单ID -> (价格, 方向, 数量)
self.timestamp = 0
def add_order(self, order_id, side, price, quantity):
"""添加订单"""
self.timestamp += 1
if side == 'buy':
heapq.heappush(self.bids, (-price, self.timestamp, order_id, quantity))
else:
heapq.heappush(self.asks, (price, self.timestamp, order_id, quantity))
self.orders[order_id] = (price, side, quantity)
def cancel_order(self, order_id):
"""取消订单"""
if order_id in self.orders:
del self.orders[order_id]
def get_best_bid(self):
"""获取最优买价"""
while self.bids:
neg_price, _, order_id, quantity = self.bids[0]
if order_id in self.orders:
return -neg_price, quantity
heapq.heappop(self.bids)
return None, None
def get_best_ask(self):
"""获取最优卖价"""
while self.asks:
price, _, order_id, quantity = self.asks[0]
if order_id in self.orders:
return price, quantity
heapq.heappop(self.asks)
return None, None
def get_spread(self):
"""获取买卖价差"""
best_bid, _ = self.get_best_bid()
best_ask, _ = self.get_best_ask()
if best_bid and best_ask:
return best_ask - best_bid
return None
def get_mid_price(self):
"""获取中间价"""
best_bid, _ = self.get_best_bid()
best_ask, _ = self.get_best_ask()
if best_bid and best_ask:
return (best_bid + best_ask) / 2
return None
def get_depth(self, levels=5):
"""获取订单簿深度"""
bid_depth = []
ask_depth = []
# 收集有效买单
valid_bids = []
temp_bids = list(self.bids)
heapq.heapify(temp_bids)
while temp_bids and len(valid_bids) < levels:
neg_price, _, order_id, quantity = heapq.heappop(temp_bids)
if order_id in self.orders:
valid_bids.append((-neg_price, quantity))
bid_depth = valid_bids
# 收集有效卖单
valid_asks = []
temp_asks = list(self.asks)
heapq.heapify(temp_asks)
while temp_asks and len(valid_asks) < levels:
price, _, order_id, quantity = heapq.heappop(temp_asks)
if order_id in self.orders:
valid_asks.append((price, quantity))
ask_depth = valid_asks
return bid_depth, ask_depth
订单簿指标
def calculate_order_imbalance(bid_depth, ask_depth):
"""
计算订单簿不平衡
参数:
bid_depth: 买单深度 [(价格, 数量), ...]
ask_depth: 卖单深度
返回:
不平衡指标 [-1, 1]
"""
bid_volume = sum(qty for _, qty in bid_depth)
ask_volume = sum(qty for _, qty in ask_depth)
if bid_volume + ask_volume == 0:
return 0
imbalance = (bid_volume - ask_volume) / (bid_volume + ask_volume)
return imbalance
def calculate_weighted_mid_price(bid_depth, ask_depth):
"""
计算加权中间价
参数:
bid_depth: 买单深度
ask_depth: 卖单深度
返回:
加权中间价
"""
if not bid_depth or not ask_depth:
return None
best_bid, bid_qty = bid_depth[0]
best_ask, ask_qty = ask_depth[0]
# 按数量加权
weighted_mid = (best_bid * ask_qty + best_ask * bid_qty) / (bid_qty + ask_qty)
return weighted_mid
def calculate_market_pressure(order_book, window=100):
"""
计算市场压力指标
参数:
order_book: 订单簿历史数据
window: 计算窗口
返回:
市场压力指标
"""
# 基于订单流计算市场压力
# 主动性买入增加多头压力,主动性卖出增加空头压力
pass
做市策略
做市策略原理
做市商通过同时挂出买卖双边订单,赚取买卖价差。做市策略的核心是管理存货风险和逆向选择风险。
存货风险:如果市场价格单边移动,做市商可能积累大量不利方向的存货。例如,市场价格下跌时,买单不断成交,做市商积累多头头寸。
逆向选择风险:知情交易者可能利用做市商的报价进行套利。例如,当知情交易者知道价格即将上涨时,会买入做市商的卖单。
简单做市策略
class SimpleMarketMaker:
"""简单做市策略"""
def __init__(self, spread=0.02, quantity=100, max_position=1000):
self.spread = spread # 目标价差
self.quantity = quantity # 每边挂单数量
self.max_position = max_position # 最大持仓
self.position = 0 # 当前持仓
def get_quotes(self, mid_price):
"""
计算报价
参数:
mid_price: 当前中间价
返回:
(买价, 卖价, 买量, 卖量)
"""
# 根据持仓调整报价偏移
skew = self.position / self.max_position * self.spread / 2
bid_price = mid_price - self.spread / 2 + skew
ask_price = mid_price + self.spread / 2 + skew
# 根据持仓调整数量
if self.position > 0:
# 多头,减少买入量,增加卖出量
bid_qty = max(0, self.quantity - self.position)
ask_qty = self.quantity + self.position
else:
# 空头,增加买入量,减少卖出量
bid_qty = self.quantity - self.position
ask_qty = max(0, self.quantity + self.position)
return bid_price, ask_price, bid_qty, ask_qty
def on_fill(self, side, price, quantity):
"""订单成交回调"""
if side == 'buy':
self.position += quantity
else:
self.position -= quantity
Avellaneda-Stoikov做市模型
Avellaneda-Stoikov模型是经典的做市策略模型,通过优化报价来最大化效用。
import numpy as np
class AvellanedaStoikovMM:
"""Avellaneda-Stoikov做市模型"""
def __init__(self, gamma=0.1, sigma=0.01, T=1.0, k=1.5, A=0.5):
"""
参数:
gamma: 风险厌恶系数
sigma: 波动率
T: 时间范围
k: 订单强度参数
A: 订单强度参数
"""
self.gamma = gamma
self.sigma = sigma
self.T = T
self.k = k
self.A = A
self.position = 0
def get_quotes(self, mid_price, t):
"""
计算最优报价
参数:
mid_price: 当前中间价
t: 当前时间(0到T)
返回:
(买价, 卖价)
"""
# 剩余时间
tau = self.T - t
# 存货风险调整
reservation_price = mid_price - self.position * self.gamma * self.sigma**2 * tau
# 最优价差
spread = self.gamma * self.sigma**2 * tau + 2 / self.gamma * np.log(1 + self.gamma / self.k)
# 报价
bid_price = reservation_price - spread / 2
ask_price = reservation_price + spread / 2
return bid_price, ask_price
def estimate_order_intensity(self, delta):
"""
估计订单强度
参数:
delta: 与中间价的距离
返回:
订单到达强度
"""
return self.A * np.exp(-self.k * delta)
交易执行
订单类型
高频交易中常用的订单类型:
限价单:指定价格和数量,只有达到指定价格时才成交。适合做市策略。
市价单:立即以市场最优价格成交。适合需要快速执行的情况,但可能面临滑点。
冰山订单:只显示部分数量,隐藏剩余数量。适合大额交易,减少市场冲击。
条件订单:满足特定条件时触发。如止损单、止盈单等。
执行算法
class TWAPAlgorithm:
"""时间加权平均价格执行算法"""
def __init__(self, total_quantity, duration_minutes, interval_seconds=10):
"""
参数:
total_quantity: 总交易量
duration_minutes: 执行时长(分钟)
interval_seconds: 下单间隔(秒)
"""
self.total_quantity = total_quantity
self.duration = duration_minutes * 60
self.interval = interval_seconds
# 计算每次下单量
self.num_orders = self.duration // self.interval
self.quantity_per_order = total_quantity / self.num_orders
self.executed_quantity = 0
self.orders_placed = 0
def get_next_order(self):
"""获取下一个订单"""
if self.orders_placed >= self.num_orders:
return None
self.orders_placed += 1
return self.quantity_per_order
def on_fill(self, filled_quantity):
"""订单成交回调"""
self.executed_quantity += filled_quantity
class VWAPAlgorithm:
"""成交量加权平均价格执行算法"""
def __init__(self, total_quantity, volume_profile):
"""
参数:
total_quantity: 总交易量
volume_profile: 成交量分布(每分钟的预期成交量比例)
"""
self.total_quantity = total_quantity
self.volume_profile = volume_profile / volume_profile.sum()
self.executed_quantity = 0
self.current_period = 0
def get_next_order(self):
"""获取下一个订单"""
if self.current_period >= len(self.volume_profile):
return None
quantity = self.total_quantity * self.volume_profile[self.current_period]
self.current_period += 1
return quantity
风险控制
高频交易的风险控制至关重要,需要在极短时间内检测并处理异常情况。
风控检查
class RiskController:
"""风控控制器"""
def __init__(self, max_position=10000, max_order_rate=100,
max_loss=50000, price_deviation_limit=0.05):
"""
参数:
max_position: 最大持仓
max_order_rate: 最大订单速率(每秒)
max_loss: 最大亏损
price_deviation_limit: 价格偏离限制
"""
self.max_position = max_position
self.max_order_rate = max_order_rate
self.max_loss = max_loss
self.price_deviation_limit = price_deviation_limit
self.position = 0
self.pnl = 0
self.order_timestamps = []
self.last_price = None
def check_order(self, side, price, quantity, current_time):
"""
检查订单是否通过风控
返回:
(通过, 原因)
"""
# 检查持仓限制
new_position = self.position + (quantity if side == 'buy' else -quantity)
if abs(new_position) > self.max_position:
return False, f"持仓超限: {new_position} > {self.max_position}"
# 检查订单速率
self.order_timestamps = [t for t in self.order_timestamps
if current_time - t < 1.0]
if len(self.order_timestamps) >= self.max_order_rate:
return False, f"订单速率超限: {len(self.order_timestamps)}/s"
# 检查价格偏离
if self.last_price is not None:
deviation = abs(price - self.last_price) / self.last_price
if deviation > self.price_deviation_limit:
return False, f"价格偏离超限: {deviation:.2%}"
# 检查亏损
if self.pnl < -self.max_loss:
return False, f"亏损超限: {self.pnl}"
# 通过所有检查
self.order_timestamps.append(current_time)
return True, "OK"
def update_position(self, side, price, quantity):
"""更新持仓"""
if side == 'buy':
self.position += quantity
else:
self.position -= quantity
self.last_price = price
def update_pnl(self, pnl_change):
"""更新盈亏"""
self.pnl += pnl_change
熔断机制
class CircuitBreaker:
"""熔断机制"""
def __init__(self, loss_threshold=10000, time_window=60, cooldown_period=300):
"""
参数:
loss_threshold: 亏损阈值
time_window: 时间窗口(秒)
cooldown_period: 冷却期(秒)
"""
self.loss_threshold = loss_threshold
self.time_window = time_window
self.cooldown_period = cooldown_period
self.pnl_history = [] # [(时间, 盈亏)]
self.triggered = False
self.trigger_time = None
def check(self, current_time, current_pnl):
"""
检查是否触发熔断
返回:
是否允许交易
"""
# 如果已触发,检查冷却期
if self.triggered:
if current_time - self.trigger_time > self.cooldown_period:
self.triggered = False
self.pnl_history = []
else:
return False
# 更新盈亏历史
self.pnl_history.append((current_time, current_pnl))
# 清理过期数据
self.pnl_history = [(t, p) for t, p in self.pnl_history
if current_time - t < self.time_window]
# 检查是否触发
if self.pnl_history:
max_pnl = max(p for _, p in self.pnl_history)
current_loss = max_pnl - current_pnl
if current_loss > self.loss_threshold:
self.triggered = True
self.trigger_time = current_time
return False
return True
小结
高频交易是量化交易中技术要求最高的领域,涉及:
- 极致的性能优化:微秒级的延迟要求
- 市场微观结构:深入理解订单簿和交易机制
- 复杂的策略逻辑:做市、套利、执行等
- 严格的风险控制:在高速交易中保证安全
高频交易需要大量的技术投入,包括硬件、网络、软件优化等,不适合个人投资者。但对于理解市场运作机制、提升编程能力都有很大帮助。下一章将通过实战案例,演示如何构建一个完整的量化策略。