ICP Assignment 8 - MadhuriSarode/BDP GitHub Wiki

Module 2 _ Assignment 1

Student ID : 24 : Madhuri Sarode

Student ID : 4 : Bhargavi

Student ID : 16 : Bhavana


Apache Spark Introduction


Spark programming - Transformations and actions


At a high level, every Spark application consists of a driver program that runs the user’s main function and executes various parallel operations on a cluster. The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it.Spark can persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.

A second abstraction in Spark is shared variables that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.

Resilient Distributed Datasets (RDDs) : Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

Parallelized collections are created by calling SparkContext’s parallelize method on an existing collection in your driver program (a Scala Seq). The elements of the collection are copied to form a distributed dataset that can be operated on in parallel.

val data = Array(1, 2, 3, 4, 5)

val distData = sc.parallelize(data)

Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.

scala> val distFile = sc.textFile("data.txt")

Transformations and actions : RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset.All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently.


1)Transformations and actions on word count program

The first thing a Spark program must do is to create a SparkContext object, which tells Spark how to access a cluster. To create a SparkContext you first need to build a SparkConf object that contains information about your application.The appName parameter is a name for your application to show on the cluster UI. master is a Spark, Mesos or YARN cluster URL, or a special “local” string to run in local mode.

val conf = new SparkConf().setAppName(appName).setMaster(master)

new SparkContext(conf)

The below Program shows the code for the 3 transformations and actions used.

Transformation 1 : Filtering the strings having the regex pattern [A-Za-z] and of length > 4

val transformation1 = input.flatMap(x => x.split(" ").filter(x => x.matches("[A-Za-z]+") && x.length > 4))

Transformation 2 : Filtering only for integers. So the input words which matches the pattern 0,1..9 is filtered.

val transformation2 = input.flatMap(x => x.split(" ").filter(x => x.matches("[0-9]+")))

Transformation 3 : Extracting distinct words alone using distinct() function val transformation3 = words.distinct()

The input given to the program is as follows

The program output for transformation 1

The program output for transformation 2

The program output for transformation 3


Spark Actions

Action 1 : Counting and printing the number of words in the input file val action1= words.count()

**Action 2 : ** Printing the first element in the dataset val action2=words.first()

Action 3 : Printing the first 5 elements using take() command and foreach() command val action3=words.take(5).foreach(println)



2) Secondary Sort


Partitioning in Apache Spark One important way to increase parallelism of spark processing is to increase the number of executors on the cluster. However, knowing how the data should be distributed, so that the cluster can process data efficiently is extremely important. The secret to achieve this is partitioning in Spark. Apache Spark manages data through RDDs using partitions which help parallelize distributed data processing with negligible network traffic for sending data between executors. By default, Apache Spark reads data into an RDD from the nodes that are close to it.

Characteristics of Partitions in Apache Spark

  • Every machine in a spark cluster contains one or more partitions.

  • The number of partitions in spark are configurable and having too few or too many partitions is not good.

  • Partitions in Spark do not span multiple machines.

Secondary Sort : The approach involves using the MapReduce framework for sorting the reducer values (this does not require in-reducer sorting of values passed to the reducer). This approach consists of creating a composite key by adding a part of, or the entire value to, the natural key to achieve sorting objectives.

This is a summary of the approach:

  • Use the Value-to-Key Conversion design pattern: form a composite intermediate key, (K, V1), where V1 is the secondary key. Here, K is called a natural key. To inject a value (i.e., V1) into a reducer key, simply create a composite key (for details, see the DateTemperaturePair class). In our example, V1 is the temperature data.

  • Let the MapReduce execution framework do the sorting (rather than sorting in memory, let the framework sort by using the cluster nodes).

  • Preserve state across multiple key-value pairs to handle processing

The following shows the code for secondary sort

The input file is as follows with the temperature readings.

The sorted output is as follows.Output the temperature for every year-month with the values sorted in ascending order