Wednesday, April 11, 2012

Guava Event Bus Example


package guava;

import java.io.File;

import org.apache.log4j.Logger;

import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;

public class FileSizeApp {
private static final Logger LOG = Logger.getLogger(FileSizeApp.class);

private final EventBus eventBus = new EventBus("FileSizeEventBus");

private long filesPending;
private long totalSize;

private long start = System.nanoTime();

private void process(File file) {
eventBus.register(this);

eventBus.post(new ProcessFileEvent(file));
}

@Subscribe
public void processFile(ProcessFileEvent e) {
filesPending++;
eventBus.post(new CalculateSizeEvent(e.getFile()));

if (LOG.isDebugEnabled()) {
LOG.debug(filesPending + ": " + e.getFile().getAbsolutePath());
}
}

@Subscribe
public void calculateSize(CalculateSizeEvent e) {
long size = 0;
File file = e.getFile();

if (file.isFile()) {
size = file.length();
} else {
File[] children = file.listFiles();

if (children != null) {
for (File child : children)
if (child.isFile()) {
size += child.length();
} else {
eventBus.post(new ProcessFileEvent(child));
}
}
}

eventBus.post(new FileSizeEvent(size));
}

@Subscribe
public void fileSize(FileSizeEvent e) {
totalSize += e.getSize();
filesPending--;

LOG.info(filesPending + ": " + e.getSize() + ", " + totalSize);

if (filesPending == 0) {
System.out.println("Total size: " + totalSize);
System.out.println("Time taken (s): " + (System.nanoTime() - start) / 1.0e9);

System.exit(0);
}
}

private static class ProcessFileEvent {
private final File file;

public ProcessFileEvent(File file) {
this.file = file;
}

public File getFile() {
return file;
}
}

private static class CalculateSizeEvent {
private final File file;

public CalculateSizeEvent(File file) {
this.file = file;
}

public File getFile() {
return file;
}
}

private static class FileSizeEvent {
private final long size;

public FileSizeEvent(long size) {
this.size = size;
}

public long getSize() {
return size;
}
}

public static void main(String[] args) {
final String fileName = args[0];
final FileSizeApp app = new FileSizeApp();

System.out.println("Calculating file size for: " + fileName);
app.process(new File(fileName));
}
}

Sunday, April 8, 2012

Akka 2.0 Event Bus Example


package practice

import akka.actor._
import akka.routing._
import akka.event._

case class FileToProcess(file: java.io.File)
case class FileSize(size: Long)
case class CalculateSize(file: java.io.File)

class DirWalker(system: ActorSystem) extends Actor {
  def receive = {
    case CalculateSize(file) =>
      if (file.isFile()) {
        system.eventStream.publish(FileSize(file.length()))
      } else {
        var size = 0L
        val children = file.listFiles()

        if (children != null) {
          for (child <- children) {
            if (child.isFile()) {
              size += child.length()
            } else {
              system.eventStream.publish(FileToProcess(child))
            }
          }
        }

        system.eventStream.publish(FileSize(size))
      }
  }
}

class SizeAggregator(system: ActorSystem) extends Actor {
  val start = System.nanoTime()

  var totalSize = 0L
  var filesPendingForProcess = 0L

  def receive = {
    case FileToProcess(file) =>
      filesPendingForProcess += 1

      system.eventStream.publish(CalculateSize(file))

    case FileSize(size) =>
      totalSize += size
      filesPendingForProcess -= 1
   
      if (filesPendingForProcess == 0) {
        println("Total Size: " + totalSize)
        println("Time taken (s): " + (System.nanoTime() - start) / 1.0e9)

        system.shutdown()
      }
  }
}

object FileSizeApp {
  def main(args: Array[String]): Unit = {

    val system = ActorSystem("FileSizeApp")

    val sizeAggregator = system.actorOf(Props(new SizeAggregator(system)))
    val dirWalkerRouter = system.actorOf(Props(new DirWalker(system)).withRouter(RoundRobinRouter(15)))

    system.eventStream.subscribe(sizeAggregator, classOf[FileSize])
    system.eventStream.subscribe(sizeAggregator, classOf[FileToProcess])

    system.eventStream.subscribe(dirWalkerRouter, classOf[CalculateSize])

    sizeAggregator ! FileToProcess(new java.io.File(args(0)))
  }
}