Optimization Techniques - ignacio-alorre/Hive GitHub Wiki

Possible Optimization Techniques

  • 1. Execution Engine
  • 2. Usage of Suitable File format
  • 3. Partitioning
  • 4. Bucketing
  • 5. Use of Indexing
  • 6. Enable Compression in Hive
  • 7. Optimize Joins
  • 8. Avoid Global Sorting in Hive
  • 9. Optimize LIMIT operator
  • 10. Enable Parallel Execution
  • 11. Enable Map.Reduce Strict Mode
  • 12. Single Reduce for Multi GROUP BY
  • 13. Controls Parallel Reduce Tasks
  • 14. Enable Vectorization
  • 15. Enable Cost Based Optimization
  • Extra. Tuning up Hive Performance from an Administrative level

1. Execution Engine

Using your execution engine as tez will increase the performance of your hive query. Tez is a client-side library framework built on Hadoop Yarn, which can execute complex directed acyclic graphs of general data processing tasks. In many ways, it can be considered to be a much more flexible and powerful successor of the map-reduce framework.

To understand better how Tex helps in optimizing the jobs, we will first look into the processing sequence of a MapReduce Job:

  • The Mapper function reads data from the file system, processes it into Key-Value Pairs and stores temporarily on the local disk. These Key-value pairs, grouped on the key values, are sent to the reducers over the network.

  • On nodes where Reducers are to be run, the data is received and is saved on the local disk and waits for the data from all the mappers to arrive. Then, the entire set of values for a key is read into a single reducer, processed and further writes the output which is then further replicated based on the configuration.

  • As you can notice a whole lot of unnecessary read/write overhead is involved in a single MapReduce job. Multiple MapReduce jobs are run to accomplish a single Hive query and all outputs of the MapReduce Jobs are first written in the DFS and then transferred to nodes, and the cycle is repeated since there is no coordination between two MapReduce jobs.

mr

Apache Tez optimizes it by not breaking a Hive-query in multiple MapReduce Jobs. Since, Tez is a client-side library, to orchestrate the processing of MapReduce Jobs. Tez optimizes the jobs using the steps like the following:

  • Skipping the DFS write by the reducers and piping the output of a reducer directly in the subsequent Mapper as input.
  • Cascading a series of Reducers without intervening Mapper steps.
  • Re-use of containers for successive phases of processing.
  • Optimal Resource usage using Pre-warmed containers.
  • Cost-based Optimizations.
  • Vectorized Query Processing.
tez

More About Tez

2. Usage of Suitable File format

ORC file format is best suitable for increasing your query performance. It can store data in a more optimized way than other file formats. ORC reduces the size of the original data up to 75%. As a result the speed of data processing also increases.

An ORC file contains rows in groups called as Stripes along with a file footer. ORC format improves the performance when Hive is processing the data.

orcCompr

Create Tables with ORC File Format

We can create new hive table with ORC file format with just by adding STORED AS ORC clause to CREATE TABLE command in hive. Optionally we can provide compression techniques in TBLPROPERTIES clause.

Converting Existing Tables to ORC

Create a table with the same schema as the source table and STORED AS ORC, then we can submit below command to copy data from regular old table new ORC formatted table.

