CAGED ETL Silver Pipeline - Observatorio-do-Trabalho-de-Pernambuco/documentation GitHub Wiki

ETL Pipeline Execution Manual – CAGED

This manual provides a step-by-step guide on how to execute the ETL (Extract, Transform, Load) pipeline for the CAGED dataset. The structure is divided into three main stages: (1) downloading files from the bronze layer of the Data Lake (S3), (2) local data cleaning and standardization, and (3) uploading the processed data to the silver layer. The scripts are located in the infraestrutura-de-dados repository, under the notebooks/ folder (commented versions) and the scripts/ folder (automation-ready versions).

The first step involves downloading raw files directly from the S3 bucket where data extracted from the Ministry of Labor's website is stored. This is done using the script 2025_03_18_la_s3_caged_download_bronze.py. The script connects to the observatorio-trabalho-pe-dev-de-bronze bucket, specifically to the ministerio_do_trabalho/UNZIP_FILES/CAGED/NOVO_CAGED/ directory, using AWS credentials provided through environment variables (AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY). Once connected, the script lists all .parquet files in that path and downloads each one while preserving the original folder structure, saving them locally to downloads/CAGED/. Preserving the hierarchy ensures proper traceability and avoids data duplication. No data processing is performed at this stage; it is solely about locally replicating the raw data stored in the bronze layer.

The second stage involves processing and standardizing the downloaded .parquet files. This is done using the script 2025_03_18_la_caged_parquet_processing.py, which recursively scans all subfolders of the input directory (e.g., C:/Users/.../Downloads/CAGED). Data reading and transformation are handled using the Polars library, which is highly efficient for this type of task. Unlike Pandas, Polars is implemented in Rust, supports parallel processing and lazy evaluation, and delivers superior performance for reading and writing large files—ideal when working with hundreds of files simultaneously.

During processing, the script performs several key transformations: first, it normalizes column names by removing accents and special characters, converting everything to ASCII. Next, it applies type conversion rules. Columns with numeric values in string format (such as salaries and contractual hours) are cleaned—commas are replaced with dots, whitespace is removed—and converted to floats. Some text columns, like secao, are explicitly cast to strings, while the remaining ones are cast to integers when possible, allowing for nulls and negative values. The processed data overwrites the original .parquet files using Snappy compression. This ensures that all files on the local machine are standardized and ready for analytical use or export.

The final step of the pipeline is to upload the processed data to the silver layer of the Data Lake. This is accomplished using the script 2025-03-24_la_upload_caged.py. The script recursively scans the directory where the processed .parquet files are stored, and for each file found, compresses it into .gz format using the gzip library. After compression, the .parquet.gz file is uploaded to the observatorio-trabalho-pe-silver-dev S3 bucket, maintaining the original relative folder structure. For example, a file located at CAGED/NOVO_CAGED/UF=PE/2025_01/data.parquet will be stored in S3 as ministerio_do_trabalho/caged/NOVO_CAGED/UF=PE/2025_01/data.parquet.gz. Compression helps save space and improves read performance over the network. After upload, the local .gz file is deleted to free up disk space.

With this, the pipeline is complete: from collecting the raw data stored in the bronze layer, through local standardization using Polars, to final structured and compressed storage in the silver layer of the Data Lake. The modular design of the scripts allows each step to be executed independently, making it easier to maintain, automate, and audit the data. It is recommended to use workflow orchestrators like Prefect, Airflow, or even simple cron jobs or GitHub Actions to automate execution.