Incremental updates in Hive - ignacio-alorre/Hive GitHub Wiki

Incremental Updates

As per now Hive do not provide insert or update operations.

  • The OVERWRITE option requires moving the complete record set from source to Hadoop. While this approach may work for smaller data sets, it may be prohibitive at scale.

  • The APPEND option can limit data movement only to new or updated records. As true Inserts and Updates are not yet available in Hive, we need to consider a process of preventing duplicate records as Updates are appended to the cumulative record set.

Hive Table Definition Options: External, Local and View

  • External Tables: Are the combination of Hive table definitions and HDFS managed folders and files. The table definition exists independent from the data, so that, if the table is dropped, the HDFS folders and files remain in their original state.

  • Local Tables: Are Hive tables that are directly tied to the source data. The data is physically tied to the table definition and will be deleted if the table is dropped.

  • View: Are created on top of Hive tables. Support the same READ interaction as HIVE tables, yet they do not store any data of their own.

The following process outlines a workflow that leverages all of the above in four steps:

  • Ingest: Complete (base_table) Base table movement followed by Change (incremental_table) records only

  • Reconcile: Creating a Single View of Base + Change records (reconcile_view) to reflect the most up-to-date record set.

  • Compact: Creating a Reporting table (reporting_table) from the reconciled view

  • Purge: Replacing the Base table with Reporting table contents and deleting any previously processed Change records before the next Data Ingestion cycle.

The tables and views that will be a part of the Incremental Update Workflow are:

base_table: A HIVE Local table that initially holds all records from the source system. After the initial processing cycle, it will maintain a copy of the most up-to-date synchronized record set from the source. At the end of each processing cycle, it is overwritten by the reporting_table (as explained in the Step 4: Purge).

incremental_table: A HIVE External table that holds the incremental change records (INSERTS and UPDATES) from the source system. At the end of each processing cycle, it is cleared of content (as explained in the Step 4: Purge).

reconcile_view: A HIVE View that combines and reduces the base_table and incremental_table content to show only the most up-to-date records. It is used to populate the reporting_table (as explained in Step 3: Compact).

reporting_table: A HIVE Local table that holds the most up-to-date records for reporting purposes. It is also used to overwrite the base_table at the end of each processing run.

Step 1: Ingest

Depending on whether direct access is available to the RDBMS source system, you may opt for either a File Processing method (when no direct access is available) or RDBMS Processing (when database client access is available). Regardless of the ingest option, the processing workflow in this article requires:

  1. One-time, initial load to move all data from source table to HIVE.
  2. On-going, “Change Only” data loads from the source table to HIVE.

We are going to use Sqoop

Full import [For historical]

sqoop import --connect jdbc:teradata://{host name or ip address}/Database=retail 
--connection-manager org.apache.sqoop.teradata.TeradataConnManager 
--username dbc 
--password dbc 
--table SOURCE_TBL 
--target-dir /user/hive/incremental_table -m 1

Incremental import After the initial import, subsequent imports can leverage SQOOP’s native support for “Incremental Import” by using the “check-column”, “incremental” and “last-value” parameters.

sqoop import --connect jdbc:teradata://{host name or ip address}/Database=retail 
--connection-manager org.apache.sqoop.teradata.TeradataConnManager 
--username dbc 
--password dbc 
--table SOURCE_TBL 
--target-dir /user/hive/incremental_table -m 1
--check-column modified_date 
--incremental lastmodified 
--last-value {last_import_date}

Another option for the incremental case, you can leverage the “query” parameter, and have SQL select statements limit the import to new or changed records only

sqoop import 
--connect jdbc:teradata://{host name or ip address}/Database=retail 
--connection-manager org.apache.sqoop.teradata.TeradataConnManager 
--username dbc 
--password dbc 
--target-dir /user/hive/incremental_table -m 1 
--query 'select * from SOURCE_TBL where modified_date > {last_import_date} AND $CONDITIONS’

Step 2: Reconcile

In order to support an on-going reconciliation between current records in HIVE and new change records, two tables should be defined: base_table and incremental_table

base_table

This table will house the initial, complete record load from the source system. After the first processing run, it will house the on-going, most up-to-date set of records from the source system

CREATE TABLE base_table (
    id string,
    field1 string,
    field2 string,
    field3 string,
    field4 string,
    field5 string,
    modified_date string
)
 ROW FORMAT DELIMITED
 FIELDS TERMINATED BY ','
 LOCATION '/user/hive/base_table';

incremental_table

CREATE EXTERNAL TABLE incremental_table (
    id string,
    field1 string,
    field2 string,
    field3 string,
    field4 string,
    field5 string,
    modified_date string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '/user/hive/incremental_table';

Reconcile_view

This view combines record sets from both the Base base_table and Change incremental_table tables and is reduced only to the most recent records for each unique “id”.

CREATE VIEW reconcile_view AS
SELECT t1.* FROM
(  SELECT * FROM base_table
   UNION ALL SELECT * FROM incremental_table
)t1
JOIN
(  SELECT id, max(modified_date) max_modified FROM
        (  SELECT * FROM base_table
           UNION ALL SELECT * FROM incremental_table) t2 
   GROUP BY id) s 
ON t1.id = s.id AND t1.modified_date = s.max_modified;

Sources