MapReduce开发经验总结 - ZhaoHongnan/Practise-Of-Hadoop GitHub Wiki

写在前面:根据项目经验整理一些常遇到的功能实现,具体细节没有详细描述,如感兴趣或有疑问欢迎邮件讨论,如有需要我会继续补充。[email protected]

一、关于Hive的使用:

  • 日常开发中,尤其大数据工程师对于Hive颇多接触,Hive作为数据仓库根据分区(partition)存储数据,HiveQL开发高效简洁,这是Hive产生的主要原因,让不熟悉Java等编程语言的数据分析师可以使用擅长的类SQL语句检索数据信息。HiveQL应用可以参考apache文档(https://cwiki.apache.org/confluence/display/Hive),这里着重介绍一下Java Mapreduce直接访问Hive仓库源数据的情况。
  • Hive源数据存储于HDFS的Hive仓库(warehouse)路径下,文件是gz的压缩格式。Mapreduce FileInputFormat类读取文件是默认是不会对原始压缩文件进行切分,如此map的数量就会取决于gz文件的个数。一般而言,Hive仓库大多是按照一定的时间级别定时导入的,gz文件个数不会很多,这样map数量也就会相对较少,显然没有充分利用到Hadoop集群高并行的性能优势。
  • 解决方式,gz文件解析产生文本文件,在mapreduce job中设置map分块大小,在集群性能允许条件下更多map同时处理。

二、关于Hbase的使用:

  • Hbase属于NoSql类型数据库,相较于传统类型的数据库,更适用于行列稀疏的数据存储,并且Hbase做过查询检索的优化,一般可以在秒级别返回数据查询结构。Hbase属于Hadoop生态一部分,Mapreduce可以利用集群性能过滤处理存放于Hbase中的大规模数据,参考文档https://hbase.apache.org/book.html。
  • Hbase数据会根据rowkey进行裂表,将一张表数据分裂成多个region。region是部分数据保存开始和结束rowkey等信息,可以确定自己的数据范围。具体参考org.apache.hadoop.hbase.HRegionInfo源代码。涉及到实际工程中的裂表,对于开发着而言最为重要的当属rowkey的涉及问题。rowkey设计好坏影响到表的分裂,是否会产生热点。Hbase分表算法包括IncreasingToUpperBoundRegionSplitPolicy,ConstantSizeRegionSplitPolicy,DelimitedKeyPrefixRegionSplitPolicy,KeyPrefixRegionSplitPolicy等,具体实现这里不多叙述,可以参看官方文档或者源代码。
  • 我们这里讨论如何设计rowkey,官方(https://hbase.apache.org/book.html#rowkey.design)给出三种设计建议分别是Salting、Hashing和Reversing the Key。Salting是在rowkey前加入随机的数据,将数据随机分配到指定的region中;Reversing the Key将rowkey倒置把经常变化的部分放置在最前面,显然它与Salting可以避免热点产生,但是却牺牲了rowkey的字典顺序,相同开头的rowkey不会分配到相同的region上。Hashing是工程中最常用的处理rowkey的方法,通过对rowkey取哈希值,代替Salting中随机数据,保证相同的rowkey总是有相同的前缀字符,然后选择优化分区算法,相同的rowkey总是位于相同的region中。
  • 在实际关于用户信息系统中,我喜欢使用md5计算用户id和其他必要的信息产生rowkey,这样可以根据自己产品需求快速检索到某个用户符合某个属性的批量信息。

三、Mapreduce

  • Mapreduce是apache根据Google论文“MapReduce: simplified data processing on large clusters”开源实现。利用集群多节点环境分别实现map过程和reduce过程处理数据。简单说,根据Mapreduce编程结构,继承实现Mapper中的map函数和Reducer中的reduce函数即可。数据来源可以是HDFS上文件,Hbase数据库,Hive等数据仓库。对于大数据处理具有极好的性能。

  • 在Mapreduce架构中,输入输出是以键值对传递,Mapper的输出作为Reducer的输入。Mapper的输出是根据键key进行划分传递对应的Reducer,默认的情况下key是文件偏移量。一些项目会用到这个特性,如果想让相同的数据到同一个Reducer中处理,可以设置key值实现。倘若不涉及具体业务必须实现的逻辑,建议使用默认的文件便宜量即可,如此可以让产生的中间结果键值对均匀分配到每个Reducer中,不会让Reducer产生热点。

  • Mapper输出会根据key进行排序这个过程成为shuffle。排序会调用WritableComparable接口具体实现类中的WritableComparator实现的compare函数,具体实现由指定的输出键值对的key确定。如果我们的业务需要根据key的不同规则排序,只需要重写比较器,在job中设置自定义的新比较器即可。例如key是LongWritable类型,默认调用比较器LongWritable.Comparator实现升序排序,我们希望按照降序排序,继承实现LongWritable.Comparator重写compare方法接口,代码如下所示: public class MyComparator extends LongWritable.Comparator{ @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return -super.compare(b1, s1, l1, b2, s2, l2); } }

  • Mapper输出分区发给Reducer是根据接口Partitioner具体实现类中的getPartition实现逻辑确定。默认情况下调用HashPartitioner根据key取hash值模Reducer数量,这样解释了上文中默认的文件偏移量作为key具有很好的均匀分配效果。实现Partitioner类改写getPartion函数定义自己的分区实现逻辑。例如按照业务分区,假如有三种业务,每种业务使用一个Reducer处理,那么Mapper输出的value中携带业务类型,getPartion函数代码片段如下所示: @Override public int getPartition(LongWritable key, Text value, int numReduceTasks) { int type = parseType(value.toString.trim()); return type % numReduceTasks; }

  • Mapper的HDFS文件读取。Mapper读取文件定义多个实现读取不同的HDFS文件,例如FileInputFormat,SequenceFileInputFormat,TextInputFormat等,顾名思义读取的是文件,二进制序列化文件和文本文件。在文件夹中多个文件同时读取时使用FileInputFormat实现,FileInputFormat默认会对文本文件进行分片,是否对文件分片根据函数isSplitable的返回结果确定,默认返回true,意味着依据map设置的大小可以拆分文件。如果程序中需要使用一个Mapper有序读取一份文件,则继承FileInputFormat类,重写其中的函数isSplitable,重载的函数返回 false即可,不允许对map输入的文件进行拆分。代码函数示例如下: @Override protected boolean isSplitable(JobContext context, Path filename){return false;}

  • Mapper类和Reducer类功能函数,除了上文所提到的map和reduce函数意外,还具有setup和cleanup两个函数可供开发者重写,setup在调用map和reduce函数前执行,可以初始化上下文信息,cleanup正好相反,在map和reduce函数执行之后调用用于处理一些善后工作。

四、Mapreduce过滤无效数据

  • 日常开发中经常会遇到海量数据中过滤无效数据需求,Mapreduce是一个可以较为常用的解决方式。推荐两种解决算法,位图法和布隆过滤器。两种方式共同点都需要上传黑/白名单到Hadoop集群中,这里可以利用Hadoop提供的分布式缓存(Distributed Cache)存储名单。
  • 位图法,根据全量数据和名单构建位图,位图是一串二进制数据,每一位代表一个信息字段,如果信息存在置位为1,不存在置位为0。每次新数据到来,找到代表其的位,如果是1,则证明此数据存在,否则不存在。位图法实现简单,在Java中可以使用长整型long作为位存储的底层实现。如果数据分布均匀可以取最大最小值为位图的边界,所有数据覆盖在位图之上,可以节约内存,一个64bit长整型可以表示64个长整型数据是否存在;但是当数据分布不均匀,可能导致位图中出现大量的0字节,这样反而可能会浪费过多存储空间。
  • 布隆过滤器类似于位图,只是根据多个哈希函数将一个数据映射到多个位上,比较数据时,如果映射的位全为1,证明可能存在,但是如果有一位为0,则一定不存在。因为使用哈希函数可以减少位的使用,比起位图法更省空间。但是布隆过滤器只能完全判断不存在情况,不能确定存在情况,这样会导致一批脏数据没有得到过滤,需要进一步过滤。哈希函数设计直接决定布隆过滤器的过滤效率。推荐参考文档:https://link.springer.com/chapter/10.1007/978-3-540-30494-4_26(Bloom Filters in Probabilistic Verification)和https://link.springer.com/chapter/10.1007/11841036_42(Less Hashing, Same Performance: Building a Better Bloom Filter)