技术因子 类别四收益风险类 - ChannelCMT/OFO GitHub Wiki

技术因子-类别四收益风险类

如须调用该因子,将add_data=False改为True.可自行将因子名字value更改.收盘价在部分数据没有close_adj的情况下,使用close即可,其他high、low同理.其中N为参数,可自行设置。

Variance20 H010001A

因子描述: 20日收益方差。

计算方法

StdDev(Return(close,1),N)^2*250

N=20、60、120等 注:因子值为年化后的值,等于日度方差*250

Variance20_J = dv.add_formula('Variance20_J', 'StdDev(Return(close,1),20)^2*250' ,
                                  is_quarterly=False, add_data=False)

Kurtosis20 H010004A

因子描述:个股收益的20日峰度。

计算方法

Ts_Kurtosis(Return(close_adj,1),N)

或者

Ts_Kurtosis(((close_adj-Delay(close_adj,1))/Delay(close_adj,1)),N)

两种方式皆可,N=20、60、120等 其中: r代表每日收益 σ代表收益标准差

Kurtosis20_j = dv.add_formula('Kurtosis20_j', 'Ts_Kurtosis(Return(close_adj,1),20)' ,
                                  is_quarterly=False, add_data=False)

Alpha20 H010007A

因子描述: 20日Jensen's alpha

计算方法

alpha=(E(r)-rf)-betaE(rm-rf) r为每日收益,rf为无风险收益,beta为收益20日的bata值,bata=cov(r,rm)/var(rm)

betaN_J=Covariance(r_J,nr,N)/(StdDev(nr,N)^2)

AlphaN=(Ts_Mean(r_J-0.01,N) - betaN_J*(Ts_Mean((nr-0.01),N)))*250

主要在上面的两处地方可以修改N

N=20、60、120等

hs300_close = dv.data_api.daily('000300.SH', dv.extended_start_date_d, dv.end_date, fields="close",
                                    adjust_mode=None)
hs300_benchmark = hs300_close[0]['trade_date', 'close'](/ChannelCMT/OFO/wiki/'trade_date',-'close').set_index('trade_date')
dv.add_field("close")
hs300 = 0 * dv.get_ts('close')
for i in range(hs300.shape[1]):
    hs300.iloc[:, i] = hs300_benchmark
dv.append_df(hs300, 'hs300')

nr = dv.add_formula('nr', '(hs300-Delay(hs300,1))/Delay(hs300,1)', is_quarterly=False, add_data=True)
r_J = dv.add_formula('r_J', '(close-Delay(close,1))/Delay(close,1)', is_quarterly=False, add_data=True)
beta20_J = dv.add_formula('beta20_J', 'Covariance(r_J,nr,20)/(StdDev(nr,20)^2)' , is_quarterly=False,add_data=True)
Alpha20_J = dv.add_formula('Alpha20_A',"(Ts_Mean(r_J-0.01,20) - beta20_J*(Ts_Mean((nr-0.01),20)))*250", is_quarterly=False, add_data=False)

Beta20 H010010A

因子描述: 20日beta值

计算方法: r为每日收益,rm为指数收益,beta为收益20日的bata值,bata=cov(r,rm)/var(rm)

betaN_J=Covariance(r_J,nr,N)/(StdDev(nr,N)^2)

N=20、60、120、250等


hs300, msg = dv.data_api.daily("000300.SH", dv.extended_start_date_d, dv.end_date, fields='trade_date,close,open')
hs300_benchmark = hs300['trade_date', 'close'](/ChannelCMT/OFO/wiki/'trade_date',-'close').set_index('trade_date')
dv.add_field("close")
hs300 = 0 * dv.get_ts('close')
for i in range(hs300.shape[1]):
    hs300.iloc[:, i] = hs300_benchmark
dv.append_df(hs300, 'hs300')

nr = dv.add_formula('nr', '(hs300-Delay(hs300,1))/Delay(hs300,1)', is_quarterly=False, add_data=True)
r_J = dv.add_formula('r_J', '(close_adj-Delay(close_adj,1))/Delay(close_adj,1)', is_quarterly=False, add_data=True)
beta20_J =dv.add_formula('beta20_J' , 'Covariance(r_J,nr,20)/(StdDev(nr,20)^2)', is_quarterly=False,add_data=True)

