Pair Trading S1 and S2

This is a generic Notebook, similar to the one Pair Trading GLD and GDX, where I am going to pair trade two securities. The backtesting also includes broker fees. All I have to do is adjust my input parameters such as ticker symbols and dates. The algorithm then fetches the 10 year historical price data from Nasdaq and simulates the pair trading strategy.

# Import the necessary modules

%matplotlib inline

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import math
import requests
import bs4 as bs
from sklearn import linear_model
import csv
import re
from statsmodels.tsa.stattools import coint, adfuller
from pathlib import Path
# Set the parameters

start_date = '2010-01-01'
end_date = '2018-01-01'
entry_condition_long = -1
entry_condition_short = 1
abs_condition_exit = 0.5
s1_symbol = 'v'
s2_symbol = 'ma'
def historical_prices(symbol:str):
    url = "https://www.nasdaq.com/symbol/{0}/historical".format(symbol)
    headers = {'content-type' : 'application/json'}
    data = "10y|false|{0}".format(symbol)
    
    resp = requests.post(url, data=data, headers=headers)
    soup = bs.BeautifulSoup(resp.text, 'lxml')
    historical_container = soup.find("div", {"id" : "quotes_content_left_pnlAJAX"})
    table = historical_container.find("table")
    historical_prices = []
    for row in table.findAll('tr')[2:]:
        date = re.sub(r"[\n\t\s]*", "", row.findAll('td')[0].text)
        price_open = re.sub(r"[\n\t\s]*", "", row.findAll('td')[1].text)
        price_high = re.sub(r"[\n\t\s]*", "", row.findAll('td')[2].text)
        price_low = re.sub(r"[\n\t\s]*", "", row.findAll('td')[3].text)
        price_close = re.sub(r"[\n\t\s]*", "", row.findAll('td')[4].text)
        volume = re.sub(r"[\n\t\s]*", "", row.findAll('td')[5].text)
        historical_prices.append((date,price_open,price_high,price_low,price_close,volume))

    file_name = "{0}_10y.csv".format(symbol)
    with open(file_name, 'w') as csv_file:
        writer = csv.writer(csv_file)
        writer.writerow(['date', 'open', 'high', 'low', 'close', 'volume'])
        for row in historical_prices:
           writer.writerow(row)

    return historical_prices
# Fetch prices from Nasdaq and save to CSV if CSV doesn't exists
csv_path_s1 = "{0}_10y.csv".format(s1_symbol)
csv_path_s2 = "{0}_10y.csv".format(s2_symbol)

s1_file = Path(csv_path_s1)
if not s1_file.is_file():
    s1 = historical_prices(s1_symbol)
else:
    print("%s already exists." % s1_file)

s2_file = Path(csv_path_s2)
if not s2_file.is_file():
    s2 = historical_prices(s2_symbol)
else:
    print("%s already exists." % s2_file)
v_10y.csv already exists.
ma_10y.csv already exists.
# Read the price data

# Read s1 prices
s1_df = pd.read_csv(csv_path_s1, index_col='date', sep=',', encoding = 'utf8')
s1_df.index = pd.to_datetime(s1_df.index, format = '%m/%d/%Y')

# Read s2 prices
s2_df = pd.read_csv(csv_path_s2, index_col='date')
s2_df.index = pd.to_datetime(s2_df.index, format = '%m/%d/%Y')

calendar_dates = pd.date_range(start=start_date, end=end_date, freq='D', tz=None)
s1_df = s1_df.reindex(calendar_dates)
s2_df = s2_df.reindex(calendar_dates)

s2_df = s2_df[start_date:end_date]
s1_df = s1_df[start_date:end_date]
# Plot the s1 prices

plt.figure(figsize=(14,4))
plt.plot(s1_df['close'])
plt.title("%s Prices" % s1_symbol)
plt.ylabel("Price")
plt.xlabel("Date");

_config.yml

# Plot the s2 prices

plt.figure(figsize=(14,4))
plt.plot(s2_df['close'])
plt.title("%s Prices" % s2_symbol)
plt.ylabel("Price")
plt.xlabel("Date");

