Sparksql - RatneshKumarSrivastava/Ratnesh GitHub Wiki
http://apachesparkbook.blogspot.in/2015/11/foreach-example.html
There are several ways to interact with Spark SQL including SQL and the Dataset API.
****Hive table :
However, since Hive has a large number of dependencies, these dependencies are not included in the default Spark distribution. If Hive dependencies can be found on the classpath, Spark will load them automatically. Note that these Hive dependencies must also be present on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries (SerDes) in order to access data stored in Hive.
Spark session : The entry point to programming Spark with the Dataset and DataFrame API.
Spark session : A unified entry point for manipulating data with Spark. β¦ Beyond a time-bounded interaction, SparkSession provides a single point of entry to interact with underlying Spark functionality and allows programming Spark with DataFrame and Dataset APIs.
-ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
why sparksql β-Hive can not handle encrypted data.Because of in memory computation(memory) ,saprk is faster so sparksql is also faster.
spark sql can not handle unstructured data.
UDF
val dataset = Seq((0,βHelloβ),(1,βworldβ)).toDF(βidβ,βtextβ))
val upper :(function to convert string into Uppercase)
import org.apache.spark.sql.functions.udf
val upperUDF = udf(upper)
dataset.withColumn(βupperβ,upperUDF(βtextβ)).show
-βββββββββββββββββββββββββ-Starting Up Spark Shell β Spark sessionββββββββ
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName(βSpark sql basicβ).config(βspark.some.config.optionβ,βsome-valueβ).getorCreate()
//importing implicit class into our βsparkβ session
import spark.implicits._
//create dataframe
val df =spark.read.json("")
df.show()
after creeating dataframe, we will create temp View df.CreateOrReplaceTempView(βemployeeβ)
dataframe = spark.sql(βselect * from employee where age is between 19 to 20β)
-ββββββββββββββββββββββββββ
rdd
employeedf = spark.createDataFrame(rdd,scema)
result = spark.sql("")
result.map(attributes => "Name: " +attributes(0).show())
-βββββββββββββββββββββββββJSON Parquet Fileβββββββ
//write
employeeDF.write.parquet(βemployee.parquetβ)
-ββββββββββββββββββββββββ-Hive Tables- case class & Spark Session -βββββββββ-
//importing βRowβ class and spark session into spark shell
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
//creating class βRecordβ with attributes int and string
case class Record (key= Int, value= String)
// setting the location of βwarehouselocationβ to spark session
val warehouselocation = βspark-warehouseβ
// we now build a Sparksession βsparkβ to demostrete Hive Example in spark sql
val spark = SparkSession.builder().appName("").config.(βspark.sql.warehouse.dirβ,warehouselocation).enablehivesupport().getorCreate()
//importing implicit class and sql library into the shell
import spark.implicits._
import spark.sql
val sqlDF =sql("")
// creating dataset using dataframe(sqlDF)
val stringDS = sqlDF.map{case Row(key: int, value: string) => s"key: $key, Value: $value"}
stringDS.show()
-ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Iβve got this JSON file
which has been obtained with Python json.dump method. Now, I want to read this file into a DataFrame in Spark, using pyspark. Following documentation, Iβm doing this
sc = SparkContext()
sqlc = SQLContext(sc)
df = sqlc.read.json(βmy_file.jsonβ)
print df.show()
The print statement spits out this though:
-ββββββ
|_corrupt_record|
-ββββββ
| {|
| βaβ: 1, |
| βbβ: 2|
| }|
-ββββββ
Anyone knows whatβs going on and why it is not interpreting the file correctly?
ans : {"a" :1 , βbβ: 2}
{"a" : 1, βbβ: 2}
file should be in this format
-βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
#load data from text file to dataframe though rdd
rdd = sc.textFile(βname1/khatak.csvβ)
rdd1 = rdd.map(lambda x :x.split(β,β))
people = rdd1.map(lambda x : Row(id = int(x0),first_name = x1,ip_address = x2,date = x3))
schema = sqlContext.createDataFrame(peopleβ)
schemaregisterTempTable(βschemaβ)
tee= sqlContext.sql(βselect * from people where id in (1,2,3,4,5))
-βββββββββββββββββββββββββββββββββββββββββββββ
By default saveAsTable will create a βmanaged tableβ, meaning that the location of the data will be controlled by the metastore. Managed tables will also have their data deleted automatically when a table is dropped.
-βββββββββββββββββββββββββββββββββββββββββββββ
Parquet files are self-describing so the schema is preserved.
-ββββββββββββββββββββββββββββββββββ-
/////Register the function as a UDF ////ββ
def squared(s):
return s * s
sqlContext.udf.register(βsquaredWithPython", squared)
optionally, you can also explicitly set the return type of your UDF.
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
sqlContext.udf.register(βsquaredWithPythonβ, squared, LongType())
////Call the UDF in Spark SQL
sqlContext.range(1, 20).registerTempTable(βtestβ)
///Use UDF with DataFrames
from pyspark.sql.functions import udf
squared_udf = udf(squared, LongType())
df = sqlContext.table(βtestβ)
display(df.select(βidβ, squared_udf(βidβ).alias(βid_squaredβ)))
sqlContext.tableNames() // describes all table
Repartition vs. Coalesce
Posted on August 11, 2015 by Bo Zhang
Repartition and Coalesce are 2 RDD methods since long ago. However for DataFrame, repartition was introduced since Spark 1.3 and coalesce was introduced since Spark 1.4.
Both of them are actually changing the number of partitions where the data stored (as RDD). According to either RDD document or DataFrame document, the repartition is actually shuffle the original partitions and repartition them, while coalesce will just combine original partitions to the new number of partitions. In that sense, coalesce will only reduce the number of partitions.
Since shuffling could be very costly, if reduce the number of partitions is what you really want to do, please consider use coalesce. For example, as mentioned in an earlier post, you may need to reduce the number of partitions before join operation. In that case, coalesce is a better choice than repartition, although in the earlier post example, since the data itself is quite small, repartition does not cause too much delay.
If you are using Spark 1.3, you can still take advantage of coalesce, although it has to be the RDD version. Here is the workaround for Spark 1.3,
1
2
val rdd = df.rdd.coalesce(16)
val resDF = df.sqlContext.createDataFrame(rdd, df.schema)
which is equivalent to the following in Spark 1.4,
1
val resDF = df.coalesce(16)
-βββββββββββββββββββββββββββββββββββββββββββββββββ
Global Temporary View
Temporary views in Spark SQL are session-scoped and will disappear if the session that creates it terminates. If you want to have a temporary view that is shared among all sessions and keep alive until the Spark application terminates, you can create a global temporary view. Global temporary view is tied to a system preserved database global_temp, and we must use the qualified name to refer it, e.g. SELECT * FROM global_temp.view1.