massivelogdataQuestion - juedaiyuer/researchNote GitHub Wiki

海量日志数据,提取出某日访问百度次数最多的那个IP

首先是这一天,并且是访问百度的日志中的IP取出来,逐个写入到一个大文件中。注意到IP是32位的,最多有个2^32个IP。同样可以采用映射的方法,比如模1000,把整个大文件映射为1000个小文件,再找出每个小文中出现频率最大的IP(可以采用hash_map进行频率统计,然后再找出频率最大的几个)及相应的频率。然后再在这1000个最大的IP中,找出那个频率最大的IP,即为所求。

或者如下阐述(雪域之鹰):
算法思想:分而治之+Hash

  1. IP地址最多有2^32=4G种取值情况,所以不能完全加载到内存中处理;
  2. 可以考虑采用“分而治之”的思想,按照IP地址的Hash(IP)%1024值,把海量IP日志分别存储到1024个小文件中。这样,每个小文件最多包含4MB个IP地址;
  3. 对于每一个小文件,可以构建一个IP为key,出现次数为value的Hash map,同时记录当前出现次数最多的那个IP地址;
  4. 可以得到1024个小文件中的出现次数最多的IP,再依据常规的排序算法得到总体上出现次数最多的IP;

关于本题,还有几个问题,如下:

  • Ipv4地址,count用short表示也得64g内存
  • Hash取模是一种等价映射,不会存在同一个元素分散到不同小文件中的情况,即这里采用的是mod1000算法,那么相同的IP在hash取模后,只可能落在同一个文件中,不可能被分散的。因为如果两个IP相等,那么经过Hash(IP)之后的哈希值是相同的,将此哈希值取模(如模1000),必定仍然相等。
  • 那到底什么是hash映射呢?简单来说,就是为了便于计算机在有限的内存中处理big数据,从而通过一种映射散列的方式让数据均匀分布在对应的内存位置(如大数据通过取余的方式映射成小树存放在内存中,或大文件映射成多个小文件),而这个映射散列方式便是我们通常所说的hash函数,设计的好的hash函数能让数据均匀分布而减少冲突。尽管数据映射到了另外一些不同的位置,但数据还是原来的数据,只是代替和表示这些原始数据的形式发生了变化而已。

hash的个人笔记

python代码

#coding:utf-8 
#py3.x 
import re 
import json 
import time 
import os 
  
def run(logpath, hashcode,topnum=100): 
    for i in range(0, hashcode):  
        with open(str(i), 'w', encoding='utf-8') as f:  
            f.write('{}')  
    h = 50*1024 * 1024#50MB  
    with open(logpath, 'r', encoding='utf-8-sig') as f:  
        lines=f.readlines(h)  
        while lines:  
            lineshandler(lines, hashcode)  
            lines=f.readlines(h)  
    return orderresult(hashcode,topnum)  
  
def lineshandler(lines,hashcode): 
    ipreg=re.compile('\d+\.\d+\.\d+\.\d+')  
    d={}  
    for i in range(0,hashcode):  
        with open(str(i),'r',encoding='utf-8') as f:  
            d[i]=json.loads(f.read())  
    for line in lines:  
        if line.startswith('#'):  
            continue  
        else:  
            ips=re.findall(ipreg,line)  
            if len(ips)<2:  
                continue  
            else:  
                ip=ips[1]  
            hid=hash(ip)%hashcode  
            d[hid].setdefault(ip,0)  
            d[hid][ip]+=1  
    for hid in d.keys():  
        with open(str(hid),'w',encoding='utf-8') as f:  
            f.write(json.dumps(d[hid]))  
  
  
def orderresult(hashcode,topnum): 
    l=[]  
    for i in range(0, hashcode):  
        with open(str(i), 'r', encoding='utf-8') as f:  
            d=json.loads(f.read())  
            for key in d.keys():  
                if len(l)<topnum:  
                    l.append((key,d[key]))  
                    if len(l)==topnum:  
                        l=sorted(l,key=lambda x:x[1],reverse=True)  
                else:  
                    if l[topnum-1][1]<d[key]:  
                        l.pop(topnum-1)  
                        l.append((key,d[key]))  
                        l=sorted(l,key=lambda x:x[1],reverse=True)  
        os.remove(str(i))  
    return l  
  
t1=time.time() 
rs = run('u_ex130702.log',9,10) 
t2=time.time() 
print(rs) 
print('共用时'+str(t2-t1)+"秒")

使用了 readlines 将文件分块读入内存,然后hash并序列化保存(这里为了方便用了JSON序列化缓存到文本,IO开销很大,在真实的使用环境中可以缓存到数据库),在python3.x中使用了readlines后 tell 不能再用了,两个函数冲突。

在py2.7中open函数没有encoding参数,要注明 encoding 必须使用codecs模块的codecs.open。

source