_config.yml

# Run a LinearRegression model to figure out the beta such that:
# s1 = beta * s2
# We can use Ordinary Least Squares to estimate the values of the coefficients.
# See: http://scikit-learn.org/stable/auto_examples/linear_model/plot_ols.html


s1_cl = s1_df['close']
s2_cl = s2_df['close']

s1_cl = s1_cl.dropna()
s2_cl = s2_cl.dropna()

# Find the date/index intersection of the two data sets
s1_s2_index_intersection = s1_cl.index.intersection(s2_cl.index)
s1_cl_y = s1_cl[s1_s2_index_intersection]
s2_cl_x = s2_cl[s1_s2_index_intersection]

s1_cl = s1_cl_y
s2_cl = s2_cl_x

train_size = len(s2_cl) // 2

# Split the data into training/testing sets
s1_cl_y_train = s1_cl_y[:-train_size]
s1_cl_y_test = s1_cl_y[-train_size:]

dates_test = s1_cl_y_test.index
dates_train = s1_cl_y_train.index

# Split the targets into training/testing sets
s2_cl_x_train = s2_cl_x[:-train_size]
s2_cl_x_test = s2_cl_x[-train_size:]


s1_cl_y_test = s1_cl_y_test.values.reshape(-1,1)
s2_cl_x_test = s2_cl_x_test.values.reshape(-1,1)
s1_cl_y_train = s1_cl_y_train.values.reshape(-1,1)
s2_cl_x_train = s2_cl_x_train.values.reshape(-1,1)

regr = linear_model.LinearRegression()

# Train the model using the training sets
regr.fit(s2_cl_x_train, s1_cl_y_train)

# Make predictions using the testing set
s1_cl_y_pred = regr.predict(s2_cl_x_test)

# The coefficients
beta = regr.coef_[0][0]

# Plot outputs
plt.figure(figsize=(14,4))
plt.scatter(s2_cl_x_test, s1_cl_y_test, color='black')
plt.plot(s2_cl_x_test, s1_cl_y_pred, color='blue', linewidth=3)
plt.ylabel("s1");
plt.xlabel("s2")
plt.show()

_config.yml

# Plot the s1 and s2 (adjusted using beta) prices

plt.figure(figsize=(14,4))
plt.plot(s1_df['close'], 'y')
plt.plot(s2_df['close'] * beta, 'c')
plt.legend([s1_symbol, s2_symbol])
plt.show()

_config.yml

# Calculate estimated spread:
# spread = s1 - beta * s2

spread = s1_cl - beta * s2_cl
plt.figure(figsize=(14,4))
plt.title("Spread (spread = %s - beta * %s)" % (s1_symbol, s2_symbol))
plt.plot(spread, color='blue', linewidth=1)

_config.yml

# Calculate and plot the spread's z-score:
# spread_zscore = (spread - spread_mean)/spread_std

spread_mean = spread.mean()
spread_std = spread.std()
spread_zscore = (spread - spread_mean)/spread_std

plt.figure(figsize=(14,4))
plt.plot(spread_zscore, color='blue', linewidth=1)
plt.axhline(spread_zscore.mean(), color='black')
plt.axhline(entry_condition_short, color='green', linestyle='--')
plt.axhline(entry_condition_long, color='red', linestyle='--')
plt.legend(['Spread z-score', 'Spread z-score Mean', ('%s' % entry_condition_short), ('%s' % entry_condition_long)])
plt.title("Spread z-score")

_config.yml

print("spread_mean:%s  spread_std:%s  beta:%s"%(spread_mean, spread_std, beta))
spread_mean:6.33933713133  spread_std:4.86939115165  beta:0.668568952645
# Derive trading signals based on the spread's z-score

# Buy spread when its value drops below entry_condition_long standard deviations
longs = spread_zscore <= entry_condition_long

# Short spread when its value rises above entry_condition_short standard deviations
shorts = spread_zscore >= entry_condition_short

