Get the Parquet files into staging layer from landing zone - Ayush810/NYC-TAXI-Yellow GitHub Wiki
Get the Parquet files into staging layer from landing zone
Pipeline from landing to staging
create a new pipeline (pl_stg_processing_nyctaxi)
Goto -> Pipeline Activity -> add -> copy data Activity
Goto -> General -> Rename it to -> Copy to staging
then Go to Source tab of the activity -> and inside connections select your Lakehouse (Project Lakehouse)
and the Select Root option & Cheque Files and then goto file path and enter your file path and filename with its file extensions as shown below
I have selected only one file 2024-01 only rest of the files i want to upload with an automated process through via pipeline
Now Go to -> File format -> select your file type = Parquete
now i want to automate the uploading process Goto - file path and and instead of file name click below on **Add Dynamic content **
A new window will appear click on Variable and then click on (+) icon to add a variable
Add a variable v_date of type string
now go to functions inside string functions select concat and then do this-> @concat(yellow_tripdata_,variables('v_date'),'parquet')
(Here in this code i am concatenating the file name with year and month they have (V_date variable will store the date part))
Now set the destination for the activity which your project warehouse goto -> Table-> write your schema name (stg) and table name = nyctaxi_yellow
Now unselect the copy activity
Go to variables option in below provide v_date a default value 2024-01 (This will be starting date)
now validate the activity the (Save + run)
Now if you go back to to you warehouse you will see a table there
Now check the mamximum & minimum tpep_picup date time to verify weather the data is correct or not
SELECT MAX(tpep_pickup_datetime),min(tpep_dropoff_datetime) FROM stg.nyctaxi_yellow
in output it seems like it has data till year 2002 now we need to do some cleaning or filter this data
so i am adding an additional step to filter out these records i am adding an stored procedure which will remove all the records accept for the month we want to process
CREATE PROCEDURE stg.data_cleaning_stg
@end_date DATETIME2,
@start_date DATETIME2
AS
DELETE FROM stg.nyctaxi_yellow WHERE tpep_pickup_datetime <@start_date or tpep_pickup_datetime >@end_date;
Now go to your pipeline pl_stg_processing_nyctaxi add an stored procedure activity in there
now rename the activty in general option = SP Removing outlier dates
goto -> settings -> connections -> select project warehouse
goto stored procedure name option -> click on refresh -> select your stored procedure name in my case it was (stg.data_cleaning_stg)
now just below that click on Import ->
deactivate both the existing activities
add new Set Variable Activity in pipeline
goto -> settings -> add variable name = v_end_date Type = "string"
goto values -> click add dynamic content -> now goto -> pipeline expression builder and add this code to supply the value to the variable and adter concat add your previsously created v_date variable inside it.
@addToTime(concat(variables('v_date'),'-01'),1,Month)
now validate the activity and (Save + run ) and check the activity output
it should retun february 1, 2024 as date
goto -> general -> rename the activity as v_end_date
now go back to the stored procedure activity under import -> end_date -> goto -> values -> add dynamic content -> go to variables -> select V_end_date
Now goto Start_date -> values -> add dynamic content -> under pipeline expression builder -> write this activity code ->
@concat(variables('v_date'),'-01')
Now goto copy data activity -> Destnation -> Advanced -> Pre copy script (enter this code to delete the data in the staging table everytime this pipeline runs)
delete from stg.nyctaxi_yellow
Now i also want to modify the destination in table options -> select Use existing table > select table name below dropdown -> stg.nyctaxi_yellow and then validate the pipline and save + run
So now if you go back to your warehouse and reexecute the query
SELECT MAX(tpep_pickup_datetime),min(tpep_dropoff_datetime) FROM stg.nyctaxi_yellow
you will see the correct dates as min or max
now i wanted to create a metadata table where i can store information related to pipelines and rows processed by them and i also want to store the latest tpep_pickup_datetime and rows proceesing time
Here from this query you can find out the sample structure of the table
SELECT 'placeholder' as pipeline_run_id,
'staging_nyctaxi_yellow' as table_processed,
count(*) as rows_processed_date,
MAX(tpep_pickup_datetime) as latest_processed_pickup,
current_timestamp as processed_datetime FROM stg.nyctaxi_yellow
Now to create this table i will create a separate schema
CREATE SCHEMA metadata;
and then i will create the table
CREATE TABLE metadata.processing_log (
pipeline_run_id VARCHAR(255),
table_processed VARCHAR(255),
rows_processed int,
latest_processed_pickup DATETIME2(6),
processed_datetime datetime2(6)
);
Now i will crate a stored procedure to insert data into metadata.processing_log table
Create PROCEDURE metadata.insert_staging_metadata
@pipeline_run_id VARCHAR(255),
@table_name varchar(255),
@processed_date DATETIME2
AS
INSERT INTO metadata.processing_log (pipeline_run_id,table_processed,rows_processed,latest_processed_pickup, processed_datetime)
SELECT
@pipeline_run_id as pipeline_id,
@table_name as table_processed,
count(*) as rows_processed,
MAX(tpep_pickup_datetime) as latest_processed_pickup,
@processed_date as processed_datetime FROM stg.nyctaxi_yellow
Now go back to the pipeline pl_stg_processing_nyc_taxi
And add a stored procedure activity
rename it -> SP Loading Staging Metadata settings -> connections -> select project warehouse under stored procedure name - click refresh -> and select stored procedure name metadata.insert_staging_metadata now Click on -> Import ->
For the pipeline_run_id -> add dynamic content -> system variable -> select pipeline run id For the processed_date -> add dynamic content -> system variable -> pipeline trigger time For the table_name - -> just type -> staging_nyctaxi_yellow
Now validate the activity and run it
Now go to Copy data activity and inside variable tab you will find the v_date Default value is hard coded now we want to update the value dynmically
now Add an Script Activity in pipeline
Under General -> rename it to = Latest processed Data and on the settings for the connection select you warehouse (Project warehouse in my case ) go to -> query -> edit icon -> write the query to pickup the lates date
select top 1
latest_processed_pickup
from metadata.processing_log where table_processed = 'staging_nyctaxi_yellow'
order by latest_processed_pickup desc;
Now validate and run the activity
now if you will check the output of that pipeline you will get the date as an output
now i want store this date value in a variable
so ADD an **set variable Activity ** in pipeline
Rename it -> v_date goto -> settings -> Name -> select v_date -> in value -> add dynamic content -> go to activity output - > Latest processed data -> now got to pipeline expression builder and write the below code
@formatDateTime(addToTime(activity('Latest Processed Data').output.resultSets[0].rows[0].Latest_processed_pickup,1,'Month'),'yyyy-MM')
Now your pipeline is ready click on home and save your pipeline do not run it for now otherwise it will add the data for febrary too.