PySpark Made Easy - mdjibran/PySparkGuide GitHub Wiki

Index

Transformations

  • map()
  • filter()
  • flatMap()
  • intersection()
  • distinct()
  • groupByKey()
  • reduceByKey()
  • aggregateByKey()
  • sortByKey()
  • join()
  • cogroup()
  • cartesian()
  • pipe()
  • coalesce()
  • repartition()
  • repartitionAndSortWithinPartitions()
  • mapPartitions()
  • mapPartitionsWithIndex()
  • sample()
  • union()

Actions

  • reduce()
  • count()
  • first()
  • take()
  • takeSample()
  • takeOrdered()
  • saveAsTextFile()
  • saveAsSequenceFile()
  • saveAsObjectFile()
  • countByKey()
  • foreach()
  • collect()

Content

Transformations

1. map()

Purpose:

Syntax:

Input/Output:

USE when:

NOT to USE when:

Example:

text = sc.textFile('/data/sample.csv')
text.first()

2. filter()

Purpose: To obtain a subset of records from a RDD

Syntax: RDD.filter(lambda i: condition) RDD.filter(func)

Input/Output: Condition or function/RDD

USE when: A large dataset is to be filtered into smaller chunks based on certain criteria

NOT to USE when:

Example:

# 1
nums = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
RDD = nums.filter(lambda x: x % 2 == 0)
RDD.take(2)

# 2
def Func(x):
  if x%2 == 0:
    return true
  else
    return false

nums = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
RDD = nums.filter(Func)
RDD.take(2)

reduceByKey()

Purpose:

Syntax:

Input/Output:

USE when:

NOT to USE when:

Example:

orderItems = sc.textFile("/data/data-master/retail_db/order_items")
orderItemsMap = orderItems.map(lambda i: (int(i.split(',')[1]), float(i.split(',')[4])))
orderItemsSubTotal = orderItemsMap.reduceByKey(lambda x, y: x + y)

Get minimum value
orderItemsSubTotal = orderItemsMap.reduceByKey(lambda x, y: x if(x < y) else y)

aggregateByKey()

Purpose:

Syntax:

Input/Output:

USE when:

NOT to USE when:

Example:

orderItems = sc.textFile("/data/data-master/retail_db/order_items")
orderItemsMap = orderItems.map(lambda x: (int(x.split(',')[1]), float(x.split(',')[4]) ))
orderGroupedCount = orderItemsMap.aggregateByKey((0.0, 0),
lambda x,y: (x[0]+y, x[1]+1),
lambda x,y: (x[0]+y[0], x[1]+y[1])
)

sortByKey()

Purpose:

Syntax:

Input/Output:

USE when:

NOT to USE when:

Example:

products = sc.textFile("/data/data-master/retail_db/products")
productsMap = products.map(lambda x: (float(x.split(',')[4]) if x.split(',')[4] !='' else float(x.split(',')[5] ), x.split(',')[2]))
sortedProducts = productsMap.sortByKey()
)

# Sort data by product category  and then by product price descending - sortByKey
products = sc.textFile("/data/data-master/retail_db/products")
productsMap = products\
.filter(lambda x: x.split(',')[4] != '')\
.map(lambda x: ((int(x.split(',')[1]), float(x.split(',')[4] )), x.split(',')[2] ))\
.sortByKey()

# To get orderby key (x,y) where x is ascending and y is descending 
products = sc.textFile("/data/data-master/retail_db/products")
productsMap = products\
.filter(lambda x: x.split(',')[4] != '')\
.map(lambda x: ((int(x.split(',')[1]), -float(x.split(',')[4] )), x.split(',')[2] ))\
.sortByKey()

Actions

1. reduce()

Purpose:

Syntax:

Input/Output:

USE when:

NOT to USE when:

Example:

text = sc.textFile('/data/sample.csv')
text.first()

2. count()

Purpose: To get the total number of elements in RDD

Syntax: RDD.count()

Input/Output: -/int

USE when: Need to get total elements

NOT to USE when:

Example:

text = sc.textFile('/data/sample.csv')
text.count()

3. first()

Purpose: To get first element of the RDD

Syntax: RDD.first()

Input/Output: -/RDD[0]

USE when: Get only one element from RDD

NOT to USE when:

Example:

text = sc.textFile('/data/sample.csv')
text.first()

4. take()

Purpose: Returns n records from RDD

Syntax: RDD.take(n), where n is the number of records

Input/Output: int/int

USE when:

  • Dataset is large
  • Only a fraction of data is required

NOT to USE when:

Example:

text = sc.textFile('/data/sample.csv')
for i in text.take(10): print i

5. top()

Purpose:

Syntax:

Input/Output:

USE when:

NOT to USE when:

Example:

# Get Top records - top, takeOrdered
filteredProducts = products.filter(lambda x: x.split(',')[4] != '' )
topProducts = filteredProducts.top(5, key=lambda x: float(x.split(',')[4] ))
# above is similar as writing below statement with takeOrdered
takeOrderedProducts = filteredProducts.takeOrdered(5, key=lambda x: -float(x.split(',')[4] ))

5. takeOrdered()

Purpose:

Syntax:

Input/Output:

USE when:

NOT to USE when:

Example:

# Get Top records - top, takeOrdered
filteredProducts = products.filter(lambda x: x.split(',')[4] != '' )
topProducts = filteredProducts.top(5, key=lambda x: float(x.split(',')[4] ))
# above is similar as writing below statement with takeOrdered
takeOrderedProducts = filteredProducts.takeOrdered(5, key=lambda x: -float(x.split(',')[4] ))