Hive Patterns to Handle Incremental Data - ignacio-alorre/Hive GitHub Wiki
Typically the data ingestion process involves following scenarios to add new set of data to the Hadoop layer.
-
Complete Load: In this method, entire table/partition is truncated and then added with new full set of data.
-
Append Load: New data is appended to existing data for each batch of data refresh.
-
Insert or Update Ingestion: In this method, data with new key is inserted to the table, whereas if the relevant key already exists in partition/table then record is updated with latest info.
For the demonstration we will use a table named Orders. Orders table is in JSON format and contains below fields:
ID
Order_ID
Product_ID
Product_Name
Quanity
Product_Price
Customer_Id
Customer_Name
Order_Date
Step 1: Create Hive Stage
First step in creating data pipelines is preparing and ingesting data to stage table. Data from source system is fetching using Hadoop API and then stored in HDFS stage location. An external stage table is created pointing to this location.
In the following examples, the stage table for order
data is defined in "/user/dks/datalake/orders_st", is HDFS landing location where incremental data is stored.
CREATE EXTERNAL TABLE IF NOT EXISTS orders_stg (
id string,
customer_id string,
customer_name string,
product_id string,
product_name string,
product_price string,
quantity string,
order_date string,
src_update_ts string
)ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' LOCATION '/user/dks/datalake/orders_stg';
-- source data location
Step 2: Create final table
In this step will create a hive managed table which holds the final data. Table is stored in ORC format and partitioned by order_date.
Here is the DML of final table:
CREATE TABLE IF NOT EXISTS orders (
id bigint,
customer_id string,
customer_name string,
product_id int,
product_name string,
product_price decimal(12,2),
quantity int,
src_update_ts timestamp,
src_file string,
ingestion_ts timestamp
)
PARTITIONED BY (order_date date)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION '/user/dks/datalake/orders';
Now table structure has been defined, let's go through each incremental ingestion option
Step 3: Incremental Ingestion Types
Once the stage and final tables are created, next is determine best suitable ingestion scenario that works for you ingestion. You need to consider source data format, volume, velocity of data flow, access criteria, etc.. to find ingestion type that meets your requirement.
Note that orders table is partitioned table, so ingestion will be performed at partition level.
1. Complete/Full Load
In this approach overlapping partitions are dropped completely and recreated with new set of data. This method is typically used when incremental stage table holds corresponding full partition data.
Set the following hive configurations.
set hive.support.quoted.identifiers=none;
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
Query for complete load
with new_data as(select * from orders_stg)
insert overwrite table orders partition(order_date)
select `(order_date)?+.+`, input__file__name, current_timestamp as ingestion_ts, cast(order_date as date) as part from new_data
In the above query, Hive command “insert overwrite” drops the existing partition and then recreated with new set of data from stage table.
Hive maintains the data integrity by acquiring shared lock on the ‘orders” table and exclusive lock on partition that is being updated.
2. Append Load
In append load new data is appended to the existing partition or table.This approach is typically used when incremental data contains only new data without any redundancy.
This can be achieved using two ways. Lets go through each one. In the first approach “Append Load” is done by overwriting the existing partition by merging old and new data. Advantage with method it avoids creating small files if there are frequent appends.
Here is the query for this method:
-- Use the same hive configuration used for Complete load
with new_data as(select * from orders_stg)
insert overwrite table orders partition(order_date)
(select * from orders where order_date in
(select distinct order_date from new_data) --existing data
Union all
(select `(order_date)?+.+`, input__file__name, current_timestamp as ingestion_ts, cast(order_date as date) as part from new_data )
Second approach is simply add the new data to existing partition.
-- Use the same hive configuration used for Complete load
with new_data as(select * from orders_stg)
insert into table orders partition(order_date)
select `(order_date)?+.+`, input__file__name, current_timestamp as ingestion_ts, cast(order_date as date) as part from new_data
3. Insert or Update Ingestion
If the requirement for data ingestion is overwrite or update existing data with incoming data then Insert/Update method is preferred.
New data is added or updated to the existing partition/table based on a row key. Data is appended to the table if the record contains new “key” otherwise existing record is updated with latest information.
This option is only possible when a row in the table can be identified uniquely using one or more columns. In orders table, “ID” field is row key. src_update_ts field indicates record time stamp.
Let me walk you through two different techniques to apply insert or update ingestion.
Method 1:
Use over()
function to find out latest record that needs to be updated/inserted.
insert overwrite table orders partition(order_date)
select `(rank_no)?+.+` from (
(
select *, row_number() over(partition by order_date,ID order by src_update_ts desc) rank_no
from(
select * from orders where order_date in (select distinct order_date from new_data)
Union
select `(order_date)?+.+`, input__file__name, current_timestamp as ingestion_ts, cast(order_date as date) as part from new_data
) update_data
where rank_no=1 distribute by order_date order by ID
))
--rank_no=1 indicates recent record
--`(order_date)?+.+` : This selects all the columns except order_date
Method 2:
In this method, a temporary table is created by merging data from both original and incremental tables. Next data in temporary table is filtered to select only the records with latest timestamp.
Last step is original table is dropped and temporary table is renamed with original table name.
This approach is more suitable non-partition tables. For example if orders table is not partitioned. Here would be the DML
- Step1=> RECONCILE VIEW: This view combines record set from both original table and incremental data.
With new_data as (select * from orders_stg)
create view recon_orders as (
select final_data.* from
(
select * from orders where order_date
Union all
select `(order_date)?+.+`, input_file_name, current_timestamp as ingestion_ts, cast(order_date as date) as part from new_data
) final_data
inner join
(
select order_date, ID, max(src_update_ts) as max_src_upd_ts
from
(
select order_date, id, src_update_ts from orders
union all
select order_date,id, src_update_ts from new_data
) all_data
) max_data
on final_data.id=max_data.id and final_data.src_update_ts=max_data.max_src_upd_ts
)
- Step2 => Insert into temp table
drop table temp_orders;
create table temp_orders as
select * from recon_orders
--this view is created in step1
- Step3 => Purge
drop table orders;
ALTER TABLE recon_orders RENAME TO orders;
Sources