scala - How to send a process to multiple sinks in scalaz-stream -
if have simple process emitting values of type string
, wish send these multiple sinks (i.e. each sink gets sent string
), how do this?
for example, running program:
object play extends app { def prepend(s: string): string => string = s ++ _ val out1 = io.stdoutlines.map(prepend("1-") andthen _) val out2 = io.stdoutlines.map(prepend("2-") andthen _) val p = io.stdinlines (out1 merge out2) p.run.run }
the output looks like:
a //input 1-a b //input 2-b c //input 2-c d //input 1-d
i want output this:
a //input 1-a 2-a b //input 2-b 1-b c //input 2-c 1-c d //input 1-d 2-d
edit
i can achieve follows:
implicit class toboth[o](p: process[task, o]) { def toboth(s1: sink[task, o], s2: sink[task, o]): process[task, unit] = { (for (o <- p; n <- process.emit(o) ++ process.emit(o)) yield n) (s1 interleave s2) } }
that is, duplicate input , interleave output. can generalized:
def toall(sinks: sink[task, o] *): process[task, unit] = { (for (o <- p; n <- process.emitall(sinks.map(_ => o))) yield n) sinks.reduceleftoption(_ interleave _).getorelse(process.empty) }
edit 2
i realized generalization toall
not work. toboth
does, though
is there better (built-in) way?
you use observe
, to
attach multiple sink
s process
:
val p = io.stdinlines.observe(out1).to(out2)
observe
to
echoes passed sink. io.stdinlines.observe(out1)
still emits strings come stdin (that means of type process[task, string]
) sends them sink out1
.
as eric pointed out, possible zip
sink
s together. here more elaborate example sends individual lines of logfile different sinks depending on log level:
sealed trait loglevel case object info extends loglevel case object debug extends loglevel case object warning extends loglevel case class line(level: loglevel, line: string) val outinfo = io.stdoutlines.contramap((l: line) => "i: " + l.line) val outdebug = io.stdoutlines.contramap((l: line) => "d: " + l.line) val outwarning = io.stdoutlines.contramap((l: line) => "w: " + l.line) val zipped = outinfo.zip(outdebug).zip(outwarning).map { case ((finfo, fdebug), fwarning) => (l: line) => l.level match { case info => finfo(l) case debug => fdebug(l) case warning => fwarning(l) } } val lines = list( line(info, "hello"), line(warning, "oops"), line(debug, "ui ui"), line(info, "world")) process.emitall(lines).liftio.to(zipped).run.run
running output:
i: hello w: oops d: ui ui i: world
Comments
Post a Comment