How spark reads parquet - animeshtrivedi/notes GitHub Wiki

How Spark reads a parquet file

  1. From DataSourceScanExec we end up in ParquetFileFormat (not entirely clear how, but makes sense. We need to trace the whole-stage-code-generation path with the batch support, not the other path).

  2. ParquetFileFormat has buildReader function that returns (PartitionedFile => Iterator[InternalRow]) type function. The iterator in this function is generated as val iter = new RecordReaderIterator(parquetReader). Here parquetReader is of type VectorizedParquetRecordReader. The RecordReaderIterator class wraps a Scala iterator (we discuss later how this iterator is consumed, step 4) around Hadoop style RecordReader<K,V>, which is implemented by VectorizedParquetRecordReader (and its base class SpecificParquetRecordReaderBase<Object>).

  3. What does VectorizedParquetRecordReader do? According to the comment in the file," A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the Parquet column APIs. This is somewhat based on parquet-mr's ColumnReader ". After object allocation VectorizedParquetRecordReader, initialize(split, hadoopAttemptContext) and initBatch(partitionSchema, file.partitionValues) functions are called.

    1. initialize calls the initialize function of the super class SpecificParquetRecordReaderBase. Here, the file schema is read, requested schema is inferred, and a reader of type ParquetFileReader is allocated in the base class. By the end of the SpecificParquetRecordReaderBase.initialize(...) we know how many rows are there in the InputFileSplit. This is stored in the totalRowCount variable.
    2. initBatch mainly allocates ColumnarBatch. The key parameter for this class are This is an interesting class. More on this later.
  4. Implementation of the RecordReader interface in VectorizedParquetRecordReader demands more attention, because it what what is called when the iterator wrapper from the step 2 is consumed (where?, we tell later). Upon calling nextKeyValue(), the function first calls resultBatch (which becomes no-op, if columnarBatch is already initialized, which it is, see step 3.ii), and then to nextBatch(). Remember, we always operate in the batch mode (returnColumnarBatch is set to true). The nextBatch function fills (how? we tell later) columnarBatch with data and this variable is returned in getCurrentValue function. getCurrentKey always returns null and is implemented in the base class of SpecificParquetRecordReaderBase.

So, now we know what variable is returned in the iterator. Now from here there are two directions. We first describe how ColumnarBatch is filled with the parquet data. And then we describe who consumes the mysterious iterator that is generated in the beginning, where ColumnarBatch is returned and what happens with it?

How and where ColumnarBatch is filled? In the VectorizedParquetRecordReader.nextBatch() function, if it has not read all the rows, it calls checkEndOfRowGroup() function. checkEndOfRowGroup function then reads a rowGroup(? in parquet class), and then allocates a VectorizedColumnReader object for each requested column in the requestedSchema. VectorizedColumnReader constructor takes a ColumnDescriptor (can be found in the schema) and a PageReader (can be found from the rowGroup (?)). Also missingColumns is a bitmap of missing columns. Then in the nextBatch function readBatch(num, columnarBatch.column(i)); is called on all the VectorizedColumnReader objects allocated before in the checkEndOfRowGroup function (essentially per column). (So, ColumnarBatch and ColumnVector are just raw piece of memory which are used by the VectorizedColumnReader). So here in the readBatch, number of rows is passed and ColumnVector (stored in ColumnarBatch). So what is ColumnVector? We can think of this as an array of types, index by the rowId.

/** 
 * This class represents a column of values and provides the main APIs 
 * to access the data values. It supports all the types and contains 
 * get/put APIs as well as their batched versions. The batched versions 
 * are preferable whenever possible.
 * ...
*/
public abstract class ColumnVector implements AutoCloseable

