Sunday, April 8, 2012

Akka 2.0 Event Bus Example


package practice

import akka.actor._
import akka.routing._
import akka.event._

case class FileToProcess(file: java.io.File)
case class FileSize(size: Long)
case class CalculateSize(file: java.io.File)

class DirWalker(system: ActorSystem) extends Actor {
  def receive = {
    case CalculateSize(file) =>
      if (file.isFile()) {
        system.eventStream.publish(FileSize(file.length()))
      } else {
        var size = 0L
        val children = file.listFiles()

        if (children != null) {
          for (child <- children) {
            if (child.isFile()) {
              size += child.length()
            } else {
              system.eventStream.publish(FileToProcess(child))
            }
          }
        }

        system.eventStream.publish(FileSize(size))
      }
  }
}

class SizeAggregator(system: ActorSystem) extends Actor {
  val start = System.nanoTime()

  var totalSize = 0L
  var filesPendingForProcess = 0L

  def receive = {
    case FileToProcess(file) =>
      filesPendingForProcess += 1

      system.eventStream.publish(CalculateSize(file))

    case FileSize(size) =>
      totalSize += size
      filesPendingForProcess -= 1
   
      if (filesPendingForProcess == 0) {
        println("Total Size: " + totalSize)
        println("Time taken (s): " + (System.nanoTime() - start) / 1.0e9)

        system.shutdown()
      }
  }
}

object FileSizeApp {
  def main(args: Array[String]): Unit = {

    val system = ActorSystem("FileSizeApp")

    val sizeAggregator = system.actorOf(Props(new SizeAggregator(system)))
    val dirWalkerRouter = system.actorOf(Props(new DirWalker(system)).withRouter(RoundRobinRouter(15)))

    system.eventStream.subscribe(sizeAggregator, classOf[FileSize])
    system.eventStream.subscribe(sizeAggregator, classOf[FileToProcess])

    system.eventStream.subscribe(dirWalkerRouter, classOf[CalculateSize])

    sizeAggregator ! FileToProcess(new java.io.File(args(0)))
  }
}

No comments: