Day 28: Reveal

On Day 27, we had our strategy enhancement reveal. By modifying the arithmetic behind our error correction, we chiseled another 16% points of outperformance vs. buy-and-hold and the original 12-by-12 strategy. All that remains now is to run the prediction scenario metrics and conduct circular block sampling. Given that we’ve laid the ground work for these analyses in past posts, we will only spill a little bit of virtual, binary ink in the discussion. Strap in.

First, we present the confusion matrices substituting the previous adjusted model for the new one below.

The true negatives for the new adjusted strategy are not different than the prior adjustment (196 vs. 197), which is not shown. However, true positives increase by about 2.5% to 324. This comes on thet back of modestly lower false positives (drops by 1, or 0.5%) and a false negatives (drops by 8, 3.6%).

Now we show the average return by prediction scenario.

The chart looks quite similar to the previous version, so we’ll move on to the t-tests. In this case, we only show the p-values for the differences in mean for the new adjusted vs unadjusted and original strategies along with the same comparisons for the old adjusted strategy for reference. We see that in most cases the p-value decreases for the differences in means for those metrics that already had p-values below the 0.05 threshold. Interestingly, if were to flip the test, we’d note that the difference in average returns for the original strategy are significant vs. the new adjusted one at the 0.05 level. Before there was no significant difference.

T-test P-values
New Adjusted vs. Unadjusted Old Adjusted vs. Unadjusted New Adjusted vs. Original Old Adjusted vs. Original
True positive 0.003 0.016 0.005 0.021
False negative 0.007 0.001 0.004 0.006
True negative 0.114 0.131 0.074 0.085
False positive 0.872 0.765 0.948 0.870

We now turn to circular block sampling to see how the strategy might perform relative to buy-and-hold over 1,000 simulated five-year periods. We call use 3 and 5 block groups of returns to simulate the serial correlation of returns of the underlying.

3-block sample 7-block sample
New Original Buy-and-Hold New Original Buy-and-Hold
Return 14.8% 12.4% 23.8% 15.2% 15.0% 24.6%
Sharpe Ratio 0.25 0.20 0.30 0.26 0.25 0.32
Return Outperformance Frequency 33.6% 29.2% 34.5% 31.7%
Sharpe Outperformance Frequency 43.0% 35.7% 42.2% 38.5%

On the 3-block sample the new adjusted strategy outperforms the original but underperforms buy-and-hold on cumulative and risk-adjusted (Sharpe ratio) returns. It’s frequency of outperformance relative to buy-and-hold also surpasses the original strategy. On the 7-block, performance for the new adjusted strategy is ever so slightly better than the original but underperforms buy-and-hold as does the original. Interestingly, the new adjusted’s frequency of outperformance relative to buy-and-hold seems to outpace the original strategy meaningfully, yet does not result in a sizeable difference in performance. A quick glance at the data reveals a single instance of surprising underperformance relative to buy-and-hold. But more analysis, beyond the scope of this post, is required to find the root cause.

In sum, the new adjusted strategy performs better than the original and adjusted strategies. Such performance is also statistically significant for key scenarios. On simulation, the new adjusted strategy also tends to outperform the original strategy and exceed the original strategy’s performance relative to buy-and-hold. But buy-and-hold still appears to beat most of the strategies. It is interesting question whether this has to do with the relative frequency of severe market downdrafts due to the Global Financial Crisis and the Tech Bubble not being captured in the simulation. But this would require further analysis. Indeed, simulation is only one piece of the puzzle. And we’ll present the most important part – the out-of-sample test – in our next post. Stay tuned!

Code below.

# Built using Python 3.10.19 and a virtual environment 

# Load packages
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import statsmodels.api as sm
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
from utils_intraday import *
import yfinance as yf
from tqdm import tqdm
import seaborn as sns
from scipy.stats import t
from scipy.stats import ttest_ind

plt.style.use('seaborn-v0_8')
plt.rcParams['figure.figsize'] = (14,8)

def get_spy_weekly_data() -> pd.DataFrame:
    df = yf.download('SPY', start='2000-01-01', end='2024-10-01')
    df.columns = ['open', 'high', 'low', 'close', 'adj close', 'volume']
    df.index.name = 'date'

    # Create training set and downsample to weekly ending Friday
    df_train = df.loc[:'2019-01-01', 'adj close'].copy()
    df_w = pd.DataFrame(df_train.resample('W-FRI').last())
    df_w.columns = ['price'] 

    return df_w

