scala - How to send a process to multiple sinks in scalaz-stream -


if have simple process emitting values of type string , wish send these multiple sinks (i.e. each sink gets sent string), how do this?

for example, running program:

object play extends app {    def prepend(s: string): string => string = s ++ _   val out1 = io.stdoutlines.map(prepend("1-") andthen _)   val out2 = io.stdoutlines.map(prepend("2-") andthen _)    val p = io.stdinlines (out1 merge out2)   p.run.run } 

the output looks like:

a     //input 1-a b     //input 2-b c     //input 2-c d     //input 1-d 

i want output this:

a     //input 1-a 2-a b     //input 2-b 1-b c     //input 2-c 1-c d     //input 1-d 2-d 

edit

i can achieve follows:

implicit class toboth[o](p: process[task, o]) {   def toboth(s1: sink[task, o], s2: sink[task, o]): process[task, unit] = {     (for (o <- p; n <- process.emit(o) ++ process.emit(o)) yield n) (s1 interleave s2)   } } 

that is, duplicate input , interleave output. can generalized:

def toall(sinks: sink[task, o] *): process[task, unit] = {   (for (o <- p; n <- process.emitall(sinks.map(_ => o))) yield n) sinks.reduceleftoption(_ interleave _).getorelse(process.empty) } 

edit 2

i realized generalization toall not work. toboth does, though

is there better (built-in) way?

you use observe , to attach multiple sinks process:

val p = io.stdinlines.observe(out1).to(out2) 

observe to echoes passed sink. io.stdinlines.observe(out1) still emits strings come stdin (that means of type process[task, string]) sends them sink out1.


as eric pointed out, possible zip sinks together. here more elaborate example sends individual lines of logfile different sinks depending on log level:

sealed trait loglevel case object info extends loglevel case object debug extends loglevel case object warning extends loglevel  case class line(level: loglevel, line: string)  val outinfo = io.stdoutlines.contramap((l: line) => "i: " + l.line) val outdebug = io.stdoutlines.contramap((l: line) => "d: " + l.line) val outwarning = io.stdoutlines.contramap((l: line) => "w: " + l.line)  val zipped = outinfo.zip(outdebug).zip(outwarning).map {   case ((finfo, fdebug), fwarning) =>     (l: line) => l.level match {       case info    => finfo(l)       case debug   => fdebug(l)       case warning => fwarning(l)     } }  val lines = list(   line(info, "hello"),   line(warning, "oops"),   line(debug, "ui ui"),   line(info, "world"))  process.emitall(lines).liftio.to(zipped).run.run 

running output:

i: hello w: oops d: ui ui i: world 

Comments

Popular posts from this blog

java - Oracle EBS .ClassNotFoundException: oracle.apps.fnd.formsClient.FormsLauncher.class ERROR -

c# - how to use buttonedit in devexpress gridcontrol -

How do you convert a timestamp into a datetime in python with the correct timezone? -