Custom Data - Loren1166/NautilusTrader- GitHub Wiki

Custom Data 自定义数据

Due to the modular nature of the Nautilus design, it is possible to set up systems with very flexible data streams, including custom user-defined data types. This guide covers some possible use cases for this functionality.

由于 Nautilus 设计的模块化特性,可以使用非常灵活的数据流设置系统,包括自定义用户定义的数据类型。本指南涵盖了此功能的一些可能用例。

It's possible to create custom data types within the Nautilus system. First you will need to define your data by subclassing from Data.

可以在 Nautilus 系统中创建自定义数据类型。首先,您需要通过从 Data 子类化来定义您的数据。

info 信息

As Data holds no state, it is not strictly necessary to call super().__init__().

由于 Data 不保存状态,因此严格来说不需要调用 super().__init__()

from nautilus_trader.core.data import Data


class MyDataPoint(Data):
    """
    This is an example of a user-defined data class, inheriting from the base class `Data`.

    The fields `label`, `x`, `y`, and `z` in this class are examples of arbitrary user data.
    # 这是用户定义数据类的示例,继承自基类 `Data`。

    # 此类中的字段 `label`、`x`、`y` 和 `z` 是任意用户数据的示例。
    """

    def __init__(
        self,
        label: str,
        x: int,
        y: int,
        z: int,
        ts_event: int,
        ts_init: int,
    ) -> None:
        self.label = label
        self.x = x
        self.y = y
        self.z = z
        self._ts_event = ts_event
        self._ts_init = ts_init

    @property
    def ts_event(self) -> int:
        """
        UNIX timestamp (nanoseconds) when the data event occurred.

        Returns
        -------
        int

        """
        return self._ts_event

    @property
    def ts_init(self) -> int:
        """
        UNIX timestamp (nanoseconds) when the object was initialized.

        Returns
        -------
        int

        """
        return self._ts_init

The Data abstract base class acts as a contract within the system and requires two properties for all types of data: ts_event and ts_init. These represent the UNIX nanosecond timestamps for when the event occurred and when the object was initialized, respectively.

Data 抽象基类充当系统中的契约,并且要求所有类型的数据都具有两个属性:ts_eventts_init。它们分别表示事件发生时和对象初始化时的 UNIX 纳秒时间戳。

The recommended approach to satisfy the contract is to assign ts_event and ts_init to backing fields, and then implement the @property for each as shown above (for completeness, the docstrings are copied from the Data base class).

满足契约的推荐方法是将 ts_eventts_init 分配给后备字段,然后为每个字段实现 @property,如上所示(为了完整起见,文档字符串是从 Data 基类复制的)。

info 信息

These timestamps enable Nautilus to correctly order data streams for backtests using monotonically increasing ts_init UNIX nanoseconds.

这些时间戳使 Nautilus 能够使用单调递增的 ts_init UNIX 纳秒正确地对回测的数据流进行排序。

We can now work with this data type for backtesting and live trading. For instance, we could now create an adapter which is able to parse and create objects of this type - and send them back to the DataEngine for consumption by subscribers.

我们现在可以使用此数据类型进行回测和实时交易。例如,我们现在可以创建一个能够解析和创建此类型对象的适配器 - 并将它们发送回 DataEngine 以供订阅者使用。

You can subscribe to these custom data types within your actor/strategy in the following way:

您可以通过以下方式在您的参与者/策略中订阅这些自定义数据类型:

self.subscribe_data(
    data_type=DataType(MyDataPoint, metadata={"some_optional_category": 1}),
    client_id=ClientId("MY_ADAPTER"),
)

This will result in your actor/strategy passing these received MyDataPoint objects to your on_data method. You will need to check the type, as this method acts as a flexible handler for all custom data.

这将导致您的参与者/策略将这些接收到的 MyDataPoint 对象传递给您的 on_data 方法。您需要检查类型,因为此方法充当所有自定义数据的灵活处理程序。

def on_data(self, data: Data) -> None:
    # First check the type of data
    # 首先检查数据的类型
    if isinstance(data, MyDataPoint):
        # Do something with the data
        # 对数据做一些处理

Publishing and receiving signal data 发布和接收信号数据

Here is an example of publishing and receiving signal data using the MessageBus from an actor or strategy. A signal is an automatically generated custom data identified by a name containing only one value of a basic type (str, float, int, bool or bytes).

以下是使用参与者或策略中的 MessageBus 发布和接收信号数据的示例。信号是由名称标识的自动生成的自定义数据,该名称仅包含基本类型(strfloatintboolbytes)的一个值。

self.publish_signal("signal_name", value, ts_event)
self.subscribe_signal("signal_name")

def on_data(self, data):
    if data.is_signal("signal_name"):
        print("Signal", data)

Option Greeks example 期权希腊字母示例

This example demonstrates how to create a custom data type for option Greeks, specifically the delta. By following these steps, you can create custom data types, subscribe to them, publish them, and store them in the Cache or ParquetDataCatalog for efficient retrieval.

此示例演示如何为期权希腊字母(特别是 delta)创建自定义数据类型。通过按照这些步骤,您可以创建自定义数据类型、订阅它们、发布它们,并将它们存储在 CacheParquetDataCatalog 中以便高效检索。

import msgspec
from nautilus_trader.core.data import Data
from nautilus_trader.model.data import DataType
from nautilus_trader.serialization.base import register_serializable_type
from nautilus_trader.serialization.arrow.serializer import register_arrow
import pyarrow as pa

from nautilus_trader.model.identifiers import InstrumentId
from nautilus_trader.core.datetime import dt_to_unix_nanos, unix_nanos_to_dt, format_iso8601


