Features - Jvelasquez980/MapReduce-Distributed-Processing GitHub Wiki
Features
- Script to download the data (data/scripts/getData.py) Will download the data from the api and can be visualize in data/data/cleaned_gdp_data.csv
python data/scripts/getData.py
- Script to upload the data to hdfs from a S3 bucket (Run it in the cluster after you make the s3 bucket and you already upload the data) data\scripts\cargar_hdfs.txt
#!/bin/bash
# cargar_hdfs.sh
# --- CONFIGURACIÓN ---
BUCKET="proyectotelematica"
OBJECT_KEY="input/cleaned_gdp_data.csv"
LOCAL_FILE="cleaned_gdp_data.csv"
HDFS_DIR="/user/hadoop/entrada"
# 1. Descargar archivo desde S3
aws s3 cp s3://$BUCKET/$OBJECT_KEY $LOCAL_FILE
# 2. Crear carpeta en HDFS si no existe
hdfs dfs -mkdir -p $HDFS_DIR
# 3. Subir archivo a HDFS
hdfs dfs -put -f $LOCAL_FILE $HDFS_DIR
# 4. Verificar subida
hdfs dfs -ls $HDFS_DIR
# 5. Instalar mrjob si es necesario
python3 -m ensurepip --upgrade
pip3 install --user mrjob
- Results from the Mapreduce in csv, mapreduce/results/output.csv
- Mapreduce job MRStatsGDPByDepartment mapreduce\scripts\total_gdp_by_department.py (MRJob)
def mapper(self, _, line):
if line.startswith("year"):
return
try:
year, activity, sector, tipo_precio, codigo, department, value = next(csv.reader([line]))
value = float(value)
if tipo_precio.strip().lower() != "pib a precios constantes de 2015": #We take only GDP at constant prices
return
if "bogot" in department.lower(): # We exclude Bogotá
return
key = (year, department)
yield key, (value, activity)
except Exception:
pass
The mapper will take all the columns from the data, as the yield key we take the year where the data has been take and the name of the deparment, also just the value (The pib from the activity), as a sample we will use this
a_o,actividad,sector,tipo_de_precios,c_digo_departamento_divipola,departamento,valor_miles_de_millones_de
2020,Construcción,Industria,Constantes,05,Antioquia,123.45
The result of will be like this
[2020, Antioquia]
But this doesnt make sense without a the reducer
def reducer(self, key, values):
total = 0
count = 0
max_value = float('-inf')
max_activity = ""
actividad_gdp = defaultdict(float)
for value, activity in values:
total += value
count += 1
actividad_gdp[activity] += value
if value > max_value:
max_value = value
max_activity = activity
promedio = total / count if count else 0
yield key, {
"PIB total": round(total, 2),
"Promedio de PIB": round(promedio, 2),
"Actividad con maximo PIB": {
"actividad": max_activity,
"valor": round(max_value, 2)
},
"Datos totales": count,
"PIB de las actividades": {act: round(gdp, 2) for act, gdp in actividad_gdp.items()}
}
The value yield will have "PIB total" the result of add each index from that department and year, the "Promedio de PIB" the average of the GDP that every activity had, "Actividad con maximo PIB" a dictionary with the highest GDP and the name, "Datos totales" a counter with the amount of data that we had that year with this department, and the last one, the "PIB de las actividades" a dictionary with the GPD and activity, if we use the example as a unique index we will get
[2020, Antioquia],{"PIB total":123.45,"Promedio de PIB": 123.45,"Actividad con maximo PIB": {"actividad": "Construcción,Industria","valor": 123.45},"Datos totales": 1,"PIB de las actividades": {"Construcción,Industria":123.45}}
Can be runned with
python .\mapreduce\scripts\total_gdp_by_department.py .\data\data\cleaned_gdp_data.csv > .\mapreduce\results\output.csv
(You must run getData.py before)
- Script to get and run the mapreducer from the s3 bucket, and save the results in the same s3 bucket mapreduce\scripts\correr_map_reduce.txt
#!/bin/bash
# --- CONFIGURACIÓN ---
S3_BUCKET="proyectotelematica"
SCRIPT_KEY="scripts/total_gdp_by_department.py"
SCRIPT_PY="total_gdp_by_department.py"
INPUT_HDFS_PATH="hdfs:///user/hadoop/entrada/cleaned_gdp_data.csv"
OUTPUT_HDFS_DIR="hdfs:///user/hadoop/salida"
LOCAL_OUTPUT_FILE="resultados.csv"
S3_DEST_PATH="output/resultados.csv"
# Descargar el script desde S3
if ! aws s3 cp s3://$S3_BUCKET/$SCRIPT_KEY $SCRIPT_PY; then
exit 1
fi
# Eliminar salida anterior en HDFS
hdfs dfs -rm -r -f $OUTPUT_HDFS_DIR
# Ejecutar MapReduce
python3 $SCRIPT_PY -r hadoop $INPUT_HDFS_PATH --output-dir $OUTPUT_HDFS_DIR
# Unificar resultados
hdfs dfs -getmerge $OUTPUT_HDFS_DIR $LOCAL_OUTPUT_FILE
# Subir resultado final a S3
aws s3 cp $LOCAL_OUTPUT_FILE s3://$S3_BUCKET/$S3_DEST_PATH
- Flask Api to show the results from the s3 bucket mapreduce\API\app.py
- Web aplication to show the results better than a json response web_visualizer\visual_app.py
- README.md with all the information about the project and how to run it locally and in a aws instance