Created by Mark Harrison / @markglh
“Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols.”
BidirectionalFlow
(two in -> two out)onNext
functiononError
signal... (???)ActorSystem
and Materializer
implicit val system = ActorSystem("actors")
implicit val materializer = ActorMaterializer()
ActorSystem
and Materializer
Source(1 to 5)
.filter(_ < 3) // 1, 2
.map(_ * 2) // 2, 4
.to(Sink.foreach(println))
.run()
//prints 2 4
Composing elements together
val nestedSource = Source(1 to 5)
.map(_ * 2)
val nestedFlow = Flow[Int]
.filter(_ <= 4)
.map(_ + 2)
val sink = Sink.foreach(println)
//link up the Flow to a Sink
val nestedSink = nestedFlow.to(Sink.foreach(println))
// Create a RunnableGraph - and run it! Prints 4 6
nestedSource.to(nestedSink).run()
nestedSource
.via(nestedFlow)
.to(Sink.foreach(println(_)))
Broadcast[T]
– (1 input, N outputs)Balance[T]
– (1 input, N outputs)Merge[In]
– (N inputs , 1 output)groupedWithin(Int, Duration)
val g = FlowGraph.closed() {
implicit builder: FlowGraph.Builder[Unit] =>
//This provides the DSL
import FlowGraph.Implicits._
val in = Source(1 to 3)
val out = Sink.foreach(println)
//2 outputs, 2 inputs
val bcast = builder.add(Broadcast[Int](2))
val merge = builder.add(Merge[Int](2))
val f1, f2, f3, f4 = Flow[Int].map(_ + 10)
in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
bcast ~> f4 ~> merge
}
g.run() //Prints 31 31 32 32 33 33
val kafka = new ReactiveKafka()
val publisher: Publisher[StringKafkaMessage] =
kafka.consume(
ConsumerProperties(...)
)
val subscriber: Subscriber[String] =
kafka.publish(
ProducerProperties(...)
)
Source(publisher).map(_.message().toUpperCase)
.to(Sink(subscriber)).run()
FlowGraph.closed() {
implicit builder: FlowGraph.Builder[Unit] =>
import FlowGraph.Implicits._
val in = Source(kafkaConsumer)
val out = Sink.foreach(println)
val bcast = builder
.add(Broadcast[StringKafkaMessage](2))
val merge = builder
.add(Merge[StringKafkaMessage](2))
val parser1, parser2 = Flow[StringKafkaMessage]
.map(...)
val group = Flow[StringKafkaMessage].grouped(4)
in ~> bcast ~> parser1 ~> merge ~> group ~> out
bcast ~> parser2 ~> merge
}.run()