df_w = get_spy_weekly_data()

# Create momentum dictionary
periods = [3, 6, 9, 12]
momo_dict = {}
for back in periods:
    for forward in periods:
        df_out = df_w.copy()
        df_out['ret_back'] = np.log(df_out['price']/df_out['price'].shift(back))
        df_out['ret_for'] = np.log(df_out['price'].shift(-forward)/df_out['price'])
        df_out = df_out.dropna()

        mod = sm.OLS(df_out['ret_for'], sm.add_constant(df_out['ret_back'])).fit()
        momo_dict[f"{back} - {forward}"] = {'data': df_out,
                                            'params': mod.params,
                                            'pvalues': mod.pvalues}

# Trade with error handling
df_trade_1 = momo_dict['12 - 12']['data'].copy()

# create new adjusted, original, and unadjusetd strategies
mod_look_forward = 12
train_pd = 5
test_pd = 1
tot_pd = train_pd + test_pd
lr = 2

trade_pred = []
trade_pred_un = []
trade_pred_old = []
mod_params = []
for i in range(tot_pd, len(df_trade_1)-mod_look_forward+1, test_pd):
    train_df = df_trade_1.iloc[i-tot_pd:i-test_pd, 1:]
    valid_df = df_trade_1.iloc[i-test_pd:i, 1:]
    uncorr_df = df_trade_1.iloc[i-test_pd+mod_look_forward-1:i-test_pd+mod_look_forward, 1:]
    test_df = df_trade_1.iloc[i-test_pd+mod_look_forward:i-test_pd+mod_look_forward+1, 1:]

    # Ensure 'ret_back' is 2D by selecting it as a DataFrame, not a Series
    X_train = sm.add_constant(train_df[['ret_back']])
    if valid_df.shape[0] > 1:
        X_valid = sm.add_constant(valid_df[['ret_back']])
    else:
        X_valid = sm.add_constant(valid_df[['ret_back']], has_constant='add')

    if uncorr_df.shape[0] > 1:
        X_uncorr = sm.add_constant(uncorr_df[['ret_back']])
    else:
        X_uncorr = sm.add_constant(uncorr_df[['ret_back']], has_constant='add')

    if test_df.shape[0] > 1:
        X_test = sm.add_constant(test_df[['ret_back']])
    else:
        X_test = sm.add_constant(test_df[['ret_back']], has_constant='add')

    # Fit the model
    mod = sm.OLS(train_df['ret_for'], X_train).fit()
    mod_params.append(mod.params.values)

    # Predict using the test data
    pred = mod.predict(X_valid).values
    actual = valid_df['ret_for'].values
    gamma = -(actual - pred)*lr
    # gamma = -1 if np.sign(actual) + np.sign(pred) == 0 else 1
    pred_old = mod.predict(X_uncorr)
    trade_pred_old.extend(pred_old)

    mod_pred = mod.predict(X_test).values
    trade_pred_un.extend(mod_pred)
    # trade_pred.extend(mod_pred + gamma)
    trade_pred.extend(mod_pred * gamma)

assert len(trade_pred) + mod_look_forward + train_pd == len(df_trade_1)
assert len(trade_pred_un) + mod_look_forward + train_pd == len(df_trade_1)

df_trade_1['pred'] = np.concatenate((np.zeros(mod_look_forward + train_pd), np.array(trade_pred)))
df_trade_1['pred_un'] = np.concatenate((np.zeros(mod_look_forward + train_pd), np.array(trade_pred_un)))
df_trade_1['pred_old'] = np.concatenate((np.zeros(mod_look_forward + train_pd - 1), np.array(trade_pred_old), np.zeros(1)))
df_trade_1['ret'] = np.log(df_trade_1['price']/df_trade_1['price'].shift(1))

df_trade_1['signal'] = np.where(df_trade_1['pred'] > 0, 1, 0)
df_trade_1['signal_un'] = np.where(df_trade_1['pred_un'] > 0, 1, 0)
df_trade_1['signal_old'] = np.where(df_trade_1['pred_old'] > 0, 1, 0)
df_trade_1['signal_sh'] = np.where(df_trade_1['pred'] >= 0, 1, -1)

