Components description - SaraCmque/TET-Project3-EMR GitHub Wiki

1. Ingest Layer Explanation

This document outlines the ingestion process of our climate and traffic data pipeline. It describes the configuration steps, AWS components used, and the flow from data ingestion to triggering downstream processing.

1.1. AWS Services Used

  • AWS EventBridge
  • AWS Lambda
  • Amazon S3
  • Amazon RDS (MySQL)
  • Amazon DynamoDB

1.2. EventBridge Cron Trigger

To automate the ingestion process, we set up an Amazon EventBridge rule that runs a Lambda function at a scheduled time.

1.2.1. JSON Configuration

{
    "Name": "test-rule",
    "Arn": "arn:aws:events:us-east-1:307291231597:rule/test-rule",
    "ScheduleExpression": "cron(48 18 * * ? *)",
    "State": "ENABLED",
    "EventBusName": "default",
    "CreatedBy": "307291231597"
}

Explanation:

  • "ScheduleExpression": Runs every day at 18:48 UTC.
  • "State": Rule is enabled.
  • "EventBusName": Uses the default EventBridge bus.

1.2.2. Target Lambda

{
    "Targets": [
        {
            "Id": "6d055w00q28im0a1iz",
            "Arn": "arn:aws:lambda:us-east-1:307291231597:function:function-example"
        }
    ]
}

1.3. Lambda for Data Ingestion

The Lambda function defined above ingests data from:

  • Open-Meteo API (weather)
  • Relational database (traffic data)

It uploads the data to the S3 bucket under the raw/ folder.

📎 Lambda Code: GitHub - lambda_function.py

1.4. RDS - Relational Data

We use Amazon RDS (MySQL) to store traffic data before ingestion. 365 days of historical data were loaded.

image

1.4.1. Configuration Summary

{
    "DBInstanceIdentifier": "climate-db",
    "Engine": "mysql",
    "DBInstanceClass": "db.t4g.micro",
    "AllocatedStorage": 20,
    "PubliclyAccessible": true,
    "Endpoint": {
        "Address": "climate-db.c2hyrnbdeokg.us-east-1.rds.amazonaws.com",
        "Port": 3306
    },
    "MultiAZ": false,
    "EngineVersion": "8.0.41"
}

1.5. Amazon S3 - Storage Layer

Our centralized S3 bucket (eafit-project-3-bucket) holds project data in these folders:

  • raw/
  • trusted/
  • refined/
  • scripts/

1.5.1. Bucket Policies

{
    "Statement": [
        {
            "Sid": "AllowAllLambdaPutObject",
            "Effect": "Allow",
            "Action": "s3:PutObject",
            "Resource": "arn:aws:s3:::eafit-project-3-bucket/*"
        },
        {
            "Sid": "AllowDeleteObjects",
            "Effect": "Allow",
            "Action": ["s3:DeleteObject", "s3:AbortMultipartUpload"],
            "Resource": "arn:aws:s3:::eafit-project-3-bucket/*"
        },
        {
            "Sid": "AllowPublicRead",
            "Effect": "Allow",
            "Action": "s3:GetObject",
            "Resource": "arn:aws:s3:::eafit-project-3-bucket/*"
        },
        {
            "Sid": "AllowListBucket",
            "Effect": "Allow",
            "Action": "s3:ListBucket",
            "Resource": "arn:aws:s3:::eafit-project-3-bucket"
        }
    ]
}

1.5.2. Server-Side Encryption

{
    "ServerSideEncryptionConfiguration": {
        "Rules": [
            {
                "ApplyServerSideEncryptionByDefault": {
                    "SSEAlgorithm": "AES256"
                },
                "BucketKeyEnabled": true
            }
        ]
    }
}

1.6. S3 Trigger for EMR Processing

An event notification is configured on the S3 bucket to trigger a processing Lambda when a new file is added under the raw/ folder.

{
    "LambdaFunctionConfigurations": [
        {
            "Id": "f6fba20a-2e7f-4798-9cce-15675689a0ea",
            "LambdaFunctionArn": "arn:aws:lambda:us-east-1:307291231597:function:lambda-project3",
            "Events": ["s3:ObjectCreated:*"],
            "Filter": {
                "Key": {
                    "FilterRules": [
                        {
                            "Name": "Prefix",
                            "Value": "raw/"
                        }
                    ]
                }
            }
        }
    ]
}

1.7. S3 Lifecycle Policy

To ensure daily automatic reprocessing, a lifecycle rule deletes files from raw/, trusted/, and refined/ folders after 1 day.

{
    "Rules": [
        {
            "ID": "DeleteRawObjects",
            "Expiration": { "Days": 1 },
            "Filter": { "Prefix": "raw/" },
            "Status": "Enabled"
        },
        {
            "ID": "DeleteTrustedObjects",
            "Expiration": { "Days": 1 },
            "Filter": { "Prefix": "trusted/" },
            "Status": "Enabled"
        },
        {
            "ID": "DeleteRefinedObjects",
            "Expiration": { "Days": 1 },
            "Filter": { "Prefix": "refined/" },
            "Status": "Enabled"
        }
    ]
}

1.8. DynamoDB Cluster Lock

To avoid multiple EMR clusters being created from multiple S3 uploads, we use a DynamoDB lock.

1.8.1. Table Configuration

{
    "TableName": "EmrClusterLock",
    "KeySchema": [
        { "AttributeName": "LockId", "KeyType": "HASH" }
    ],
    "AttributeDefinitions": [
        { "AttributeName": "LockId", "AttributeType": "S" }
    ],
    "TimeToLiveDescription": {
        "AttributeName": "ExpiresAt",
        "TimeToLiveStatus": "ENABLED"
    }
}

Logic:
If a cluster is being created, an item is inserted into the table with a TTL of 2 minutes. The next trigger will skip execution until the lock expires.

1.9. Summary of Flow

  1. EventBridge triggers a Lambda daily.
  2. Lambda fetches API + RDS data.
  3. Uploads to S3 under raw/.
  4. S3 triggers the processing Lambda.
  5. Lambda checks DynamoDB lock.
  6. If free, creates EMR cluster and processes data.
  7. S3 auto-deletes old files with lifecycle rules.

1.10. Result Files S3 Bucket

image image

2. Processing Layer Explanation

The Data Processing Layer of our project begins by ingesting raw data from the raw/ folder in the S3 bucket. This data is then cleaned, normalized, and transformed into a reliable, structured DataFrame, which is stored in the trusted/ folder. Once the data is trusted, an exploratory data analysis (EDA) is performed to better understand the variables and patterns within the dataset. The results of this analysis are saved in the refined/ folder, making them readily available for visualization and reporting.

In addition, this layer performs advanced analytics by developing predictive models using regression techniques to estimate the target variable—the number of accidents per day. These models are then applied to additional datasets generated within the pipeline to support forecasting.

2.1. Components Used:

  • Amazon S3: Serves as the central storage layer for the entire data pipeline. It holds:

    • Raw and processed datasets used for exploration and model training.

    • Input data used for model inference.

    • Output data including model predictions, results, and evaluation metrics.

  • Amazon Lambda: Used to automate and orchestrate key parts of the pipeline. Lambda functions:

    • Trigger and execute the necessary steps for running the predictive model.

    • Handle bootstrap configurations required for initializing the data analysis workflow.

2.2. Steps Description:

2.2.1. Lambda and Trigger Configuration:

To automate the data processing pipeline, we created an AWS Lambda function named lambda-project3, which is triggered whenever a new file is uploaded to the raw/ folder of the eafit-project-3-bucket S3 bucket. The trigger is set up to detect S3 PUT events and initiates the EMR cluster execution only if another job is not already in progress. This is managed using a DynamoDB table (EmrClusterLock) that acts as a simple locking mechanism to prevent multiple concurrent EMR jobs. Once a valid file is detected and the lock is acquired, the Lambda function launches an EMR cluster with predefined steps for data transformation, analysis, and modeling.

image image

2.2.2. ETL Processing:

To prepare the data stored in the raw/ folder for analysis, we implement an ETL process using PySpark. This process begins by extracting the climate and the traffic incident data. The climate data is nested, so we normalize it by exploding the relevant arrays to obtain a flat, structured format containing the daily maximum temperature and precipitation. In parallel, we clean and cast fields in the traffic dataset, such as converting the incident date to a standard date format. Once both datasets are processed, we join them on the date column and encode categorical fields like congestion level into numeric values. The resulting unified and cleaned DataFrame is then saved in Parquet format to the trusted/ folder in S3, forming a reliable foundation for subsequent analysis and modeling.

To execute this transformation, we created a PySpark script called transform_to_trusted.py, stored in the scripts/ folder of our S3 bucket. This script is executed as a step in the EMR cluster from a Lambda function, using the following configuration:

Steps = [
    {
        'Name': 'TransformRawToTrusted',
        'ActionOnFailure': 'TERMINATE_CLUSTER',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': [
                'spark-submit',
                '--deploy-mode', 'cluster',
                's3://eafit-project-3-bucket/scripts/transform_to_trusted.py'
            ]
        }
    },
]

This ensures the ETL logic is executed automatically as part of the cluster lifecycle, enabling a reproducible and automated pipeline from raw data to trusted data.

2.2.3. Analysis Requirements Configuration:

