Hive_on_Spark - 9dian/Index GitHub Wiki

Hive

参数调整

set mapreduce.job.queuename=long_running; set io.sort.mb=512; set hive.exec.compress.intermediate=true; set mapreduce.map.memory.mb=4096; set mapreduce.reduce.memory.mb=20480; set mapreduce.map.java.opts=-Xmx3048m; set mapreduce.reduce.java.opts=-Xmx16384m; set hive.auto.convert.join=false;

set hive.execution.engine;

解决如下问题

Task with the most failures(4):
-----
Task ID:
  task_1525335504172_17955_m_000000

URL:
  http://**101:8088/taskdetails.jsp?jobid=job_1525335504172_17955&tipid=task_1525335504172_17955_m_000000
-----
Diagnostic Messages for this Task:
Error: GC overhead limit exceeded

FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask
MapReduce Jobs Launched:


spark2_on_yarn

ApplicationMaster 
mapreduce.map.memory.mb 
mapreduce.reduce.memeory.mb

NameNode HDFS master节点进程,管理文件系统元数据(metadata)。 DataNode Daemons 进程是HDFS的slave节点(daemon),

ResourceManager 负责给集群中的应用分配计算资源,资源的分配单元是是预定义的CPU核心和内存的组合称为容器(containers)。 容器份额(分配),可以在集群中配置最小、最大阈值。 ResourceManager跟踪应用结束并释放其资源时的可用容量,也会跟踪应用运行时的状态。

NodeManager Daemons YARN后台进程的slave节点,管理slave节点主机上的容器。 容器执行一个应用的任务,任务运行于NodeManager进程所在主机的容器中。 大多数容器只是简单的运行任务,但ApplicationMaster(特殊容器)还复杂一些针对应用的其他责任。

Running an Application on YARN YARN schedules and orchestrates applications and tasks in Hadoop. When tasks to be run require data from HDFS, YARN will attempt to schedule these tasks on the node where the data resides, applying the concept of data locality discussed previously. YARN的设计将一个应用的负载分布到多个NodeManager(工作器后台进程或进程). NodeManagers是执行任务s(一个应用的全集)的工作器or代理人 ResourceManager(A YARN daemon)负责指定一个ApplicationMaster,作为管理应用执行和状态的代理进程 ResourceManager(the YARN master node process)跟踪NodeManagers的可用资源,如CPU和内存。 计算和内存资源按照称作容器的处理单元被送给(分配)给应用

PS: In Hadoop or YARN 术语, an application or job就是一整套任务(独立的工作单元),如:Map任务。 Figure 6.6

ApplicationMaster ApplicationMaster是为了在NodeManager上运行一个应用前ResourceManager分配的第一个容器。 他的工作是制定应用的计划,包括决定需要什么样的资源(经常是基于要处理的数据量), 为应用的各个阶段计算资源量(用于实例化MapReduce的map和各个阶段的reduce) ApplicationMaster为应用向ResourceManager请求这些资源,ResourceManager在同一个NodeManager或其他NodeManager上 为ApplicationMaster分配资源,供这个应用在其生命周期内使用。 ApplicationMaster同时也监控任务的进度,阶段(按照任务可以并行执行来分组)和依赖, 发摘要信息给ResourceManager用于在UI上显示。

I cover HDFS in much more detail in Hour 4, “Understanding the Hadoop Distributed File System (HDFS)”. YARN will be covered in much more detail in Hour 6,“Understanding Data Processing in Hadoop.”

hour 20

关于Yarn内存分配与管理,主要涉及到了ResourceManage、ApplicationMatser、NodeManager这几个概念,相关的优化也要紧紧围绕着这几方面来开展。这里还有一个Container的概念,现在可以先把它理解为运行map/reduce task的容器,后面有详细介绍。

ResourceManager的内存资源配置, 配置的是资源调度相关

RM1: yarn.scheduler.minimum-allocation-mb 分配给AM单个容器可申请的最小内存

RM2: yarn.scheduler.maximum-allocation-mb 分配给AM单个容器可申请的最大内存

注: 最小值可以计算一个节点最大Container数量, 一旦设置,不可动态改变

NodeManager的内存资源配置,配置的是硬件资源相关

NM1: yarn.nodemanager.resource.memory-mb 节点最大可用内存

NM2: yarn.nodemanager.vmem-pmem-ratio 虚拟内存率,默认2.1

