In this post, we develop the essential code for extracting, transforming, and loading options data. We will need to retrieve data from the TD Ameritrade API, transform it, and load the data into the PostgreSQL database we created in Part I: Designing the Database. We will also run some checks to ensure that the pipeline works.

The Series

This is the second post in my series, Towards Open Options Chains: A Data Pipeline for Collecting Options Data at Scale:

  1. Database Design
  2. Foundational ETL Code
  3. Getting Started with Airflow
  4. Building the DAG
  5. Containerising the Pipeline

Pre-requisites

  • Operating system: Linux (I’m using Ubuntu 20.04). If you’re on Windows, consider using Windows Subsystem for Linux [1].
  • Python 3.9 with the following packages/libraries and their dependencies installed: requests, python-dotenv, pandas, and psycopg2

Recap of Use Case

The use case for the data is to run backtests of options trading strategies. Therefore, the data pipeline must collect sufficiently fine-grained data for us to do backtests. This means we’ll need options prices, volumes, and the options greeks (e.g. delta, theta, and gamma), collected at sufficiently high frequency.

Extract

The first step in our pipeline involves extracting the data. In our case, we will retrieve data from the TD Ameritrade (TDA) Option Chains API.

First, we will obtain credentials for using the API by doing the following:

  1. Head on over to the TD Ameritrade developer site to sign up for a free account.
  2. Go to My Apps and create an app.
  3. Note the Consumer Key, which is used for OAuth 2.0 token-based authentication.

Next, we construct requests to the TDA Option Chains API in Python. We define a few parameters:

  • Your API key: We assume that we have a file (.env) in the same folder containing your API key in a variable API_KEY=<your API key here>. We will load it into Python using python-dotenv.
  • Ticker: For now, we’ll use Facebook (FB) as an example.
  • Instrument: We need to specify the instrument too. For now, we query the data for puts only.
  • Strike Count: This defines the total number of strikes to be returned. We’ll go for 50 as a start, but we should decrease this number to the minimum necessary to optimise storage costs.
  • Range: This refers to whether the options are in-the-money (ITM), out-of-the-money (OTM), etc. We’ll request all of them.
  • Date range: We specify a start date and end date using Python’s built-in datetime module. I’ve chosen to use the current date as the start date and the day 45 days out as the end date.

That gives us the following:

# Import modules
import json
import requests
from dotenv import dotenv_values

# Load environment variables - API key is accessible at env['API_KEY']
env = dotenv_values('.env')

# Configure dates
start_date = datetime.today()
end_date = start_date + timedelta(days=45)

# Other params
TICKER = 'FB'

# Configure request
headers = {'Authorization': ''}

params = (
    ('apikey', env['API_KEY']),
    ('symbol', TICKER),
    ('contractType', 'PUT'),
    ('strikeCount', '50'),
    ('range', 'ALL'),
    ('fromDate', start_date),
    ('toDate', end_date),
)

We now use the requests library to send the request with the defined headers and parameters, receiving a JSON response with the options chain if all went well:

# Get data
response = requests.get(
    'https://api.tdameritrade.com/v1/marketdata/chains',
    headers=headers,
    params=params
)

# Extract data
data = json.loads(response.content)

The data is structured as follows:

root
└── Puts/Calls
    └── Contract (by expiry date)
        └── Strike
            └── Fields (e.g. prices, greeks)

We parse the JSON object using the following code to obtain a Pandas dataframe:

# Extract puts data
puts = []
dates = list(data['putExpDateMap'].keys())

for date in dates:

    strikes = data['putExpDateMap'][date]

    for strike in strikes:
        puts += data['putExpDateMap'][date][strike]

# Define columns
columns = ['putCall', 'symbol', 'description', 'exchangeName', 'bid', 'ask',
    'last', 'mark', 'bidSize', 'askSize', 'bidAskSize', 'lastSize',
    'highPrice', 'lowPrice', 'openPrice', 'closePrice', 'totalVolume',
    'tradeDate', 'tradeTimeInLong', 'quoteTimeInLong', 'netChange',
    'volatility', 'delta', 'gamma', 'theta', 'vega', 'rho', 'openInterest',
    'timeValue', 'theoreticalOptionValue', 'theoreticalVolatility',
    'optionDeliverablesList', 'strikePrice', 'expirationDate',
    'daysToExpiration', 'expirationType', 'lastTradingDay', 'multiplier',
    'settlementType', 'deliverableNote', 'isIndexOption', 'percentChange',
    'markChange', 'markPercentChange', 'mini', 'inTheMoney', 'nonStandard']

# Convert to dataframe
puts = pd.DataFrame(puts, columns=columns)

Transform

The transform step will convert the data into the final structure/format to be loaded into Postgres. For our use case, it is fairly simple: (1) select the right columns and (2) ensure we have the right data types.

First, inspecting the data, we identify several fields to keep (below). We will select only these fields for processing.

  • Metadata: putCall, symbol, description, quoteTimeInLong (as unix timestamp)
  • Contract info: openInterest, timeValue, theoreticalOptionalValue, strikePrice, expirationDate (as unix timestamp), daysToExpiration, volatility
  • Trade data: bid, ask, last, bidSize, askSize, lastSize, highPrice, lowPrice, openPrice, closePrice, totalVolume
  • Greeks: delta, gamma, theta, vega, rho

