spark学习笔记(二) - 18965050/learning-spark GitHub Wiki

RDD编程

RDD操作又两种类型: transformation(将一个RDD转换为另一个RDD)和action(计算结果集). 这个有点像JDK8中collection stream中的中间操作和结果操作.

pythonLines = lines.filter(lambda line: "Python" in line)	#transformation

pythonLines.first()	# action

RDD经过action操作后, 所有的计算都需要从头做起, 因此我们可以使用persist来保存中间结果

spark一般编程流程为:

  • 根据外部数据集创建RDD
  • transformation操作进行RDD数据集转换
  • 如果需要复用的话, 可使用persist()/cache()函数
  • action操作获取结果

spark生成RDD有两种方式:

  • 从外部加载数据集
  • 通过驱动程序parallelize一个集合

懒加载

RDD只有在遇到action操作时才会真正执行整个流程

spark中的函数传递

不同语言,函数传递的方式也不同.

python

可通过lamba表达式或函数来传递

word = rdd.filter(lambda s: "error" in s)

def containsError(s):
	return "error" in s
word = rdd.filter(containsError)

注意, 不要传递函数所在的对象引用.这会引起的问题有:

  • 整个对象会被加载到spark的work node中, 有时候对象会很大
  • 对象中的其他数据结构可能会由于加载不到而导致抛出异常

举例如下:

# 错误的方式
class SearchFunctions(object):
	def __init__(self, query):
		self.query = query

	def isMatch(self, s):
		return self.query in s

	def getMatchesFunctionReference(self, rdd):
		# Problem: references all of "self" in "self.isMatch"
		return rdd.filter(self.isMatch)

	def getMatchesMemberReference(self, rdd):
		# Problem: references all of "self" in "self.query"
		return rdd.filter(lambda x: self.query in x)

# 正确的方式
class WordFunctions(object):
	...
	def getMatchesNoReference(self, rdd):
		# Safe: extract only the field we need into a local variable
		query = self.query
		return rdd.filter(lambda x: query in x)

java

java中使用的函数接口来源于org.apache.spark.api.java.function包,可以匿名或实名实例化接口来实现

RDD<String> errors = lines.filter(new Function<String, Boolean>() {
	public Boolean call(String x) { return x.contains("error"); }
});

class ContainsError implements Function<String, Boolean>() {
	public Boolean call(String x) { return x.contains("error"); }
}
RDD<String> errors = lines.filter(new ContainsError());

在jdk8中,也可以使用lamba表达式

RDD<String> errors = lines.filter(s -> s.contains("error"));

通用的transformation和action

transformation

  • map: 将RDD中每个元素进行转换

  • flatMap: 对element再次进行迭代

     lines = sc.parallelize(["hello world", "hi"])
     words = lines.flatMap(lambda line: line.split(" "))
     words.first() # returns "hello"
  • filter: 过滤

  • distinct:去除重复元素

  • union:并集, 不会去重

  • intersection:交集

  • subtract: 补集

  • cartesian: 笛卡尔积乘

rdd-transformation-1 rdd-transformation-2

action

rdd-action-1 rdd-action-2

persist/cache

如前所述, 一个RDD由于其懒加载的特性(直到遇到action操作才会执行之前的transformation操作), 使得其有时候性能很差. 看下面的例子:

val result = input.map(x => x*x)
println(result.count())	# 会执行map
println(result.collect().mkString(","))	#会执行map

这种情况下, 我们可以使用persist/cache来保存中间结果. 存储级别有如下几种: persist-storage-level

当内存使用满时, spark使用LRU(Least Recently Used)策略来踢出RDD对象, 也可以使用unpersist()手动从缓存中删除

⚠️ **GitHub.com Fallback** ⚠️