CREATE TABLE EMP_ORC (id int, name string, age int, address string)
STORED AS ORC tblproperties (“orc.compress" = “SNAPPY”);
INSERT OVERWRITE TABLE EMP_ORC SELECT * FROM EMP;

3. Partitioning

Without partitioning Hive reads all the data in the directory and applies the query logic on it, which can be slow and expensive. However, very often users just need to query a subset of the entire directory, for example rows from a particular department/country or from a particular date. To speed up this sort of queries Hive offers partitions.

When to use Hive Partitioning: When any user wants data contained within a table to be split across multiple sections in hive table. The entries for the various columns of dataset are segregated and stored in their respective partition. When query those datasets, only the required partitions of the table will be retrieved, reducing the execution time.

4. Bucketing

Sometimes partitioning is not enough, and one partition remains huge and expensive to handle. To overcome this problem, bucketing allows us to divide table data sets into more manageable parts. The Bucketing concept is based on Hash function, which depends on the type of the bucketing column.

5. Use of Indexing

Indexing will create a separate table called index table which acts as a reference for the original one.

If we want to perform queries only on some columns without indexing the queries will be executed on all the columns present in the table.

The major advantage, when we perform a query on a table that has an index, there is no need for the query to scan all the rows in that table?? It checks the index first and then goes to particular column and performs the opertion.

So if we maintain indexes, it will be easier for Hive query to look into the indexes first and then perform the needed operations within less amount of time.

6. Enable Compression in Hive

Compression techniques reduce the amount of data being transferred and so reduces the data transfer between mappers and reducers.

For better result, you need to perform compression at both mapper and reducer side separately. Although gzip is considered as the best compression format but beware that it is not splittable and so should be applied with caution.

Other formats are snappy, lzo, bzip, etc. You can set compression at mapper and reducer side using codes below-

set mapred.compress.map.output = true;
set mapred.output.compress= true;

Also, the compressed file should not be more than few hundred MBs else it may impact the jobs

More about this

7. Optimize Joins

Auto Map Joins

Auto Map-Join is a very useful feature when joining a big table with a small table (By default hive.mapjoin.smalltable.filesize value is 25MB). if we enable this feature, the small table will be saved in the local cache on each node, and then joined with the big table in the Map phase. Enabling Auto Map Join provides two advantages. First, loading a small table into cache will save read time on each data node. Second, it avoids skew joins in the Hive query, since the join operation has been already done in the Map phase for each block of data.

Also with hive.auto.convert.join.noconditionaltask. When 3 or more tables are involved in the join condition, using hive.auto.convert.join hive generates three or more map-side joins with an assumption that all tables are of smaller size. Using hive.auto.convert.join.noconditionaltask, you can combine three or more map-side joins into a single map-side join if size of n-1 table is less than 10 MB. (This rule is defined by hive.auto.convert.join.noconditionaltask.size)

Outer joins are not always converted to map joins

  • Full outer joins are never converted to map-side joins
  • A left-outer join are converted to a map join only if the right table that is to the right side of the join conditions, is lesser than 25 MB in size.
  • Similarly, a right-outer join is converted to a map join only if the left table size is lesser than 25 MB.

Enabling the Auto Map-Join feature [2 Options]

Setting flags

set hive.auto.convert.join=true
set hive.auto.convert.join.noconditionaltask=true
set hive.auto.convert.join.noconditionaltask.size=10000000

hive-site.xml

  <property>
    <name>hive.auto.convert.join</name>
    <value>true</value>
    <description>Whether Hive enables the optimization about converting common join into mapjoin based on the input file size</description>
  </property>
  <property>
    <name>hive.auto.convert.join.noconditionaltask</name>
    <value>true</value>
    <description>
      Whether Hive enables the optimization about converting common join into mapjoin based on the input file size. 
      If this parameter is on, and the sum of size for n-1 of the tables/partitions for a n-way join is smaller than the
      specified size, the join is directly converted to a mapjoin (there is no conditional task).
    </description>
  </property>
  <property>
    <name>hive.auto.convert.join.noconditionaltask.size</name>
    <value>10000000</value>
    <description>
      If hive.auto.convert.join.noconditionaltask is off, this parameter does not take affect. 
      However, if it is on, and the sum of size for n-1 of the tables/partitions for a n-way join is smaller than this size, 
      the join is directly converted to a mapjoin(there is no conditional task). The default is 10MB
    </description>
  </property>
  <property>
    <name>hive.auto.convert.join.use.nonstaged</name>
    <value>false</value>
    <description>
      For conditional joins, if input stream from a small alias can be directly applied to join operator without 
      filtering or projection, the alias need not to be pre-staged in distributed cache via mapred local task.
      Currently, this is not working with vectorization or tez execution engine.
    </description>
  </property>

Skew Joins

We can enable optimization of skew joins, i.e. imbalanced joins by setting hive.optimize.skewjoin property to true. Below are the list of properties that can be fine tuned to better optimize the skew joins in hive-site.xml.

  <property>
    <name>hive.optimize.skewjoin</name>
    <value>true</value>
    <description>
      Whether to enable skew join optimization. 
      The algorithm is as follows: At runtime, detect the keys with a large skew. Instead of
      processing those keys, store them temporarily in an HDFS directory. In a follow-up map-reduce
      job, process those skewed keys. The same key need not be skewed for all the tables, and so,
      the follow-up map-reduce job (for the skewed keys) would be much faster, since it would be a
      map-join.
    </description>
  </property>
  <property>
    <name>hive.skewjoin.key</name>
    <value>100000</value>
    <description>
      Determine if we get a skew key in join. If we see more than the specified number of rows with the same key in join operator,
      we think the key as a skew join key. 
    </description>
  </property>
  <property>
    <name>hive.skewjoin.mapjoin.map.tasks</name>
    <value>10000</value>
    <description>
      Determine the number of map task used in the follow up map join job for a skew join.
      It should be used together with hive.skewjoin.mapjoin.min.split to perform a fine grained control.
    </description>
  </property>
  <property>
    <name>hive.skewjoin.mapjoin.min.split</name>
    <value>33554432</value>
    <description>
      Determine the number of map task at most used in the follow up map join job for a skew join by specifying 
      the minimum split size. It should be used together with hive.skewjoin.mapjoin.map.tasks to perform a fine grained control.
    </description>
  </property>

Enable Bucketed Map joins

If tables are bucketed by a particular column and these tables are being used in joins then we can enable bucketed map join to improve the performance

  <property>
    <name>hive.optimize.bucketmapjoin</name>
    <value>true</value>
    <description>Whether to try bucket mapjoin</description>
  </property>
  <property>
    <name>hive.optimize.bucketmapjoin.sortedmerge</name>
    <value>true</value>
    <description>Whether to try sorted bucket merge map join</description>
  </property>

8. Avoid Global Sorting in Hive

Global Sorting can be achieved in Hive with ORDER BY clause, but this comes with a drawback. ORDER BY produces a result by setting the number of reducers to one, making it very inefficient for large datasets.

When a globally sorted results is not required, then we can use SORT BY clause. SORT BY produces a sorted file per reducer.

If we need to control which reducer a particular row goes to, we can use DISTRIBUTE BY clause, for example:

SELECT id, name, salary, dept FROM employee
DISTRIBUTE BY dept
SORT BY id ASC, name DESC;

Now each dept will be processed separately by a reducer and records will be sorted by id and name fields within each dept separately

9. Optimize LIMIT operator

By default LIMIT operator still executes the entire query, then only returns a limited results. Because this behavior is generally wasteful, it can be avoided by setting below properties

<property>
    <name>hive.limit.optimize.enable</name>
    <value>true</value>
    <description>Whether to enable to optimization to trying a smaller subset of data for simple LIMIT first.</description>
  </property>
  <property>
    <name>hive.limit.row.max.size</name>
    <value>100000</value>
    <description>When trying a smaller subset of data for simple LIMIT, how much size we need to guarantee each row to have at least.</description>
  </property>
  <property>
    <name>hive.limit.optimize.limit.file</name>
    <value>10</value>
    <description>When trying a smaller subset of data for simple LIMIT, maximum number of files we can sample.</description>
  </property>
  <property>
    <name>hive.limit.optimize.fetch.max</name>
    <value>50000</value>
    <description>
      Maximum number of rows allowed for a smaller subset of data for simple LIMIT, if it is a fetch query. 
      Insert queries are not restricted by this limit.
    </description>
  </property>

10. Enable Parallel Execution

Hive converts a query into one or more stages. Stages could be: MapReduce stage, Sampling stage, a Merge stage, a Limit stage. By default, Hive executes these stages one at a time. A particular job may consist of some stages that are not dependent on each other and could be executed in parallel. Allowing the overall job to complete quicker. Parallel execution can be enabled by setting below properties.

  <property>
    <name>hive.exec.parallel</name>
    <value>true</value>
    <description>Whether to execute jobs in parallel</description>
  </property>
  <property>
    <name>hive.exec.parallel.thread.number</name>
    <value>8</value>
    <description>How many jobs at most can be executed in parallel</description>
  </property>

11. Enable Map-Reduce Strict Mode

We can enable mapreduce stric mode by setting below property to strict

<property>  
  <name>hive.mapred.mode</name>
  <value>nonstrict</value>
  <description> 
      The mode in which the Hive operations are being performed. 
      In strict mode, some risky queries are not allowed to run. They include:
        Cartesian Product.
        No partition being picked up for a query.
        Comparing bigints and strings.
        Comparing bigints and doubles.
        Orderby without limit.
   </description>
</property>   

12. Single Reduce for Multi GROUP BY

By enabling single reducer task for multi group by operations, we can combine multiple GROUP BY operations in a query into a single MapReduce job.

<property>
    <name>hive.multigroupby.singlereducer</name>
    <value>true</value>
    <description>
      Whether to optimize multi group by query to generate single M/R  job plan. If the multi group by query has 
      common group by keys, it will be optimized to generate single M/R job.
    </description>
</property>

13. Controls Parallel Reduce Tasks

We can control the number of parallel reduce tasks that can be run for a given hive query with below properties

  <property>
    <name>hive.exec.reducers.bytes.per.reducer</name>
    <value>256000000</value>
    <description>size per reducer.The default is 256Mb, i.e if the input size is 1G, it will use 4 reducers.</description>
  </property>
  <property>
    <name>hive.exec.reducers.max</name>
    <value>1009</value>
    <description>
      max number of reducers will be used. If the one specified in the configuration parameter mapred.reduce.tasks is
      negative, Hive will use this one as the max number of reducers when automatically determine number of reducers.
    </description>
  </property>

We can also set the parallel reduce tasks to a fixed value with below property.

set mapred.reduce.tasks=32;

14. Enable Vectorization

By vectorized query execution, we can improve performance of operations like 'scans', 'aggregations', 'filters' and 'joins', by performing them in batches of 1024 rows at once instead of single row each time. We can enable vectorized query execution by setting below three properties:

set hive.vectorized.execution.enabled = true;
set hive.vectorized.execution.reduce.enabled = true;
set hive.vectorized.execution.reduce.groupby.enabled = true;

15. Enable Cost Based Optimization

Recent Hive releases provided the feature of cost based optimization, one can achieve further optimizations based on query cost, resulting in potentially different decisions: how to order joins, which type of join to perform, degree of parallelism and others. Hive optimizes each query’s logical and physical execution plan before submitting for final execution. These optimizations are not based on the cost of the query

Cost based optimization can be enabled by setting below properties in hive-site.xml file.

set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;

Or in hive-site.xml

<property>
    <name>hive.cbo.enable</name>
    <value>true</value>
    <description>Flag to control enabling Cost Based Optimizations using Calcite framework.</description>
  </property>
  <property>
    <name>hive.compute.query.using.stats</name>
    <value>true</value>
    <description>
      When set to true Hive will answer a few queries like count(1) purely using stats
      stored in metastore. For basic stats collection turn on the config hive.stats.autogather to true.
      For more advanced stats collection need to run analyze table queries.
    </description>
  </property>
  <property>
    <name>hive.stats.fetch.partition.stats</name>
    <value>true</value>
    <description>
      Annotation of operator tree with statistics information requires partition level basic
      statistics like number of rows, data size and file size. Partition statistics are fetched from
      metastore. Fetching partition statistics for each needed partition can be expensive when the
      number of partitions is high. This flag can be used to disable fetching of partition statistics
      from metastore. When this flag is disabled, Hive will make calls to filesystem to get file sizes
      and will estimate the number of rows from row schema.
    </description>
  </property>
  <property>
    <name>hive.stats.fetch.column.stats</name>
    <value>true</value>
    <description>
      Annotation of operator tree with statistics information requires column statistics.
      Column statistics are fetched from metastore. Fetching column statistics for each needed column
      can be expensive when the number of columns is high. This flag can be used to disable fetching
      of column statistics from metastore.
    </description>
  </property>
  <property>
    <name>hive.stats.autogather</name>
    <value>true</value>
    <description>A flag to gather statistics automatically during the INSERT OVERWRITE command.</description>
  </property>
  <property>
    <name>hive.stats.dbclass</name>
    <value>fs</value>
    <description>
      Expects one of the pattern in [jdbc(:.*), hbase, counter, custom, fs].
      The storage that stores temporary Hive statistics. In filesystem based statistics collection ('fs'), 
      each task writes statistics it has collected in a file on the filesystem, which will be aggregated 
      after the job has finished. Supported values are fs (filesystem), jdbc:database (where database 
      can be derby, mysql, etc.), hbase, counter, and custom as defined in StatsSetupConst.java.
    </description>
  </property>

Then gather basic statistics about all columns in an employee table with below command in hive shell.

ANALYZE TABLE employee COMPUTE STATISTICS FOR COLUMNS;
ANALYZE TABLE employee COMPUTE STATISTICS FOR COLUMNS id, dept;

16. Avoiding unnecassary JOINs

While using the aggregate functions in HIVE(like AVG, SUM, MAX, and etc), sometimes we tend to use JOINs unnecessarily to get the desired output. Instead, we can use windowing functions wherever it is required.

Consider Using functions like STREAMABLE wherever it is applicable.

Extra. Tuning up Hive Performance from an Administrative level

https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.0/performance-tuning/hive_performance_tuning.pdf

Sources

⚠️ **GitHub.com Fallback** ⚠️