Apr 7, 2012

ScalaSTM 試用

STM(software transactional memory)是種 concurrency control 機制,相對於用 locking 進行同步化,更安全與簡單,以下使用帳戶轉帳的例子實作比較locking與STM

帳號轉帳案例
多組帳號同時以隨機金額進行轉帳,轉帳過程要保證沒有 race condition 與避免 deadlock




Locking版轉帳(coarse granularity)
 
object Lock {}


case class Account(val name: String, initialBalance: Int) {

  /**
   * 存款金額
   */
  private var balance = initialBalance

  /**
   * 交易次數
   */
  private var transactions = 0;

  def withdraw(amount: Int) = Lock.synchronized {
    balance -= amount
    transactions += 1
  }

  def deposit(amount: Int) = Lock.synchronized {
    balance += amount
    transactions += 1
  }

  def transferTo(to: Account, amount: Int) = Lock.synchronized {
    this.withdraw(amount)
    to.deposit(amount)
  }

  def getBalance = Lock.synchronized { balance }

  def getTransactions = Lock.synchronized { transactions }
}


object FundTransferSimulationWithLocking extends App {

  val parameters = if (args.length == 4) args else "5 100 100 1".split("\\s")

  val (numberOfAccounts, initialBalance, rangeOftransferValue, duration) = (parameters(0).toInt, parameters(1).toInt, parameters(2).toInt, parameters(3).toInt)

  val accounts = Range(1, numberOfAccounts + 1).map(x => Account(x.toString(), initialBalance)).toList

  val simulator = new Simulator()

  // 加入轉帳組合
  for (left <- accounts; right <- accounts) {
    if (left != right) {
      simulator.addWorker(new TransferWorker(left, right, rangeOftransferValue))
    }
  }

  // 監控各 account 存款與交易次數(每秒)
  simulator.addWorker(new SnapshotWorker(1000, accounts))

  // 偵測 deadlock (每秒)
  simulator.addWorker(new DeadLockDetectWorker(1000, true))

  simulator.start()
  TimeUnit.MINUTES.sleep(duration)
  simulator.stop()

}


class TransferWorker(from: Account, to: Account, maxValue: Int) extends Worker {

  def doTask = {
    Lock.synchronized {
      val amount = ThreadLocalRandom.current().nextInt(maxValue)
      if (from.getBalance >= amount) {
        from.transferTo(to, amount)
      }
    }
    Thread.`yield`
  }

}


class SnapshotWorker(intervalMilli: Int, accounts: List[Account]) extends Worker {

  def doTask = {
    println(snapshot)
    TimeUnit.MILLISECONDS.sleep(intervalMilli)
  }

  def snapshot = Lock.synchronized {
    val buf = new StringBuilder
    for (acount <- accounts) {
      buf ++= "Account:%s balance:%3d transactions:%d\n".format(acount.name, acount.getBalance, acount.getTransactions)
    }

    buf.toString
  }
}
輸出數據

Account:1 balance:  1 transactions:24
Account:2 balance:  6 transactions:21
Account:3 balance:436 transactions:20
Account:4 balance: 12 transactions:12
Account:5 balance: 45 transactions:17
(略)

Account:1 balance: 81 transactions:6049189
Account:2 balance: 73 transactions:6063152
Account:3 balance: 60 transactions:6081932
Account:4 balance:165 transactions:6083079
Account:5 balance:121 transactions:6079310


監控數據:



Locking版轉帳(fine granularity)
 
case class Account(val name: String, init: Int) {

  /**
   * 存款金額
   */
  private var balance = init

  /**
   * 交易次數
   */
  private var transactions = 0;

  def withdraw(amount: Int) = synchronized {
    balance -= amount
    transactions += 1
  }

  def deposit(amount: Int) = synchronized {
    balance += amount
    transactions += 1
  }

  def transferTo(to: Account, amount: Int) = synchronized {
    this.withdraw(amount)
    to.deposit(amount)
  }

  def getBalance = synchronized { balance }

  def getTransactions = synchronized { transactions }
}


object FundTransferSimulationWithComplexLocking extends App {

  val parameters = if (args.length == 4) args else "5 100 100 1".split("\\s")

  val (numberOfAccounts, initialBalance, rangeOftransferValue, duration) = (parameters(0).toInt, parameters(1).toInt, parameters(2).toInt, parameters(3).toInt)

  val accounts = Range(1, numberOfAccounts + 1).map(x => Account(x.toString(), initialBalance)).toList

  val simulator = new Simulator()

  // 加入轉帳組合
  for (left <- accounts; right <- accounts) {
    if (left != right) {
      simulator.addWorker(new TransferWorker(left, right, rangeOftransferValue))
    }
  }

  // 偵測 deadlock (每秒)
  simulator.addWorker(new DeadLockDetectWorker(1000, true))

  simulator.start()
  TimeUnit.MINUTES.sleep(duration)
  simulator.stop()

  println(snapshot(accounts))

  def snapshot(accounts: List[Account]) = {
    val buf = new StringBuilder
    for (acount <- accounts) {
      buf ++= "Account:%s balance:%3d transactions:%d\n".format(acount.name, acount.getBalance, acount.getTransactions)
    }

    buf.toString
  }
}


class TransferWorker(from: Account, to: Account, maxValue: Int) extends Worker {