To execute certain parts of the analysis, we need specific Python libraries to be available on the EMR cluster nodes. To ensure these dependencies are installed automatically, we store a bootstrap script called install.sh in the scripts/ folder of our S3 bucket. This script is executed during cluster startup to prepare the environment.

#!/bin/bash
set -ex

sudo yum install -y python3 python3-pip

sudo pip3 install --no-cache-dir pandas seaborn matplotlib boto3 numpy

echo "Paquetes instalados correctamente"

Then, in the Lambda function that provisions the EMR cluster, we attach this script as a bootstrap action, so it's executed automatically during cluster startup:

BootstrapActions=[
    {
        'Name': 'InstallVisualizationPackages',
        'ScriptBootstrapAction': {
            'Path': 's3://eafit-project-3-bucket/scripts/install.sh',
            'Args': []
        }
    }
],

2.2.4. Exploratory Data Analysis:

The analysis step, implemented in the analysis.py script stored in the scripts/ folder of the S3 bucket, performs descriptive analytics and visual exploration on the trusted dataset. It begins by reading the joined Parquet data from trusted/joined_data/ using Spark, computes global statistics (min, max, mean, standard deviation) for reported incidents, temperature, and precipitation, and generates segmented statistics by congestion level. It also calculates Pearson correlations between incidents and climate/congestion variables, saving all results in Parquet format to refined/descriptive_stats/ for querying via Athena. To complement these insights visually, the script converts the Spark DataFrame to Pandas and generates plots (bar charts and heatmaps) using Seaborn and Matplotlib, storing them as PNGs in refined/visualizations/. Metadata for each visualization—covering dimensions, variables, parameters, and S3 paths—is registered in refined/visualizations_metadata/ as Parquet files. To operationalize this step, it was added to the EMR pipeline through a new Lambda step configuration

{
    'Name': 'DataAnalysis',
    'ActionOnFailure': 'TERMINATE_CLUSTER',
    'HadoopJarStep': {
        'Jar': 'command-runner.jar',
        'Args': [
            'spark-submit',
            '--deploy-mode', 'cluster',
            's3://eafit-project-3-bucket/scripts/analysis.py'
        ]
    }
}
image

2.2.5. Regression Model:

In the final step of the workflow, we used SparkML Pipelines to train and evaluate machine learning models. This step was defined in a script named model.py, located in the scripts/ folder of the S3 bucket. The step to execute it was added to the cluster creation flow via the AWS Lambda function.

{
    'Name': 'RegressionModels',
    'ActionOnFailure': 'TERMINATE_CLUSTER',
    'HadoopJarStep': {
        'Jar': 'command-runner.jar',
        'Args': [
            'spark-submit',
            '--deploy-mode', 'cluster',
            's3://eafit-project-3-bucket/scripts/model.py'
        ]
    }
}

The job loads the trusted dataset from S3, where the target variable (reported_incidents) is renamed to label. A feature vector is assembled using weather and traffic attributes (max_temp, precipitation, congestion_level_num).

Two regression models—Random Forest and Gradient Boosted Trees—are integrated into separate pipelines. Each pipeline includes a VectorAssembler followed by the respective regression estimator. Hyperparameter tuning is performed using CrossValidator with 5-fold cross-validation and metric-based evaluation using RMSE.

After splitting the data into training and test sets, both models are trained and evaluated, their results are stored in the bucket. To validate the models, a synthetic dataset representing 60 days of conditions in Medellín is generated and stored. Both pipelines are used to generate predictions on this dataset, and the results are saved in the bucket too.

2.3. Expected Results

The Processing Layer generates key outputs stored in the S3 bucket and available for download in the project repository:

  • Trusted Data: Cleaned and transformed dataset combining climate and traffic data, stored in the trusted/ folder.
  • Exploratory Data Analysis (EDA): Descriptive statistics, visualizations, and metadata saved under refined/.
  • Synthetic Test Data: Simulated 60-day dataset for Medellín used for model validation.
  • Model Evaluation: Performance metrics and validation reports for Random Forest and Gradient Boosted Trees models.
  • Model Predictions: Forecasted accident counts generated by both regression models.

All results are downloadable at: Processing Results

3. Data Visualization Layer Explanation

The Data Visualization Layer of our project provides access to analytical results generated from combined datasets (API data and relational database) using Spark (SparkSQL and SparkML). These results are stored in Amazon S3 (Refined Zone) in Parquet format and can be queried using Amazon Athena and API Gateway.

3.1. Components Used:

  • Amazon Athena: Executes SQL queries directly on data stored in S3.
  • AWS Lambda: Serves as an API handler to trigger Athena queries.
  • Amazon API Gateway: Provides REST endpoints to access Lambda.
  • Amazon S3: Stores the refined datasets used by Athena.

3.2. Components Configuration:

3.2.1. Amazon Athena Configuration