# Exit any spread position when its value approaches abs_condition_exit
# standard deviation of its mean
exits = abs(spread_zscore).between(-abs_condition_exit, abs_condition_exit, inclusive=True)

Going back to s2 and s1 above that follow s1 = ⍺ s2 + e, such that ratio (s1/s2) moves around it’s mean value ⍺, we make money on the ratio of the two reverting to the mean. In order to do this we’ll watch for when s2 and s1 are far apart, i.e ⍺ is too high or too low:

Going Long the Ratio This is when the ratio ⍺ is smaller than usual and we expect it to increase. In the above example, we place a bet on this by buying s1 and selling s2.

Going Short the Ratio This is when the ratio ⍺ is large and we expect it to become smaller. In the above example, we place a bet on this by selling s1 and buying s2.

# Plot the z-score together with the buy and sell signals

plt.figure(figsize=(14,4))

buyR = 0 * spread_zscore.copy()
sellR = 0 * spread_zscore.copy()

buyR[longs[longs == True].index] = spread_zscore[longs[longs == True].index]
sellR[shorts[shorts == True].index] = spread_zscore[shorts[shorts == True].index]
sellR.replace(0.0, np.nan, inplace=True)
buyR.replace(0.0, np.nan, inplace=True)

plt.plot(spread_zscore, color='blue', linewidth=1)
plt.plot(buyR, color='g', linestyle='None', marker='^')
plt.plot(sellR, color='r', linestyle='None', marker='^')

plt.axhline(abs_condition_exit, color='c', linestyle='-')
plt.axhline(-abs_condition_exit, color='c', linestyle='-')

plt.axhline(spread_zscore.mean(), color='black')
plt.axhline(entry_condition_short, color='green', linestyle='--')
plt.axhline(entry_condition_long, color='red', linestyle='--')

plt.legend(['Spread z-score', 'Buy Signal', 'Sell Signal', 'Exit', 'Exit' ,'Spread z-score Mean', ('%s' % entry_condition_short), ('%s' % entry_condition_long)])
plt.title("Spread z-score")
plt.show()

_config.yml

# Plot the prices together with the buy and sell signals

buy_s1_prices = buyR.copy()
buy_s1_prices[longs[longs == True].index] = s1_df['close'][longs[longs == True].index]

sell_s1_prices = sellR.copy()
sell_s1_prices[shorts[shorts == True].index] = s1_df['close'][shorts[shorts == True].index]

buy_s2_prices = sellR.copy()
buy_s2_prices[shorts[shorts == True].index] = s2_df['close'][shorts[shorts == True].index] * beta

sell_s2_prices = buyR.copy()
sell_s2_prices[longs[longs == True].index] = s2_df['close'][longs[longs == True].index] * beta

plt.figure(figsize=(14,4))
plt.plot(s1_df['close'], 'y')
plt.plot(s2_df['close']*beta, 'c')
plt.plot(buy_s1_prices, color='g', linestyle='None', marker='^')
plt.plot(sell_s1_prices, color='r', linestyle='None', marker='^')
plt.plot(buy_s2_prices, color='g', linestyle='None', marker='^')
plt.plot(sell_s2_prices, color='r', linestyle='None', marker='^')
plt.legend([s1_symbol, s2_symbol, 'Buy Signal', 'Sell Signal'])
plt.show()

_config.yml

# Simulate trading

positions_s1 = 0 * s1_cl.copy()
positions_s2 = positions_s1.copy()

# Initial amount of money
money = 5000
print('Initial money: %s (%s)' % (money, start_date))