  def doTask = {
    val amount = ThreadLocalRandom.current().nextInt(maxValue)

    var first: Account = null
    var second: Account = null

    val fromHash = System.identityHashCode(from)
    val toHash = System.identityHashCode(to)

    if (fromHash > toHash) {
      first = from
      second = to
    } else {
      first = to
      second = from
    }

    first.synchronized {
      second.synchronized {
        if (from.getBalance >= amount) {
          from.transferTo(to, amount)
        }
      }
    }

    Thread.`yield`
  }

}


//no idea ...
class SnapshotWorker{}
輸出數據

Account:1 balance:327 transactions:14443051
Account:2 balance: 16 transactions:5672252
Account:3 balance: 43 transactions:1867237
Account:4 balance: 37 transactions:13944761
Account:5 balance: 77 transactions:625777


監控數據 





STM版轉帳

這裡使用 ScalaSTM 0.5 版,其內部原理類似資料庫的 transaction 的並行操作,包含ACID(atomicity, consistency, isolation, durability)的前三項,每組 thread 進行資料讀寫都會產生 log,當 commit 時再驗證 log 有無資料衝突,無衝突就將 log值寫入保護的資料,衝突則重新執行

實作要宣告需要保護的資料(e.g. balance 與 transactions) 與操作啟始範圍(e.g atmoic 函式),另外由於會有重新執行的需要,所以實做上要為 idempotent 與避免 side effect (e.g IO operation) 
 
case class Account(val name: String, initialBalance: Int) {

  /**
   * 存款金額
   */
  private val balance = Ref(initialBalance);

  /**
   * 交易次數
   */
  private val transactions = Ref(0);

  def withdraw(amount: Int) = atomic { implicit txn =>
    balance -= amount
    transactions += 1
  }

  def deposit(amount: Int) = atomic { implicit txn =>
    balance += amount
    transactions += 1
  }

  def transferTo(to: Account, amount: Int) = atomic { implicit txn =>
    this.withdraw(amount)
    to.deposit(amount)
  }

  def getBalance = balance.single.get

  def getTransactions = transactions.single.get
}


object FundTransferSimulationWithSTM extends App {

  val parameters = if (args.length == 4) args else "5 100 100 1".split("\\s")

  val (numberOfAccounts, initialBalance, rangeOftransferValue, duration) = (parameters(0).toInt, parameters(1).toInt, parameters(2).toInt, parameters(3).toInt)

  val accounts = Range(1, numberOfAccounts + 1).map(x => Account(x.toString(), initialBalance)).toList

  val simulator = new Simulator()

  // 加入轉帳組合
  for (left <- accounts; right <- accounts) {
    if (left != right) {
      simulator.addWorker(new TransferWorker(left, right, rangeOftransferValue))
    }
  }

  // 監控各 account 存款與交易次數(每秒)
  simulator.addWorker(new SnapshotWorker(1000, accounts))

  // 偵測 deadlock (每秒)
  simulator.addWorker(new DeadLockDetectWorker(1000, true))

  simulator.start()
  TimeUnit.MINUTES.sleep(duration)
  simulator.stop()

}


class TransferWorker(from: Account, to: Account, maxValue: Int) extends Worker {

  def doTask =
    atomic { implicit txn =>
      val amount = ThreadLocalRandom.current().nextInt(maxValue)
      if (from.getBalance >= amount) {
        from.transferTo(to, amount)
      } else {
        Thread.`yield`
      }
    }
}


class SnapshotWorker(intervalMilli: Int, accounts: List[Account]) extends Worker {

  def doTask = {
    println(snapshot)
    TimeUnit.MILLISECONDS.sleep(intervalMilli)
  }

  def snapshot = atomic { implicit txn =>
    val buf = new StringBuilder
    for (acount <- accounts) {
      buf ++= "Account:%s balance:%3d transactions:%d\n".format(acount.name, acount.getBalance, acount.getTransactions)
    }

    buf.toString
  }
}
輸出數據

Account:1 balance:100 transactions:0
Account:2 balance:100 transactions:0
Account:3 balance:100 transactions:0
Account:4 balance:100 transactions:0
Account:5 balance:100 transactions:0
(略)

Account:1 balance:  1 transactions:11733384
Account:2 balance:  3 transactions:11740378
Account:3 balance:310 transactions:11667328
Account:4 balance: 32 transactions:11639929
Account:5 balance:154 transactions:11719617


監控數據 



效能相較
交易次數CPU使用率CPU單位交易量(1%)GC overhead
Locking(coarse granularity)30,356,662(100%) 30%1,011,888(155%)0.01%
Locking(fine granularity)36,553,078(120%)44%830,751(127%)0.01%
STM58,500,636(192%)90%650,007(100%)1.8%
CPU單位交易量 = 交易次數 / CPU使用率

STM交易次數最高,約為 locking(粗粒度)版本的 1.9 倍,但CPU單位計算量則為最低,約為 locking(粗粒度)版本的 0.6 倍,GC overhead 也最高,為 1.8 %


小結

以這個帳號轉帳的例子來說STM與Locking(coarse granularity)的實作困難度差不多,但STM的並行性較好,也有較高的交易量,然而使用STM 需要符合特定的操作模式,像是操作要為 idempotent ,要避免 side effect 操作,不能保證執行順序,較難評估效能消耗與時間延遲

References
wikipedia - Software transactional memory

State: You're Doing It Wrong - Alternative Concurrency Paradigms For The JVM

ScalaSTM - Quick Start

ScalaSTM - Benchmarking

No comments: