STM(software transactional memory)是種 concurrency control 機制,相對於用 locking 進行同步化,更安全與簡單,以下使用帳戶轉帳的例子實作比較locking與STM
帳號轉帳案例
Locking版轉帳(coarse granularity)
object Lock {} case class Account(val name: String, initialBalance: Int) { /** * 存款金額 */ private var balance = initialBalance /** * 交易次數 */ private var transactions = 0; def withdraw(amount: Int) = Lock.synchronized { balance -= amount transactions += 1 } def deposit(amount: Int) = Lock.synchronized { balance += amount transactions += 1 } def transferTo(to: Account, amount: Int) = Lock.synchronized { this.withdraw(amount) to.deposit(amount) } def getBalance = Lock.synchronized { balance } def getTransactions = Lock.synchronized { transactions } } object FundTransferSimulationWithLocking extends App { val parameters = if (args.length == 4) args else "5 100 100 1".split("\\s") val (numberOfAccounts, initialBalance, rangeOftransferValue, duration) = (parameters(0).toInt, parameters(1).toInt, parameters(2).toInt, parameters(3).toInt) val accounts = Range(1, numberOfAccounts + 1).map(x => Account(x.toString(), initialBalance)).toList val simulator = new Simulator() // 加入轉帳組合 for (left <- accounts; right <- accounts) { if (left != right) { simulator.addWorker(new TransferWorker(left, right, rangeOftransferValue)) } } // 監控各 account 存款與交易次數(每秒) simulator.addWorker(new SnapshotWorker(1000, accounts)) // 偵測 deadlock (每秒) simulator.addWorker(new DeadLockDetectWorker(1000, true)) simulator.start() TimeUnit.MINUTES.sleep(duration) simulator.stop() } class TransferWorker(from: Account, to: Account, maxValue: Int) extends Worker { def doTask = { Lock.synchronized { val amount = ThreadLocalRandom.current().nextInt(maxValue) if (from.getBalance >= amount) { from.transferTo(to, amount) } } Thread.`yield` } } class SnapshotWorker(intervalMilli: Int, accounts: List[Account]) extends Worker { def doTask = { println(snapshot) TimeUnit.MILLISECONDS.sleep(intervalMilli) } def snapshot = Lock.synchronized { val buf = new StringBuilder for (acount <- accounts) { buf ++= "Account:%s balance:%3d transactions:%d\n".format(acount.name, acount.getBalance, acount.getTransactions) } buf.toString } }輸出數據:
Account:1 balance: 1 transactions:24
Account:2 balance: 6 transactions:21
Account:3 balance:436 transactions:20
Account:4 balance: 12 transactions:12
Account:5 balance: 45 transactions:17
(略)
Account:1 balance: 81 transactions:6049189
Account:2 balance: 73 transactions:6063152
Account:3 balance: 60 transactions:6081932
Account:4 balance:165 transactions:6083079
Account:5 balance:121 transactions:6079310
監控數據:
Locking版轉帳(fine granularity)
case class Account(val name: String, init: Int) { /** * 存款金額 */ private var balance = init /** * 交易次數 */ private var transactions = 0; def withdraw(amount: Int) = synchronized { balance -= amount transactions += 1 } def deposit(amount: Int) = synchronized { balance += amount transactions += 1 } def transferTo(to: Account, amount: Int) = synchronized { this.withdraw(amount) to.deposit(amount) } def getBalance = synchronized { balance } def getTransactions = synchronized { transactions } } object FundTransferSimulationWithComplexLocking extends App { val parameters = if (args.length == 4) args else "5 100 100 1".split("\\s") val (numberOfAccounts, initialBalance, rangeOftransferValue, duration) = (parameters(0).toInt, parameters(1).toInt, parameters(2).toInt, parameters(3).toInt) val accounts = Range(1, numberOfAccounts + 1).map(x => Account(x.toString(), initialBalance)).toList val simulator = new Simulator() // 加入轉帳組合 for (left <- accounts; right <- accounts) { if (left != right) { simulator.addWorker(new TransferWorker(left, right, rangeOftransferValue)) } } // 偵測 deadlock (每秒) simulator.addWorker(new DeadLockDetectWorker(1000, true)) simulator.start() TimeUnit.MINUTES.sleep(duration) simulator.stop() println(snapshot(accounts)) def snapshot(accounts: List[Account]) = { val buf = new StringBuilder for (acount <- accounts) { buf ++= "Account:%s balance:%3d transactions:%d\n".format(acount.name, acount.getBalance, acount.getTransactions) } buf.toString } } class TransferWorker(from: Account, to: Account, maxValue: Int) extends Worker { def doTask = { val amount = ThreadLocalRandom.current().nextInt(maxValue) var first: Account = null var second: Account = null val fromHash = System.identityHashCode(from) val toHash = System.identityHashCode(to) if (fromHash > toHash) { first = from second = to } else { first = to second = from } first.synchronized { second.synchronized { if (from.getBalance >= amount) { from.transferTo(to, amount) } } } Thread.`yield` } } //no idea ... class SnapshotWorker{}輸出數據:
Account:1 balance:327 transactions:14443051
Account:2 balance: 16 transactions:5672252
Account:3 balance: 43 transactions:1867237
Account:4 balance: 37 transactions:13944761
Account:5 balance: 77 transactions:625777
監控數據:
STM版轉帳
這裡使用 ScalaSTM 0.5 版,其內部原理類似資料庫的 transaction 的並行操作,包含ACID(atomicity, consistency, isolation, durability)的前三項,每組 thread 進行資料讀寫都會產生 log,當 commit 時再驗證 log 有無資料衝突,無衝突就將 log值寫入保護的資料,衝突則重新執行
實作要宣告需要保護的資料(e.g. balance 與 transactions) 與操作啟始範圍(e.g atmoic 函式),另外由於會有重新執行的需要,所以實做上要為 idempotent 與避免 side effect (e.g IO operation)
實作要宣告需要保護的資料(e.g. balance 與 transactions) 與操作啟始範圍(e.g atmoic 函式),另外由於會有重新執行的需要,所以實做上要為 idempotent 與避免 side effect (e.g IO operation)
case class Account(val name: String, initialBalance: Int) { /** * 存款金額 */ private val balance = Ref(initialBalance); /** * 交易次數 */ private val transactions = Ref(0); def withdraw(amount: Int) = atomic { implicit txn => balance -= amount transactions += 1 } def deposit(amount: Int) = atomic { implicit txn => balance += amount transactions += 1 } def transferTo(to: Account, amount: Int) = atomic { implicit txn => this.withdraw(amount) to.deposit(amount) } def getBalance = balance.single.get def getTransactions = transactions.single.get } object FundTransferSimulationWithSTM extends App { val parameters = if (args.length == 4) args else "5 100 100 1".split("\\s") val (numberOfAccounts, initialBalance, rangeOftransferValue, duration) = (parameters(0).toInt, parameters(1).toInt, parameters(2).toInt, parameters(3).toInt) val accounts = Range(1, numberOfAccounts + 1).map(x => Account(x.toString(), initialBalance)).toList val simulator = new Simulator() // 加入轉帳組合 for (left <- accounts; right <- accounts) { if (left != right) { simulator.addWorker(new TransferWorker(left, right, rangeOftransferValue)) } } // 監控各 account 存款與交易次數(每秒) simulator.addWorker(new SnapshotWorker(1000, accounts)) // 偵測 deadlock (每秒) simulator.addWorker(new DeadLockDetectWorker(1000, true)) simulator.start() TimeUnit.MINUTES.sleep(duration) simulator.stop() } class TransferWorker(from: Account, to: Account, maxValue: Int) extends Worker { def doTask = atomic { implicit txn => val amount = ThreadLocalRandom.current().nextInt(maxValue) if (from.getBalance >= amount) { from.transferTo(to, amount) } else { Thread.`yield` } } } class SnapshotWorker(intervalMilli: Int, accounts: List[Account]) extends Worker { def doTask = { println(snapshot) TimeUnit.MILLISECONDS.sleep(intervalMilli) } def snapshot = atomic { implicit txn => val buf = new StringBuilder for (acount <- accounts) { buf ++= "Account:%s balance:%3d transactions:%d\n".format(acount.name, acount.getBalance, acount.getTransactions) } buf.toString } }輸出數據:
Account:1 balance:100 transactions:0
Account:2 balance:100 transactions:0
Account:3 balance:100 transactions:0
Account:4 balance:100 transactions:0
Account:5 balance:100 transactions:0
(略)
Account:1 balance: 1 transactions:11733384
Account:2 balance: 3 transactions:11740378
Account:3 balance:310 transactions:11667328
Account:4 balance: 32 transactions:11639929
Account:5 balance:154 transactions:11719617
監控數據:
交易次數 | CPU使用率 | CPU單位交易量(1%) | GC overhead | |
Locking(coarse granularity) | 30,356,662(100%) | 30% | 1,011,888(155%) | 0.01% |
Locking(fine granularity) | 36,553,078(120%) | 44% | 830,751(127%) | 0.01% |
STM | 58,500,636(192%) | 90% | 650,007(100%) | 1.8% |
STM交易次數最高,約為 locking(粗粒度)版本的 1.9 倍,但CPU單位計算量則為最低,約為 locking(粗粒度)版本的 0.6 倍,GC overhead 也最高,為 1.8 %
小結
以這個帳號轉帳的例子來說STM與Locking(coarse granularity)的實作困難度差不多,但STM的並行性較好,也有較高的交易量,然而使用STM 需要符合特定的操作模式,像是操作要為 idempotent ,要避免 side effect 操作,不能保證執行順序,較難評估效能消耗與時間延遲
References
wikipedia - Software transactional memory
State: You're Doing It Wrong - Alternative Concurrency Paradigms For The JVM
ScalaSTM - Quick Start
ScalaSTM - Benchmarking
1 comment:
Borgata Hotel Casino & Spa - JTA Hub
Borgata Hotel Casino & Spa, 밀양 출장마사지 Atlantic City - Use this simple form 구미 출장샵 to find rooms, restaurants, nightlife, entertainment and 대구광역 출장마사지 more at our 여수 출장안마 luxury resort. 춘천 출장마사지
Post a Comment