How it works - gabriel-milan/smtr_challenge GitHub Wiki
As mentioned previously, this project consists of a simple Dagster pipeline that will fetch data from an API and save it as a CSV file.
Before we dive into the implementation, it's important we go over a few concepts from Dagster. If you're already familiar with'em, please jump to the implementation section.
All implemented things have docstrings for better comprehension.
In this section, I'll go over a few core concepts of Dagster. I won't be too long about it and I do recommend the Dagster Docs, they're really clear and succinct.
From their own homepage:
Dagster is a data orchestrator for machine learning, analytics, and ETL
Using my own words, Dagster is a Python framework for writing, executing and debugging data pipelines based on DAGs (directed acyclic graphs). It's similar to other frameworks such as Airflow, Luigi and Prefect but, of course, has its own concepts. In the following subsections, I'll talk about them.
The definition of solids in the Dagster documentation is very clear: "Solids are the functional unit of work in Dagster. A solid's responsibility is to read its inputs, perform an action, and emit outputs". You can kind of think about it as a task, it really works for me.
Besides inputs, a solid can have a configuration, which is specified by a schema.
Think of a solid that will greet newcomers to this repository:
- Should it say "Hi
<name>
"? - Should it say "Welcome to this repository,
<name>
!"? - What if I wanted to use this solid for another repository? Should I rewrite it?
Configurations allow solids to be more flexible and reusable. If we keep discussing this example, we could think about this solid containing a configurable welcome_message
string and a name
input. This would allow it to be used in any environment, just changing the configuration and input, no messing with code.
Every solid, when executed, returns a SolidExecutionResult
. This object can tell whether the execution succeeded, the output data from the solid and much more.
As expected, solids can be connected to each other building a pipeline, which is just a set of solids with explicit dependencies on each other. This is where the DAG concept comes in: the pipeline is nothing but a DAG. Of course, it has awesome features implemented by Dagster, but you get the idea.
On this section I'll talk about the implementation of all solids, test cases and the pipeline itself.
- Configuration schema:
{
"api_endpoint": str,
"api_path": str,
}
- Inputs:
None
- Outputs:
dict or list
- Raised exceptions: when the HTTP response is not OK (error code 200)
The following steps are executed:
- Join
api_endpoint
andapi_path
into a single URL (example:https://api.github.com/
+/users/gabriel-milan
=https://api.github.com/users/gabriel-milan
- Makes GET request to the URL
- If response is OK (error code 200), parses JSON data into Python dictionary and returns it
- If response is not OK, logs status code and response message and raises exception
- Configuration schema:
{
"data_key": str,
}
- Inputs:
data: dict or list
- Outputs:
pd.DataFrame
- Raised exceptions: only for unexpected behavior
The following steps are executed:
- Gets
data_key
from configuration - If
data_key
is an empty string, assume thatdata
is a list of items - If
data_key
is not empty, extracts a list of items fromdata
at keydata_key
- Builds a Pandas DataFrame from the data (either from step 2 or 3) and returns it
- Configuration schema:
{
"output_filename": str,
}
- Inputs:
df: pd.DataFrame
- Outputs:
None
- Raised exceptions: only for unexpected behavior
The following steps are executed:
- Gets
output_filename
from configuration - Checks whether
output_filename
ends with.csv
. If it doesn't, add it - Exports Pandas DataFrame to
output_filename
CSV file
This is the only pipeline implemented, its functionality is described on the introduction page of this wiki.
Solids (connected to each other in this order):
fetch_json_data
generate_dataframe
save_dataframe_to_csv
Test cases were split into two files: test_solids.py
and test_pipelines.py
. Inside them, test names follow this logic: test_<module>
, where <module>
can be any solid or pipeline previously described. I'll now describe what each of them tests, one by one.
- Configuration:
{
"api_endpoint": "http://webapibrt.rio.rj.gov.br/api/v1",
"api_path": "/brt"
}
- Ensures solid executes
- Ensures output is a subclass of
SolidExecutionResult
- Ensures it's ran without exceptions
- Ensures output type is either a list or dictionary
- Configuration:
{
"data_key": "asd"
}
- Input
data
:
"asd": [
{"a": 123},
{"b": 456}
]
- Ensures solid executes
- Ensures output is a subclass of
SolidExecutionResult
- Ensures it's ran without exceptions
- Ensures output type is a Pandas Dataframe
- Ensures the dataframe matches
pd.DataFrame([{"a": 123}, {"b": 456}])
, which is expected
- Configuration:
{
"output_filename": "test.csv"
}
- Input
df
:pd.DataFrame([{"a": 1}, {"a": 2}])
- Ensures solid executes
- Ensures output is a subclass of
SolidExecutionResult
- Ensures it's ran without exceptions
- Ensures output is
None
- Loads saved file and ensures the content matches the input
- Configuration:
"fetch_json_data": {
"config": {
"api_endpoint": "http://webapibrt.rio.rj.gov.br/api/v1",
"api_path": "/brt"
}
},
"generate_dataframe": {
"config": {
"data_key": "veiculos"
}
},
"save_dataframe_to_csv": {
"config": {
"output_filename": "veiculos.csv"
}
}
- Ensures pipeline executes
- Ensures output is a subclass of
PipelineExecutionResult
- Ensures it's ran without exceptions
- Ensures types of each solids' output matches their own expected
- Ensures
fetch_json_data
output matches expected forgenerate_dataframe
- Ensures
generate_dataframe
output matches expected forsave_dataframe_to_csv
- Ensures saved CSV file contains the same number of rows as the
fetch_json_data
list