df_trade_1['strat_ret'] = df_trade_1['signal'].shift(1) * df_trade_1['ret']
df_trade_1['strat_ret_un'] = df_trade_1['signal_un'].shift(1) * df_trade_1['ret']
df_trade_1['strat_ret_old'] = df_trade_1['signal_old'].shift(1) * df_trade_1['ret']
df_trade_1['strat_ret_sh'] = df_trade_1['signal_sh'].shift(1) * df_trade_1['ret']

(df_trade_1[['strat_ret', 'strat_ret_old', 'ret']].cumsum()*100).plot()
plt.ylabel("Return (%)")
plt.xlabel("")
plt.legend(['New adjusted', 'Original', 'Buy-and-Hold'])
plt.title('Cumulative returns: 12-by-12 Strategy (with and without error correction) vs. Buy-and-Hold')
plt.show()

#### Correlation analysis ####
df_corr = df_trade_1[['pred', 'pred_un', 'pred_old', 'gamma', 'ret', 'ret_for', 'ret_back']].copy()
df_corr['ret_lag'] = df_corr['ret'].shift(-1)

# Confusion matrix analysis
pred_sign = [1 if x == 1 else -1 for x in np.sign(df_corr['pred'])]
pred_un_sign = [1 if x == 1 else -1 for x in np.sign(df_corr['pred_un'])]
ret_sign = [1 if x == 1 else -1 for x in np.sign(df_corr['ret_lag'])]
pred_old_sign = [1 if x == 1 else -1 for x in np.sign(df_corr['pred_old'])]

# Create a DataFrame
pred_data = pd.DataFrame({'Prediction': pred_sign, 'Return': ret_sign})
pred_un_data = pd.DataFrame({'Prediction': pred_un_sign, 'Return': ret_sign})
pred_old_data = pd.DataFrame({'Prediction': pred_old_sign, 'Return': ret_sign})

# Create a heatmap of counts for each combination
pred_heatmap = pred_data.groupby(['Prediction', 'Return']).size().unstack(fill_value=0)
pred_un_heatmap = pred_un_data.groupby(['Prediction', 'Return']).size().unstack(fill_value=0)
pred_old_heatmap = pred_old_data.groupby(['Prediction', 'Return']).size().unstack(fill_value=0)

# Heatmap
fig, (ax1, ax2, ax3) = plt.subplots(1,3, sharey=True, figsize=(12,6))
fig.text(0.5, 0.98, "Prediction vs actual direction confusion matrix", ha='center', fontsize=12)

sns.heatmap(pred_heatmap.T, annot=True, fmt='d', cmap='Blues', cbar=False, linewidths=0.5, linecolor='white', ax=ax1)
ax1.xaxis.tick_top()
ax1.xaxis.set_label_position('top')  # Move the x-axis label to the top
ax1.set_xlabel('Adjusted Prediction Direction', fontsize = 10)
ax1.set_ylabel('Forward Return Direction')

sns.heatmap(pred_old_heatmap.T, annot=True, fmt='d', cmap='Blues', cbar=False, linewidths=0.5, linecolor='white', ax=ax2)
ax2.xaxis.tick_top()
ax2.xaxis.set_label_position('top')  # Move the x-axis label to the top
ax2.set_xlabel('Original Prediction Direction', fontsize = 10)
ax2.set_ylabel('')

sns.heatmap(pred_un_heatmap.T, annot=True, fmt='d', cmap='Blues', cbar=False, linewidths=0.5, linecolor='white', ax=ax3)
ax3.xaxis.tick_top()
ax3.xaxis.set_label_position('top')  # Move the x-axis label to the top
ax3.set_xlabel('Unadjusted Prediction Direction', fontsize = 10)
ax3.set_ylabel('')

plt.show()

