DbLog - laforge49/Asynchronous-Functional-Programming GitHub Wiki

Transaction logs are important, no matter how robust a system. Transaction logs are recorded before an update request is processed. They can be used to build a database in the event of a failure, for a major software update, or when the size of the root block needs to be changed.

Logging is done by the TransactionComponentLog, which takes one parameter:

  • logDirPathname - This is the pathname of the directory where log files are to be written. (A new log file is created each time the program is run.)
  • flushLog - Set to "false" to turn off flushing after writing each journal entry.

##LoggingTest

LoggingTest configures the db subsystem for logging and then does a SetRequest transaction. A small log file is created.

val systemServices = SystemServices(new ServicesRootComponentFactory)
val dbName = "smallLogging.db"
val logDirPathname = "smallLogging"
val file = new java.io.File(dbName)
file.delete
EmptyLogDirectory(logDirPathname)
val properties = new Properties
properties.put("dbPathname", dbName)
properties.put("logDirPathname", logDirPathname)
val db = Subsystem(
  systemServices,
  new SmallComponentFactory,
  properties = properties,
  actorId = ActorId("db"))
val results = new Results
val chain = new Chain(results)
chain.op(systemServices, Register(db))
chain.op(db, SetRequest(db, "/$", IncDesInt(null)))
Future(systemServices, chain)
systemServices.close

LoggingTest

##EmptyLogDirectory

The EmptyLogDirectory companion object deletes all the files in the given directory, if it exists.

object EmptyLogDirectory {
  def apply(logDirPathname: String) {
    val file = new java.io.File(logDirPathname)
    if (!file.exists) return
    val files = file.listFiles
    var i = 0
    while (i < files.size) {
      if (!files(i).delete) throw new IllegalStateException(
        "unable to delete log file " + files(i).getCanonicalPath)
      i += 1
    }
  }
}

EmptyLogDirectory

##SmallComponentFactory

The SmallComponentFactory configures the small datastore to use the TransactionLogComponent.

class SmallComponentFactory
  extends ComponentFactory {
  addDependency(classOf[SmallDataStoreComponentFactory])
  addDependency(classOf[TransactionLogComponentFactory])
  addDependency(classOf[SmallTransactionsComponentFactory])
}

SmallComponentFactory

##TransactionLog

The TransactionLog creates a new log file when the first update request is processed. The name of the log file is a UTC timestamp of the time the program was started, e.g. 2011-10-01_02-58-14_308.

As always, when I/O is involved, it needs to be done with a separate mailbox, which is why TransactionLog messages are forwarded by the TransactionLogComponent to the TransactionLog actor.

class TransactionLogComponentFactory extends ComponentFactory {
  override def instantiate(actor: Actor) = new TransactionLogComponent(actor)
}

class TransactionLogComponent(actor: Actor)
  extends Component(actor) {
  private var transactionLog = new TransactionLog
  bindSafe(classOf[LogTransaction], new SafeForward(transactionLog))
  bindSafe(classOf[LogInfo], new SafeForward(transactionLog))

  override def open {
    super.open
    transactionLog.logDirPathname = GetProperty.required("logDirPathname")
    transactionLog.flushLog = GetProperty.boolean("flushLog", true)
    transactionLog.blockSize = GetProperty.int("logBlockSize", 8 * 1024)
  }

  override def close {
    transactionLog.close
    super.close
  }
}

class TransactionLog
  extends Actor {
  var logDirPathname: String = null
  var flushLog = true
  var blockSize = 8 * 1024
  var logFile: java.io.File = null
  private val logTS = (new org.joda.time.DateTime(org.joda.time.DateTimeZone.UTC)).
    toString("yyyy-MM-dd_HH-mm-ss_SSS")
  private var writer: java.io.DataOutputStream = null
  private var fileChannel: FileChannel = null

  setMailbox(new AsyncReactorMailbox)
  bind(classOf[LogTransaction], logTransaction)
  bind(classOf[LogInfo], logInfo)

  private def logInfo(msg: AnyRef, rf: Any => Unit) {
    val position = if (logFile == null) 0L else fileChannel.size
    rf((logTS, position))
  }

  private def logTransaction(msg: AnyRef, rf: Any => Unit) {
    initialize
    val logTransaction = msg.asInstanceOf[LogTransaction]
    val timestamp = logTransaction.timestamp
    val bytes = logTransaction.bytes
    writer.writeLong(timestamp)
    writer.writeInt(bytes.length)
    writer.write(bytes)
    if (flushLog) {
      writer.flush
      fileChannel.force(false)
    }
    rf(null)
  }

  def initialize {
    if (writer != null) return
    val dir = new java.io.File(logDirPathname)
    if (!dir.exists) dir.mkdirs
    val fileName = dir.getCanonicalPath + java.io.File.separator + logTS + ".jnl"
    logFile = new java.io.File(fileName)
    val fileOutputStream = new java.io.FileOutputStream(logFile)
    fileChannel = fileOutputStream.getChannel
    val bos = new java.io.BufferedOutputStream(fileOutputStream, blockSize)
    writer = new java.io.DataOutputStream(bos)
  }

  override def close {
    try {
      writer.close
    } catch {
      case unknown => {}
    }
    super.close
  }
}

TransactionLog


tutorial