Exposicion de Resultados - osozzz/BachArchitecture-TT GitHub Wiki
Esta etapa detalla cómo exponer los resultados procesados del pipeline para su consumo vía:
- 🔎 Amazon Athena (consultas SQL serverless)
- 🔗 API REST (FastAPI + Docker en EC2)
Desde consola o CLI:
aws glue create-database --database-input '{"Name":"bacharchitecture_db"}'
📸 Captura Relevante: Página de Glue > Databases con bacharchitecture_db
.
Puedes hacerlo desde consola Athena o usando el script create_athena_tables.py
.
CREATE EXTERNAL TABLE IF NOT EXISTS bacharchitecture_db.city_summary (
city STRING,
avg_temp_max DOUBLE,
avg_temp_min DOUBLE,
avg_precipitation DOUBLE,
avg_windspeed DOUBLE
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES ('field.delim' = ',')
LOCATION 's3://bacharchitecture/refined/summary/'
TBLPROPERTIES ('skip.header.line.count'='1');
CREATE EXTERNAL TABLE IF NOT EXISTS bacharchitecture_db.city_clusters (
city STRING,
temperature_max DOUBLE,
temperature_min DOUBLE,
precipitation DOUBLE,
windspeed DOUBLE,
features STRUCT<type:STRING, values:ARRAY<DOUBLE>>,
prediction INT
)
STORED AS PARQUET
LOCATION 's3://bacharchitecture/refined/clusters/';
📸 Captura Relevante: Vista de las tablas en consola Athena.
SELECT * FROM bacharchitecture_db.city_summary;
SELECT city, prediction, COUNT(*) FROM bacharchitecture_db.city_clusters GROUP BY city, prediction;
📸 Captura Relevante: Resultado de una consulta en consola Athena.
api/
├── main.py
├── services/
│ └── athena_client.py
├── requirements.txt
└── Dockerfile
from fastapi import FastAPI
from services.athena_client import query_athena_summary, query_clusters
app = FastAPI(title="BachArchitecture API")
@app.get("/summary")
def get_summary():
return query_athena_summary()
@app.get("/clusters")
def get_clusters():
return query_clusters()
import boto3
import pandas as pd
import time
athena = boto3.client("athena", region_name="us-east-1")
S3_OUTPUT = "s3://bacharchitecture/athena-results/"
def run_query(query):
execution = athena.start_query_execution(
QueryString=query,
ResultConfiguration={"OutputLocation": S3_OUTPUT}
)
qid = execution["QueryExecutionId"]
while True:
result = athena.get_query_execution(QueryExecutionId=qid)
if result["QueryExecution"]["Status"]["State"] in ["SUCCEEDED", "FAILED"]:
break
time.sleep(1)
output = f"{S3_OUTPUT}{qid}.csv"
return pd.read_csv(output).to_dict(orient="records")
FROM python:3.10-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
docker build -t bachapi .
docker run -d -p 8000:8000 --name bachapi_container bachapi
Navega a:
http://<IP_PUBLICA_EC2>:8000/docs
Verás la documentación Swagger con endpoints /summary
y /clusters
.
📸 Capturas Relevante:
- Datos refinados accesibles vía SQL en Athena
- API REST con documentación Swagger
- Preparado para conectar con dashboards, aplicaciones web o móviles