DMS Writing Parquet data to AWS s3 CDC - isgaur/AWS-BigData-Solutions GitHub Wiki

DMS is writing data in parquet file to AWS s3 whenever there is any activity ( INSERT/UPDATE/DELETE) in postgres RDS but what is happening while querying this data using Athena it is giving multiple records because that is how the DMS writes the data in parquet file for any given entity for example -

We will take a .csv example and When you use AWS DMS to replicate data changes, the first column of the .csv or .parquet output file indicates how the data was changed as shown for the following .csv file.

I,101,Smith,Bob,4-Jun-14,New York U,101,Smith,Bob,8-Oct-15,Los Angeles U,101,Smith,Bob,13-Mar-17,Dallas D,101,Smith,Bob,13-Mar-17,Dallas

For this example, suppose that there is an EMPLOYEE table in the source database. AWS DMS writes data to the .csv or .parquet file, in response to the following events:

A new employee (Bob Smith, employee ID 101) is hired on 4-Jun-14 at the New York office. In the .csv or .parquet file, the I in the first column indicates that a new row was INSERTed into the EMPLOYEE table at the source database.

On 8-Oct-15, Bob transfers to the Los Angeles office. In the .csv or .parquet file, the U indicates that the corresponding row in the EMPLOYEE table was UPDATEd to reflect Bob's new office location. The rest of the line reflects the row in the EMPLOYEE table as it appears after the UPDATE.

On 13-Mar,17, Bob transfers again to the Dallas office. In the .csv or .parquet file, the U indicates that this row was UPDATEd again. The rest of the line reflects the row in the EMPLOYEE table as it appears after the UPDATE.

After some time working in Dallas, Bob leaves the company. In the .csv or .parquet file, the D indicates that the row was DELETEd in the source table. The rest of the line reflects how the row in the EMPLOYEE table appeared before it was deleted.

Therefore every time if there is any change for Bob in postgres RDS , it will create a new entry in the parquet file with the required falg as a first column which is an expected behavior for the AWS DMS while writing the data to AWS s3. For this specific use case you would want to merge these parquet files in the target i.e. AWS s3 explicitly using one more layer of AWS Service ( such as AWS glue or AWS Data pipeline etc. ) before you can query the data using Athena. Currently neither DMS or Athena has the capability to merge these files at rest or while being written from AWS DMS to AWS s3 and it is not supported by these two services.

Using other services i.e. AWS glue/AWS Data Pipeline can be used to merge these parquet file beforehand. One of the simple example is mentioned here [1] which can be executed as AWS Glue spark job and can merge all the parquet files as per your use case.Here is one more public documentation that gives a fair idea on how to create PySpark job using AWS glue .[2]

Reference documentation:

[1] https://aws.amazon.com/premiumsupport/knowledge-center/emr-concatenate-parquet-files/ [2] https://www.synerzip.com/blog/a-practical-guide-to-aws-glue/