Python Spark - RatneshKumarSrivastava/Ratnesh GitHub Wiki
what is sqlcontext.sql ?
sql. SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. β¦ SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); The entry point into all relational functionality in Spark is the SQLContext class, or one of its decedents.
-βββββββββββββββββββββββββββββββββ-
https://dzone.com/articles/best-spark-sql-example-in-10-steps
-ββββββββββββββββββββββββββββββββββ
RDD is an immutable distributed collection of data, partitioned across nodes in the cluster that can be operated in parallel with a low-level API that offers transformations and actions.
limitations of RDD -
1) No input optimization engineβ-
There is no provision in RDD for automatic optimization.
2 )there is no Static typing and run-time type safety in RDD. It does not allow us to check error at the runtime.
3)Degrade when not enough memory
The RDD degrades when there is not enough memory to store RDD in-memory or on disk. There comes storage issue when there is a lack of memory to store RDD. The partitions that overflow from RAM can be stored on disk and will provide the same level of performance. By increasing the size of RAM and disk it is possible to overcome this issue.
4) Handling structured data
RDD does not provide schema view of data. It has no provision for handling structured data.
Dataset and DataFrame provide the Schema view of data. It is a distributed collection of data organized into named columns.
-βββββββββββββββββββββββββββββββββββββββββββββ-
Compare Spark vs Hadoop MapReduce
Criteria Hadoop MapReduce/Apache Spark
Disk usage MapReduce is disk oriented.//Spark caches data in-memory and ensures low latency.
Processing Only batch processing is supported//Supports real-time processing through spark streaming.
Installation is bound to hadoop. //Is not bound to Hadoop.
-ββββββββββββββββββββββββββββββββββββββββββββββββββββ
how many modes are there for spark execution??
1.local
2.standalone
3.yarn(default)
4.mesos
syntax: pyspark βmaster local
SparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file.
RDD.saveAsPickleFile and SparkContext.pickleFile support saving an RDD in a simple format consisting of pickled Python objects. Batching is used on pickle serialization, with default batch size 10.
RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset.
-ββββββββββββββββββββββββββββββββββββββββββββββββββββ
http://www.secnetix.de/olli/Python/lambda_functions.hawk
print reduce(lambda x,y:x+y, map(lambda w : w1,[(1,2),(3,4),(5,6)]))
reduce (lambda x,y: x+y, map(lambda x :x1, filter(lambda x : x0%2==1,[(1,2),(3,4),(5,6)])))
for loading hive table
from pyspark import HiveContext
sqlContext = HiveContext(sc)
data = sqlContext.sql(βselect * from tableβ)
for i in data.collect():
print(i)///// for all values
print(i.department_id)// for selected columns
-βββββββββββ-
for loading JSON file
from pyspark import SQLContext
sqlConext = SQLConext(sc)
departmewtnjson= sqlConext.jsonFile("")
departmewtnjson.registerTempTable(βjsontableβ)
sqlContext.sql(βselect * from dsonβ)
for i in sqlContext.sql(βselect * from dsonβ) :
print(i)
-ββββββββββββββββββββββ-
#calculate aggregate statistics
orderRdd = sc.textFile("")
orderMap = orderRdd.map(lambda x : split(β,β)1,1))
ordebystatus = orderMap.groupByKey(lambda x,y : x+y)
orderBystatus = orderMap.reduceByKey(lambda x,y : x+y)
difference btw reduceByKey and groupByKey
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html
Notice how pairs on the same machine with the same key are combined (by using the lamdba function passed into reduceByKey) before the data is shuffled. Then the lamdba function is called again to reduce all the values from each partition to produce one final result.
-ββββββββββββββββββββββββ-
word count program -βββββββββββββββββββββββββββ-
ratnesh1 = sc.teextFile(" ")
ratnesh1.map(lambda x: (x0,1)).reduceBYKey(lambda x,y: x + y).collect()
-
joining two datasetsβ
ordersRDD = sc.textFile(" β)
orderMapRdd = ordersRDD.map(lambda o : (int(o.split(β,β)0),o.split(β,β)1)) /// for two record from file
//////orderMapRdd = ordersRDD.map(lambda o : (int(o.split(β,β)0),o)) /// for all records
orderjoinitem = orderMapRdd.join(ordersRDD)
joindata = orderjoinitem.map(lambda t:t10.split(β,β)1,float(t11.split(β,β)4)))/// for getting special records
ordeperday = joindata.map(lambda t: (t11.split(β,")1 + β,β +str[t0))).distinct() //distinct for unique key,
By Default HiveContext/sqlContext using 200 tasks manager
but we can set
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
sqlContext.sql(βset spark.sql.shuffle.partitions=10β)
-βββββββββββββββββββββββββββββ-
- using spark native sql
from pyspark.sql import SQLContext, Row
sqlContext = SQLConext(sc)
sqlContext.sql(set spark.sql.shuffle.partitions=10");
orderrdd = sq.textFile("")
orderMap = orderrdd.map(lambda x : x.split(β,β)).map(lambda x : Row(id=x0,first_name= x1,gender = x2))
orderschema = sqlContext.inferSchema(orders)
orderSchema.registerTempTable(βordersβ)
Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only βaddedβ to, such as counters and sums.
spark document importent pointsβ
The first thing a Spark program must do is to create a SparkContext object, which tells Spark how to access a cluster. To create a SparkContext you first need to build a SparkConf object that contains information about your application.
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
Apart from text files, Sparkβs Python API also supports several other data formats:
SparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file.
RDD.saveAsPickleFile and SparkContext.pickleFile support saving an RDD in a simple format consisting of pickled Python objects. Batching is used on pickle serialization, with default batch size 10.
-ββββββββββββββββββββββββββββββββββββ-
Passing Functions to Spark
Sparkβs API relies heavily on passing functions in the driver program to run on the cluster. There are three recommended ways to do this:
Lambda expressions, for simple functions that can be written as an expression. (Lambdas do not support multi-statement functions or statements that do not return a value.)
Local defs inside the function calling into Spark, for longer code.
-βββββββββββββββββββββββββββββββββββββ-
groupBy() & groupByKey() Exampleββ groupByKey() operates on Pair RDDs and is used to group all the values related to a given key. groupBy() can be used in both unpaired & paired RDDs. When used with unpaired data, the key for groupBy() is decided by the function literal passed to the method
Whatβs the difference between an RDDβs map and mapPartitions method?
The method map converts each element of the source RDD into a single element of the result RDD by applying a function. mapPartitions converts each partition of the source RDD into multiple elements of the result (possibly none).
And does flatMap behave like map or like mapPartitions?
Neither, flatMap works on a single element (as map) and produces multiple elements of the result (as mapPartitions).
-βββββββββββββββββββββββββββββββββββββ-
Transformation returns RDD but action returns value or save in any file format.
Transformations are lazy the result RDD is not immediate computed.
Actions are eager their result is immediate computed.
One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10)). Note: some places in the code use the term slices (a synonym for partitions) to maintain backward compatibility.
-βββββββββββββββββββββββββββββββββββββββ-
Transformation of spark ::
map - apply function to each element of RDD and provide result into single RDD. apply function to each element of RDD and provide an RDD of interators returned.
flatMapp
filter - apply function to each element of RDD and return an rdd of element which passes to filter condition. return rdd having distinct elements.
distinct()
union(), substract(), intersection() ,cartesian(),
Actions in spark ::
collect(), count(), take(), reduce(), foreach()
takeSample(), takeOrdered(), saveAsTextFile(), saveAsSequenceFile(),
-βββββββββββββββββββββββββββββββββββββββββ-
Transformation for pair RDD ::
groupByKey() :
-βββββββββββββββββββββββββββββββββββββββ
Difference b/w reduceByKey and groupByKey
reduceByKey will aggregate y key before shuffling, and groupByKey will shuffle all the value key pairs as the diagrams show. On large size data the difference is obvious.
combineByKey can be used when you are combining elements but your return type differs from your input value type.
aggregateByKey() is logically same as reduceByKey() but it lets you return result in different type. In another words, it lets you have a input as type x and aggregate result as type y. For example (1,2),(1,4) as input and (1,βsixβ) as output
-βββββββββββββββββββββββββββββββββββββββββ-
Databricks Spark Knowledge Base
Avoid GroupByKeyββ
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html
Donβt copy all elements of a large RDD to the driver --
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/dont_call_collect_on_a_very_large_rdd.html
Gracefully Dealing with Bad Input Data
-βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ-
Understanding Sparkβs SparkConf, SparkContext, SQLContext and HiveContext
https://blogs.msdn.microsoft.com/bigdatasupport/2015/09/14/understanding-sparks-sparkconf-sparkcontext-sqlcontext-and-hivecontext/
Important classes of Spark SQL and DataFrames:
β :class:`pyspark.sql.SparkSession`
Main entry point for :class:`DataFrame` and SQL functionality.
β :class:`pyspark.sql.DataFrame`
A distributed collection of data grouped into named columns.
β :class:`pyspark.sql.Column`
A column expression in a :class:`DataFrame`.
β :class:`pyspark.sql.Row`
A row of data in a :class:`DataFrame`.
β :class:`pyspark.sql.GroupedData`
Aggregation methods, returned by :func:`DataFrame.groupBy`.
β :class:`pyspark.sql.DataFrameNaFunctions`
Methods for handling missing data (null values).
β :class:`pyspark.sql.DataFrameStatFunctions`
Methods for statistics functionality.
β :class:`pyspark.sql.functions`
List of built-in functions available for :class:`DataFrame`.
β :class:`pyspark.sql.types`
List of data types available.
β :class:`pyspark.sql.Window`
For working with window functions.
"""
from future import absolute_import
from pyspark.sql.types import Row
from pyspark.sql.context import SQLContext, HiveContext, UDFRegistration
from pyspark.sql.session import SparkSession
from pyspark.sql.column import Column
from pyspark.sql.dataframe import DataFrame, DataFrameNaFunctions, DataFrameStatFunctions
from pyspark.sql.group import GroupedData
from pyspark.sql.readwriter import DataFrameReader, DataFrameWriter
from pyspark.sql.window import Window, WindowSpec
all = [
βSparkSessionβ, βSQLContextβ, βHiveContextβ, βUDFRegistrationβ,
βDataFrameβ, βGroupedDataβ, βColumnβ, βRowβ,
βDataFrameNaFunctionsβ, βDataFrameStatFunctionsβ, βWindowβ, βWindowSpecβ,
βDataFrameReaderβ, βDataFrameWriterβ
]
-βββββββββββββββββββββββββββββββββββββββββββββββββ-
Pair RDD : Spark provides special type of operations on RDDs containing key or value pairs.
RDDs
aggregateByKey()
cogroup [Pair] Multiple Pair RDDs can be combined using cogroup() ex .rdd.cogroup(rdd1)
groupWith [Pair]
countApproxDistinct
mapPartitions -β>The method map converts each element of the source RDD into a single element of the result RDD by applying a function. mapPartitions converts each partition of the source RDD into multiple elements of the result (possibly none).βββheavyweight initialization
mapPartitionsWithSplit -β >Similar to mapPartitions, but also provides func with an integer value representing the index of the split, so func must be of type (Int, Iterator[T]) => Iterator[U] when running on an RDD of type T.
-ββββββββββββββββββββββββββββββ-
how do you drop rows from an RDD in PySpark? Particularly the first row, since that tends to contain column names in my datasets. From perusing the API, I canβt seem to find an easy way to do this. Of course I could do this via Bash / HDFS, but I just want to know if this can be done from within PySpark.???//
Ans ::
header = rdd.first()
rdd.filter(lambda line: line != header)
-βββββββββββββββββββββββββββββββββββββββββββββ
How to find fue data from rdd and saveasTextFile()
ans ::