def unix_nanos_to_str(unix_nanos):
    return format_iso8601(unix_nanos_to_dt(unix_nanos))


class GreeksData(Data):
    def __init__(
        self, instrument_id: InstrumentId = InstrumentId.from_str("ES.GLBX"),
        ts_event: int = 0,
        ts_init: int = 0,
        delta: float = 0.0,
    ) -> None:
        self.instrument_id = instrument_id
        self._ts_event = ts_event
        self._ts_init = ts_init
        self.delta = delta

    def __repr__(self):
        return (f"GreeksData(ts_init={unix_nanos_to_str(self._ts_init)}, instrument_id={self.instrument_id}, delta={self.delta:.2f})")

    @property
    def ts_event(self):
        return self._ts_event

    @property
    def ts_init(self):
        return self._ts_init

    def to_dict(self):
        return {
            "instrument_id": self.instrument_id.value,
            "ts_event": self._ts_event,
            "ts_init": self._ts_init,
            "delta": self.delta,
        }

    @classmethod
    def from_dict(cls, data: dict):
        return GreeksData(InstrumentId.from_str(data["instrument_id"]), data["ts_event"], data["ts_init"], data["delta"])

    def to_bytes(self):
        return msgspec.msgpack.encode(self.to_dict())

    @classmethod
    def from_bytes(cls, data: bytes):
        return cls.from_dict(msgspec.msgpack.decode(data))

    def to_catalog(self):
        return pa.RecordBatch.from_pylist([self.to_dict()], schema=GreeksData.schema())

    @classmethod
    def from_catalog(cls, table: pa.Table):
        return [GreeksData.from_dict(d) for d in table.to_pylist()]

    @classmethod
    def schema(cls):
        return pa.schema(
            {
                "instrument_id": pa.string(),
                "ts_event": pa.int64(),
                "ts_init": pa.int64(),
                "delta": pa.float64(),
            }
        )

Publishing and receiving data 发布和接收数据

Here is an example of publishing and receiving data using the MessageBus from an actor or strategy:

以下是使用参与者或策略中的 MessageBus 发布和接收数据的示例:

register_serializable_type(GreeksData, GreeksData.to_dict, GreeksData.from_dict)

def publish_greeks(self, greeks_data: GreeksData):
    self.publish_data(DataType(GreeksData), greeks_data)

def subscribe_to_greeks(self):
    self.subscribe_data(DataType(GreeksData))

def on_data(self, data):
    if isinstance(GreeksData):
        print("Data", data)

Writing and reading data using the cache 使用缓存写入和读取数据

Here is an example of writing and reading data using the Cache from an actor or strategy:

以下是使用参与者或策略中的 Cache 写入和读取数据的示例:

def greeks_key(instrument_id: InstrumentId):
    return f"{instrument_id}_GREEKS"

def cache_greeks(self, greeks_data: GreeksData):
    self.cache.add(greeks_key(greeks_data.instrument_id), greeks_data.to_bytes())

def greeks_from_cache(self, instrument_id: InstrumentId):
    return GreeksData.from_bytes(self.cache.get(greeks_key(instrument_id)))

Writing and reading data using a catalog 使用目录写入和读取数据

For streaming custom data to feather files or writing it to parquet files in a catalog (register_arrow needs to be used):

用于将自定义数据流式传输到 feather 文件或将其写入目录中的 parquet 文件(需要使用 register_arrow):

register_arrow(GreeksData, GreeksData.schema(), GreeksData.to_catalog, GreeksData.from_catalog)

from nautilus_trader.persistence.catalog import ParquetDataCatalog
catalog = ParquetDataCatalog('.')

catalog.write_data([GreeksData()])

Creating a custom data class automatically 自动创建自定义数据类

The @customdataclass decorator enables the creation of a custom data class with default implementations for all the features described above.

@customdataclass 装饰器支持创建自定义数据类,并为上述所有功能提供默认实现。

Each method can also be overridden if needed. Here is an example of its usage:

如果需要,也可以覆盖每个方法。以下是其用法示例:

from nautilus_trader.model.custom import customdataclass


@customdataclass
class GreeksTestData(Data):
    instrument_id: InstrumentId = InstrumentId.from_str("ES.GLBX")
    delta: float = 0.0


GreeksTestData(
    instrument_id=InstrumentId.from_str("CL.GLBX"),
    delta=1000.0,
    ts_event=1,
    ts_init=2,
)

Custom data type stub 自定义数据类型存根

To enhance development convenience and improve code suggestions in your IDE, you can create a .pyi stub file with the proper constructor signature for your custom data types as well as type hints for attributes. This is particularly useful when the constructor is dynamically generated at runtime, as it allows the IDE to recognize and provide suggestions for the class's methods and attributes.

为了增强开发便利性并改进 IDE 中的代码建议,您可以创建一个 .pyi 存根文件,其中包含自定义数据类型的正确构造函数签名以及属性的类型提示。当构造函数在运行时动态生成时,这特别有用,因为它允许 IDE 识别并提供对类的方法和属性的建议。

For instance, if you have a custom data class defined in greeks.py, you can create a corresponding greeks.pyi file with the following constructor signature:

例如,如果您在 greeks.py 中定义了一个自定义数据类,则可以创建一个相应的 greeks.pyi 文件,其中包含以下构造函数签名:

from nautilus_trader.core.data import Data
from nautilus_trader.model.identifiers import InstrumentId


class GreeksData(Data):
    instrument_id: InstrumentId
    delta: float
    
    def __init__(
        self,
        ts_event: int = 0,
        ts_init: int = 0,
        instrument_id: InstrumentId = InstrumentId.from_str("ES.GLBX"),
        delta: float = 0.0,
  ) -> GreeksData: ...