Second, we ensure that the data is in the right format. Most of the columns have been parsed by Pandas correctly. Although we could assume that Pandas will continue to parse the data correctly, it may not be wise to do so. Suppose that for the first 6 months, the API has always returned 100% float values for the OHLC prices. One day, it starts to return the string nan, just like the options greeks data returned today. This results in Pandas now parsing the OHLC prices as object types and the data loading step in the pipeline may fail. Hence, we should explicitly format the data. Also, for any data that was coerced to missing values, we fill that in with a placeholder value -99.

The final step is to convert the column names into snake case. This is just a personal preference. The code to do everything we discussed is shown below:

# Select columns
puts = puts[['putCall', 'symbol', 'description', 'bid', 'ask', 'last', 'bidSize',
    'askSize', 'lastSize', 'highPrice', 'lowPrice', 'openPrice',
    'closePrice', 'totalVolume', 'quoteTimeInLong', 'volatility', 'delta',
    'gamma', 'theta', 'vega', 'rho', 'openInterest', 'timeValue',
    'theoreticalOptionValue', 'strikePrice', 'expirationDate',
    'daysToExpiration']]

# Convert floats
def conv_num(x):
    return pd.to_numeric(x.astype(str).str.replace('NaN|nan', '', regex=True))

for col in ['bid', 'ask', 'last', 'highPrice', 'lowPrice', 'openPrice',
            'closePrice', 'volatility', 'delta', 'gamma', 'theta', 'vega',
            'rho', 'timeValue', 'theoreticalOptionalValue', 'strikePrice']:
    puts[col] = conv_num(puts[col])

# Specifically for puts delta: make it positive
puts['delta'] = -puts['delta']

# Convert strings
def conv_str(x):
    return x.astype(str)

for col in ['putCall', 'symbol', 'description']:
    puts[col] = conv_str(puts[col])

# Convert integers
def conv_int(x):
    return x.astype(int)

for col in ['bidSize', 'askSize', 'lastSize', 'totalVolume', 'quoteTimeInLong',
            'openInterest', 'expirationDate', 'daysToExpiration']:
    puts[col] = conv_int(puts[col])

# Fill missing values
puts = puts.fillna(-99)

# Rename columns
puts = puts.rename(columns={
    'putCall': 'put_call',
    'bidSize': 'bid_size',
    'askSize': 'ask_size',
    'lastSize': 'last_size',
    'highPrice': 'high_price',
    'lowPrice': 'low_price',
    'openPrice': 'open_price',
    'closePrice': 'close_price',
    'totalVolume': 'total_volume',
    'quoteTimeInLong': 'quote_time',
    'openInterest': 'open_interest',
    'timeValue': 'time_value',
    'theoreticalOptionValue': 'theoretical_value',
    'strikePrice': 'strike_price',
    'expirationDate': 'expiration_date',
    'daysToExpiration': 'dte',
})

Load

After extracting the data and transforming it to conform to the defined schema, we will need to load the data into our Postgres database. We use INSERT ... ON CONFLICT DO NOTHING, keeping only the first entry in the event of repeated rows:

INSERT INTO FB (<all columns here>) VALUES (<all values here>) ON CONFLICT DO NOTHING;

We use the psycopg2 library to load the data from Python into Postgres. We follow the typical process of (1) defining the query (as above), (2) connecting to the database, (3) creating a cursor, (4) executing the query with the parameters, (5) committing the transaction, and then (6) closing the connection. Note that we have placed the database password into the .env file to avoid hardcoding it.

import psycopg2 as pg2

col_str = ', '.join(puts.columns.tolist())
query_insert = f"INSERT INTO fb ({col_str}) VALUES %s ON CONFLICT DO NOTHING"

# Connect to database
conn = pg2.connect(
    host='localhost',
    database='optionsdata',
    user='postgres',
    password=env['DB_PASSWORD']
)

# Loop through rows
with conn.cursor() as cursor:
    for t in puts.itertuples(index=False, name=None):
        cursor.execute(query_insert % str(t))
    conn.commit()

# Close connection
conn.close()

To inspect the data, we retrieve all records from the table using the following code:

# Run select * query
with conn.cursor() as cursor:
    cursor.execute('SELECT * FROM fb;')
    df = pd.DataFrame(cur.fetchall(), columns=...)

The first time we run both snippets above, we’ll obtain a dataframe (df) containing data from the latest API query. If we run it again without changing the API data, we should see no change to df.

Summary

In this post, we achieved the following:

  1. Developed code to retrieve data from the TD Ameritrade Option Chains API
  2. Developed code to process the data
  3. Developed code to load the data from Python into the PostgreSQL table
  4. Checked that the loading worked as intended

Putting everything together, we have a one-off script that enables us to extract data from an API, transform it, and load it into a Postgres database. This is similar in functionality to the solutions by Kleppen [2], BlackArbs [3], and Sauers [4], which have data persisted in a database or file as opposed to a Pandas dataframe, putting them one level above the other solutions. Still, this is only a snapshot. We need to run this code repeatedly at different times of the day, and a tool to manage these runs.

In the next post, we will set up Apache Airflow, a workflow orchestration tool commonly used for managing data pipelines. It will enable us to schedule ETL jobs for automated data collection.


Credits for image: Kevin Ku on Unsplash

References

  1. D. Littlefield, Windows Subsystem for Linux (2020), The Startup
  2. E. Kleppen, Collecting Stock and Options Data Easily using Python and Ally Financial API - 3 Example Queries (2020), TowardsDataScience
  3. Aggregating Free Options Data with Python (2016), Blackarbs
  4. H. Sauers, How I get options data for free (2019), freeCodeCamp