SharpeRatio20 H010014A

因子描述: 20日夏普比率,表示每承受一单位总风险,会产生多少的超额报酬,可以同时对策略的收益与风险进行综合考虑。

计算方法

(Ts_Mean(close_ret,N)*250-0.03)/StdDev(close_ret,N)/Sqrt(250)

N=20、60、120等

dv.add_formula("close_ret", "Return(close_adj,1)", is_quarterly=False, add_data=True)
SharpeRatio20 = dv.add_formula('SharpeRatio20_J', "(Ts_Mean(close_ret,20)*250-0.03)/StdDev(close_ret,20)/Sqrt(250)",is_quarterly=False,add_data=True)

TreynorRatio20 H010017A

因子描述:20日特诺雷比率,用以衡量投资回报率

计算方法

TR = (E(r)-Rf)/β

r代表每日收益,E(r)代表期望收益,Rf代表无风险收益,beta代表收益的风险值

因子值是年化后的值,等于日度值乘以250

betaN_J=Covariance(r_J,nr,N)/(StdDev(nr,N)^2)

TRN_J(250*(Ts_Mean(r_J,N))-0.03)/betaN_J

N=20、60、120等

import numpy as np
hs300, msg = dv.data_api.daily("000300.SH", dv.extended_start_date_d, dv.end_date, fields='trade_date,close,open')
hs300_benchmark = hs300['trade_date', 'close'](/ChannelCMT/OFO/wiki/'trade_date',-'close').set_index('trade_date')
hs300 = 0 * dv.get_ts('close')
for i in range(hs300.shape[1]):
    hs300.iloc[:, i] = hs300_benchmark
dv.append_df(hs300, 'hs300')
dv.add_formula('nr', '(hs300-Delay(hs300,1))/Delay(hs300,1)' 
                   , is_quarterly=False, add_data=True)
dv.add_formula('r_J','(close_adj-Delay(close_adj,1))/Delay(close_adj,1)' 
                   , is_quarterly=False, add_data=True)
beta20_J =dv.add_formula('beta20_J' , 'Covariance(r_J,nr,20)/(StdDev(nr,20)^2)', is_quarterly=False,
                   add_data=True)

TR20_J = dv.add_formula('TR20_J','(250*(Ts_Mean(r_J,20))-0.03)/beta20_J' ,
                            is_quarterly=False, add_data=True)

InformationRatio20 H010020A

因子描述: 20日信息比率

计算方法

Ts_Mean(r_J - nr,N)/StdDev(r_J - nr,N)

N=20、60、120等

其中: r代表每日收益 r_M代表指数收益,选用沪深300指数

import numpy as np
hs300, msg = dv.data_api.daily("000300.SH", dv.extended_start_date_d, dv.end_date, fields='trade_date,close,open')
hs300_benchmark = hs300['trade_date', 'close'](/ChannelCMT/OFO/wiki/'trade_date',-'close').set_index('trade_date')
hs300 = 0 * dv.get_ts('close')
for i in range(hs300.shape[1]):
    hs300.iloc[:, i] = hs300_benchmark
dv.append_df(hs300, 'hs300')

nr = dv.add_formula('nr', '(hs300-Delay(hs300,1))/Delay(hs300,1)', is_quarterly=False, add_data=True)
r_J = dv.add_formula('r_J', '(close_adj-Delay(close_adj,1))/Delay(close_adj,1)', is_quarterly=False, add_data=True)
IR20_J=dv.add_formula('IR20_J' , 'Ts_Mean(r_J - nr,20)/StdDev(r_J - nr,20)' , is_quarterly=False,
                   add_data=True)

GainVariance20 H010023A

因子描述: 20日收益方差,类似于方差,但是主要衡量收益的表现。

计算方法

GainVariance20_J = pd.DataFrame({name: value.dropna().rolling(N).std() ** 2 for name, value in pct_return.iteritems()},index=pct_return.index).fillna(method='ffill')

N=20、60、120等

其中: r代表每日收益 注:因子值是年化后的值,等于日度值*250

import pandas as pd

def cal_positive(df):
    return df[df > 0]

