EMR 004 EMR and Big Data Tips - qyjohn/AWS_Tutorials GitHub Wiki
(1) Spark writing to DynamoDB
- Create a CSV file (test.csv) on HDFS for testing
id,name,value
1,John,100
2,Ellie,99
3,Kaliman,88
4,Walton,98
-
In DynamoDB, create a table with partition key (id, N) and range key (name, S).
-
In Hive, create a table
CREATE EXTERNAL TABLE hive_test (id BIGINT, name STRING, value BIGINT)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES(
"dynamodb.table.name" = "hive_test",
"dynamodb.column.mapping" = "id:id,name:name,value:value");
- Start pyspark
$ pyspark --jars /usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar,/usr/share/aws/emr/ddb/lib/emr-ddb-hive.jar
- Insert data
df = spark.read.format("csv").option("header", "true").load("/test.csv")
df.write.mode("append").insertInto("hive_test")
- Look into the DynamODB table to confirm data is now there.
(2) Relational Database to Parquet
The following pyspark code converts CSV on HDFS to Parquet on S3/HDFS:
#
# convert.py
#
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('CSV2Parquet').getOrCreate()
df = spark.read.format("csv").load("/test.csv")
df.write.parquet("s3://331982-syd/test-0009")
The following bash script exports a table in PostgreSQL to CSV on local disk, copies it to HDFS, then call the above-mentioned pyspark code to convert it into Parquet.
#
# import.sh
#
psql -h rds-aurora-postgresql-dns-endpoint -U username dbname -F, --no-align -c "SELECT * FROM table_name" > test.csv
hdfs dfs -copyFromLocal test.csv /test.csv
spark-submit convert.py
(3) AVRO with nested JSON
Below is the content of my data file data01.json
{"id":12345,"name":"Johnny","address":{"city":"Los Angeles", "state":"CA"}}
Below is the content of my AVRO schema test.avsc
{
"type": "record",
"name": "test01",
"namespace": "default",
"fields": [{
"name": "id",
"type": "int"
}, {
"name": "name",
"type": "string"
}, {
"name": "address",
"type": {
"type": "record",
"name": "AddressRecord",
"fields": [{
"name": "city",
"type": "string"
},
{
"name": "state",
"type": "string"
}
]
}
}]
}
Use the following command to create an AVRO data file
java -jar ~/avro-tools-1.8.2.jar fromjson --schema-file test.avsc data01.json > test.avro
Upload my test.avro to my S3 bucket:
aws s3 cp test.avro s3://my-bucket/avro/test.avro
In AWS Glue, create a Crawler to crawler s3://my-bucket/avro/. After running the crawler, you will get a table call "avro" with the following schema:
1 id int
2 name string
3 address struct
In AWS Athena, you can query fields in the nested JSON, as below:
select address.city from avro;
(4) Forget Hue Superuser Password
- SSH into the master node
cd /usr/lib/hue/build/env/bin
sudo ./hue createsuperuser
sudo ./hue changepassword username
(5) Glue ETL from PostgreSQL to RedShift with empty string
When performing ETL from PostgreSQL to RedShift using AWS Glue, if the source data has a VARCHAR() NOT NULL and empty string, the ETL job would fail because the spark-redshift driver was handling it improperly. For example:
- On both PostgreSQL and RedShift, create a test table with NOT NULL constraints, as below:
CREATE TABLE varchar_test (id bigint not null, lastname varchar(255) not null, password_expired boolean not null);
- On PostgreSQL, insert some test data:
INSERT INTO varchar_test VALUES (1, 'Test A', True); INSERT INTO varchar_test VALUES (2, '', False); INSERT INTO varchar_test VALUES (3, 'Test C', True);
-
In AWS Glue, use crawlers to crawl both PostgreSQL and RedShift. Do not modify the schema produced by the crawlers.
-
Create an ETL job to copy from PostgreSQL to RedShift, using the automatically generated script. The ETL job would fail because the empty string '' was treated as NULL.
-
Download the spark-avro driver 4.0.0, upload it to your S3 bucket as s3://your-bucket/spark-avro_2.11-4.0.0.jar.
http://central.maven.org/maven2/com/databricks/spark-avro_2.11/4.0.0/spark-avro_2.11-4.0.0.jar
- The last two lines of the automatically generated Python code look like the following:
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = "test", table_name = "test_public_varchar_test_1", redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink5")
job.commit()
- Change the above-mentioned two lines to the following:
datasink5 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = resolvechoice4, catalog_connection = "redshift", connection_options = {"dbtable": "varchar_test", "database": "test", "tempformat": "AVRO"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink5")
job.commit()
Please note that the value for catalog_connect is the name of the connection to your RedShift database in Glue. In the connection_options, dbtable is the name of the table in RedShift, database is the name of the database in RedShift.
-
Save the script. In the AWS Glue UI, click on the job, then "action" -> "Edit Job" -> "Security configuration, script libraries, and job parameters (optional)" -> "Dependent jars path", fill in s3://your-bucket/spark-avro_2.11-4.0.0.jar here.
-
Run the ETL job. The ETL job will run successfully, with all the records ending up in the destination table in RedShift.
(6) Receives "503 Slow down" Error from s3-dist-cp
By default, the AWS S3DistCp (s3-dist-cp) command launches as many reducers as possible to increase speed up the copy. This is usually effective in getting the copy done as soon as possible. However, when the EMR cluster is big, you can quickly reach the API rate limit imposed by S3.
The solution is to manually specify the number of reducers for the copy job, as below:
s3-dist-cp -Dmapreduce.job.reduces=30 --src hdfs:///path/to/dir --dest s3://path/to/dir/
(7) Gracefully remove a core node
- Edit /emr/instance-controller/lib/dfs.hosts.exclude , with the private IP address of the impacted node.
- Run "hdfs dfsadmin -refreshNodes" to decommission the impacted node.
- Run "hdfs dfsadmin -report" to view the status of the nodes. You need to wait until the decommission is completed.
- Run "hdfs fsck / -files -blocks -locations -racks" to ensure that you do not have any corrupted blocks or missing replicas.
- Terminate the impacted node from the EC2 console. A replacement node will automatically be launched by EMR.
(8) Check master node in a bootstrap action script
#!/bin/bash
IS_MASTER=true
if [ -f /mnt/var/lib/info/instance.json ]; then
IS_MASTER=$(cat /mnt/var/lib/info/instance.json | jq .isMaster)
fi
if [ "$IS_MASTER" = true ]; then
instance=$(curl http://169.254.169.254/latest/meta-data/instance-id)
aws ec2 modify-instance-attribute --instance-id $instance --groups sg-xxxxxxxx sg-xxxxxxxx sg-xxxxxxxx sg-xxxxxxxx sg-xxxxxxxx
fi