# Confusion matrix scenario returns
# Create function to get returns and test difference of means
def get_error_df_with_ttest(dataf, pred_col, comp_col, act_col):
    # Calculate means for each scenario
    false_positive = dataf.loc[(dataf[pred_col] > 0) & (dataf[act_col] <= 0), act_col] * 100
    false_positive_un = dataf.loc[(dataf[comp_col] > 0) & (dataf[act_col] <= 0), act_col] * 100

    false_negative = dataf.loc[(dataf[pred_col] <= 0) & (dataf[act_col] > 0), act_col] * 100
    false_negative_un = dataf.loc[(dataf[comp_col] <= 0) & (dataf[act_col] > 0), act_col] * 100

    true_positive = dataf.loc[(dataf[pred_col] > 0) & (dataf[act_col] > 0), act_col] * 100
    true_positive_un = dataf.loc[(dataf[comp_col] > 0) & (dataf[act_col] > 0), act_col] * 100

    true_negative = dataf.loc[(dataf[pred_col] < 0) & (dataf[act_col] < 0), act_col] * 100
    true_negative_un = dataf.loc[(dataf[comp_col] < 0) & (dataf[act_col] < 0), act_col] * 100

    # Calculate t-tests
    ttest_results = {
        'True positive': ttest_ind(true_positive.dropna(), true_positive_un.dropna(), alternative='greater').pvalue,
        'False negative': ttest_ind(false_negative.dropna(), false_negative_un.dropna(), alternative='less').pvalue,
        'True negative': ttest_ind(true_negative.dropna(), true_negative_un.dropna(), alternative='greater').pvalue,
        'False positive': ttest_ind(false_positive.dropna(), false_positive_un.dropna(), alternative='greater').pvalue
    }

    # Calculate means for the dataframe
    result_df = pd.DataFrame({
        'Adjusted': [
            true_positive.mean(),
            false_negative.mean(),
            true_negative.mean(),
            false_positive.mean()
        ],
        'Unadjusted': [
            true_positive_un.mean(),
            false_negative_un.mean(),
            true_negative_un.mean(),
            false_positive_un.mean()
        ],
        'P-value': [
            ttest_results['True positive'],
            ttest_results['False negative'],
            ttest_results['True negative'],
            ttest_results['False positive']
        ]
    }, index=['True positive', 'False negative', 'True negative', 'False positive'])

    return result_df

df_error_ttest = get_error_df_with_ttest(df_corr, 'pred', 'pred_un', 'ret_lag')
df_error_old_ttest = get_error_df_with_ttest(df_corr, 'pred', 'pred_old', 'ret_lag')
df_error_un_old_ttest = get_error_df_with_ttest(df_corr, 'pred_un', 'pred_old', 'ret_lag')
df_error_un_old_ttest.columns = ['Unadjusted', 'Former', 'P-value']

df_error_comb = df_error_ttest[['Adjusted', 'Unadjusted']].copy()
df_error_comb['Original'] = df_error_old_ttest['Unadjusted']

df_error_comb.plot(kind='bar', stacked=False, rot=0)
plt.ylabel("Return")
plt.title('Average Returns by Metric: Adjusted, Unadjusted, and Original Strategies')
plt.yticks([-2.0, -1.0, 0.0, 1.0, 2.0], [f"{x:0.1f}%" for x in [-2.0, -1.0, 0.0, 1.0, 2.0]])
plt.show()

# Circular block sampling

# Circular block
data = df_w.apply(lambda x: np.log(x/x.shift(1))).dropna().copy()
# Unfortunately, we did not create a circular block function, so we leave it to the reader to follow the logic below to reproduce the 7-block. We failed at not following the "don't repeat yourself" design pattern. Sorry. Note this takes about 10-15 minutes to run. 

#### 3-block ####
block_size = 3
data_idx = len(data) - 1
data_len = int(52*5 + 24) # five years + 12 weeks on either end
iter = data_len // block_size 
adder = data_len % block_size
block_data = []

np.random.seed(42)
for _ in tqdm(range(1000), desc='Simulation'):
    dat = []
    for _ in range(iter):
        start = np.random.choice(data_idx, 1)[0]
        end = start+block_size
        out = data.iloc[start:end]
        if end > data_idx + 1:
            new_end = end - data_idx - 1
            new_out = data.iloc[:new_end]
            out = pd.concat([out, new_out])
        dat.extend(out.values)
    if adder:
        new_start = np.random.choice(data_idx, 1)[0]
        new_end = new_start+adder
        out_add = data.iloc[new_start:new_end]
        if new_end > data_idx + 1:
            s_end = new_end - data_idx - 1
            s_end = s_end if s_end < adder else adder - 1 
            new_out = data.iloc[:s_end]
            out_add = pd.concat([out_add, new_out])
        dat.extend(out_add.values)
    block_data.append(dat)

