06. PySpark 访问 Oss - aliyun/MaxCompute-Spark GitHub Wiki

参数配置

  • 首先需要参考文档配置ossid和key:
spark.hadoop.fs.oss.accessKeyId = xxxxxx
spark.hadoop.fs.oss.accessKeySecret = xxxxxx
spark.hadoop.fs.oss.endpoint = oss-xxxxxx-internal.aliyuncs.com
  • 【公共云】需要引用hadoop-fs-oss.jar包,直接添加以下配置即可:
spark.hadoop.odps.cupid.resources=public.hadoop-fs-oss-shaded.jar 
  • 【专有云】需要引用hadoop-fs-oss.jar包,需要按照以下步骤上传资源并添加配置:
(1)下载hadoop-fs-oss.jar包,下载地址(https://odps-repo.oss-cn-hangzhou.aliyuncs.com/spark/hadoop-fs-oss-shaded.jar)
(2)将jar包上传为MaxCompute资源,参考文档(https://help.aliyun.com/document_detail/27831.html?spm=a2c4g.27797.0.i1#section-533-s8q-d9w)
(3)添加参数:spark.hadoop.odps.cupid.resources=<projectname>.hadoop-fs-oss-shaded.jar 
  • 需要注意:如果已经配置过spark.hadoop.odps.cupid.resources这个参数,则引用多个资源需要用逗号隔开,参考文档

例子1:判断oss文件是否存在

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('testoss').getOrCreate()
sc = spark.sparkContext
conf = sc._jsc.hadoopConfiguration()
conf.set("fs.oss.accessKeyId", "xxxx")
conf.set("fs.oss.accessKeySecret", "xxx")
conf.set("fs.oss.endpoint", "xxxx")
conf.set("fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem")

path = sc._jvm.org.apache.hadoop.fs.Path("oss://xxxxx")
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(path.toUri(), conf)
exist = fs.exists(path)

例子2:写oss

spark = SparkSession.builder.appName('testoss').getOrCreate()
data = [i for i in range(0, 100)]
df = spark.sparkContext.parallelize(data, 2).map(lambda s: ("name-%s" % s, s)).toDF("name: string, num: int")
df.show(n=10)
# write to oss
pathout = 'oss://yeshan01/test.csv'
df.write.csv(pathout)