注:RM1、RM2的值均不能大于NM1的值, NM1可以计算节点最大最大Container数量,max(Container)=NM1/RM1, 一旦设置,不可动态改变。

ApplicationMatser内存配置相关参数,配置的是任务相关

AM1:mapreduce.map.memory.mb 分配给map Container的内存大小

AM2:mapreduce.reduce.memory.mb 分配给reduce Container的内存大小

注:这两个值应该在RM1和RM2这两个值之间, AM2的值最好为AM1的两倍, 这两个值可以在启动时改变

AM3:mapreduce.map.java.opts 运行map任务的jvm参数,如-Xmx,-Xms等选项

AM4:mapreduce.reduce.java.opts 运行reduce任务的jvm参数,如-Xmx,-Xms等选项

--hiveconf hive.root.logger=debug,console

set io.sort.mb=512;
set hive.exec.compress.intermediate=true;
set mapreduce.map.memory.mb=2048;
set mapreduce.reduce.memory.mb=10240;
set mapreduce.map.java.opts=-Xmx1524m;
set mapreduce.reduce.java.opts=-Xmx8192m;

spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/jars/spark-examples-1.6.0-cdh5.12.1-hadoop2.6.0-cdh5.12.1.jar

set hive.execution.engine=spark;
set hive.prewarm.enabled=true;
set spark.dynamicAllocation.enabled=true;
# set spark.executor.instances=???;
# set spark.dynamicAllocation.initialExecutors=?; # if spark.executor.instances > initialExecutors then use initialExecutors
set yarn.nodemanager.resource.cpu-vcores=28;
set yarn.nodemanager.resource.memory-mb=102400;
# set yarn.scheduler.maximum-allocation-mb=20480;
# set yarn.scheduler.minimum-allocation-mb=2048;
# set spark.executor.cores=4;
set spark.executor.memory=12g;
set spark.executor.memoryOverhead=2g;
# set hive.spark.client.server.connect.timeout=900000ms;

default:
hive> set yarn.nodemanager.resource.cpu-vcores;
yarn.nodemanager.resource.cpu-vcores=8
hive> set yarn.nodemanager.resource.memory-mb;
yarn.nodemanager.resource.memory-mb=8192
hive> set yarn.scheduler.maximum-allocation-mb;
yarn.scheduler.maximum-allocation-mb=85579
hive> set yarn.scheduler.minimum-allocation-mb;
yarn.scheduler.minimum-allocation-mb=1024

hive> set spark.driver.memory;
spark.driver.memory=11596411699
hive> set spark.yarn.driver.memoryOverhead;
spark.yarn.driver.memoryOverhead=1228

Hive partition

 
SET hive.exec.dynamic.partition.mode=nonstrict;

CREATE TABLE T (key int, value string) 
PARTITIONED BY (ds string, hr int);

INSERT OVERWRITE TABLE T PARTITION(ds, hr) 
SELECT key, value, ds, hr+1 AS hr 
   FROM srcpart 
   WHERE ds is not null 
   And hr>10;

Impala

impala-shell

set mem_limit=20g

时区

默认impala配置不是中国的时区,所以在用from_unixtime的时候,有八个小时的时间差。

解决方案1: impala启动时加 -use_local_tz_for_unix_timestamp_conversions=true。 在cdh里面,impala->配置->mpala Daemo ->Impala Daemon 命令行参数高级配置代码段(安全阀) 加:

-use_local_tz_for_unix_timestamp_conversions
-convert_legacy_hive_parquet_utc_timestamps

Hive CTE

Create table