back = 12
forward = 12
mod_look_forward = forward
train_pd = 5
test_pd = 1
tot_pd = train_pd + test_pd

lst_sim = []
for dt in tqdm(block_data, desc='Simulations'):
    price_sim = np.exp(np.array(dt)).cumprod()*100
    dataf = pd.DataFrame(np.c_[price_sim, np.array(dt)], columns=['price', 'ret'])

    dataf['ret_back'] = np.log(dataf['price']/dataf['price'].shift(back))
    dataf['ret_for'] = np.log(dataf['price'].shift(-forward)/dataf['price'])
    dataf = dataf.dropna()

    trade_pred_circ = []
    for i in range(tot_pd, len(dataf)-mod_look_forward+1, test_pd):
        train_df = dataf.iloc[i-tot_pd:i-test_pd, 1:]
        valid_df = dataf.iloc[i-test_pd:i, 1:]
        # uncorr_df = df_trade_1.iloc[i-test_pd+mod_look_forward-1:i-test_pd+mod_look_forward, 1:]
        test_df = dataf.iloc[i-test_pd+mod_look_forward:i-test_pd+mod_look_forward+1, 1:]

        # Ensure 'ret_back' is 2D by selecting it as a DataFrame, not a Series
        X_train = sm.add_constant(train_df[['ret_back']])
        if valid_df.shape[0] > 1:
            X_valid = sm.add_constant(valid_df[['ret_back']])
        else:
            X_valid = sm.add_constant(valid_df[['ret_back']], has_constant='add')

        # if uncorr_df.shape[0] > 1:
        #     X_uncorr = sm.add_constant(uncorr_df[['ret_back']])
        # else:
        #     X_uncorr = sm.add_constant(uncorr_df[['ret_back']], has_constant='add')

        if test_df.shape[0] > 1:
            X_test = sm.add_constant(test_df[['ret_back']])
        else:
            X_test = sm.add_constant(test_df[['ret_back']], has_constant='add')

        # Fit the model
        mod = sm.OLS(train_df['ret_for'], X_train).fit()
        # mod_params.append(mod.params.values)

        # Predict using the test data
        pred = mod.predict(X_valid).values
        actual = valid_df['ret_for'].values
        gamma = -(actual - pred)*2
        # pred_old = mod.predict(X_uncorr)
        mod_pred = mod.predict(X_test).values
        trade_pred_circ.extend(mod_pred * gamma)    
        
    dataf['pred'] = np.concatenate((np.zeros(mod_look_forward + train_pd), np.array(trade_pred_circ)))
    dataf['signal'] = np.where(dataf['pred'] > 0, 1, 0)
    dataf['signal_sh'] = np.where(dataf['pred'] >= 0, 1, -1)

    dataf['strat_ret'] = dataf['signal'].shift(1) * dataf['ret']
    dataf['strat_ret_sh'] = dataf['signal_sh'].shift(1) * dataf['ret']

    cumul_ret = dataf[['strat_ret', 'strat_ret_sh', 'ret']].cumsum().iloc[-1].values
    sharpe = dataf[['strat_ret', 'strat_ret_sh', 'ret']].apply(lambda x: x.mean()/x.std()*np.sqrt(52)).values
    lst_sim.append([cumul_ret, sharpe])

flat_data = [np.concatenate(triple) for triple in lst_sim]
df_sim = pd.DataFrame(flat_data, columns=['strat_ret', 'strat_ret_sh', 'ret', 'strat_sharpe','strat_sharpe_sh', 'ret_sharpe'])
df_sim['outperf'] = df_sim['strat_ret'] - df_sim['ret']
df_sim['outperf_sh'] = df_sim['strat_ret_sh'] - df_sim['ret']

# print performance
print(f"Frequency of return outperformance: {(df_sim['strat_ret'] > df_sim['ret']).mean()}")
print(f"Frequency of Sharpe outperformance: {(df_sim['strat_sharpe'] > df_sim['ret_sharpe']).mean()}")
print(f"Mean returns: {df_sim.mean()}")