Monix Connect

Monix Connect

  • API Docs
  • Documentation
  • GitHub

›Documentation

Documentation

  • Overview
  • Akka Streams
  • Apache Parquet
  • AWS DynamoDB
  • AWS S3
  • Elasticsearch
  • Google Cloud Storage
  • HDFS
  • MongoDB
  • Redis

Akka Streams

Introduction

This module makes interoperability with akka streams easier by simply defining implicit extended classes for reactive stream conversions between akka and monix.

The reactive streams specification is an standard to follow for those JVM libraries that aims to provide asynchronous stream processing with non-blocking back pressure. As you might probably know, both Akka Streams and Monix Reactive follows the same standard, meaning that it is possible to convert between their own different stream data types (since both implement the Publisher and Subscriber contract).

So this module aims to provide a nice and easy inter-operability between the two mentioned libraries, and to achieve that it provides extended conversion methods. These implicit extended methods can be imported from: monix.connect.akka.stream.Converters._. Therefore, under the scope of the import, the signatures .asObservable and .asConsumer will be available from Source, Flow, and Sink instances, whereas asSource and asSink would be for the monix Observable and Consumer

The below table shows that in more detail:

AkkaMonixAkka → MonixAkka ← Monix
Source[+In, +Mat]Observable[+In]source.asObservable[In]observable.asSource[In]
Flow[-In, +Out, +Mat]Consumer[-In, +Out]flow.asConsumer[Out]-
Sink[-In, +Out <: Future[Mat]]Consumer[-In, +Mat]sink.asConsumer[Mat]consumer.asSink[In]

Note that when calling the methods it is not needed to pass the type parameter (as it has been explicitly indicated in the example table), the compiler will infer it for you.

Also, in order to perform these conversion it is required to have an implicit instance of akka.stream.Materializer and monix.execution.Scheduler in the scope.

Dependency

Add the following dependency to get started:

libraryDependencies += "io.monix" %% "monix-akka" % "0.5.1"

Getting started

The only three things we will need to perform these conversions would be the implicit conversion, and an instance of the monix Scheduler and the akka ActorMaterializer.

import monix.connect.akka.stream.Converters._
import monix.execution.Scheduler.Implicits.global

val actorSystem: ActorSystem = ActorSystem("Akka-Streams-InterOp")  
implicit val materializer = ActorMaterializer() //setting actorSystem as implicit variable might have ben enough

Akka → Monix

asObservable

Let's see how easy can be converting an Source[+In, +Mat] to Observable[+In]:

//given
val source: Source[Int] = Source.from(1 until 50)

//when
val ob: Observable[Int] = source.asObservable //`asObservable` converter as extended method of source.

//then
ob.toListL.runSyncUnsafe() should contain theSameElementsAs elements

In this case we have not needed to consume the Observable since we directly used an operator that collects to a list .toList, but note that in case you need to use an specific consumer, you can also directly call consumeWith, as a shortcut for source.asObservable.consumeWith(consumer), see an example below:

//given the same `source` as the above example`

//when
val t: Task[List[Int]] = source.consumeWith(Consumer.toList) //`consumeWith` as extended method of `Source`

//then
t.runSyncUnsafe() should contain theSameElementsAs elements

asConsumer

On the other hand, see how to convert an Sink[-In, +Out <: Future[Mat]] into a Consumer[+In, +Mat].

//given
val foldSumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)((acc, num) => acc + num)

//when
val consumer: Consumer[Int, Int] = foldSumSink.asConsumer[Int] //`asConsumer` as an extended method of `Sink`

//then
val t: Task[Int] = Observable.fromIterable(Seq(1, 2, 3)).consumeWith(consumer)
t.runSyncUnsafe() should be 6

Finally, you can also convert Flow[-In, +Out, +Mat] into Consumer[+In, +Out] in the same way you did with Sink in the previous example.

//given
val foldSumFlow: Flow[Int, Int, NotUsed] = Flow[Int].fold[Int](0)((acc, num) => acc + num)

//when
val (consumer: Consumer[Int, Int]) = foldSumFlow.asConsumer //`asConsumer` as an extended method of `Flow`
val t: Task[Int] = Observable.fromIterable(Seq(1, 2, 3)).consumeWith(consumer)

t.runSyncUnsafe() should be 6

Notice that this interoperability would allow the Monix user to take advantage of the already pre built integrations from Alpakka or any other Akka Streams implementation.

Akka ← Monix

asSource

On the other hand, for converting from Monix Observable[+In] to Akka Streams Source[+In, NotUsed] we would use the conversion signature asSource.

//given
val ob: Observable[Int] = Observable.range(1, 100)

//when
val f: Future[Seq[Long]] = ob.asSource.runWith(Sink.seq) 

//then eventualy will return a sequence from 1 to 100

asConsumer

Finally, the converter asSink is available for converting from Consumer[-In, +Mat] to Sink[-In, +Out <: Future[Mat]].

//given
val l: List[Int] = List(1, 2, 3)
val headConsumer: Consumer.Sync[Int, Int] = Consumer.head[Int]

//when
val f: Future[Int] = Source(l).runWith(headConsumer.asSink)

//then eventually will materialize to 1 (the head)
← OverviewApache Parquet →
  • Introduction
  • Dependency
  • Getting started
    • Akka → Monix
    • Akka ← Monix

Copyright © 2020-2020 The Monix Connect Developers.