scala - How to send a process to multiple sinks in scalaz-stream -
if have simple process emitting values of type string , wish send these multiple sinks (i.e. each sink gets sent string), how do this?
for example, running program:
object play extends app { def prepend(s: string): string => string = s ++ _ val out1 = io.stdoutlines.map(prepend("1-") andthen _) val out2 = io.stdoutlines.map(prepend("2-") andthen _) val p = io.stdinlines (out1 merge out2) p.run.run } the output looks like:
a //input 1-a b //input 2-b c //input 2-c d //input 1-d i want output this:
a //input 1-a 2-a b //input 2-b 1-b c //input 2-c 1-c d //input 1-d 2-d edit
i can achieve follows:
implicit class toboth[o](p: process[task, o]) { def toboth(s1: sink[task, o], s2: sink[task, o]): process[task, unit] = { (for (o <- p; n <- process.emit(o) ++ process.emit(o)) yield n) (s1 interleave s2) } } that is, duplicate input , interleave output. can generalized:
def toall(sinks: sink[task, o] *): process[task, unit] = { (for (o <- p; n <- process.emitall(sinks.map(_ => o))) yield n) sinks.reduceleftoption(_ interleave _).getorelse(process.empty) } edit 2
i realized generalization toall not work. toboth does, though
is there better (built-in) way?
you use observe , to attach multiple sinks process:
val p = io.stdinlines.observe(out1).to(out2) observe to echoes passed sink. io.stdinlines.observe(out1) still emits strings come stdin (that means of type process[task, string]) sends them sink out1.
as eric pointed out, possible zip sinks together. here more elaborate example sends individual lines of logfile different sinks depending on log level:
sealed trait loglevel case object info extends loglevel case object debug extends loglevel case object warning extends loglevel case class line(level: loglevel, line: string) val outinfo = io.stdoutlines.contramap((l: line) => "i: " + l.line) val outdebug = io.stdoutlines.contramap((l: line) => "d: " + l.line) val outwarning = io.stdoutlines.contramap((l: line) => "w: " + l.line) val zipped = outinfo.zip(outdebug).zip(outwarning).map { case ((finfo, fdebug), fwarning) => (l: line) => l.level match { case info => finfo(l) case debug => fdebug(l) case warning => fwarning(l) } } val lines = list( line(info, "hello"), line(warning, "oops"), line(debug, "ui ui"), line(info, "world")) process.emitall(lines).liftio.to(zipped).run.run running output:
i: hello w: oops d: ui ui i: world
Comments
Post a Comment