Unit testing Spark Applications with standalone Apache Spark Cluster.
Update 2015-10-08: Albeit approach described in this post works and totally valid, now I would suggest to take a look on packaging all tests in a fat jar together with
scalatest
(or any other test library of your choice) and usingspark-submit
command to run it
The code for this application app can be found on Github
Running Spark Applications
To be able to run Spark jobs, Spark cluster needs to have all classes used by your application in it’s classpath. You can put manually all jar files required by your application to Spark nodes, but it’s not cool. Another solution is to manually set jar files that required to distribute to worker nodes when you create SparkConf. One way to do it, is to package your application as a “fat-jar”, so you need to distribute only single jar. Industry standard for packaging Spark application is sbt-assembly plugin, and it’s used by Spark itself.
Unit Testing Spark Applications
If you need to test your Spark application, easiest way is to create local Spark Context for each test, or maybe shared between all tests. When Spark is running in local mode, it’s running in the same JVM as your tests with same jar files in classpath.
If your tests requires data that doesn’t fit into single node, for example in integration or acceptance tests, obvious solution is to run them in standalone Spark cluster with sufficient number of nodes. At this time everything becomes more difficult. Now you need to package you application with tests in single jar file, and submit it to Spark cluster with each test.
Example Application
To show how to run and test Spark applications I prepared very simple application. It uses Scala OpenBook library to parse NYSE OpenBook messages (orders log from New York Stock Exchange), distribute them to cluster as RDD, and count Buy and Sell orders by ticker. Only purpose of this application is to have dependency on a library that for sure is not available on Spark nodes.
class OrdersFunctions(@transient sc: SparkContext, orders: Iterator[OpenBookMsg]) extends Serializable { private val ordersRDD = sc.parallelize(orders.toSeq) def countBuyOrders(): Map[String, Long] = countOrders(OrderFunctions.isBuySide) def countSellOrders(): Map[String, Long] = countOrders(OrderFunctions.isSellSide) private def countOrders(filter: OpenBookMsg => Boolean): Map[String, Long] = ordersRDD.filter(filter). map(order => (order.symbol, order)). countByKey().toMap }
Assembly Main Application
Add sbt-assembly plugin in project/plugin.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
Add assembly settings to build.sbt
// Merge strategy shared between app & test val sharedMergeStrategy: (String => MergeStrategy) => String => MergeStrategy = old => { case x if x.startsWith("META-INF/ECLIPSEF.RSA") => MergeStrategy.last case x if x.startsWith("META-INF/mailcap") => MergeStrategy.last case x if x.endsWith("plugin.properties") => MergeStrategy.last case x => old(x) } // Load Assembly Settings assemblySettings // Assembly App mainClass in assembly := Some("com.github.ezhulenev.spark.RunSparkApp") jarName in assembly := "spark-testing-example-app.jar" mergeStrategy in assembly <<= (mergeStrategy in assembly)(sharedMergeStrategy)
Inside your application you need to create SparkConf and add current jar to it.
new SparkConf(). setMaster("spark://spark-host:7777"). setJars(SparkContext.jarOfClass(this.getClass).toSeq). setAppName("SparkTestingExample")
After that you can use assembly command, and run assembled application in your Spark Cluster
> sbt assembly > java -Dspark.master=spark://spark-host:7777 target/scala_2.10/spark-testing-example-app.jar
Assembly Tests
First step to run tests in standalone Spark Cluster is to package all main and test classes into single jar, that will be transfered to each worker node before running tests. It’s very similar to assemblying main app.
// Assembly Tests Project.inConfig(Test)(assemblySettings) jarName in (Test, assembly) := "spark-testing-example-tests.jar" mergeStrategy in (Test, assembly) <<= (mergeStrategy in assembly)(sharedMergeStrategy) test in (Test, assembly) := {} // disable tests in assembly
I wrote simple sbt plugin that has test-assembly
task. First this task assemblies jar file with test classes and all dependencies, then set it’s location to environment variable, and then starts tests.
object TestWithSparkPlugin extends sbt.Plugin { import TestWithSparkKeys._ import AssemblyKeys._ object TestWithSparkKeys { lazy val testAssembled = TaskKey[Unit]("test-assembled", "Run tests with standalone Spark cluster") lazy val assembledTestsProp = SettingKey[String]("assembled-tests-prop", "Environment variable name used to pass assembled jar name to test") } lazy val baseTestWithSparkSettings: Seq[sbt.Def.Setting[_]] = Seq( testAssembled := TestWithSpark.testWithSparkTask.value, assembledTestsProp := "ASSEMBLED_TESTS" ) lazy val testWithSparkSettings: Seq[sbt.Def.Setting[_]] = baseTestWithSparkSettings object TestWithSpark { def assemblyTestsJarTask: Initialize[Task[File]] = Def.task { val assembled = (assembly in Test).value sys.props(assembledTestsProp.value) = assembled.getAbsolutePath assembled } private def runTests = Def.task { (test in Test).value } def testWithSparkTask: Initialize[Task[Unit]] = Def.sequentialTask { assemblyTestsJarTask.value runTests.value } } }
All Apache Spark tests should inherit ConfiguredSparkFlatSpec
with configured Spark Context. If assembled tests jar file is available, it’s distributed to Spark worker nodes. If not, only local mode is supported.
trait ConfiguredSparkFlatSpec extends FlatSpec with BeforeAndAfterAll { private val log = LoggerFactory.getLogger(classOf[ConfiguredSparkFlatSpec]) private val config = ConfigFactory.load() private lazy val sparkConf = { val master = config.getString("spark.master") log.info(s"Create spark context. Master: $master") val assembledTests = sys.props.get("ASSEMBLED_TESTS") val baseConf = new SparkConf(). setMaster(master). setAppName("SparkTestingExample") assembledTests match { case None => log.warn(s"Assembled tests jar not found. Standalone Spark mode is not supported") baseConf case Some(path) => log.info(s"Add assembled tests to Spark Context from: $path") baseConf.setJars(path :: Nil) } } lazy val sc = new SparkContext(sparkConf) override protected def afterAll(): Unit = { super.afterAll() sc.stop() } }
Running Tests
By default spark.master
property is set to local[2]. So you can run tests in local mode. If you want run tests in standalone Apache Spark, you need to override spark.master
with your master node.
If you’ll try to run test
command with standalone cluster it will fail with ClassNotFoundException
> sbt -Dspark.master=spark://spark-host:7777 test > > Create spark context. Master: spark://Eugenes-MacBook-Pro.local:7077 > Assembled tests jar not found. Standalone Spark mode is not supported > > [error] Failed tests: > org.apache.spark.SparkException: Job aborted due to stage failure: > Task 2 in stage 1.0 failed 4 times, most recent failure: > Lost task 2.3 in stage 1.0 (TID 30, 192.168.0.11): > java.lang.ClassNotFoundException: com.scalafi.openbook.OpenBookMsg
However test-assembled
will be successfull
> sbt -Dspark.master=spark://spark-host:7777 test-assembled > > Create spark context. Master: spark://Eugenes-MacBook-Pro.local:7077 > Add assembled tests to Spark Context from: /Users/ezhulenev/spark-testing/target/scala-2.10/spark-testing-example-tests.jar > > [info] Run completed in 7 seconds, 587 milliseconds. > [info] Total number of tests run: 2 > [info] Suites: completed 1, aborted 0 > [info] Tests: succeeded 2, failed 0, canceled 0, ignored 0, pending 0 > [info] All tests passed. > [success] Total time: 37 s
The code for this application app can be found on Github