EMR AWS - aidarko/dev-notes GitHub Wiki

Table of contents

  1. Introduction
    1. Overview
    2. S3 data source
    3. Glue
    4. Sparksql
  2. Some paragraph
    1. Sub paragraph
  3. Another paragraph

Introduction

Overview

  1. Create EMR cluster with Spark and Hadoop (master, core and task nodes). Link spark & hive metadata to Glue.
  2. Get ghactivity data
  3. Upload data.
  4. Try out cli (spark-shell, pyspark), notebook (Jupiter)
  5. In EMR there are Notebooks for test ideas.

More:

S3 data source

Source https://www.gharchive.org/ In s3 bucket: s3://itv-guthub/landing/ghactivity In master node:

## see bucket
hdfs dfs -ls s3://itv-guthub/landing/ghactivity
## download files
wget <see on gharchive.org>
## upload to the bucket
hdfs dfs -copyFromLocal * s3://itv-github/landing/ghactivity

Glue catalogue

Glue - serverless data integration service. Can make S3, RDS, Dynamo as a table in a catalog.

When creating a cluster we may set up AWS Glue Data Catalog settings(optional) -> check Use for Spark table metadata. It makes accessible resources via Glue from EMR.

spark-sql --master yarn

spark-sql

With running EMR cluster, connect to Master node of the cluster.

## Launch spark-sql session with Yarn
spark-sql
## Against Glue catalog
SHOW databases;
USE retail_db;
SHOW tables;
DESCRIBE FORMATTER orders;
SELECT * from orders LIMIT 10;

PySpark in AWS EMR

With running EMR cluster, connect to Master node of the cluster.

## list files from S3
## alternative: hadoop fs -ls 
## alternative: aws s3 ls s3://...
hdfs dfs -ls s3://itv-retail/retail_db_json/orders

## launch CLI in Yarn mode
pyspark

## read via PySpark
spark.read.json('s3://itv-retail/retail_db_json/orders').printSchema()
spark.read.json('s3://itv-retail/retail_db_json/orders').show()
spark.read.json('s3://itv-retail/retail_db_json/orders').count()

Group by and count

from pyspark.sql.functions import *
orders = spark.read.json('s3://itv-retail/retail_db_json/orders') ## DataFrame
orders.groupBy('order_status').count().show() ## shows countBy status
+--------------+-----+
|  order_status|count|
+--------------+-----+
|     CANCELLED| 1428|
|      COMPLETE|22899|
+--------------+-----+

>>> orders.groupBy(lower('order_status')).count().show() ## shows order_status in lower case

Same with Spark shell (scala)

spark-shell
scala> spark.read.json("s3://itv-retail/retail_db_json/orders").printSchema
scala> spark.read.json("s3://itv-retail/retail_db_json/orders").show

scala> val orders = spark.read.json("s3://itv-retail/retail_db_json/orders") ## dataframe
scala> orders.groupBy("order_status").count.show
scala> orders.groupBy(lower(col("order_status"))).count.show
##                  same as ^
scala> orders.groupBy(lower($"order_status")).count.show

Notebook

Create a cluster with JupiterEnterpriseGateway. A was for data scientists/engineers to test their ideas.

Set up boto3 (Boto3 makes it easy to integrate your Python application, library, or script with AWS services including Amazon S3, Amazon EC2, Amazon DynamoDB, and more.) in cluster bootstrap: /usr/bin/pip3 install boto3=1.24.17

ipynb file extension is used for computational notebooks that can be open with Jupyter Notebook.

create new *.ipynb file (Jupiter):

import os
os.environ.setdefault('AWS_PROFILE', 'analytiqs')
import boto3
s3_client = boto3.client('s3')
bukcets = s3_client.list_buckets()['Buckets']
for bucket in buckets:
    print(bucket['Name'])

In cmd export AWS_PROFILE=analytiqs allows to run aws commands without specifying profile: aws emr list-clusters --active

Using Boto3 on EMR's master

On EMR since we have boto3 and profile is set:

python 
>>> import boto3
>>> s3_client = boto3.client('s3')
>>> s3_client.list_buckets()