Tips and tricks - derlin/bda-lsa-project GitHub Wiki

Tips and Tricks

Spark: set master in sparkContext

For setting the master, there are 4 precedence level (1 to 4 , 1 being the highest priority):

  1. SparkConf set in the application
  2. Properties set using spark-submit
  3. Properties set in a property file, which is given as argument during the job's submission
  4. Default values

So it is very important not to hardcode the master in the code with:

 SparkSession.builder().master("local[*]").getOrCreate()

The problem is, when running inside the JVM, we will get an error "master not set". To overcome it, use the following JVM option:

-Dspark.master="local[*]"

SBT assembly

To create fat jars using SBT, one can use the sbt-assembly plugin. Simply create a assembly.sbt file under the project folder with the following content:

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")

Useful properties in the build.sbt:

 mainClass in Compile := Some("bda.lsa.preprocessing.XmlToParquetWriter") // set main class
 unmanagedJars in Compile += file("lib/spark-corenlp-0.2.0-s_2.11.jar") // add jar "manually" to the final jar

Creating the jar: sbt assembly

SBT OutOfMemoryException : Java Heap Space

You can increase the heap size for sbt with:

export JAVA_OPTS="-Xms256m -Xmx4g"

Extracting just a part of data

from docTexts: Dataset[(String,String)] = (title, content), we can use:

docTexts.randomSplit(Array(0.01, 0.99))(0) // split into two datasets 
docTexts.sample(false, 0.001, 234) // sample a fraction of rows, using a random seed
docTexts.limit(100) // get the 100 first documents, in alphabetic order ... not the best one

Adding id to dataframe rows

The first and cleaner way is to use:

import org.apache.spark.sql.functions._
df.withColumn("id",monotonically_increasing_id)

But be aware that the id will be very large: 8589934592, 8589934593, 8589934594, 8589934595, 8589934596, 8589934597, 8589934598, 8589934599, 8589934600, 8589934601, ...

To get smaller ids, one can use zipWithUniqueId, but unfortunately it is defined only for RDDs:

  import org.apache.spark.sql.types.{StructType, StructField, LongType}
  val schema = df.schema
  val rowsWithId = df.rdd.zipWithUniqueId.map {
    case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)
  }
  spark.sqlContext.createDataFrame(rowsWithId, StructType(StructField("id", LongType, nullable = false) +: df.schema.fields))

More ways are presented in this articles.

For comparison, we ran the two methods over a dataframe of 5371159 rows on the daplab (we used the docTexts dataframe for the full wikipedia set). The monotically_increasing_id took 1.0 min, the zipWithUniqueId took 1.8 min. So it is about a x2 factor (even though less than 2 minutes is acceptable ^^).