scala – Splitting the fs2 stream output to two files

scala – Splitting the fs2 stream output to two files

Unfortunately, as far as I know, theres no easy way to split fs2 stream into two.

What you could do, is splitting your stream by pushing values to one of two queues (1st for value under 10, 2nd for values over or equal 10). If we use NoneTerminatedQueue then queues will be not terminated until we put None into them. Then we can just use dequeue to create separate streams until queues are not closed.

Example solution below. I split writting to file and reading into separate methods:

import java.nio.file.Paths
import cats.effect.{Blocker, ExitCode, IO, IOApp}
import fs2.concurrent.{NoneTerminatedQueue, Queue}
import fs2.{Stream, io, text}

object FahrenheitToCelsius extends IOApp {

  def fahrenheitToCelsius(f: Double): Double =
    (f - 32.0) * (5.0 / 9.0)

  //I split reading into separate method
  def read(blocker: Blocker, over: NoneTerminatedQueue[IO, Double], under: NoneTerminatedQueue[IO, Double]) = io.file.readAll[IO](Paths.get(testdata/fahrenheit.txt), blocker, 4096)
    .through(text.utf8Decode)
    .through(text.lines)
    .filter(s => !s.trim.isEmpty && !s.startsWith(//))
    .map(line => fahrenheitToCelsius(line.toDouble))
    .evalMap { value =>
      if (value > 10) { //here we put values to one of queues
        over.enqueue1(Some(value)) //until we put some queues are not close
      } else {
        under.enqueue1(Some(value))
      }
    }
    .onFinalize(
      over.enqueue1(None) *> under.enqueue1(None) //by putting None we terminate queues
    )

  //function write takes as argument source queue and target file
  def write(s: Stream[IO, Double], blocker: Blocker, fileName: String): Stream[IO, Unit] = {
    s.map(_.toString)
      .intersperse(n)
      .through(text.utf8Encode)
      .through(io.file.writeAll(Paths.get(fileName), blocker))
  }

  val converter: Stream[IO, Unit] = for {
    over <- Stream.eval(Queue.noneTerminated[IO, Double]) //here we create 2 queues
    under <- Stream.eval(Queue.noneTerminated[IO, Double])
    blocker <- Stream.resource(Blocker[IO])
    _ <- write(over.dequeue, blocker, testdata/celsius-over.txt) //we run reading and writing to both
      .concurrently(write(under.dequeue, blocker, testdata/celsius-under.txt)) //files concurrently
      .concurrently(read(blocker, over, under)) //stream runs until queue over is not terminated
  } yield ()

  override def run(args: List[String]): IO[ExitCode] =
    converter
      .compile
      .drain
      .as(ExitCode.Success)

}

Its also possible to use broadcastThrough, which allows broadcasting all elements of a stream to multiple Pipes.

A full solution to your problem could look like this (using cats effect 3.3.8 and fs2 3.2.5. Thats why it looks a bit different but the main idea is the same regardless of the versions):

import cats.effect.{IO, IOApp}
import fs2.io.file.{Files, Path}
import fs2.{Pipe, Stream, text}

object Converter extends IOApp.Simple {

  val converter: Stream[IO, Unit] = {
    def fahrenheitToCelsius(f: Double): Double =
      (f - 32.0) * (5.0 / 9.0)

    def saveFiltered(filename: Path, predicate: Double => Boolean): Pipe[IO, Double, Unit] =
      _.filter(predicate)
        .map(_.toString)
        .through(text.utf8.encode)
        .through(Files[IO].writeAll(filename))

    Files[IO].readAll(Path(testdata/fahrenheit.txt))
      .through(text.utf8.decode)
      .through(text.lines)
      .filter(s => !s.trim.isEmpty && !s.startsWith(//))
      .map(line => fahrenheitToCelsius(line.toDouble))
      .broadcastThrough(
        saveFiltered(Path(testdata/celsius_over.txt), { n => n >= 0 }),
        saveFiltered(Path(testdata/celsius_below.txt), { n => n < 0 })
      )
  }

  def run: IO[Unit] =
    converter.compile.drain
}

saveFiltered is now a function returning Pipe that is built using a filename and a Predicate. This function is used to build two arguments for broadcastThrough. I tested it for a small example and FWIW it works as expected.

broadcastThrough guarantees that all elements from the stream are sent to all pipes. Theres one little caveat thats mentioned in the Scaladoc: the slowest pipe will cause the whole stream to slow down. I dont think this is a problem in this particular case because Id guess that both pipes are equally fast.


You could even go a step further and generalize the idea a little bit:

def partition[F[_] : Concurrent, A, B](predicate: A => Boolean, in: Pipe[F, A, B], out: Pipe[F, A, B]): Pipe[F, A, B] =
  _.broadcastThrough[F, B](
    _.filter(predicate).through(in),
    _.filter(a => !predicate(a)).through(out)
  )

With that you dont have to make sure that the two predicates produce results that are mutually exclusive.

With a slightly adapted saveFiltered:

def saveFiltered2(filename: Path): Pipe[IO, Double, Unit] =
  _.map(_.toString)
    .through(text.utf8.encode)
    .through(Files[IO].writeAll(filename))

the last part of the stream is a bit shorter:

...
.through(
  partition(n => n >= 0,
    saveFiltered2(Path(testdata/celsius_over.txt)),
    saveFiltered2(Path(testdata/celsius_below.txt))))```

scala – Splitting the fs2 stream output to two files

Ive managed to find another solution. Here it is:

import cats.effect.{Blocker, ExitCode, IO, IOApp, Resource}
import fs2.{io, text, Stream}
import fs2.io.file.WriteCursor
import java.nio.file.Paths

object Converter extends IOApp {

  val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
    def fahrenheitToCelsius(f: Double): Double =
      (f - 32.0) * (5.0/9.0)

    def saveFiltered(in: Stream[IO,Double], blocker: cats.effect.Blocker, filename: String, filter: Double => Boolean) = {
      val processed = in.filter(filter).intersperse(n).map(_.toString).through(text.utf8Encode)

      Stream.resource(WriteCursor.fromPath[IO](Paths.get(filename), blocker)).flatMap(_.writeAll(processed).void.stream)
    }

    io.file.readAll[IO](Paths.get(testdata/fahrenheit.txt), blocker, 4096)
      .through(text.utf8Decode)
      .through(text.lines)
      .filter(s => !s.trim.isEmpty && !s.startsWith(//))
      .map(line => fahrenheitToCelsius(line.toDouble))
      .observe( in => saveFiltered(in, blocker, testdata/celsius_over.txt, {n => n >= 0}) )
      .through( in => saveFiltered(in, blocker, testdata/celsius_below.txt, {n => n < 0}) )
  }

  def run(args: List[String]): IO[ExitCode] =
    converter.compile.drain.as(ExitCode.Success)
}

I think its a bit easier to understand than the answer involving queues (queues appear to be a common solution to similar cases, though).

Leave a Reply

Your email address will not be published.