hadoop - JiyangM/spring GitHub Wiki
Hadoop
- HDFS:分布式文件系统
- MAPREDUCE:分布式运算程序开发框架
- HIVE:基于大数据技术(文件系统+运算框架)的SQL数据仓库工具
- HBASE:基于HADOOP的分布式海量数据库
- ZOOKEEPER:分布式协调服务基础组件
- Flume:日志数据采集框架
集群部署
简要描述如何安装配置一个apache开源版hadoop,描述即可,列出步骤更好
-
安装JDK并配置环境变量(/etc/profile)
-
关闭防火墙
-
配置hosts文件,方便hadoop通过主机名访问(/etc/hosts)
-
设置ssh免密码登录
-
解压缩hadoop安装包,并配置环境变量
-
修改配置文件($HADOOP_HOME/conf)
hadoop-env.sh core-site.xml hdfs-site.xml mapred-site.xml Slaves
-
格式化hdfs文件系统 (hadoop namenode -format)
-
启动hadoop ($HADOOP_HOME/bin/start-all.sh)
-
使用jps查看进程
- hadoop-env.sh (配置JAVA_HOME)
- core-site.xml (配置nameNode)
- hdfs-site.xml(dfs.namenode.name.dir 、dfs.datanode.data.dir)
- mapred-site.xml(mapreduce.framework.name配置mapreduce资源管理器yarn)
- yarn-site.xml(yarn.resourcemanager.hostname)
HDFS
-
HDFS中的文件在物理上是分块存储(block),块的大小可以通过配置参数( dfs.blocksize)来规定,默认大小在hadoop2.x版本中是128M,老版本中是64M
-
HDFS文件系统会给客户端提供一个统一的抽象目录树,客户端通过路径来访问文件,形如:hdfs://namenode:port/dir-a/dir-b/dir-c/file.data
-
目录结构及文件分块位置信息(元数据)的管理由namenode节点承担 ——namenode是HDFS集群主节点,负责维护整个hdfs文件系统的目录树,以及每一个路径(文件)所对应的block块信息(block的id,及所在的datanode服务器)
-
文件的各个block的存储管理由datanode节点承担 ---- datanode是HDFS集群从节点,每一个block都可以在多个datanode上存储多个副本(副本数量也可以通过参数设置dfs.replication,默认是3)
hdfs的工作机制
- 1.HDFS集群分为两大角色:NameNode、DataNode (Secondary Namenode)
- 2.NameNode负责管理整个文件系统的元数据
- 3.DataNode 负责管理用户的文件数据块
- 4.文件会按照固定的大小(blocksize)切成若干块后分布式存储在若干台datanode上
- 5.每一个文件块可以有多个副本,并存放在不同的datanode上
- 6.Datanode会定期向Namenode汇报自身所保存的文件block信息,而namenode则会负责保持文件的副本数量
- 7.HDFS的内部工作机制对客户端保持透明,客户端请求访问HDFS都是通过向namenode申请来进行
namenode对数据的管理采用了三种存储形式:
- 内存元数据(NameSystem)
- 磁盘元数据镜像文件
- 数据操作日志文件(可通过日志运算出元数据)
当客户端对hdfs中的文件进行新增或者修改操作,操作记录首先被记入edits日志文件中,当客户端操作成功后,相应的元数据会更新到内存meta.data中
HDFS读写流程
写详细步骤:
- 1.客户端向NameNode发出写文件请求。
- 2.检查是否已存在文件、检查权限。若通过检查,直接先将操作写入EditLog,并返回输出流对象。 (注:WAL,write ahead log,先写Log,再写内存,因为EditLog记录的是最新的HDFS客户端执行所有的写操作。如果后续真实写操作失败了,由于在真实写操作之前,操作就被写入EditLog中了,故EditLog中仍会有记录,我们不用担心后续client读不到相应的数据块,因为在第5步中DataNode收到块后会有一返回确认信息,若没写成功,发送端没收到确认信息,会一直重试,直到成功)
- 3.client端按128MB的块切分文件。
- 4.client将NameNode返回的分配的可写的DataNode列表和Data数据一同发送给最近的第一个DataNode节点,此后client端和NameNode分配的多个DataNode构成pipeline管道,client端向输出流对象中写数据。client每向第一个DataNode写入一个packet,这个packet便会直接在pipeline里传给第二个、第三个…DataNode。 (注:并不是写好一个块或一整个文件后才向后分发)
- 5.每个DataNode写完一个块后,会返回确认信息。 (注:并不是每写完一个packet后就返回确认信息,个人觉得因为packet中的每个chunk都携带校验信息,没必要每写一个就汇报一下,这样效率太慢。正确的做法是写完一个block块后,对校验信息进行汇总分析,就能得出是否有块写错的情况发生)
- 6.写完数据,关闭输输出流。
- 7.发送完成信号给NameNode。 (注:发送完成信号的时机取决于集群是强一致性还是最终一致性,强一致性则需要所有DataNode写完后才向NameNode汇报。最终一致性则其中任意一个DataNode写完后就能单独向NameNode汇报,HDFS一般情况下都是强调强一致性)
关于pipline 是一种管道模式,我的理解是类似在redis 里面的pipline ,如果每次执行一条命令 需要这样的过程 请求数据 等待处理 返回结果 这样的来回操作很耗时 采用一种管道的概念一次发送多条命令 交给服务器去优化处理 最终将结果一次返回。
读详细步骤:
- 1.client访问NameNode,查询元数据信息,获得这个文件的数据块位置列表,返回输入流对象。
- 2.就近挑选一台datanode服务器,请求建立输入流 。
- 3.DataNode向输入流中中写数据,以packet为单位来校验。
- 4.关闭输入流
Mapreduce shuffle
shuffle是MR处理流程中的一个过程,它的每一个处理步骤是分散在各个map task和reduce task节点上完成的,整体来看,分为3个操作:
- 1、分区partition
- 2、Sort根据key排序
- 3、Combiner进行局部value的合并
详细过程
- 1、maptask收集我们的map()方法输出的kv对,放到内存缓冲区中
- 2、从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
- 3、多个溢出文件会被合并成大的溢出文件
- 4、在溢出过程中,及合并的过程中,都要调用partitoner进行分组和针对key进行排序
- 5、reducetask根据自己的分区号,去各个maptask机器上取相应的结果分区数据
- 6、reducetask会取到同一个分区的来自不同maptask的结果文件,reducetask会将这些文件再进行合并(归并排序)
- 7、合并成大文件后,shuffle的过程也就结束了,后面进入reducetask的逻辑运算过程(从文件中取出一个一个的键值对group,调用用户自定义的reduce()方法)
Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快 缓冲区的大小可以通过参数调整, 参数:io.sort.mb 默认100M)
https://blog.csdn.net/bingduanlbd/article/details/51933914
Mapreduce数据倾斜原因和解决方案
https://blog.csdn.net/WYpersist/article/details/80213811 https://blog.csdn.net/wypersist/article/details/79797075
MAPREDUCE中的Combiner
- combiner是MR程序中Mapper和Reducer之外的一种组件
- combiner组件的父类就是Reducer
- combiner和reducer的区别在于运行的位置:
- Combiner是在每一个maptask所在的节点运行Reducer是接收全局所有Mapper的输出结果;
- combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量 具体实现步骤: 1、自定义一个combiner继承Reducer,重写reduce方法 2、在job中设置: job.setCombinerClass(CustomCombiner.class)
- combiner能够应用的前提是不能影响最终的业务逻辑 而且,combiner的输出kv应该跟reducer的输入kv类型要对应起来
Combiner 所做的事情: 每一个map都可能会产生大量的本地输出,Combiner的作用就是对map端的输出先做一次合并,以减少在map和reduce节点之间的数据传输量; Combiner 的意义: 在MapReduce中,当map生成的数据过大时,带宽就成了瓶颈,当在发送给 Reduce 时对数据进行一次本地合并, 减少数据传输量以提高网络IO性能; Combiner 的时机: Combiner 最基本的是实现本地key的聚合,有本地 Reduce 之称 ,实际上是现实就继承来 Reducer ,本质上就是一个 Reducer。
请写出以下执行命令
1)杀死一个job? hadoop job –kill job_id
2)删除hdfs上的/tmp/aaa目录? hadoop fs -rmr hdfs_path
3)动态添加datanode,不停namenode?
1.修改slaves文件,添加需要增加的节点host或者ip,并将其更新到各个节点
2.在datanode中启动执行启动datanode命令。命令:sh hadoop-daemon.sh start datanode
3.可以通过web界面查看节点添加情况。或使用命令:sh hadoop dfsadmin -report
4.执行hadoop balance命令。(此项为balance集群使用,如果只是添加节点,则此步骤不需要)
请列出你所知道的hadoop调度器,并简要说明其工作方法?
比较流行的三种调度器有:默认调度器FIFO,计算能力调度器CapacityScheduler,公平调度器Fair Scheduler
- 默认调度器FIFO
hadoop中默认的调度器,采用先进先出的原则
- 计算能力调度器CapacityScheduler
选择占用资源小,优先级高的先执行
- 公平调度器FairScheduler
同一队列中的作业公平共享队列中所有资源
https://www.linuxidc.com/Linux/2018-02/150813.htm
hive有哪些方式保存元数据,各有哪些特点?
-
内存数据库derby,较小,不常用
-
本地mysql,较常用
-
远程mysql,不常用
请简述hadoop怎么样实现二级排序?
在Hadoop中,默认情况下是按照key进行排序,如果要按照value进行排序怎么办?
有两种方法进行二次排序,分别为:buffer and in memory sort和 value-to-key conversion。
buffer and in memory sort
主要思想是:在reduce()函数中,将某个key对应的所有value保存下来,然后进行排序。 这种方法最大的缺点是:可能会造成out of memory。
value-to-key conversion
主要思想是:将key和部分value拼接成一个组合key(实现WritableComparable接口或者调setSortComparatorClass函数),这样reduce获取的结果便是先按key排序,后按value排序的结果,需要注意的是,用户需要自己实现Paritioner,以便只按照key进行数据划分。
简述hadoop实现join的几种方法?
- reduce side join
reduce side join是一种最简单的join方式,其主要思想如下:
在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0表示来自文件File1,tag=2表示来自文件File2。即:map阶段的主要任务是对不同文件中的数据打标签。
在reduce阶段,reduce函数获取key相同的来自File1和File2文件的value list, 然后对于同一个key,对File1和File2中的数据进行join(笛卡尔乘积)。即:reduce阶段进行实际的连接操作。
- map side join
之所以存在reduce side join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。
Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。
为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:
(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode端口号)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。
- SemiJoin
SemiJoin,也叫半连接,是从分布式数据库中借鉴过来的方法。它的产生动机是:对于reduce side join,跨机器的数据传输量非常大,这成了join操作的一个瓶颈,如果能够在map端过滤掉不会参加join操作的数据,则可以大大节省网络IO。
实现方法很简单:选取一个小表,假设是File1,将其参与join的key抽取出来,保存到文件File3中,File3文件一般很小,可以放到内存中。在map阶段,使用DistributedCache将File3复制到各个TaskTracker上,然后将File2中不在File3中的key对应的记录过滤掉,剩下的reduce阶段的工作与reduce side join相同。
更多关于半连接的介绍,可参考:半连接介绍:
- reduce side join + BloomFilter
在某些情况下,SemiJoin抽取出来的小表的key集合在内存中仍然存放不下,这时候可以使用BloomFiler以节省空间。
BloomFilter最常见的作用是:判断某个元素是否在一个集合里面。它最重要的两个方法是:add() 和contains()。最大的特点是不会存在false negative,即:如果contains()返回false,则该元素一定不在集合中,但会存在一定的true negative,即:如果contains()返回true,则该元素可能在集合中。
因而可将小表中的key保存到BloomFilter中,在map阶段过滤大表,可能有一些不在小表中的记录没有过滤掉(但是在小表中的记录一定不会过滤掉),这没关系,只不过增加了少量的网络IO而已。
hive有哪些方式保存元数据,各有哪些特点?
-
内存数据库derby,较小,不常用
-
本地mysql,较常用
-
远程mysql,不常用