Let’s assume that we have some input process, and want to run some ‘heavy computation’ on each element. Obviously we want utilize all available cores and use thread pool. However scalaz-stream by default is deterministic and in following example all computation steps will run consecutively.
valtimeFormat=DateTimeFormat.forPattern("HH:mm:ss:SSS")valcounter=newAtomicInteger(0)// ThreadPool for running effectful functionsvalexecutor=Executors.newFixedThreadPool(3)// channel of effectful functionsvaleffectfulChannel=channel[Int, Int]{in=>Task{valtaskN=counter.incrementAndGet()println(s"${Thread.currentThread().getName}: "+s"Run for $in, "+s"TaskN = $taskN "+s"(time = ${timeFormat.print(System.currentTimeMillis())})")// Long running computationvalcomputed={Thread.sleep(1000)in*in}computed}(executor)}valstart=System.currentTimeMillis()valoutput=Process.range(1,11).through(effectfulChannel).runLog.runvalend=System.currentTimeMillis()println(s"Output = $output, in ${end-start} ms")
Deterministic Output
1234567891011
pool-1-thread-1: Run for 1, TaskN = 1 (time = 22:59:14:720)
pool-1-thread-2: Run for 2, TaskN = 2 (time = 22:59:15:811)
pool-1-thread-3: Run for 3, TaskN = 3 (time = 22:59:16:813)
pool-1-thread-3: Run for 4, TaskN = 4 (time = 22:59:17:815)
pool-1-thread-3: Run for 5, TaskN = 5 (time = 22:59:18:817)
pool-1-thread-3: Run for 6, TaskN = 6 (time = 22:59:19:818)
pool-1-thread-3: Run for 7, TaskN = 7 (time = 22:59:20:819)
pool-1-thread-3: Run for 8, TaskN = 8 (time = 22:59:21:821)
pool-1-thread-3: Run for 9, TaskN = 9 (time = 22:59:22:822)
pool-1-thread-3: Run for 10, TaskN = 10 (time = 22:59:23:823)
Output = Vector(1, 4, 9, 16, 25, 36, 49, 64, 81, 100), in 10196 ms
Concurrent Process
To run effectful functions concurrently, with controlled number of queued tasks we can use scalaz.stream.merge.mergeN which is for now available only in snapshot-0.4.
123456789101112131415161718
valP=scalaz.stream.ProcessimplicitclassConcurrentProcess[O](valprocess:Process[Task, O]){/** * Run process through channel with given level of concurrency */defconcurrently[O2](concurrencyLevel:Int)(f:Channel[Task, O, O2]):Process[Task, O2]={valactions=process.zipWith(f)((data,f)=>f(data))valnestedActions=actions.map(P.eval)merge.mergeN(concurrencyLevel)(nestedActions)}}