Risk Management in finance is one of the most common case studies for Grid Computing, and Value-at-Risk is most widely used risk measure. In this article I’m going to show how to scale-out Value-at-Risk calculation to multiple nodes with latest Akka middleware. In Part 1 I’m describing the problem and single-node solution, and in Part 2 I’m scaling it to multiple nodes.
[Part 2/2] Scale-out VaR calculation to multiple nodes
Go to Part 1 where Value at Risk calculator defined.
Akka Cluster
Akka is amazing library for Actors abstraction for Scala and Java.
Actors are location transparent and distributable by design. This means that you can write your application without hardcoding how it will be deployed and distributed, and then later just configure your actor system against a certain topology …
Akka Cluster provides a fault-tolerant decentralized peer-to-peer based cluster membership service with no single point of failure or single point of bottleneck. It does this using gossip protocols and an automatic failure detector.
It means that it’s very easy to distribute portfolio simulations to multiple nodes.
Messages
Messages that are passed around between backend & calculator nodes:
First type of node in a cluster is SimulationNode that is going to run all ‘heavy’ portfolio price simulations.
After joining the cluster it subscribes for all MemberUp messages, and when new node with role calculator joins the cluster, it register itself as available backend.
Calculator node joins the cluster, and receive RegisterBackend messages from simulation nodes. It keeps track of all available simulation backend nodes, and when gets a request to calculate market risk, it splits this request into multiple simulation tasks and send them to all available simulation backends.
Backend Nodes Manager
1234567891011121314151617181920212223242526272829
classBackendNodesManagerextendsActorwithActorLogging{private[this]valbackendNodes=ListBuffer.empty[ActorRef]private[this]varjobCounter=0private[this]implicitvaltimeout=Timeout(10.seconds)overridedefreceive:Receive={caseWakeUp=>log.info("Wake up backend nodes manager")caserun:RunSimulation=>jobCounter+=1valbackendN=jobCounter%backendNodes.sizelog.debug(s"Pass simulation request to backend: $backendN")backendNodes(backendN)?runpipeTosender()caseRegisterBackendif!backendNodes.contains(sender())=>contextwatchsender()backendNodes+=sender()log.debug(s"Added new backend. "+s"Total: ${backendNodes.size}. Node: [${sender()}}]")caseTerminated(backEnd)ifbackendNodes.contains(backEnd)=>backendNodes-=sender()log.debug(s"Removed terminated backend. "+s"Total: ${backendNodes.size}. "+s"Terminated node: [${sender()}}]")}}
Cluster Portfolio Value Simulation
Simulation channel constructed by asking backend simulation actors to run simulation and converting scala.concurrent.Future to scalaz.concurrent.Task. Concurrency management and split factor is defined in abstract monte carlo risk calculator described in Part 1
In this example I’m going to run all nodes in a single JVM for simplicity. Distributed deployment is only Akka configuration issue, and it doesn’t affect the code at all.
I’m starting three simulation backend nodes in a cluster, and later join them with calculator node, and submit risk calculation task.
objectClusterMarketRiskCalculationextendsApp{valSystemName="ClusterMarketRiskCalculation"valsimulationConfig=ConfigFactory.parseResources("simulation-node.conf")valcalculatorConfig=ConfigFactory.parseResources("calculator-node.conf")// Start 3 simulation nodesvalsystem1=ActorSystem(SystemName,simulationConfig)valjoinAddress=Cluster(system1).selfAddressCluster(system1).join(joinAddress)system1.actorOf(Props[PortfolioValueSimulationBackend],"simulationBackend")valsystem2=ActorSystem(SystemName,simulationConfig)Cluster(system2).join(joinAddress)system2.actorOf(Props[PortfolioValueSimulationBackend],"simulationBackend")valsystem3=ActorSystem(SystemName,simulationConfig)Cluster(system3).join(joinAddress)system3.actorOf(Props[PortfolioValueSimulationBackend],"simulationBackend")// Start Cluster Risk Calculator nodeobjectRiskCalculatorextendsMonteCarloMarketRiskCalculator(10000,10)withClusterPortfolioValueSimulationwithHistoricalMarketFactorswithHistoricalMarketData{valsystemName=SystemNamevalsystemConfig=calculatorConfig}RiskCalculator.join(joinAddress)// Let's cluster state some time to convergeThread.sleep(2000)// Run VaR calculationvalAMZN=Equity("AMZN")valAAPL=Equity("AAPL")valIBM=Equity("IBM")valGS=Equity("GS")// Portfolio evaluation datevaldate=newLocalDate(2014,1,3)// Options maturity datevalmaturityDate=newLocalDate(2014,3,31)valportfolio=Portfolio(nels(Position(AMZN,10),Position(AAPL,20),Position(IBM,30),Position(CallOption(GS,180,maturityDate),10)))valstart=System.currentTimeMillis()valmarketRisk=RiskCalculator.marketRisk(portfolio,date)valend=System.currentTimeMillis()println(s"Calculated marker risk in ${end - start} milliseconds; "+s"VaR(p = 0.95) = ${marketRisk.VaR(0.95)}, "+s"CVaR(p = 0.95) = ${marketRisk.conditionalVaR(0.95)}")// Shutdown actor systemssystem1.shutdown()system2.shutdown()system3.shutdown()RiskCalculator.shutdown()// and applicationSystem.exit(0)}
Conclusion
As I showed in this post moving calculations to a cluster can be easy and fun task with Akka Cluster.
I use scalaz-stream for abstracting over effectful functions with scalaz.stream.Channel, I guess it maybe be overengineering in this particular case, but it allows to completely hide implementation details and take control concurrency in very abstract way. And scalaz-stream is very nice and super powerful library, I strongly encourage you to take a look on it.
Links
Akka http://akka.io/ and http://doc.akka.io/docs/akka/2.3.2/scala/index-network.html
Scalaz-stream
Value at Risk http://en.wikipedia.org/wiki/Value_at_risk