Spark SQL - xingzhihe/hello-world GitHub Wiki
#spark cluster:
# 192.168.80.1 node-801 4Core 16G slave
# 192.168.80.10 node-8010 4Core 4G slave
# 192.168.80.20 node-8020 4Core 4G master
# 192.168.80.30 node-8030 4Core 4G slave
# 192.168.80.40 node-8040 4Core 8G slave
## for heartbeat timeout
#spark.network.timeout 10000000
#spark.executor.heartbeatInterval 10000000
## for OOM
#spark.memory.fraction 0.75
#spark.storage.memoryFraction 0.45
#loging in master of spark cluster
ssh [email protected]
cd /home/spark-2.1.2-bin-hadoop2.7/bin
#start spark-shell
spark-shell --master spark://192.168.80.20:7077 --jars ../../ebd.api/lib/mysql-connector-java-6.0.5.jar --executor-memory 2G --conf "spark.executor.heartbeatInterval=300000" --conf "spark.network.timeout=300000"
#spark-shell scala>
import org.apache.spark.sql._
val sqlContext = spark.sqlContext
import sqlContext.implicits._
//从mysql中读取数据
val url = "jdbc:mysql://192.168.16.79:3307/prism1?user=root&password=mysql123&characterEncoding=UTF-8&serverTimezone=GMT"
val map = Map("url"-> url,"dbtable"->"company","partitionColumn"->"id", "lowerBound"->"1","upperBound"->"140000000","numPartitions"->"70","driver"->"com.mysql.cj.jdbc.Driver" )
val df = sqlContext.read.format("jdbc").options(map).load()
df.rdd.partitions.size
df.createOrReplaceTempView("aa")
// query data
sqlContext.sql("select id from aa where id < 100").count
res3: Long = 99
//duration 2s
sqlContext.sql("select id from aa where id < 10000").count
res4: Long = 6093
//duration 0.4s
sqlContext.sql("select id from aa where id < 1000000").count
res5: Long = 978349
//duration 10s
sqlContext.sql("select id from aa where id < 10000000").count
res6: Long = 9940023
//duration 5.3 min
sqlContext.sql("select id from aa where id < 50000000").count
res7: Long = 49839203
//duration 29 min
sqlContext.sql("select id from aa where id < 100000000").count
res8: Long = 99699059
//duration 1.2 h
sqlContext.sql("select count(id) from aa").show
+---------+
|count(id)|
+---------+
|133446481|
+---------+
//duration 1.5 h
//save to hdfs
df.write.parquet(s"hdfs://192.168.80.1:9000/teledata/gs/company")
//load data from hdfs company's data
val dfTest = sqlContext.read.parquet("hdfs://192.168.80.1:9000/teledata/gs/company")
println(s"partitions.size=${dfTest.rdd.partitions.size}")
dfTest.createOrReplaceTempView("test")
sqlContext.sql("select id, name, reg_location from test where reg_location like '%大荔%'").collect()
//company's data
val dfCompany = sqlContext.read.parquet("hdfs://192.168.80.1:9000/teledata/gs/company")
dfCompany.createOrReplaceTempView("gs")
sql("select base, count(id) from gs group by base").show
//company_category's data
val dfCompanyCategory = sqlContext.read.parquet("hdfs://192.168.80.1:9000/teledata/gs/company_category")
dfCompanyCategory.createOrReplaceTempView("gsCode")
//join
sql("select gs.id, gs.name, gs.reg_location, gs.estiblish_time,gs.reg_capital, gsCode.category_code from gs left join gsCode on gs.id=gsCode.company_id").show
//annual_report's data
val dfRptAnnual = sqlContext.read.parquet("hdfs://192.168.80.1:9000/teledata/gs/annual_report")
dfRptAnnual.createOrReplaceTempView("rptAnnual")
sql("select * from rptAnnual").show
//tm_info's data
val dfTM = sqlContext.read.parquet("hdfs://192.168.80.1:9000/teledata/gs/tm_info")
dfTM.createOrReplaceTempView("tm")
sql("select applicant_cn, count(id) cnt from tm group by applicant_cn").show
//ent_patent_info's data
val dfPT = sqlContext.read.parquet("hdfs://192.168.80.1:9000/teledata/gs/ent_patent_info")
dfPT.createOrReplaceTempView("pt")
sql("select id,autoid,appnumber,pubnumber,title,applicantname,inventroName from pt").show
sql("select applicantname, count(id) cnt from pt group by applicantname").show
sql("select applicantname, count(id) cnt from pt group by applicantname").rdd.flatMap(row=>row.getString(0).split(",").map((_,row.getLong(1)))).toDF.show(20,false)
// save to mysql
#val url = "jdbc:mysql://192.168.16.79:3307/prism1?user=root&password=mysql123&characterEncoding=UTF-8&serverTimezone=GMT"
val opts = Map("url"-> url,"dbtable"->"company_base","driver"->"com.mysql.cj.jdbc.Driver","batchsize"->"10000","truncate"->"true")
sql("select base, count(id) from gs group by base").write.mode("overwrite").format("jdbc").options(opts).save
val url = "jdbc:mysql://192.168.16.79:3307/prism1?user=root&password=mysql123&characterEncoding=UTF-8&serverTimezone=GMT"
val prop = new java.util.Properties
prop.setProperty("user","root")
prop.setProperty("password","mysql123")
sql("select base, count(id) from gs group by base").write.jdbc(url,"company_base",prop)
#sql("select base, count(id) from gs group by base").write.mode("append").jdbc(url,"company_base",prop)