MapReduce_With_AKKA_Actor - wuhaixing/gb921 GitHub Wiki

存储过程?咋扩展嘛

SP只会出现在标题里,你懂的。

代码都是从参考资料里偷的,未经作者允许。

其实,内容基本也是 :D

虽然不懂scala也都能看懂,但可能会比较晕。

基本概念即时贴

Map reduce架构

这个词来自伟大的公司google,是用于大规模数据集(大于1TB)并行运算的软件架构。概念"Map(映射)"和"Reduce(化简)",和他们的主要思想,都是从函数式编程语言借来的,当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(化简)函数,用来保证所有映射的键值对中的每一个共享相同的键组。

Actor 模型

这个概念在1973年提出来。。。。。是一种并行运算的模型。“参与者”是一个并行运算的基本单元:当一个参与者接收到一条消息,它可以做出一些决策、建立更多的参与者、发出更多的消息、决定要如何回答后续消息。

akka

Akka 是为那些事件驱动、扩展性强、错误容忍程度高的架构准备的平台。这个平台最初起源于scala,后来提供了java API。 目前的支持者是个还不是那么伟大的公司typesafe,最近刚刚把play framework收归门下,将来不知他会被谁收归门下。

数数每个词出现多少次,请

这是map reduce的经典案例,解决方案也很经典,一个工头负责任务分解(按行拆),一群工人负责数分给自己那一段,数完交给工头,工头合完收工。工头分工的时候会公平吗。。。。。

actor的经典案例是算π?

沟通第一

首先要解决的就是任务如何清晰准确的传达,工作成果如何反馈。 所以首先定义沟通用的消息,用case类,消息的接收者可以用模式匹配很方便的处理。akka负责维护消息队列,保证任务按到达顺序处理。

// Messages
sealed trait MapReduceMessage
case class CountDocument(document: Iterator[String]) extends MapReduceMessage
case class CountLine(line: String) extends MapReduceMessage
case class Result(values: Map[String, Int]) extends MapReduceMessage 

工头收到 CountDocument 消息后,分解出 CountLine 交给工人,工人干完后返回 Result 。用sealed trait 做超类会确保所有消息都会被处理,不过这样也不能在类外添加消息类型了。

工头!

// Master Actor, creates Worker Actors, distributes work and gathers results
  class Master(nrOfWorkers: Int, latch:CountDownLatch) extends Actor {
 
    val workers = Vector.fill(nrOfWorkers)(actorOf[CountLineWorker].start());
    val router = Routing.loadBalancerActor(CyclicIterator(workers)).start();
 
    val resultMap = new HashMap[String, Int]();
 
    var start : Long = _
    var count : Long = 0
 
    def receive = {
      case CountDocument(lines : Iterator[String]) =>
 
        lines.foreach(line =>
              if (!line.isEmpty) {
                count = count+1;
                router ! CountLine(line)
              })
 
        //shutdown actors
        router ! Broadcast(PoisonPill)
        router ! PoisonPill
 
      case Result(values: Map[String, Int]) =>
 
        for ((key, value) <- values) {
          resultMap.put(key, resultMap.getOrElse(key, 0)+value)
        }
        count = count - 1;
        if (count <= 0) self.stop()
    }
 
    override def preStart() {
      start = System.currentTimeMillis
    }
 
    override def postStop() {
      val end = System.currentTimeMillis-start
      println("Result after %s ms :".format(end))
      for((key, value) <- resultMap.toList.sortBy(_._2).reverse) {
        println("%s: %s".format(value, key))
      }
      latch.countDown()
    }
  }

工头也是acotor,也需要实现Actor接口,提供receive方法。

首先映入眼帘的是一队整装待发的工人,他们意气风发,等着干活

配合工头的是一个框架提供的actor- Routing.LoadbalancingActor ,他负责消息的分发,在上面的例子中用的是round robin模式。

工头收到 CountDocument 消息后,就让router逐行创建 CountLine 消息发送给工人们。完事后,router就广播一下要服毒,然后,就真服毒了,退出历史舞台。

工头等着,来一个Result就给加上,活都干完,他也就 self.stop 收工了。

preStart 和 postStop这两个是覆盖的方法,一个负责开始计时,一个负责结束计时并打印结果

  router ! CountLine(line) 

中的 ! 俗称 砰 方法,放完拉到,啥也不管。还有最终返回的 !! 方法 和 最终返回Future的 !!! 方法。

工人。。。

//Actor that counts the words for a single line
  class CountLineWorker extends Actor {
 
    def receive = {
       case CountLine(line) =>
         self reply Result(countWords(line))
    }
 
    def countWords(line: String):Map[String, Int] = {
      val result = new HashMap[String, Int]
 
      "[^A-Za-z0-9\u0020]".r.replaceAllIn(line, "")
            .split(" ")
            .foreach(word => {
              result.put(word, result.getOrElse(word, 0)+1)
            })
 
      result
    }
  }

这个不言自明,就是收消息,数词,返回结果给消息发送方

远程协作

看看这个网站的高频词呗

demo,这个可以有!


参考资料

http://devcenter.heroku.com/articles/scaling-out-with-scala-and-akka

http://blog.addictivesoftware.net/2011/08/mapreduce-using-actors/

https://gist.github.com/1123718

https://gist.github.com/856112