create table ac.ac_hf_1m_continuous stored as parquet as 
with t_hf_runafteron as (
  select tn.appliance_id, tn.dt, hf.sampling_time, hf.run_time, hf.on_off_flag from tmp.tnext tn 
  inner join (
    select appliance_id, on_off_flag, sampling_time, run_time from (
      SELECT appliance_id, on_off_flag, sampling_time, lead(sampling_time, 1) over(PARTITION BY appliance_id order by sampling_time) run_time
      from ac.ac_hf_interval1m
      where on_off_flag <> 1 or on_off_flag is null
    ) t where run_time is not null and unix_timestamp(cast(run_time as string), 'yyyyMMddHHmmss') - unix_timestamp(cast(sampling_time as string), 'yyyyMMddHHmmss') < 300 
  ) hf on tn.appliance_id = hf.appliance_id  and hf.run_time is not null and unix_timestamp(cast(hf.run_time as string), 'yyyyMMddHHmmss') - unix_timestamp(cast(hf.sampling_time as string), 'yyyyMMddHHmmss') < 300 
  where ( hf.on_off_flag <> 1 or hf.on_off_flag is null ) and tn.dt < hf.sampling_time and hf.sampling_time < tn.dt_next 
  and hf.run_time is not null and unix_timestamp(cast(hf.run_time as string), 'yyyyMMddHHmmss') - unix_timestamp(cast(hf.sampling_time as string), 'yyyyMMddHHmmss') < 300 
  and (unix_timestamp(cast(tn.dt_next as string), 'yyyyMMddHHmmss') - unix_timestamp(cast(tn.dt as string), 'yyyyMMddHHmmss') <= 60*60*24 and tn.dt_next is not null)
)
,
t_hf_onandrun as (
  select hfr.appliance_id, hfr.dt, hfr.sampling_time, hfr.run_time, 1 on_off_flag from (
    select appliance_id, dt, dt sampling_time, sampling_time run_time,
      row_number() over(partition by appliance_id, dt order by sampling_time) as row_num, 
      count(sampling_time) over (partition by appliance_id, dt) as cnt
    from t_hf_runafteron where unix_timestamp(cast(sampling_time as string), 'yyyyMMddHHmmss') - unix_timestamp(cast(dt as string), 'yyyyMMddHHmmss') < 300
  ) hfr where row_num = 1
  union all
  select appliance_id, dt, sampling_time, run_time, on_off_flag from 
  t_hf_runafteron where unix_timestamp(cast(sampling_time as string), 'yyyyMMddHHmmss') - unix_timestamp(cast(dt as string), 'yyyyMMddHHmmss') < 300
)
,
t_hf_onandrun_rnk as (
  select appliance_id, dt, sampling_time, run_time, on_off_flag, dense_rank() over(partition by appliance_id order by dt) as applicance_seqno from t_hf_onandrun
  order by appliance_id, dt, sampling_time 
)

select hfi.ac_type, hfi.appliance_id, hfo.applicance_seqno appliance_seqno, unix_timestamp(cast(hfi.sampling_time as string), 'yyyyMMddHHmmss') dt, 
hfi.sampling_time, hfi.on_off, hfi.on_off_flag, hfi.tset, hfi.tset_flag, hfi.mode, hfi.mode_flag, hfi.wind_speed, hfi.wind_speed_flag, hfi.tin, hfi.tin_flag, hfi.tout, hfi.tout_flag
from ac.ac_hf_interval1m hfi inner join t_hf_onandrun_rnk hfo on hfi.appliance_id = hfo.appliance_id and hfi.sampling_time = hfo.sampling_time 
where hfo.applicance_seqno is not null
order by hfi.appliance_id, hfi.sampling_time, hfo.applicance_seqno
;

-- end 高频数据 cte

insert

-- CTE insert example:
with t1 as (
  select * from (
    select appliance_id, dt, dt sampling_time from tmp.tnext tn where exists (
      select 1 from t_hf_runafteron hfr where hfr.appliance_id = tn.appliance_id and hfr.dt = tn.dt )
    union all
    select appliance_id, dt, sampling_time from t_hf_runafteron
  ) t order by appliance_id, dt, sampling_time 
)
from t1
insert overwrite table tmp.t_hf_onandrun
from *;

with t0 as (
select appliance_id, dt, tset, run_status, sleep, tin, tout, humidity, light_ad from ac.a0_concat_a1 t 
where t.year_month =201806 and (t.light_ad > 50 or (light_ad > 0 and light_ad <= 20))
)
insert into tmp.sleep_c1 partition (year_month = 201806)
select * from (
  select t1.appliance_id, t1.dt, t1.tset, t1.tin, t1.light_ad, t2.dt dt2, t2.tset tset2, t2.tin tin2, t2.light_ad strong_lightad,
  first_value(t2.dt) over (partition by t1.appliance_id, t1.dt order by t2.dt desc) dt_pgt1h
  from t0 t1, t0 t2
  where t1.appliance_id = t2.appliance_id and t2.dt <= t1.dt - 3601000 and t2.dt > t1.dt - 7202000
    and t2.light_ad > 50 and (t1.light_ad>0 and t1.light_ad <= 20)
) tt where dt2 = dt_pgt1h 
;

⚠️ **GitHub.com Fallback** ⚠️