Parquet - animeshtrivedi/notes GitHub Wiki
Parquet-77 allows ByteBuffer in the code read path.
https://issues.apache.org/jira/browse/PARQUET-77
https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html
http://ingest.tips/2015/01/31/parquet-row-group-size/
You can control row group size by setting parquet.block.size
, in bytes (default: 128MB). Parquet buffers data in its final encoded and compressed form, which uses less memory and means that the amount of buffered data is the same as the row group size on disk[1]. That makes the row group size the most important setting. It controls both:
The amount of memory consumed for each open Parquet file, and The layout of column data on disk The row group setting is a trade-off between these two. It is generally better to organize data into larger contiguous column chunks to get better I/O performance, but this comes at the cost of using more memory.
/**
* OutputFormat to write to a Parquet file
*
* It requires a {@link WriteSupport} to convert the actual records to the underlying format.
* It requires the schema of the incoming records. (provided by the write support)
* It allows storing extra metadata in the footer (for example: for schema compatibility purpose when converting from a different schema language).
*
* The format configuration settings in the job configuration:
* <pre>
* # The block size is the size of a row group being buffered in memory
* # this limits the memory usage when writing
* # Larger values will improve the IO when reading but consume more memory when writing
* parquet.block.size=134217728 # in bytes, default = 128 * 1024 * 1024
*
* # The page size is for compression. When reading, each page can be decompressed independently.
* # A block is composed of pages. The page is the smallest unit that must be read fully to access a single record.
* # If this value is too small, the compression will deteriorate
* parquet.page.size=1048576 # in bytes, default = 1 * 1024 * 1024
*
* # There is one dictionary page per column per row group when dictionary encoding is used.
* # The dictionary page size works like the page size but for dictionary
* parquet.dictionary.page.size=1048576 # in bytes, default = 1 * 1024 * 1024
*
* # The compression algorithm used to compress pages
* parquet.compression=UNCOMPRESSED # one of: UNCOMPRESSED, SNAPPY, GZIP, LZO. Default: UNCOMPRESSED. Supersedes mapred.output.compress*
*
* # The write support class to convert the records written to the OutputFormat into the events accepted by the record consumer
* # Usually provided by a specific ParquetOutputFormat subclass
* parquet.write.support.class= # fully qualified name
*
* # To enable/disable dictionary encoding
* parquet.enable.dictionary=true # false to disable dictionary encoding
*
* # To enable/disable summary metadata aggregation at the end of a MR job
* # The default is true (enabled)
* parquet.enable.summary-metadata=true # false to disable summary aggregation
*
* # Maximum size (in bytes) allowed as padding to align row groups
* # This is also the minimum size of a row group. Default: 0
* parquet.writer.max-padding=2097152 # 2 MB
* </pre>
*
* If parquet.compression is not set, the following properties are checked (FileOutputFormat behavior).
* Note that we explicitely disallow custom Codecs
* <pre>
* mapred.output.compress=true
* mapred.output.compression.codec=org.apache.hadoop.io.compress.SomeCodec # the codec must be one of Snappy, GZip or LZO
* </pre>
*
* if none of those is set the data is uncompressed.
*
* @author Julien Le Dem
*
* @param <T> the type of the materialized records
*/
If you remove all parquet jars then when you run readOnly test spark tries to initialize ParquetOutputCommitter
class.
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/parquet/hadoop/ParquetOutputCommitter
at org.apache.spark.sql.internal.SQLConf$.<init>(SQLConf.scala:243)
at org.apache.spark.sql.internal.SQLConf$.<clinit>(SQLConf.scala)
at org.apache.spark.sql.internal.StaticSQLConf$$anonfun$buildConf$1.apply(SQLConf.scala:930)
at org.apache.spark.sql.internal.StaticSQLConf$$anonfun$buildConf$1.apply(SQLConf.scala:928)
at org.apache.spark.internal.config.TypedConfigBuilder$$anonfun$createWithDefaultString$1.apply(ConfigBuilder.scala:139)
at org.apache.spark.internal.config.TypedConfigBuilder$$anonfun$createWithDefaultString$1.apply(ConfigBuilder.scala:139)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.internal.config.TypedConfigBuilder.createWithDefaultString(ConfigBuilder.scala:139)
at org.apache.spark.internal.config.TypedConfigBuilder.createWithDefault(ConfigBuilder.scala:122)
at org.apache.spark.sql.internal.StaticSQLConf$.<init>(SQLConf.scala:937)
at org.apache.spark.sql.internal.StaticSQLConf$.<clinit>(SQLConf.scala)
at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$sessionStateClassName(SparkSession.scala:962)
at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:111)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:109)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$getOrCreate$5.apply(SparkSession.scala:878)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$getOrCreate$5.apply(SparkSession.scala:878)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:878)
at com.ibm.crail.benchmarks.Main$.main(Main.scala:37)
at com.ibm.crail.benchmarks.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
ParquetOutputCommitter
I can pass as an argument for writing out. Interesting, see SQLConf.scala
val PARQUET_OUTPUT_COMMITTER_CLASS = SQLConfigBuilder("spark.sql.parquet.output.committer.class")
.doc("The output committer class used by Parquet. The specified class needs to be a " +
"subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " +
"of org.apache.parquet.hadoop.ParquetOutputCommitter.")
.internal()
.stringConf
.createWithDefault(classOf[ParquetOutputCommitter].getName)
Parquet options in Spark
val PARQUET_SCHEMA_MERGING_ENABLED = SQLConfigBuilder("spark.sql.parquet.mergeSchema")
.doc("When true, the Parquet data source merges schemas collected from all data files, " +
"otherwise the schema is picked from the summary file or a random data file " +
"if no summary file is available.")
.booleanConf
.createWithDefault(false)
val PARQUET_SCHEMA_RESPECT_SUMMARIES = SQLConfigBuilder("spark.sql.parquet.respectSummaryFiles")
.doc("When true, we make assumption that all part-files of Parquet are consistent with " +
"summary files and we will ignore them when merging schema. Otherwise, if this is " +
"false, which is the default, we will merge all part-files. This should be considered " +
"as expert-only option, and shouldn't be enabled before knowing what it means exactly.")
.booleanConf
.createWithDefault(false)
val PARQUET_BINARY_AS_STRING = SQLConfigBuilder("spark.sql.parquet.binaryAsString")
.doc("Some other Parquet-producing systems, in particular Impala and older versions of " +
"Spark SQL, do not differentiate between binary data and strings when writing out the " +
"Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide " +
"compatibility with these systems.")
.booleanConf
.createWithDefault(false)
val PARQUET_INT96_AS_TIMESTAMP = SQLConfigBuilder("spark.sql.parquet.int96AsTimestamp")
.doc("Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. " +
"Spark would also store Timestamp as INT96 because we need to avoid precision lost of the " +
"nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to " +
"provide compatibility with these systems.")
.booleanConf
.createWithDefault(true)
val PARQUET_CACHE_METADATA = SQLConfigBuilder("spark.sql.parquet.cacheMetadata")
.doc("Turns on caching of Parquet schema metadata. Can speed up querying of static data.")
.booleanConf
.createWithDefault(true)
val PARQUET_COMPRESSION = SQLConfigBuilder("spark.sql.parquet.compression.codec")
.doc("Sets the compression codec use when writing Parquet files. Acceptable values include: " +
"uncompressed, snappy, gzip, lzo.")
.stringConf
.transform(_.toLowerCase())
.checkValues(Set("uncompressed", "snappy", "gzip", "lzo"))
.createWithDefault("snappy")
val PARQUET_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.parquet.filterPushdown")
.doc("Enables Parquet filter push-down optimization when set to true.")
.booleanConf
.createWithDefault(true)
val PARQUET_WRITE_LEGACY_FORMAT = SQLConfigBuilder("spark.sql.parquet.writeLegacyFormat")
.doc("Whether to follow Parquet's format specification when converting Parquet schema to " +
"Spark SQL schema and vice versa.")
.booleanConf
.createWithDefault(false)
val PARQUET_OUTPUT_COMMITTER_CLASS = SQLConfigBuilder("spark.sql.parquet.output.committer.class")
.doc("The output committer class used by Parquet. The specified class needs to be a " +
"subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " +
"of org.apache.parquet.hadoop.ParquetOutputCommitter.")
.internal()
.stringConf
.createWithDefault(classOf[ParquetOutputCommitter].getName)
val PARQUET_VECTORIZED_READER_ENABLED =
SQLConfigBuilder("spark.sql.parquet.enableVectorizedReader")
.doc("Enables vectorized parquet decoding.")
.booleanConf
.createWithDefault(true)
Interesting classes
public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
/**
*
* Context passed to ReadSupport when initializing for read
*
* @author Julien Le Dem
*
*/
public class InitContext
Parquet How to Short Description:
steps to build parquet-tools to use parquet files. Article
often we have need to read the parquet file, parquet-meta data or parquet-footer, parquet tools is shipped with parquet-hadoop library which can help us to read parquet. these are simple steps to build parquet-tools and demonstrate use of it.
prerequisites: maven 3,git, jdk-7/8
// Building a parquet tools
git clone https://github.com/Parquet/parquet-mr.git
cd parquet-mr/parquet-tools/
mvn clean package -Plocal
// know the schema of the parquet file
java -jar parquet-tools-1.6.0.jar schema sample.parquet
// Read parquet file
java -jar parquet-tools-1.6.0.jar cat sample.parquet
// Read few lines in parquet file
java -jar parquet-tools-1.6.0.jar head -n5 sample.parquet
// know the meta information of the parquet file
java -jar parquet-tools-1.6.0.jar meta sample.parquet
The type of commands are which implement
public interface Command {
Options getOptions();
boolean supportsExtraArgs();
public String[] getUsageDescription();
void execute(CommandLine options) throws Exception;
}
interface.
./bin/hadoop jar /home/demo/parquet-tools-1.8.2.jar schema --debug /f1.parquet
parquet-tools-1.8.2.jar
is here : https://github.com/animeshtrivedi/utilities/tree/master/parquet
documentation about various stuff is : https://github.com/apache/parquet-mr/tree/master/parquet-tools
https://github.com/Parquet/parquet-mr/tree/master/parquet-tools
I am trying to run this on crail...
so this also works, even with crail and fully qualified names on crail/hdfs namespaces,
java -cp /home/demo/crail-deployment/hadoop//etc/hadoop/:/home/demo/crail-deployment/hadoop-2.7.3/share/hadoop/common/lib/*:/home/demo/crail-deployment/hadoop-2.7.3/share/hadoop/common/*:/home/demo/crail-deployment/hadoop-2.7.3/share/hadoop/hdfs:/home/demo/crail-deployment/hadoop-2.7.3/share/hadoop/hdfs/lib/*:/home/demo/crail-deployment/hadoop-2.7.3/share/hadoop/hdfs/*:/home/demo/crail-deployment/hadoop-2.7.3/share/hadoop/yarn/lib/*:/home/demo/crail-deployment/hadoop-2.7.3/share/hadoop/yarn/*:/home/demo/crail-deployment/hadoop-2.7.3/share/hadoop/mapreduce/lib/*:/home/demo/crail-deployment/hadoop-2.7.3/share/hadoop/mapreduce/*:/home/demo/crail-deployment/hadoop//contrib/capacity-scheduler/*.jar:/home/demo/crail-deployment/crail//conf:/home/demo/crail-deployment/crail//jars/*:/home/demo/crail-deployment/crail//lib/*:/home/demo/parquet-tools-1.8.2.jar org.apache.parquet.tools.Main cat --debug /f1
To make it work for crail, add the class path in ./bin/hadoop
and java.library.path
(comes from libexec
)
+ HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS -Djava.library.path=$CRAIL_HOME/lib/"
and
+ export CLASSPATH=$CLASSPATH:$CRAIL_HOME/conf:$CRAIL_HOME/jars/*:$CRAIL_HOME/lib/*
then
demo@flex11:~/crail-deployment/hadoop$ ./bin/hadoop fs -ls crail://flex11-40g0:9060/sql/
drwxrwxrwx - stu stu 41984 2017-06-13 09:04 crail://flex11-40g0:9060/sql/f1.parquet
drwxrwxrwx - stu stu 41984 2017-06-13 09:05 crail://flex11-40g0:9060/sql/f2.parquet
-rw-rw-rw- 1 stu stu 34359738368 2017-06-13 09:19 crail://flex11-40g0:9060/sql/local-32gb