In Athena:

  1. Access the Athena query editor.
  2. Execute CREATE EXTERNAL TABLE statements to define external tables based on Parquet files stored in S3.
  3. Specify the target database (proyecto3_db), column definitions, file format (PARQUET), and the S3 location using the LOCATION clause.
  4. Use the table property 'parquet.compression'='SNAPPY' to define the compression format.
  5. Repeat the process for each table, such as:
    • estadisticas_congestion
    • estadisticas_globales
    • correlaciones
    • model_evaluation
    • predictions_gbt
    • predictions_rf
    • test_data
    • visualizaciones

Here is an Query example (creating table estadisticas_congestion:

CREATE EXTERNAL TABLE proyecto3_db.estadisticas_congestion (
  congestion_level_num INT,
  days_count BIGINT,
  avg_incidents DOUBLE,
  avg_temp DOUBLE,
  avg_precip DOUBLE
)
STORED AS PARQUET
LOCATION 's3://eafit-project-3-bucket/refined/descriptive_stats/congestion_stats/'
TBLPROPERTIES (
  'parquet.compression'='SNAPPY'
);

Note:
This table references a Parquet file generated from a Spark SQL query that grouped incident data by congestion level.
It includes the number of days, average incidents, temperature, and precipitation for each level.
The resulting dataset was saved in the S3 path used in the CREATE EXTERNAL TABLE statement.

3.2.2. AWS Lambda Configuration

  1. Create a Lambda function named, for example, project3-api-handler.
  2. Import the required libraries: boto3, json, and time.
  3. Instantiate the Athena client using boto3.client('athena').
  4. Define a dictionary that maps URL paths (e.g., /congestion-stats) to SQL queries (SELECT * FROM ...).
  5. Extract the request path from the incoming event object (event.get('path', '')) and validate it against the dictionary.
  6. If the path is valid, execute the corresponding Athena query using start_query_execution with:
    • Database: proyecto3_db
    • Result location: an S3 path (s3://.../athena-query-results/)
  7. Poll the query execution status (up to 30 seconds) until it reaches a terminal state (SUCCEEDED, FAILED, or CANCELLED).
  8. If the query is successful, retrieve the results using get_query_results.
  9. Parse the column names and rows, then return a JSON response containing the data and execution ID.
  10. Set response headers including Content-Type: application/json and Access-Control-Allow-Origin: *.

Lambda function source code: Download lambda_function.py

3.2.3. API Gateway Configuration

  1. Create a new REST API named, for example, project3-api-handler.
  2. Create a deployment stage named production.
  3. Add a Lambda authorizer and link it to the project3-api-handler function.
  4. Create the required resources (endpoints) matching the Lambda paths (e.g., /congestion-stats, /correlations, etc.).
  5. For each resource, add a GET method and integrate it with the project3-api-handler Lambda function.
  6. Enable CORS (Cross-Origin Resource Sharing) for each method to allow requests from different origins

3.3. How It Works

  1. Data Preparation (outside the scope of this layer):

    • Spark jobs perform descriptive and advanced analytics using DataFrame pipelines.
    • Results are exported in Parquet format to s3://eafit-project-3-bucket/refined/....
  2. Table Registration in Athena:

  3. Lambda Function (project3-api-handler):

    image

    • Receives HTTP requests triggered by API Gateway.
    • Based on the route (/congestion-stats, /correlations, etc.), it:
      • Maps the path to a specific Athena SQL query.
      • Executes the query via boto3.athena.start_query_execution.
      • Waits for completion.
      • Retrieves and formats the results as JSON.
  4. API Gateway Integration:

    • Configured as a trigger for the Lambda function.
    • Each path corresponds to an endpoint (e.g., GET /correlations).
    • Response from Lambda is returned to the client.

3.4. How to Run It

3.4.1. Testing an Endpoint (Example: /congestion-stats)

  1. Send a Request (e.g., using curl or Postman):

    curl https://<api-id>.execute-api.<region>.amazonaws.com/production/congestion-stats

    Replace <api-id> and <region> with your actual API Gateway ID and AWS region.

  2. Expected JSON response:

    {"data": [{"congestion_level_num": "1", "days_count": "20", "avg_incidents": "6.3", "avg_temp": "25.73", "avg_precip": "2.32"}, {"congestion_level_num": "2", "days_count": "137", "avg_incidents": "12.357664233576642", "avg_temp": "25.13576642335768", "avg_precip": "5.966423357664232"}, {"congestion_level_num": "3", "days_count": "208", "avg_incidents": "17.47596153846154", "avg_temp": "25.117307692307705", "avg_precip": "7.20528846153846"}], "query_execution_id": "b200c4e5-9ef0-46b0-ac9b-36ea541c0639"}
⚠️ **GitHub.com Fallback** ⚠️