Notes - animeshtrivedi/notes GitHub Wiki
override def defaultSize: Int = if (precision <= Decimal.MAX_LONG_DIGITS) 8 else 16
MAX_LONG_DIGITS is 18
So for Decimal(7,2) for store_sales, it is 8
You can specify the schema of a parquet file using:
val sch = new StructType()
.add("randInt", IntegerType)
.add("randLong", LongType)
.add("randDouble", DoubleType)
.add("randFloat", FloatType)
.add("randString", StringType)
val ds = spark.read.schema(sch).parquet("file")
This stops spark from scheduling an extra job/per parquet file to find its schema. When to launch job is controller by spark.sql.sources.parallelPartitionDiscovery.threshold
. If number of files are more than the threshold (default: 32) then a new job is launched otherwise the spark driver handles the enumeration.
Spark does not complain if it cannot match the schema from the raw data to the source. For example, I can read IntWithPayload
schema as ParquetExample
. However, data is completely null. This even happens if it cannot match the column names. For example, I had <randInt:Int, payload:Binary>
for IntWithPayload
schema. Here is the output :
+-------+--------------------+
|randInt| payload|
+-------+--------------------+
| null|[61 6D 39 6A 4B 7...|
| null|[61 6D 39 6A 4B 7...|
| null|[61 6D 39 6A 46 7...|
| null|[61 6D 39 6A 46 7...|
| null|[50 6D 39 6A 46 7...|
| null|[50 55 39 6A 2B 7...|
| null|[50 55 62 2E 2B 7...|
| null|[50 55 62 2E 2B 7...|
| null|[50 43 48 6D 2B 7...|
| null|[3E 43 48 6D 2B 7...|
| null|[3E 63 48 6D 2B 7...|
| null|[3E 63 48 53 2B 7...|
| null|[73 63 48 33 4A 7...|
| null|[73 63 48 33 4A 7...|
| null|[73 63 48 33 5C 7...|
| null|[73 63 50 33 5C 3...|
| null|[73 63 50 33 5C 3...|
| null|[73 63 50 33 5C 3...|
| null|[73 63 73 55 5C 7...|
| null|[73 63 52 55 48 7...|
+-------+--------------------+
verses when I don't specify the schema or put it as <intIndex:Int, payload:Binary>
+----------+--------------------+
| intIndex| payload|
+----------+--------------------+
|1686114644|[61 6D 39 6A 4B 7...|
|2042014588|[61 6D 39 6A 4B 7...|
| 414404205|[61 6D 39 6A 46 7...|
|1001952118|[61 6D 39 6A 46 7...|
| 851110484|[50 6D 39 6A 46 7...|
| 717552328|[50 55 39 6A 2B 7...|
|1566615353|[50 55 62 2E 2B 7...|
|1285175887|[50 55 62 2E 2B 7...|
| 275781627|[50 43 48 6D 2B 7...|
| 982670165|[3E 43 48 6D 2B 7...|
| 328403863|[3E 63 48 6D 2B 7...|
| 954097612|[3E 63 48 53 2B 7...|
| 661051566|[73 63 48 33 4A 7...|
|1290825592|[73 63 48 33 4A 7...|
| 33002434|[73 63 48 33 5C 7...|
| 268495412|[73 63 50 33 5C 3...|
|1500530103|[73 63 50 33 5C 3...|
| 223210993|[73 63 50 33 5C 3...|
|1896190602|[73 63 73 55 5C 7...|
| 338448511|[73 63 52 55 48 7...|
+----------+--------------------+
Here is the call stack of who calls sorting and who consumes the iterator.
The sorter is then wrapper into an AbstractScalaRowIterator
by UnsafeExternalRow
sorter in public Iterator<UnsafeRow> sort() throws IOException
function. This is the iterator which is consumed by the join.
In File write out is done in FileFormatWriter
class where the execute
function consumes items produced by the join iterator.
In this FileFormatWriter
the first hasNext() trigger the backward calls to fetch data over the network, sort them, and prepare the join iterator. Hence the time for this iterator is total time of the task. Here is an example line :
INFO FileFormatWriter: THE JOIN TOOK : 5306200 usec [1st: 3187147 + 2119052 usec ] processed 800534
(need to be verified) http://stackoverflow.com/questions/37487318/spark-sql-broadcast-hash-join
import org.apache.spark.sql.functions.broadcast
val smallDF: DataFrame = ???
val largeDF: DataFrame = ???
largeDF.join(broadcast(smallDF), Seq("foo"))
Also smallDF.join(largeDF) does not do a broadcast hash join, but largeDF.join(smallDF) does.
It is generated at the driver. So check its logs.