bqb18wzv的知识库

特征工程前置:如何构建低延迟的 Tick 级数据流管道?

由bqb18wzv创建,最终由bqb18wzv 被浏览 1 用户

在 AI 量化策略中,我们常说 Garbage In, Garbage Out。但在高频策略里,Slow In 也是 Garbage Out。

训练模型时我们用的是清洗好的 CSV,但在实盘推理阶段,如何对接实时的 WebSocket 流并将其转化为模型可读的 Tensor,是一个巨大的工程挑战。今天分享一个轻量级的管道验证方案,用于在实盘前由人工校验数据流的完整性。

数据流的“失真”问题 很多开发者忽略了 WebSocket 的重连机制和数据乱序问题。如果你直接把接收到的数据丢给模型,可能会因为一个 Timestamp 的跳变导致策略误判。

因此,构建一个可视化的监控面板是必须的。我们需要肉眼确认:

  1. 数据流是否连续?
  2. 时间戳是否单调递增?
  3. 高并发下是否有丢包?

轻量级解决方案 为了测试,我搭建了一个基于 Python 的可视化原型。后端数据源我接入了 AllTick API,主要看中它在 Tick 数据颗粒度上的还原能力,比较适合做特征提取的原始素材。

代码实现(基于 Matplotlib Animation) 这是一个精简版的 ETL + 可视化流程:

import websocket
import json
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from datetime import datetime

# 用于存储时间和价格
times, prices = [], []

# WebSocket 消息回调
def on_message(ws, message):
    data = json.loads(message)
    price = float(data['price'])
    timestamp = datetime.fromtimestamp(data['timestamp'])
    
    times.append(timestamp)
    prices.append(price)
    
    # 保留最近 50 条数据
    if len(times) > 50:
        times.pop(0)
        prices.pop(0)

def on_open(ws):
    subscribe_msg = {
        "type": "subscribe",
        "symbol": "ETHUSD"
    }
    ws.send(json.dumps(subscribe_msg))

# 动态绘图函数
def animate(i):
    plt.cla()
    plt.plot(times, prices, color='orange', marker='o')
    plt.title("ETH 实时走势图")
    plt.xlabel("时间")
    plt.ylabel("价格(USD)")
    plt.xticks(rotation=45)
    plt.tight_layout()

# WebSocket 地址示例(AllTick API)
ws_url = "wss://ws.alltick.co/realtime"

ws = websocket.WebSocketApp(ws_url, on_message=on_message, on_open=on_open)

# 用线程运行 WebSocket
from threading import Thread
Thread(target=ws.run_forever, daemon=True).start()

# 实时刷新可视化
plt.show()

深度思考 在这个 demo 中,我们只是画出了 Price。但在实际的量化工程中,on_message 函数里就是我们嵌入实时特征计算(比如实时波动率计算、订单流不平衡计算)的最佳位置。

可视化的目的是为了 Debug 我们的逻辑。当你的线条不再仅仅是价格,而是策略的预测概率时,这个系统的价值才真正体现出来。

\

{link}