# Do not short more than leverage amount
max_debt = 1.5 * money
debt = 0
commission_rate = 0.005
total_commission = 0
count_s1 = 0
count_s2 = 0
going_long = False
going_short = False
for day in s1_cl.index:
    ratio = s1_cl[day] // s2_cl[day]
    
    # Buy s1, Sell s2
    if longs[day]:
        if money > 0:
            quantity = money // s1_cl[day] 
            if quantity > 0 and (debt + s2_cl[day] * ratio * quantity) < max_debt:
                #print('\x1b[32mLong s1, Short s2\x1b[0m')
                money -= s1_cl[day] * quantity
                money += s2_cl[day] * ratio * quantity
                count_s1 += quantity
                count_s2 -= ratio * quantity
                positions_s1[day] = count_s1
                positions_s2[day] = count_s2
                going_long = True
                commission_s1 = max(min(commission_rate * quantity, commission_rate * quantity * s1_cl[day]), 1)
                commission_s2 = max(min(commission_rate * ratio * quantity, commission_rate * ratio * quantity * s2_cl[day]), 1)
                money -= (commission_s1 + commission_s2)
                total_commission += (commission_s1 + commission_s2)
                #print(' s1 Price: %s,  s2 Price: %s, count_s1: %s, count_s2: %s, money: %s' % (s1_cl[day], s2_cl[day],count_s1, count_s2, money))
    # Sell s1, Buy s2
    elif shorts[day]:
        if money > 0:
            quantity = money // s2_cl[day]
            if quantity > 0 and (debt + s1_cl[day] * quantity) < max_debt:
                #print('\x1b[31mShort s1, Long s2\x1b[0m')
                money += s1_cl[day] * quantity
                money -= s2_cl[day] * ratio * quantity            
                count_s1 -= quantity
                count_s2 += ratio * quantity
                positions_s1[day] = count_s1
                positions_s2[day] = count_s2
                going_short = True
                commission_s1 = max(min(commission_rate * quantity, commission_rate * quantity * s1_cl[day]), 1)
                commission_s2 = max(min(commission_rate * ratio * quantity, commission_rate * ratio * quantity * s2_cl[day]), 1)
                money -= (commission_s1 + commission_s2)
                total_commission += (commission_s1 + commission_s2)
                #print(' s1 Price: %s,  s2 Price: %s, count_s1: %s, count_s2: %s, money: %s' % (s1_cl[day], s2_cl[day],count_s1, count_s2, money))
    # Clear positions
    elif exits[day] and (going_long or going_short):
        #print("\x1b[34mExiting on %s with count_s1: %s, count_s2: %s\x1b[0m" % (day, count_s1, count_s2))
        money += s1_cl[day] * count_s1
        money += s2_cl[day] * count_s2
        count_s1 = 0
        count_s2 = 0
        positions_s1[day] = 0
        positions_s2[day] = 0
        going_long = False
        going_short = False
        commission_s1 = max(min(commission_rate * count_s1, commission_rate * count_s1 * s1_cl[day]), 1)
        commission_s2 = max(min(commission_rate * count_s2, commission_rate * count_s2 * s2_cl[day]), 1)
        money -= (commission_s1 + commission_s2)
        total_commission += (commission_s1 + commission_s2)
        #print(' s1 Price: %s,  s2 Price: %s,  money: %s' % (s1_cl[day], s2_cl[day],  money))
    else:
        # Cary positions forward
        positions_s1[day] = count_s1
        positions_s2[day] = count_s2
        
    debt = 0
    if count_s1 < 0:
        debt = abs(count_s1) * s1_cl[day]
    if count_s2 < 0:
        debt += abs(count_s2) * s2_cl[day]

print('Final money: %s (%s)' % (money, end_date))
print("Exiting any remaining positions...")
money += s1_cl[day] * count_s1
money += s2_cl[day] * count_s2
commission_s1 = max(min(commission_rate * quantity, commission_rate * count_s1 * s1_cl[day]), 1)
commission_s2 = max(min(commission_rate * ratio * quantity, commission_rate * count_s2 * s2_cl[day]), 1)
money -= (commission_s1 + commission_s2)
total_commission += (commission_s1 + commission_s2)
print('Final money: %s' % money)
print('Total commission: %s' % total_commission)
Initial money: 5000 (2010-01-01)
Final money: 17160.4825 (2018-01-01)
Exiting any remaining positions...
Final money: 8378.9425
Total commission: 16.39

Full source code is available here.

Written on August 10, 2021