dv.add_field("close_adj")
pct_return = cal_positive(dv.get_ts('close_adj').pct_change())
GainVariance20_J = pd.DataFrame(
        {name: value.dropna().rolling(20).std() ** 2 for name, value in pct_return.iteritems()},
        index=pct_return.index).fillna(method='ffill')
dv.append_df(GainVariance20_J, 'GainVariance20_J')

LossVariance20 H010026A

因子描述: 20日损失方差, 类似于方差,但是主要衡量损失的表现

计算方法

LossVariance20_A = pd.DataFrame({name: value.dropna().rolling(N).std() ** 2 for name, value in pct_return.iteritems()},index=pct_return.index).fillna(method='ffill')

N=20、60、120等

其中: r代表每日收益 注:因子值是年化后的值,等于日度值*250

import pandas as pd
    cal_negative = lambda df: df[df < 0]
dv.add_field("close_adj")
pct_return = cal_negative(dv.get_ts('close_adj').pct_change())
LossVariance20_A = pd.DataFrame(
        {name: value.dropna().rolling(20).std() ** 2 for name, value in pct_return.iteritems()},
        index=pct_return.index).fillna(method='ffill')

GainLossVarianceRatio20 H010029A

因子描述: 20日收益损失方差比

计算方法

GainVariance N_J/LossVariance N_J

N=20、60、120等

其中: r代表每日收益

import pandas as pd

def cal_negative(df):
    return df[df < 0]

dv.add_field("close_adj")
pct_return = cal_negative(dv.get_ts('close_adj').pct_change())
LossVariance20_J = pd.DataFrame(
        {name: value.dropna().rolling(20).std() ** 2 for name, value in pct_return.iteritems()},
        index=pct_return.index).fillna(method='ffill')
dv.append_df(LossVariance20_J, 'LossVariance20_J')

def cal_positive(df):
    return df[df > 0]

pct_return = cal_positive(dv.get_ts('close_adj').pct_change())
GainVariance20_J = pd.DataFrame(
        {name: value.dropna().rolling(20).std() ** 2 for name, value in pct_return.iteritems()},
        index=pct_return.index).fillna(method='ffill')
dv.append_df(GainVariance20_J, 'GainVariance20_J')

GainlossVarianceratio20 = dv.add_formula('GainlossVarianceratio20_J', "GainVariance20_J/LossVariance20_J",is_quarterly=False, add_data=True)
dv.append_df(GainlossVarianceratio20, 'GainlossVarianceratio20_J')

RealizedVolatility H010032A

因子描述:实际波动率,日内5分钟线的收益率标准差

计算方法:使用5分钟线的close计算每5分钟的收益,然后求日内5分钟的收益的标准差

import pandas as pd

