Spark Resilient Distributed Dataset and Dataframes - salmanbaig8/imp GitHub Wiki

RDD:

  • Fault-tolerant collection of elements that can be operated in parallel

  • Immutable

  • Three methods for creating RDD -Parallelizing an existing collection -referencing a dataset -Transformation from an existing RDD

  • Two types of RDD operations -Transformation -Actions

  • Datasets from any storage supported by Hadoop -HDFS -Cassandra -HBase -Amazon s3

  • Types of file supported: -Text files -Sequence files -Hadoop InputFormat

  • Create RDD: launch: ./bin/spark-shell

  • create some data val data= 1to 1000

  • Parallelize that data(Creating the RDD) val disData = sc.parallelize(data)

  • perform additional transformations or invoke an action on it disData.filter

  • Alternately, create an RDD from an external dataset val readmeFile = sc.textFile("Readme.md")

RDD operations - Basics .Loading a file val lines = sc.textFile("hdfs://data.txt") .Applying transformation val linelengths = lines.map(s => s.length) .Invoking action val totallengths = lineLengths.reduce((a,b) => a+b) .Mapreduce example: val wordcounts = textfile.flatmap(line => line.split ("")) .map(word => (word,1)) .reduceBykey((a,b) => a + b) wordCounts.collect

Direct Acyclic graph(DAG): (operatiosn)map, filter , write to text

What happens when action is executed: //Create RDD ->val logFile = sc.textFile("hdfs://...") //Transformations val errors = logFile.filter(.startsWith("ERROR")) val messages = errors.map(.split("\t")).map(r=>r(1)) //caching -> messages.cache() //Actions messages.filter(.contains("mysql")).count() messages.filter(.contains("php")).count()

RDD transformations: map(func), filter(func), flatMap(func), join(otherDataset, numTasks), reduceBykey(func), sortByKey([ascending],[numTasks])

RDD Actions (returns a value): Collect(), count(), first(), take(n), foreach(func)

RDD persistence: uses cache 2 methods for RDD persistence -persist() -cache() - essentialy just persist with MEMORY_ONLY storage

which storage level to use: .RDD's fit with default storage level(MEMORY_ONLY) its best CPU efficient option .else ->MEMORY_only_SER and slect fast serializing lib to make obj much more space efficient, but still reasonably fast to access .Dont spill to disk unless its cmpute to datasets is expensive .Use replicated storage levels if you want fault fast recovery . in env's with high amounts of memory or multiple apps, the experimental OFF_HEAP mode has several adv's

  • it allows multiple executors to share the same pool of memory in Tachyon -significantly reduces garbage collection costs -cached data is not lost if individual executors crash

Shared variables and key-value pairs: .when a func is passed from driver to a worker normally a separate copy of the variables are used .Two types of variables -Broadcast variables .read only copy on each machine .Distribute broadcast variables using efficient broadcast algo's -Accumulators .variables added through an associative operation .Implement counters and sums .Only the driver can read the accumulators value .Numeric types accumulators. Extend for new types

Programming with key value pairs: Special ops on RDDs of key value pairs -> grouping or aggregating elements by a key Tuple2 objects created by writing (a,b) pairRDDFucntions contains key-value pair operations - reduceBykey((a,b) => a +b) custom Objects as key in key-value pair requires a custom equals method with a matching hashcode() method EX: val textFile = sc.textFile("") val readmeCount = textFIle.flatMap(line => line.split("")).map(word => (word,1)).reduceBykey(+)