pyspark shell - yaokun123/php-wiki GitHub Wiki

使用pyspark shell进行交互式分析

在进行数据分析的时候,通常需要进行交互式数据探索和数据分析。为此,PySpark提供了一个交互式的工具pyspark shell。通过Spark Shell,用户可以和PySpark进行实时交互,以进行数据探索、数据清洗和整理以及交互式数据分析等工作。

pyspark shell命令格式如下所示:

$ ./bin/pyspark [options]

options 参数

options 示例 解释
--master MASTER_URL spark://host:port,mesos://host:port, yarn,k8s://https://host:port, or local (Default: local[*]). 启动模式
--deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or on one of the worker machines inside the cluster ("cluster") (Default: client). 提交运行时部署模式,表示的是Driver Program运行的地方。要么是提交应用的Client,要么是集群中从节点(Standalone:Worker,YARN: NodeManager):cluster
--class CLASS_NAME Your application's main class (for Java / Scala apps). application入口类
--name NAME A name of your application. 给application设置名称
--jars JARS Comma-separated list of jars to include on the driver and executor classpaths 设置jar包

要查看完整的参数选项列表,可以执行“pyspark --help”命令,如下:

$ pyspark --help

一、运行模式--master

Spark/PySpark的运行模式取决于传递给SparkContext的Master URL的值。参数选项“--master”表示当前的pyspark shell要连接到哪个master(即告诉Spark/PySpark使用哪种集群类型)。

如果是local[],就是使用本地模式启动pyspark shell,其中,中括号内的星号()表示需要使用几个CPU核,也就是启动几个线程模拟Spark集群。如果不指定,则默认为local。

当运行pyspark shell命令时,像下面这样定义这个参数:

$ pyspark --master 

根据所使用的集群的类型而变化。Master URL(即--master参数)的值如下表所示

二、启动和退出pyspark shell

以下操作均在终端窗口中进行。

1、启动pyspark shell方式一:local模式

$ ./bin/pyspark

# 退出pyspark shell,使用如下命令:
>>> exit()

pyspark shell在启动时,已经帮我们创建好了SparkSession对象的实例spark(实际上也包括SparkContext对象的实例sc),我们可以在pyspark shell中直接使用sc和spark这两个对象。另外,默认情况下,启动的pyspark shell采用local部署模式。

2、启动Spark Shell方式二:standalone模式

首先要确保启动了Spark集群:

$ cd ~/bigdata/spark-3.1.2
$ ./sbin/start-all.sh

# 使用jps命令查看启动的进程。如果有master和worker进程,说明Spark集群已经启动。

然后启动pyspark shell,并指定--master spark://xueai8:7077参数,以standalone模式运行:

$ ./bin/pyspark --master spark://xueai8:7077
# 在Master URL中指定的xueai8是当前的机器名。

pyspark shell常用命令

1、可以在pyspark shell里面输入python代码进行调试:
2、可以pyspark shell中键入以下命令,查看pyspark shell常用的命令:>>> help()
3、可以help模式下键入模块的名称,查看该模块的使用说明:help> pyspark.sql
4、按q键退出help帮助界面,输入exit()命令,返回shell命令行。

Spark DataFrame数据读写

// 读
spark.read.format("…")[.option("…")].load("…")
# format("…"):指定加载的数据类型,包括"csv"、“jdbc”、“json”、“orc”、“parquet"和"textFile”。
# load("…"):在"csv"、“jdbc”、“json”、“orc”、"parquet"和"textFile"格式下需要传入加载数据的路径。默认加载的是parquet类型的文件。
# option("…"):在"jdbc"格式下需要传入JDBC相应参数,url、user、password和dbtable。

// 写
df.write.mode().format().option(K, V).save(PATH)
# mode, 传入模式字符串可选:append 追加,overwriter 覆盖,ignore 忽略,error 重复就报异常(默认)
# format, 传入格式字符串,可选:text,csv,json,parquet,orc,avro,jdbc
# 注意text源只支持单列df写出
# option 设置属性,如:.option("sep", ",")
# save 写出的路径,支持本地文件和HDFS
# Write text 写出,只能写出一个单列数据
df.select(F.concat_ws("---", "user_id", "movie_id", "rank", "ts")).write.mode("overwrite").format("text").save("../data/output/sql/text")

# Write CSV写出
df.write.mode("overwrite").format("csv").option("sep", ",").option("header", True).save("../data/output/sql/csv")

# Write Json写出
df.write.mode("overwrite").format("json").save("../data/output/sql/json")

# Write Parquet写出
df.write.mode("overwrite").format("parquet").save("../data/output/sql/parquet")

# 不给format,默认以parquet写出
df.write.mode("overwrite").save("../data/output/sql/default")

# kafka
df.write.format("kafka").option("kafka.bootstrap.servers", "10.8.6.231:9092,10.8.6.230:9092,10.8.6.232:9092").option("topic", "shuidi_sifa_digest_history").save()