def get_daily_value(date):
    print(date)
    data, msg = dv.data_api.bar(",".join(dv.symbol),
                                    trade_date=date, freq="5M")
    try:
            data = data.dropna().pivot(index="time", columns="symbol", values="close")
            data = data.groupby(data.index // 500).first()
    except ValueError:
            print(date)
            raise
    return data.std().rename(date)

# 跟请求效率很有关...
dv.add_field("close")
dates = list(dv.get_ts("close").index)
result = pd.concat(map(get_daily_value, dates), axis=1).T
dv.append_df(df=result, field_name="NPFromOperatingTTM", is_quarterly=False)

DASTD H010033A

因子描述: 252日超额收益标准差

计算方法: DASTD=std(r-rf)

r为每日收益,rf为无风险收益,半衰期为42个交易日

r_J = dv.add_formula('r_J', '(close_adj-Delay(close_adj,1))/close_adj', is_quarterly=False, add_data=True)
    # dv.append_df(r_J, 'r_J')
dastd = (r_J).ewm(halflife=42).std(ddof=1)  # 如果用了定盘利率还得考虑一下日度收益率和年度收益率的问题
dv.append_df(DASTD, 'dastd')

HsigmaCNE5 H010034A

因子描述: 252日残差收益波动率

计算方法: HsigmaCNE5=std(ei)

ei代表残差收益,总共使用252个交易日,半衰期为63个交易日

start_date_delta = 300

hs300, msg = dv.data_api.daily("000300.SH", dv.extended_start_date_d, dv.end_date, fields='trade_date,close')
hs300_benchmark = hs300['trade_date', 'close'](/ChannelCMT/OFO/wiki/'trade_date',-'close').set_index('trade_date').pct_change()

from numpy.lib.stride_tricks import as_strided as strided
import numpy as np
import pandas as pd

def get_sliding_window(df, W, return2D=0):
    a = df.values
    s0, s1 = a.strides
    m, n = a.shape
    out = strided(a, shape=(m - W + 1, W, n), strides=(s0, s0, s1))
    if return2D == 1:
        return out.reshape(m - W + 1, -1)
    else:
        return out

def _beta(stock_return, universe_return, trailing_days=252, half_life=63):
        #         universe_return = hs300_benchmark.reindex(index=)
    weights = np.sqrt(exponential_weight(trailing_days, half_life))
    coef, _, resid = wls_by_numpy(universe_return, stock_return, weights)
    return coef, np.std(resid)  # np.sqrt(np.sum(resid**2)/250)

def wls_by_numpy(x, y, w):
    A = np.vstack([x, np.ones(len(x))]).T * w.reshape(-1, 1)
    _y = y * w
    m, c = np.linalg.lstsq(A, _y)[0]
    resid = y - (m * x + c)
    return m, c, resid

def exponential_weight(trailing_days=252, half_life=63):
    _s = np.flip(np.arange(0, trailing_days), axis=0) / half_life
    return np.power(0.5, _s)

def calc_hsigmacne5(s):
    idx = s.index
    s = s.dropna()
    if s.size == 0 or len(s) < 252:
        return np.nan
    concated = pd.concat([s, hs300_benchmark.reindex(index=s.index)], axis=1)
    strides = get_sliding_window(concated, 252)
    hsigma = pd.Series(list(map(lambda x: _beta(x[:, 0], x[:, 1])[1], strides)),
                           index=s.index[252 - 1:]).reindex(index=idx)
ret_ = dv.add_formula("ret_", "Return(close_adj)", is_quarterly=False)
hsigmacne5=  ret_.apply(calc_hsigmacne5)
dv.append_df(hsigmacne5, 'hsigmacne5')

CmraCNE5 H010035A

因子描述: 12月累计收益(Monthly cumulative return range over the past 12 months)。

计算方法

其中:rf代表无风险收益

def run_formula(dv):
    ret_ = dv.add_formula("ret", "Return(close_adj)", is_quarterly=False)

    month_days = 21

    # Rf先当作0

    def get_sliding_window(s, W):
        """
        input: np.arange(20), W=4
        output:
        array([[ 0,  1,  2,  3],
               [ 4,  5,  6,  7],
               [ 8,  9, 10, 11],
               [12, 13, 14, 15],
               [16, 17, 18, 19]])
        """
        from numpy.lib.stride_tricks import as_strided as strided
        assert len(s) % W == 0
        strides = s.strides
        assert len(strides) == 1
        strides = strides[0]
        return strided(s, shape=(int(len(s) / W), W), strides=(W * strides, strides))

    def calc_range(s, days=month_days, allrange=month_days * 12):
        """
        还要考虑缺失值的问题
        """

        def get_max_and_min(s):
            #         print(s)
            from numpy import log
            s = s + 1
            out = get_sliding_window(s, days)
            out = out.prod(axis=1)
            #             return out.max()-out.min()  # 跟这个有点像,那么uqer就是瞎写的
            Z_T = log(out).cumsum()  # 这里没有减无风险收益,
            return log((1 + Z_T.max()) / (1 + Z_T.min()))

        return s.rolling(allrange).apply(get_max_and_min)
    
CmraCNE5=ret_.apply(calc_range)
dv.append_df(CmraCNE5, 'CmraCNE5')

Cmra H010036A

因子描述: 24月累计收益(Monthly cumulative return range over the past 24 months)。

计算方法

成交量为0时不考虑计算

CMRA = dv.add_formula('CMRA_J',"Log(Ts_Max(close_adj,475)/Ts_Min(close_adj,475))"
                          , is_quarterly=False, add_data=False)

Hbeta H010037A

因子描述:历史贝塔(Historical daily beta ) , 过往12个月中,个股日收益关于市场组合日收益的三阶自回归,市场组合日收益的系数。

均值回归的残差的方差除以自由度

计算方法

其中市场组合日收益r_m.t的计算采用沪深300的数据 r_m.t=(CloseIndex-PrevCloseIndex)/PrevCloseIndex 回归结果中的β_h即为历史贝塔HBETA CloseIndex是今收盘指数,PrevCloseIndex是昨收盘指数

def Multi_Regression(index, n, Y, *X):
        """
        index是指返回的系数矩阵中第几个,从0开始,0代表常数项,1代表第一个系数的值
        n是rolling多少天
        Y是因变量矩阵
        *X传入list或者set或者tuple,list的元素是每一个矩阵
        """
        from numpy.linalg import inv, LinAlgError
        from pandas import DataFrame, Series
        import numpy as np
        import pandas as pd
        DF = dict()
        le_th = len(Y)
        columns = Y.columns
        indexes = Y.index

        for column in columns:
            betas = []

            def _func_x(x):
                if isinstance(x, DataFrame):
                    return x[column].values
                elif isinstance(x, Series):
                    return x.values
                else:
                    raise Error

            X_column = list(map(_func_x, X))
            X_column.insert(0, np.ones(le_th))
            X_column = np.array(X_column).T
            Y_column = np.array(Y[column].values).T
            print(column)
            for length in range(n, le_th):
                X_temp = X_column[length - n:length]
                try:
                    beta = (inv((X_temp.T).dot(X_temp))).dot(X_temp.T).dot(Y_column[length - n:length])
                    betas.append(beta[index])
                except LinAlgError:
                    betas.append(np.nan)
            DF[column] = betas
        for key, value in DF.items():
            if len(value) != le_th - n:
                DF[key] += [np.nan] * (le_th - n - len(DF[key]))
            if len(value) != le_th:
                DF[key] = [np.nan] * n + DF[key]
        df = pd.DataFrame(DF, index=indexes)
        return df

    dv.add_field("close_adj")

    zz800_benchmark, msg = dv.data_api.daily("000300.SH", dv.extended_start_date_d, dv.end_date,
                                             fields='trade_date,close,open')
    dv.data_benchmark = zz800_benchmark['trade_date', 'close', 'open'](/ChannelCMT/OFO/wiki/'trade_date',-'close',-'open').set_index('trade_date')
    BenchmarkIndexClose = dv.data_benchmark["close"].loc[dv.get_ts("close_adj").index]
    BenchmarkIndexOpen = dv.data_benchmark["open"].loc[dv.get_ts("close_adj").index]
    # risk_free_rate=-0.005
    M_Return = BenchmarkIndexClose.pct_change(1)
    Return = dv.get_ts("close_adj").pct_change(1)
    Return_1 = Return.shift(1)
    Return_2 = Return.shift(2)
    Return_3 = Return.shift(3)
    # 参数改成252是标准,但是只有0.89
    df = Multi_Regression(1, 252, Return, M_Return, Return_1, Return_2, Return_3)
    dv.append_df(df, "HBETA_J", overwrite=True, is_quarterly=False)
    HBETA = dv.get_ts("HBETA_J")

Hsigma H010038A

因子描述:历史波动(Historical daily s igma) , 过往12个月中,个股日收益关于市场组合日收益的三阶自回归,市场组合日收益的残差标准差。

计算方法

# start_date_delta = 500 # 至少需要2年的数据参与运算
def Multi_Regression(n, Y, *X):
        from numpy.linalg import inv, LinAlgError
        from pandas import DataFrame, Series
        import numpy as np
        import pandas as pd
        DF = dict()
        le_th = len(Y)
        columns = Y.columns
        indexes = Y.index
        p = 4
        for column in columns:
            residuals = []

            def _func_x(x):
                if isinstance(x, DataFrame):
                    return x[column].values
                elif isinstance(x, Series):
                    return x.values
                else:
                    raise Error

            X_column = list(map(_func_x, X))
            #         print(X_column)
            X_column.insert(0, np.ones(le_th))
            X_column = np.array(X_column).T
            Y_column = np.array(Y[column].values).T
            for length in range(n, le_th):
                X_temp = X_column[length - n:length]
                Y_temp = Y_column[length - n:length]
                try:
                    beta = (inv((X_temp.T).dot(X_temp))).dot(X_temp.T).dot(Y_temp)
                    residual = (Y_temp - (beta).dot(X_temp.T))
                    #                 print(residual[0])
                    residual = (residual ** 2).sum() / (n - p - 1)
                    residuals.append(residual)
                except LinAlgError:
                    residuals.append(np.nan)
            DF[column] = residuals
        for key, value in DF.items():
            if len(value) != le_th - n:
                DF[key] += [np.nan] * (le_th - n - len(DF[key]))
            if len(value) != le_th:
                DF[key] = [np.nan] * n + DF[key]
        df = pd.DataFrame(DF, index=indexes)
        return df

    dv.add_field("close_adj")
    Return = dv.get_ts("close_adj").pct_change(1)
    Return_1 = Return.shift(1)
    Return_2 = Return.shift(2)
    Return_3 = Return.shift(3)
    zz800_benchmark, msg = dv.data_api.daily("000300.SH", dv.extended_start_date_d, dv.end_date,
                                             fields='trade_date,close,open')
    dv.data_benchmark = zz800_benchmark['trade_date', 'close', 'open'](/ChannelCMT/OFO/wiki/'trade_date',-'close',-'open').set_index('trade_date')
    BenchmarkIndexClose = dv.data_benchmark["close"].loc[dv.get_ts("close_adj").index]
    BenchmarkIndexOpen = dv.data_benchmark["open"].loc[dv.get_ts("close_adj").index]
    M_Return = BenchmarkIndexClose.pct_change(1)
    df = Multi_Regression(250, Return, Return_1, Return_2, Return_3, M_Return)
    dv.append_df(df, "HSIGMA_J", overwrite=True, is_quarterly=False)
    HSIGMA = dv.get_ts("HSIGMA_J")

DDNSR H010039A

因子描述:下跌波动(Downside standard deviations ratio) , 过往12个月中,市场组合日收益为负时, 个股日收益标准差和市场组合日收益标准差之比。

计算方法

DDNSR=sd(r)/sd(nr_m)

其中市场组合日收益nr-m的计算采用沪深300的数据,仅考虑市场回报为负的数据

nr_m=(CloseIndex-PrevCloseIndex)/PrevCloseIndex

import pandas as pd
import numpy as np
    T = 250

    benchmark, msg = dv.data_api.daily("000300.SH", dv.extended_start_date_d, dv.end_date,
                                       fields='trade_date,close,open')
    benchmark = benchmark['trade_date', 'close'](/ChannelCMT/OFO/wiki/'trade_date',-'close').set_index('trade_date').pct_change()
    benchmark = benchmark[benchmark < 0]

    def drop_and_corr(arr):
        arr = arr[arr[:, 1] < 0]
        arr = arr[~np.isnan(arr).any(axis=1)] # 可加可不加
        return arr[:, 0].std(ddof=1)/arr[:, 1].std(ddof=1)

    from numpy.lib.stride_tricks import as_strided as strided

    def get_sliding_window(df, W, return2D=0):
        a = df.values
        s0, s1 = a.strides
        m, n = a.shape
        out = strided(a, shape=(m - W + 1, W, n), strides=(s0, s0, s1))
        if return2D == 1:
            return out.reshape(m - W + 1, -1)
        else:
            return out

    def calc_corr(s):
        benchmark_ = benchmark.reindex(index=s.index)
        df = pd.concat([s, benchmark_], axis=1)

        idx = df.index[T - 1:]
        strides = get_sliding_window(df, T)
        result = list(map(lambda x: drop_and_corr(x), strides))

        return pd.Series(result, index=idx)

ret = dv.add_formula("_ret", "Return(close_adj)", is_quarterly=False)
DDNSR = ret.apply(calc_corr)
dv.append_df(DDNSR, 'DDNSR')

DDNCR H010040A

因子描述:下跌相关系数(Downside correlation) , 过往12个月中,市场组合日收益为负时,个股日收益关于市场组合日收益的相关系数。

计算方法

import pandas as pd
import numpy as np
    T = 250

    benchmark, msg = dv.data_api.daily("000300.SH", dv.extended_start_date_d, dv.end_date,
                                       fields='trade_date,close,open')
    benchmark = benchmark['trade_date', 'close'](/ChannelCMT/OFO/wiki/'trade_date',-'close').set_index('trade_date').pct_change()
    benchmark = benchmark[benchmark < 0]

    def drop_and_corr(arr):
        from scipy.stats import pearsonr
        arr = arr[arr[:, 1] < 0]
        arr = arr[~np.isnan(arr).any(axis=1)] # 可加可不加
        return pearsonr(arr[:, 0], arr[:, 1])[0]

    from numpy.lib.stride_tricks import as_strided as strided

    def get_sliding_window(df, W, return2D=0):
        a = df.values
        s0, s1 = a.strides
        m, n = a.shape
        out = strided(a, shape=(m - W + 1, W, n), strides=(s0, s0, s1))
        if return2D == 1:
            return out.reshape(m - W + 1, -1)
        else:
            return out

    def calc_corr(s):
        benchmark_ = benchmark.reindex(index=s.index)
        df = pd.concat([s, benchmark_], axis=1)

        idx = df.index[T - 1:]
        strides = get_sliding_window(df, T)
        result = list(map(lambda x: drop_and_corr(x), strides))

        return pd.Series(result, index=idx)

    ret = dv.add_formula("_ret", "Return(close_adj)", is_quarterly=False)
DDNCR = ret.apply(calc_corr)
dv.append_df(DDNCR, 'DDNCR')

Dvrat H010042A

因子描述:收益相对波动(Daily returns variance ratio-serial dependence in daily returns)。

dv.add_formula("ret_J", "Return(close_adj)", is_quarterly=False, add_data=True)
    T = 500  # 过往24个月中的交易日数
    q = 10
    dv.add_formula("sigma_squared_J", "Ts_Sum(Pow(ret_J, 2), %s)/(%s -1)" % (T, T),
                   is_quarterly=False, add_data=False)
    
    m = q*(T-q+1)*(1-q/T)
    
dv.add_formula("sigma_q_tmp", "Pow(Ts_Sum(ret_J, %s), 2)" % q, is_quarterly=False, add_data=True)
sigma_q = dv.add_formula("sigma_q", "Ts_Sum(sigma_q_tmp, %s)/%s" % (T-q, m), is_quarterly=False)

Ddnbt H010043A

因子描述:下跌贝塔(Downside beta) , 过往12个月中,市场组合日收益为负时,个股日收益关于市场组合日收益的回归系数。

计算方法

import pandas as pd
    import numpy as np

    benchmark, msg = dv.data_api.daily("000300.SH", dv.extended_start_date_d, dv.end_date,
                                       fields='trade_date,close,open')
    benchmark = benchmark['trade_date', 'close'](/ChannelCMT/OFO/wiki/'trade_date',-'close').set_index('trade_date').pct_change()

    def np_regr(A, y, residuals=False):
        if A.ndim == 1:
            A = A[:, None]
        A = np.hstack((np.ones(len(A))[:, None], A))
        betas = np.linalg.lstsq(A, y)[0]
        if residuals:
            return y - A @ betas
        return betas

    def drop_and_regr(arr, residuals=False, drop_first=True):
        arr = arr[~np.isnan(arr).any(axis=1)]
        #         print(arr.shape)
        if arr.size == 0:
            return np.nan
        # 可以再加一个条件当过去12个月内有数据的交易日不足多少天就不算

        # 这里加一个下跌贝塔的条件判断
        arr = arr[arr[:, 1] < 0]
        if arr.size == 0:
            return np.nan

        return np_regr(arr[:, 1:], arr[:, :1], residuals=residuals)[1].item()

    from numpy.lib.stride_tricks import as_strided as strided

    def get_sliding_window(df, W, return2D=0):
        a = df.values
        s0, s1 = a.strides
        m, n = a.shape
        out = strided(a, shape=(m - W + 1, W, n), strides=(s0, s0, s1))
        if return2D == 1:
            return out.reshape(m - W + 1, -1)
        else:
            return out

    ret = dv.add_formula("ret", "Return(close_adj)", is_quarterly=False, add_data=True)

    def calc_regr(stock_ret):
        """
        在里边rolling吧
        """
        X = pd.concat([stock_ret, benchmark], axis=1)
        idx = X.index[252 - 1:]
        strides = get_sliding_window(X, 252)
        result = list(map(lambda x: drop_and_regr(x), strides))

        return pd.Series(result, index=idx)

Ddnbt = ret.apply(calc_regr)
dv.append_df(Ddnbt, 'Ddnbt')

Tobt H010044A

因子描述:超额流动(Liquidity-turnover beta)。

计算方法

import pandas as pd
    import numpy as np
    zz800_benchmark, msg = dv.data_api.daily("000300.SH", dv.extended_start_date_d, dv.end_date,
                                             fields='trade_date,close,open')
    dv.data_benchmark = zz800_benchmark['trade_date', 'close', 'open'](/ChannelCMT/OFO/wiki/'trade_date',-'close',-'open').set_index('trade_date')
    BenchmarkIndexClose = dv.data_benchmark["close"].loc[dv.get_ts("close_adj").index]
    BenchmarkIndexOpen = dv.data_benchmark["open"].loc[dv.get_ts("close_adj").index]
    # risk_free_rate=-0.005
    M_Return = BenchmarkIndexClose.pct_change(1)
    # M_return=M_Return.apply(lambda x:round(x*100,4))
    M_Return_1 = M_Return.shift(1)
    M_Return_2 = M_Return.shift(2)
    M_Return_3 = M_Return.shift(3)
    M_Return_4 = M_Return.shift(4)
    M_Return_5 = M_Return.shift(5)
    dv.add_field("turnover_ratio")
    TORate = dv.add_formula("TORate", "turnover_ratio/100", add_data=True, is_quarterly=False)
    # TORate=TORate.applymap(lambda x:round(x*100,4))
    Return = dv.add_formula("Close_Return", "Return(close_adj)", add_data=True, is_quarterly=False)
    # Return=Return.applymap(lambda x:round(x*100,4))
    Return_1 = Return.shift(1)
    Return_2 = Return.shift(2)
    Return_3 = Return.shift(3)
    Return_4 = Return.shift(4)
    Return_5 = Return.shift(5)
    from numpy.linalg import inv, LinAlgError
    DF = dict()
    window = 498
    le_th = len(Return)
    indexes = Return.index
    betas = []
    for column in Return.columns:
        X = np.array(
            [np.ones(le_th), TORate[column].abs().values, Return_1[column].abs().values, Return_2[column].abs().values,
             Return_3[column].abs().values, Return_4[column].abs().values, Return_5[column].abs().values,
             M_Return_1.abs().values, M_Return_2.abs().values, M_Return_3.abs().values, M_Return_4.abs().values,
             M_Return_5.abs().values]).T
        #         print(X)
        Y = np.array(Return[column].abs().values).T
        betas = []
        #     print(X.shape)
        #     print(Y.shape)
        print(column)
        for length in range(window, len(Y)):
            X_temp = X[length - window:length]
            #         print(X_temp.shape)
            #         print(Y[length-window:length].shape)
            try:
                beta = (inv((X_temp.T).dot(X_temp))).dot(X_temp.T).dot(Y[length - window:length])
                betas.append(beta[1])
            except LinAlgError:
                betas.append(np.nan)
        DF[column] = betas
    for key, value in DF.items():
        if len(value) != le_th - window:
            DF[key] += [np.nan] * (le_th - window - len(DF[key]))
        if len(value) != le_th:
            #         print(key)
            DF[key] = [np.nan] * window + DF[key]
    df = pd.DataFrame(DF, index=indexes)
    dv.append_df(df, "TOBT", overwrite=True, is_quarterly=False)
    TOBT = dv.get_ts("TOBT")

Skewness H010045A

因子描述:股价偏度(Skewness of price during the last 20 days) , 过去20个交易日股价的偏度。

计算方法

SKEWNESS_J = dv.add_formula('SKEWNESS_J', "Ts_Skewness(close_adj,{})".format(20), is_quarterly=False, add_data=False)