ETL Pipeline Process for Financial Data - supertypeai/sectors-kb GitHub Wiki

Diagram Flow


  • wsj_format mapping: -1: Null, 1: General, 2: [Reserved, currently unused], 3: Insurance, 4: Banking
  • source mapping: -1: Null, 1: YF, 2: WSJ

Steps

Check WSJ Formats

  1. Check wsj_format = -1 and current_source <= 1 & wsj_format in (3,4) by scraping the WSJ data source.
    • If there is any change in wsj_format, update the current_source in idx_company_profile table to 2.
    • If there is no change, proceed to the next step.

Check missing data

  1. Check for current_source = null in the YF API data source.

    • If data is found, check if latest date falls within retention range.
      • If within range, update the current_source in idx_company_profile table to 1.
      • If not, go to step 3.
  2. If data is not found in YF API, check it in the WSJ data source.

    • If data is found, check if latest date falls within retention range.
      • If within range, update the current_source in idx_company_profile table to 2.
      • If not, do not update the source. Keep it as it is.

Scrape and upsert data

  1. Scrape data from YF API for current_source = 1 and from WSJ data source for current_source = 2.

  2. Upsert data to the database.

YF Scraping Process

Steps

Initialization

  1. Initialize Supabase client with URL and secret key
  2. Get symbols from idx_active_company_profile with current_source = 1
  3. Check for null yf_currency in idx_company_profile and update if necessary.

Scraping

  1. For each symbol check the latest date on the database
    a. If there is new data in the source, scrape the data
    b. Else check the next symbol
  2. If scraping has new data, proceed to the cleaning. If no new data or the database already has the latest data, exit the process.

Cleaning

  1. Make adjustments to some columns based on the wsj_format. Details are available here.
  2. Cast column types for upsert
  3. If the currency (yf_currency) is not IDR, proceed to convert all columns that contain monetary values to IDR based on the conversion rate applicable on the date of the financial report.

Upserting

  1. Upsert the data on batch, on every batch if it wasn't successful, retry until the limit.
  2. If batch upsert is successful, then exit the process.

Scripts

scrape_data.py

Program to be used for GitHub Actions. Command-line options:

  • -tt, --target_table, help="Target table to update", required=True, type=str
  • -bs, --batch_size, help="Batch size", type=int, default=-1
  • -bn, --batch_number, help="Batch number", type=int, default=1

Example usage: python scrape_data.py -tt idx_financials_annual -bs -1 & python scrape_data.py -tt idx_financials_quarterly -bs -1

WSJ Scraping Process

Steps

Initialization

  1. Initialize Supabase client with URL and secret key
  2. Get symbols from idx_active_company_profile with current_source = 2

Scraping

  1. For each symbol check the latest date on the database
    a. If there is new data in the source, scrape the data
    b. Else check the next symbol
  2. If scraping has new data, save raw data to CSV and proceed to the cleaning. If no new data or the database already has the latest data, exit the process.

Cleaning

  1. Clean null values
    a. If succeed proceed
    b. Else exit the cleaning.
  2. Enrich columns and cast column types for upsert
    a. If succeed proceed
    b. Else exit the cleaning and save the partial clean data to CSV.

Upserting

  1. Upsert the data on batch, on every batch if it wasn't successful, retry until the limit. If it still fails then save the clean data to CSV.
  2. If batch upsert is successful, then exit the process.

Scripts

wsj_updater.py

Main program to run the updater with usage:

  • Optional:
  • -i, --infile -> The path to a CSV file containing a list of symbols to scrape. (For debugging)
  • -db, --save_to_db -> Specifies whether to save the cleaned file to db or not. Defaults to not saving to DB, CSV files are always saved.
  • -a, --append -> The path to a CSV file to append to. Used for resuming scraping. (For debugging)
  • --save_every_symbol -> Specifies whether to save CSV file to /temp every time data is scraped for a symbol. (For debugging)
  • Required:
  • -q, --quarter -> Specifies whether to scrape annual or quarterly financial data. Defaults to annual.

Use -h for help on usage.

wsj_cleaner.py

The cleaner program to clean the data after scraping. Can be used standalone without the scraper. Parsed arguments N/A. Take either a CSV file or a Supabase Client to retrieve the financial data. Saving and upserting data are done in this script.

scrape_financial_data.py

Program to be used for GitHub Actions. Usage: -q, --quarter or -a, --annual to scrape quarterly or annual respectively.

⚠️ **GitHub.com Fallback** ⚠️