HiveQL - noonecare/opensourcebigdatatools GitHub Wiki

Spark SQL 和 Hive 都使用了 HiveQL 作为查询语言。下面介绍 HiveQL。

Data Definition

show databases;
create database wangmeng;
alter database financials set properties ('edited-by' = 'Joe Dba');
CREATE TABLE IF NOT EXISTS mydb.employees (
name STRING COMMENT 'Employee name',
salary FLOAT COMMENT 'Employee salary',
subordinates ARRAY<STRING> COMMENT 'Names of subordinates',
deductions MAP<STRING, FLOAT>
COMMENT 'Keys are deductions names, values are percentages',
address STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
COMMENT 'Home address')
COMMENT 'Description of the table'
TBLPROPERTIES ('creator'='me', 'created_at'='2012-01-02 10:00:00', ...)
LOCATION '/user/hive/warehouse/mydb.db/employees';
CREATE TABLE IF NOT EXISTS mydb.employees2
LIKE mydb.employees;
USE mydb;
SHOW TABLES;
 SHOW TABLES 'empl.*';
DESCRIBE EXTENDED mydb.employees;
CREATE EXTERNAL TABLE IF NOT EXISTS stocks (
exchange STRING,
symbol STRING,
ymd STRING,
price_open FLOAT,
price_high FLOAT,
price_low FLOAT,
price_close FLOAT,
volume INT,
price_adj_close FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/data/stocks';

CREATE TABLE employees (
name STRING,
salary FLOAT,
subordinates ARRAY<STRING>,
deductions MAP<STRING, FLOAT>,
address STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
)
PARTITIONED BY (country STRING, state STRING);

set hive.mapred.mode=strict;
SHOW PARTITIONS employees;
SHOW PARTITIONS employees PARTITION(country='US');
DESCRIBE EXTENDED employees;
LOAD DATA LOCAL INPATH '${env:HOME}/california-employees'
INTO TABLE employees
PARTITION (country = 'US', state = 'CA');
CREATE EXTERNAL TABLE IF NOT EXISTS log_messages (
hms INT,
severity STRING,
server STRING,
process_id INT,
message STRING)
PARTITIONED BY (year INT, month INT, day INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
ALTER TABLE log_messages ADD PARTITION(year = 2012, month = 1, day = 2)
LOCATION 'hdfs://master_server/data/log_messages/2012/01/02';
CREATE TABLE employees (
name STRING,
salary FLOAT,
subordinates ARRAY<STRING>,
deductions MAP<STRING, FLOAT>,
address STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'
COLLECTION ITEMS TERMINATED BY '\002'
MAP KEYS TERMINATED BY '\003'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;

CREATE TABLE kst
PARTITIONED BY (ds string)
ROW FORMAT SERDE 'com.linkedin.haivvreo.AvroSerDe'
WITH SERDEPROPERTIES ('schema.url'='http://schema_provider/kst.a
STORED AS
INPUTFORMAT 'com.linkedin.haivvreo.AvroContainerInputFormat'
OUTPUTFORMAT 'com.linkedin.haivvreo.AvroContainerOutputFormat';
CREATE EXTERNAL TABLE IF NOT EXISTS stocks (
exchange STRING,
symbol STRING,
ymd STRING,
price_open FLOAT,
price_high FLOAT,
price_low FLOAT,
price_close FLOAT,
volume INT,
price_adj_close FLOAT)
CLUSTERED BY (exchange, symbol)
SORTED BY (ymd ASC)
INTO 96 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/data/stocks';
DROP TABLE IF EXISTS employees;
ALTER TABLE log_messages RENAME TO logmsgs;
ALTER TABLE log_messages ADD IF NOT EXISTS
PARTITION (year = 2011, month = 1, day = 1) LOCATION '/logs/2011/01/01'
PARTITION (year = 2011, month = 1, day = 2) LOCATION '/logs/2011/01/02'
PARTITION (year = 2011, month = 1, day = 3) LOCATION '/logs/2011/01/03'
...;
ALTER TABLE log_messages DROP IF EXISTS PARTITION(year = 2011, month = 12, day = 2);
ALTER TABLE log_messages
CHANGE COLUMN hms hours_minutes_seconds INT
COMMENT 'The hours, minutes, and seconds part of the timestamp'
AFTER severity;
ALTER TABLE log_messages ADD COLUMNS (
app_name STRING COMMENT 'Application name',
session_id LONG COMMENT 'The current session id');
ALTER TABLE log_messages REPLACE COLUMNS (
hours_mins_secs INT COMMENT 'hour, minute, seconds from timestamp',
severity STRING COMMENT 'The message severity'
message STRING COMMENT 'The rest of the message');
ALTER TABLE log_messages SET TBLPROPERTIES (
'notes' = 'The process id is no longer captured; this column is always NULL');

external table(内部表) 和 managed table(外部表),managed table 数据保存在 /user/hive/warehouse/ 目录下。外部表保存在 /user/hive/warehouse/ 目录下的是个连接,连接指向外部表实际保存数据的地址。 drop 内部表,数据会被删除。drop 外部表,连接被删除了,数据文件还在。 分区表, 分区表中的分区是子目录。想分区表中导入数据有 strict mode 和 nonstrict mode 两种,通过set hive.mapred.mode = strict 或者 set hive.mapreduce.mode = nonstrict 来选择使用哪种模式。使用 strict mode 时,只能用 Static insert/load 的方法向分区表导入数据(必须明确指出 partition 的value), 使用 nonstrict mode 时,支持用 Dynamic insert/load 的方法向分区表导入数据(不必明确指出partition 的value)。对于要导入的数据中有多个partition 的情形,Dynamic Insert/load 的方法省事儿不少。

存储格式: InputFormat/OutputFormat 负责分割每条记录。Serde 负责把每条记录的字节反序列话为特定类型的数据, 负责把特定类型的数据序列化字节。stored as 是 serde 和 inputformat/outputformat 的简写。不同存储格式,对于数据有很大影响。最明显的一点是,如果你的数据有包含 \n, 同时你又用 textfile 去保存数据,数据中的一行就可能被错分成多行。

Alter table: 我真没想到 Alter table 还能改变表的保存类型。

Data Manipulation

LOAD DATA LOCAL INPATH '${env:HOME}/california-employees'
OVERWRITE INTO TABLE employees
PARTITION (country = 'US', state = 'CA');
INSERT OVERWRITE TABLE employees
PARTITION (country = 'US', state = 'OR')
SELECT * FROM staged_employees se
WHERE se.cnty = 'US' AND se.st = 'OR';
FROM staged_employees se
INSERT OVERWRITE TABLE employees
PARTITION (country = 'US', state = 'OR')
SELECT * WHERE se.cnty = 'US' AND se.st = 'OR'
INSERT OVERWRITE TABLE employees
PARTITION (country = 'US', state = 'CA')
SELECT * WHERE se.cnty = 'US' AND se.st = 'CA'
INSERT OVERWRITE TABLE employees
PARTITION (country = 'US', state = 'IL')
SELECT * WHERE se.cnty = 'US' AND se.st = 'IL';
INSERT OVERWRITE TABLE employees
PARTITION (country, state)
SELECT ..., se.cnty, se.st
FROM staged_employees se;
INSERT OVERWRITE TABLE employees
PARTITION (country = 'US', state)
SELECT ..., se.cnty, se.st
FROM staged_employees se
WHERE se.cnty = 'US';
hive> set hive.exec.dynamic.partition=true;
hive> set hive.exec.dynamic.partition.mode=nonstrict;
hive> set hive.exec.max.dynamic.partitions.pernode=1000;
hive> INSERT OVERWRITE TABLE employees
> PARTITION (country, state)
> SELECT ..., se.cty, se.st
> FROM staged_employees se;
CREATE TABLE ca_employees
AS SELECT name, salary, address
FROM employees
WHERE se.state = 'CA';
INSERT OVERWRITE LOCAL DIRECTORY '/tmp/ca_employees'
SELECT name, salary, address
FROM employees
WHERE se.state = 'CA';
FROM staged_employees se
INSERT OVERWRITE DIRECTORY '/tmp/or_employees'
SELECT * WHERE se.cty = 'US' and se.st = 'OR'
INSERT OVERWRITE DIRECTORY '/tmp/ca_employees'
SELECT * WHERE se.cty = 'US' and se.st = 'CA'
INSERT OVERWRITE DIRECTORY '/tmp/il_employees'
SELECT * WHERE se.cty = 'US' and se.st = 'IL';

load data inpath insert into select create as load data into local directory load data into directory 很简单,只有 Dynamic Insert 稍微需要设置下 property。

Queries

-- select column names use regular expression
SELECT symbol, `price.*` FROM stocks;

Bucket

建立 Bucket Table, 在 mapreduce 中可以按照 key 做 hash ,把数据分成指定的几份。在 Hive 中 bucket 实现这种效果。比如建立: bucket table:

CREATE TABLE user_info_bucketed(user_id BIGINT, firstname STRING, lastname STRING)
COMMENT 'A bucketed copy of user_info'
PARTITIONED BY(ds STRING)
CLUSTERED BY(user_id) INTO 256 BUCKETS;

把 user_id 为 ${user_id} 的记录,分到 hash(user_id) % 256 号 bucket 中, bucket 会保存成文件。

bucket 带来的好处:

  • Map-side Join
  • 采样

bucket 常见的应用, 采样:

⚠️ **GitHub.com Fallback** ⚠️