Glue Best Practices and performance optimization - isgaur/AWS-BigData-Solutions GitHub Wiki
-
Glue provides an in-build transformation 'Relationalize' to flatten the nested json objects.
-
Relationalize return the collection of dynamicframes. It will return one root dynamicframe and separate dynamicframe for each array in the json object. It works more like a multiple relation table that can be joined using primary and foreign key. Please refer document at [1] for detail information about Relationalize transform.
Glue support EMRFS S3-optimized Committer for Glue ETL jobs. The EMRFS S3-optimized committer is an alternative OutputCommitter implementation that is optimized for writing Parquet files to Amazon S3 which was only available in EMR earlier. It helps to avoid issue that can occur with Amazon S3 eventual consistency during job and task commit phases, and helps improve job correctness under task failure conditions. To use the optimized S3 committer, you can supply Key/Value pair, “--enable-s3-parquet-optimized-committer, true” as special Job parameters.
Through Glue APIs via CLI or SDK:
If you are using Glue API to create Jobs, you can pass the Key/Value pair“--enable-s3-parquet-optimized-committer", "true” in the DefaultArguments to enable this feature while creating Jobs. This parameter can also be passed with StartJobRun API in Arguments.
Further you can consider using format="glueparquet" which designates a custom Parquet writer type that is optimized for Dynamic Frames as the data format.
If you want to partition your data based on certain column values, you might want to control the number of files in each of the partitions.
Sample Dataset
+----+-----+---+---------+
|Year|Month|Day| Value|
+----+-----+---+---------+
|2015| 4| 5| g|
|2015| 1| 1| fhwehfo|
|2016| 3| 6| wef|
|2017| 2| 3| fes|
|2017| 4| 4| e|
|2017| 2| 3| wf|
|2016| 4| 2| dgd|
|2015| 3| 4| ge|
|2016| 1| 1|dfjwefwef|
+----+-----+---+---------+
df=df.repartition(3)
+----+-----+---+---------+---+
|Year|Month|Day| Value|pid|
+----+-----+---+---------+---+
|2015| 4| 5| g| 0|
|2015| 1| 1| fhwehfo| 0|
|2016| 3| 6| wef| 0|
|2017| 2| 3| fes| 1|
|2017| 4| 4| e| 1|
|2017| 2| 3| wf| 1|
|2016| 4| 2| dgd| 2|
|2015| 3| 4| ge| 2|
|2016| 1| 1|dfjwefwef| 2|
+----+-----+---+---------+---+
As you can see above repartitioning through specifying a number randomly divides it into partitions.
datasink2 = glueContext.write_dynamic_frame.from_options(frame = mappedframeddf, connection_type = "s3", connection_options = {"path": "s3://", "partitionKeys":["year"]}, format = "glueparquet", transformation_ctx = "datasink2")
I noticed that 5 files were generated in my s3 directory.
So here is what happens,Spark independently writes the partitions in s3 directory.
So for partition 0 , it will partition based on Year (as specified in the write method) —> It will create two parquet files 1 for year 2016 and another for 2015
|2015| 4| 5| g| 0|
|2015| 1| 1| fhwehfo| 0|
|2016| 3| 6| wef| 0|
For Partiton 1 —> Will generate 1 parquet file under Folder 2017
|2017| 2| 3| fes| 1|
|2017| 4| 4| e| 1|
|2017| 2| 3| wf| 1|
For partiton 2 —> Will generate 2 files , 1 for year 2015 and 1 for 2016
|2016| 4| 2| dgd| 2|
|2015| 3| 4| ge| 2|
|2016| 1| 1|dfjwefwef| 2|
+----+-----+---+---------+---+
===================
If I do repartitioning by Year
df= df.repartition("Year")
+----+-----+---+---------+---+
|Year|Month|Day| Value|pid|
+----+-----+---+---------+---+
|2016| 1| 1|dfjwefwef| 8|
|2016| 4| 2| dgd| 8|
|2016| 3| 6| wef| 8|
|2017| 2| 3| wf| 26|
|2017| 4| 4| e| 26|
|2017| 2| 3| fes| 26|
|2015| 1| 1| fhwehfo|160|
|2015| 4| 5| g|160|
|2015| 3| 4| ge|160|
+----+-----+---+---------+---+
So when you write this data frame to your S3 , it would create 3 parquet files in total. As each partition had the same keys while writing the data you would create only 1 parquet file for each s3 partiton.
datasink2 = glueContext.write_dynamic_frame.from_options(frame = mappedframeddf, connection_type = "s3", connection_options = {"path": "s3://", "partitionKeys":["year"]}, format = "glueparquet", transformation_ctx = "datasink2")
So the trick here is to control the number of partitions before doing partitionby. You can use dataframe.coalesce(N) to reduce the number of partitions in a DataFrame, without shuffling, or df.repartition(N) to reorder and either increase or decrease the number of partitions with shuffling data across the network to achieve even load balancing. But based on your workload and your data you would need to estimate number of partitions.
In general, you should select columns for partitionKeys that are of lower cardinality and are most commonly used to filter or group query results.
Requirements for bookmarks to work===
In order to make sure bookmarks are working, The below requirement should be met :
-
When creating the job, books marks must be enabled in the Job Definition:
Advanced Properties > Job bookmark : Enable
-
Script: option "transformationContext"
Python Example:
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "catalog_api_db", table_name = "<your_table>", transformation_ctx = "datasource0")
- Script: Job init and commit must be called.
Job.init( ) // … contains all the logic including reading from glue catalog
Job.commit( ) // Saves the bookmark in AWS meta store and finish job.
Recommendations :
1. OOM exceptions commonly happen when an Apache Spark job reads a large number of small files from Amazon Simple Storage Service (Amazon S3). Resolve driver OOM exceptions with DynamicFrames using one or more of the following methods. use useS3ListImplementation - It is explained here. [1]
2. Add more DPU's - Please take a deeper look at the job execution metric which shows the required vs allocated max dpu(s) and accordingly increase the number of DPU's. This is explained well here[4] how to interpret the metrics.
3. If you are using Parquet format for the output datasets while writing , you can definitely use --enable-s3-parquet-optimized-committer —this Enables the EMRFS S3-optimized committer for writing Parquet data into Amazon S3. You can supply the parameter/value pair via the AWS Glue console when creating or updating an AWS Glue job. Setting the value to true enables the committer. By default the flag is turned off. The details are provided here [5]
4. Input Datasets Compression type - In case you are using parquet format for the input datasets , please ensure those are compressed . for example with parquet - snappy compression type goes really well.
5. Too many small files or some large files in input ? - The input datasets as a general practice should not contain too many small file like in Kb's or MB's or few large files like in GB's. The number of input files for the datasets must be good in size and fairly distributed across the partitions.
6. if the dataset (SOURCE) is partitioned - You can try using pushdown predicate as well within glue. this restricts the amount of data being read by spark in the first place. Explained here [6].
7. Another thumb rule of spark while reading the input datasets please read the data only which is required. If there are certain attributes(columns) in the sourced which are not required in the output datasets - please drop/filter them in the first place. or there might be certain attributes which are used as a look-up/Reference - once these are used you can drop them early rather than taking them until the end write them in the output. This will save overall execution time and the memory on the driver and the executors.
Reference documentation:
[1] https://aws.amazon.com/premiumsupport/knowledge-center/glue-oom-java-heap-space-error/ [2] https://docs.aws.amazon.com/glue/latest/dg/monitor-profile-debug-oom-abnormalities.html [3] https://aws.amazon.com/blogs/big-data/best-practices-to-scale-apache-spark-jobs-and-partition-data-with-aws-glue/ [4] https://docs.aws.amazon.com/glue/latest/dg/monitor-debug-capacity.html [5] https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html [6] https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-partitions.html#aws-glue-programming-etl-partitions-pushdowns