Anyways, coming back to the point. The raw data is stored in the ColumnVector, which themselves are stored in a ColumnBatch object. ColumnVector is what is passed as the storage space in the readBatch function. Now inside the readBatch function, it first calls readPage() function which see which version of the parquet file we are reading (v1 or v2, I don't know the difference), and then initializes a bunch of objects, namely, defColumn:VectorizedRleValuesReader, repetitionLevelColumn:ValuesReaderIntIterator, definitionLevelColumn:ValuesReaderIntIterator, and dataColumn:VectorizedRleValuesReader. Out of these variables ValuesReaderIntIterator is from parquet-mr, and VectorizedRleValuesReader from spark. From here on, I don't completely understand. There are bunch of read[Type]Batch() functions which are called, which in turn call defColumn.read[Type]s() functions. In these functions on VectorizedRleValuesReader, data is read, de-coded (perhaps from RLE) and then inserted into ColumnVector which is passed here. So much so for this.

How and where the Scala[ColumnBatch] iterator is consumed? Now the second part. The iterator returns two different types based on if reader is in the batch mode or not. The code looks something like this:

@Override
  public Object getCurrentValue() throws IOException, InterruptedException {
    if (returnColumnarBatch) return columnarBatch;
    return columnarBatch.getRow(batchIdx - 1);
  }

columnarBatch is of type ColumnarBatch and columnarBatch.getRow returns a nested class of type ColumnarBatch.Row. Anyways coming back to the point, we are tracking the batch mode. Hence, the value the iterator returns is of type ColumnarBatch. This iterator is somehow passed to the wholestage code generation. The code that consumes the iterator and materializes UnsafeRow is here. So what is happening here? In the scan_nextBatch function we read the new value of the the ColumnarBatch by calling next(). Then we acquire ColumnVectors objects (see scan_colInstance0 and scan_colInstance1 variables). ColumnarBatch tells how many rows it has numRows(), and it just calls ColumnVector objects with get[Type](rowId:Int) to get the final value.

These materialized values are then represented as UnsafeRow with the help of BufferHolder and UnsafeRowWriter objects, allocated as

/* 035 */     scan_result = new UnsafeRow(2);
/* 036 */     this.scan_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(scan_result, 32);
/* 037 */     this.scan_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(scan_holder, 2);

From here on, I am not sure who consumes this org.apache.spark.sql.execution.BufferedRowIterator (the parent class) iterator. There is some hint here, in the WholeStageCodegenExec class

val rdds = child.asInstanceOf[CodegenSupport].inputRDDs()
assert(rdds.size <= 2, "Up to two input RDDs can be supported")
if (rdds.length == 1) {
      rdds.head.mapPartitionsWithIndex { (index, iter) =>
        val clazz = CodeGenerator.compile(cleanedSource)
        val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
        buffer.init(index, Array(iter))
        new Iterator[InternalRow] {
          override def hasNext: Boolean = {
            val v = buffer.hasNext
            if (!v) durationMs += buffer.durationMs()
            v
          }
          override def next: InternalRow = buffer.next()
        }
      }
    }

If I have to speculate this I would say that rdd is already laid out by the operators that implement them. For example, in FileSourceScanExec which implements DataSourceScanExec (which is a trait),

private lazy val inputRDD: RDD[InternalRow] = {
    val readFile: (PartitionedFile) => Iterator[InternalRow] =
      relation.fileFormat.buildReaderWithPartitionValues(
        sparkSession = relation.sparkSession,
        dataSchema = relation.dataSchema,
        partitionSchema = relation.partitionSchema,
        requiredSchema = outputSchema,
        filters = dataFilters,
        options = relation.options,
        hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))

    relation.bucketSpec match {
      case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
        createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
      case _ =>
        createNonBucketedReadRDD(readFile, selectedPartitions, relation)
    }
  }

What are Spark's config for parquet files

https://github.com/animeshtrivedi/notes/wiki/Parquet

see "Parquet options in Spark"

How Spark reads parquet files.

Executor's log that is reading <int, byte[1024]> schema, with 100 rows distributed over 10 files in small.parquet.

  • 17/06/15 15:47:59 2891 INFO ParquetFileReader: Initiating action with parallelism: 5

    This should be increased to some sensible value

Example of Spark parquet benchmark

https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala

Spark code gen for parquet reading

/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator scan_input;
/* 009 */   private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows;
/* 010 */   private org.apache.spark.sql.execution.metric.SQLMetric scan_scanTime;
/* 011 */   private long scan_scanTime1;
/* 012 */   private org.apache.spark.sql.execution.vectorized.ColumnarBatch scan_batch;
/* 013 */   private int scan_batchIdx;
/* 014 */   private org.apache.spark.sql.execution.vectorized.ColumnVector scan_colInstance0;
/* 015 */   private org.apache.spark.sql.execution.vectorized.ColumnVector scan_colInstance1;
/* 016 */   private UnsafeRow scan_result;
/* 017 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder scan_holder;
/* 018 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter scan_rowWriter;
/* 019 */
/* 020 */   public GeneratedIterator(Object[] references) {
/* 021 */     this.references = references;
/* 022 */   }
/* 023 */
/* 024 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 025 */     partitionIndex = index;
/* 026 */     this.inputs = inputs;
/* 027 */     scan_input = inputs[0];
/* 028 */     this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 029 */     this.scan_scanTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 030 */     scan_scanTime1 = 0;
/* 031 */     scan_batch = null;
/* 032 */     scan_batchIdx = 0;
/* 033 */     scan_colInstance0 = null;
/* 034 */     scan_colInstance1 = null;
/* 035 */     scan_result = new UnsafeRow(2);
/* 036 */     this.scan_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(scan_result, 32);
/* 037 */     this.scan_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(scan_holder, 2);
/* 038 */
/* 039 */   }
/* 040 */
/* 041 */   private void scan_nextBatch() throws java.io.IOException {
/* 042 */     long getBatchStart = System.nanoTime();
/* 043 */     if (scan_input.hasNext()) {
/* 044 */       scan_batch = (org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next();
/* 045 */       scan_numOutputRows.add(scan_batch.numRows());
/* 046 */       scan_batchIdx = 0;
/* 047 */       scan_colInstance0 = scan_batch.column(0);
/* 048 */       scan_colInstance1 = scan_batch.column(1);
/* 049 */
/* 050 */     }
/* 051 */     scan_scanTime1 += System.nanoTime() - getBatchStart;
/* 052 */   }
/* 053 */
/* 054 */   protected void processNext() throws java.io.IOException {
/* 055 */     if (scan_batch == null) {
/* 056 */       scan_nextBatch();
/* 057 */     }
/* 058 */     while (scan_batch != null) {
/* 059 */       int numRows = scan_batch.numRows();
/* 060 */       while (scan_batchIdx < numRows) {
/* 061 */         int scan_rowIdx = scan_batchIdx++;
/* 062 */         boolean scan_isNull = scan_colInstance0.isNullAt(scan_rowIdx);
/* 063 */         int scan_value = scan_isNull ? -1 : (scan_colInstance0.getInt(scan_rowIdx));
/* 064 */         boolean scan_isNull1 = scan_colInstance1.isNullAt(scan_rowIdx);
/* 065 */         byte[] scan_value1 = scan_isNull1 ? null : (scan_colInstance1.getBinary(scan_rowIdx));
/* 066 */         scan_holder.reset();
/* 067 */
/* 068 */         scan_rowWriter.zeroOutNullBytes();
/* 069 */
/* 070 */         if (scan_isNull) {
/* 071 */           scan_rowWriter.setNullAt(0);
/* 072 */         } else {
/* 073 */           scan_rowWriter.write(0, scan_value);
/* 074 */         }
/* 075 */
/* 076 */         if (scan_isNull1) {
/* 077 */           scan_rowWriter.setNullAt(1);
/* 078 */         } else {
/* 079 */           scan_rowWriter.write(1, scan_value1);
/* 080 */         }
/* 081 */         scan_result.setTotalSize(scan_holder.totalSize());
/* 082 */         append(scan_result);
/* 083 */         if (shouldStop()) return;
/* 084 */       }
/* 085 */       scan_batch = null;
/* 086 */       scan_nextBatch();
/* 087 */     }
/* 088 */     scan_scanTime.add(scan_scanTime1 / (1000 * 1000));
/* 089 */     scan_scanTime1 = 0;
/* 090 */   }
/* 091 */ }

Some useful links

http://www.slideshare.net/RyanBlue3/parquet-performance-tuning-the-missing-guide https://www.slideshare.net/cloudera/hadoop-summit-36479635

⚠️ **GitHub.com Fallback** ⚠️