Monix Connect

Monix Connect

  • API Docs
  • Documentation
  • GitHub

›Documentation

Documentation

  • Overview
  • Apache Parquet
  • AWS DynamoDB
  • AWS S3
  • AWS SQS
  • Elasticsearch
  • Google Cloud Storage
  • HDFS
  • MongoDB
  • Redis

MongoDB

Introduction

MongoDB is a document database, in which the data is stored in JSON-like documents differing to the traditional _ row/column_ model. It has a rich and expressive query language that allows you to filter and sort by any field with support for aggregations and other modern use-cases. The Monix MongoDB connector offers a reactive, non blocking and resource safe api, which relies in the underlying the MongoDB Java Reactive Streams driver. The library core data type to interoperate with collections is CollectionOperator[Doc], which is composed by the following four main pillars:

  • Database: Used to manage and dealing with mongo databases and collections.
  • Single: It exposes single operations to delete, insert, replace and update collections.
  • Sink: Implements the same operations as the Operation api, but it in a streaming fashion.
  • Source: Used to fetch data from the collections with aggregate, count, distinct and find.

Each of these components is explained in detail in the following sub-sections, but before we will see how configure and connect to the database server.

No Scala3 support

This connector depends on the mongodb-scala-driver which is not yet published for Scala3 as it's build in gradle and there is many code to be ported to the new Scala3 macros. See community status request. https://www.mongodb.com/community/forums/t/scala-3-support/115681

Dependency

Add the following dependency to get started:

libraryDependencies += "io.monix" %% "monix-mongodb" % "0.6.0"

Collection Reference

Before creating the connection, we would need to have a reference to the collection that we want to interoperate with and use for storing and reading documents.

Such reference is identified by the database and collection names, and represented in code by the CollectionRef sealed trait, which is inherited by CollectionCodecRef and CollectionDocRef.

CollectionCodecRef

The most practical way of representing documents in Scala code is by using a case class, in which then they would be auto-derived as CodecProvider using implicit conversions from import org.mongodb.scala.bson.codecs.Macros._.

For didactic purposes we have defined a hypothetical Employee class with its employees collection, which that will be used to show how to create a collection reference with our custom Codec that will later be used to create the MongoConnection.

import org.mongodb.scala.bson.codecs.Macros.createCodecProvider
import monix.connect.mongodb.client.CollectionCodecRef
import monix.connect.mongodb.client.CollectionRef

case class Employee(name: String, age: Int, city: String, hobbies: List[String] = List.empty)

val employee = Employee("Bob", 29, "Barcelona")
val employeesCol: CollectionRef[Employee] =
  CollectionCodecRef("myDb", "employees", classOf[Employee], createCodecProvider[Employee]())

CollectionDocumentRef

On the other hand, one could also use org.bson.Document as a generic and flexible type for representing a document. In that case CollectionDocumentRef will be used, which would only require the db and collection name to be created:

import monix.connect.mongodb.client.{CollectionDocumentRef, CollectionRef}
import org.bson.Document

val sampleDocument: Document = Document.parse("""{"film_name":"Jumanji", "year": 1995 }""")
val documentCol: CollectionRef[Document] = CollectionDocumentRef("myDb", "employees")

Connection

Once we have created collection reference, we will pass it to the method that creates the connection which also requires either a mongo connectionString or client settings. You can find below an example of connecting to a standalone MongoDB server, but being also configurable for Replica Set, and Sharded Cluster with different options such TLS/SSL, etc. For more details, please refer to the official documentation .

Connection String

See in below code snippet how to create the connection with connection string:

import cats.effect.Resource
import monix.connect.mongodb.client.{CollectionCodecRef, MongoConnection}
import org.mongodb.scala.bson.codecs.Macros.createCodecProvider
import com.mongodb.client.model.Filters

val employeesCol = CollectionCodecRef("myDb", "employees", classOf[Employee], createCodecProvider[Employee]())
val connectionStr = "mongodb://localhost:27017"
val connection = MongoConnection.create1(connectionStr, employeesCol)

Client Settings

On the other hand, you could also build the configuration in a typed way with MongoClientSettings, see an example in below snippet:

import monix.connect.mongodb.client.{CollectionCodecRef, MongoConnection}
import com.mongodb.{MongoClientSettings, ServerAddress}
import org.mongodb.scala.bson.codecs.Macros.createCodecProvider

import scala.jdk.CollectionConverters._

val clientSettings =
  MongoClientSettings.builder()
    .applyToClusterSettings(builder =>
      builder.hosts(
        List(
          new ServerAddress("host1", 27017),
          new ServerAddress("host2", 27017)
        ).asJava
      )
    ).build()

val employeesCol = CollectionCodecRef("myDb", "employees", classOf[Employee], createCodecProvider[Employee]())
val connection = MongoConnection.create1(clientSettings, employeesCol)

CollectionOperator

Notice that the creation of the connection returns cats.effect.Resource, which abstracts the acquisition and release of the resources consumed by creating the MongoClient. The usage of such resources provides a CollectionOperator, which it is composed by four main components: Database, Source, Single and Sink, and together, they bring all the necessary methods to operate with the specified collections.

Multiple Collections

The api is designed to allow using multiple collections (up to 8) in a single connection. This is exposed in the signatures create1, create2, create3, ..., create8, and they a TupleN respectively to the number of collections, see below an example:

import monix.connect.mongodb.client.{CollectionCodecRef, CollectionDocumentRef, MongoConnection}
import com.mongodb.client.model.Filters
import org.mongodb.scala.bson.codecs.Macros.createCodecProvider
import org.bson.Document
import monix.execution.Scheduler.Implicits.global

import scala.jdk.CollectionConverters._

val business: Document = Document.parse(s"""{"business_name":"MyBusiness", "year": 1995}""")
val employee1 = Employee("Employee1", 21, "Paris")
val employee2 = Employee("Employee2", 29, "Amsterdam")
val businessCol = CollectionDocumentRef(database = "myDb", collection = "business_collection")
val employeesCol =
  CollectionCodecRef("myDb", "employees_collection", classOf[Employee], createCodecProvider[Employee]()) 

// calling `create2` because we want operate with two mongodb collections.
MongoConnection.create2("mongodb://host:27017", (employeesCol, businessCol))
  .use { case (employeesOperator, businessOperator) =>
  for {
    //business logic here
    _ <- employeesOperator.single.insertMany(List(employee1, employee2)) >>
      businessOperator.single.insertOne(business)
    parisEmployeesCount <- employeesOperator.source.count(Filters.eq("city", "Paris"))
    myOnlyBusiness <- businessOperator.source.find(Filters.eq("film_name", "MyBusiness")).headL
  } yield (parisEmployeesCount, myOnlyBusiness)
}.runToFuture

Database

Dealing with database operations is a common use case in any driver. Nevertheless, the CollectionOperator comes with an instance of MongoDatabase, allowing to create, drop, list and check existence of both databases and collections,

Exists

The MongoDatabase is just a reference to the target database, which may or may not already exist. In case you want to check that before trying to read or insert any data:

import monix.eval.Task
import monix.connect.mongodb.MongoDb
import monix.connect.mongodb.client.MongoConnection
import monix.connect.mongodb.client.CollectionCodecRef
import org.mongodb.scala.bson.codecs.Macros.createCodecProvider
import monix.execution.Scheduler.Implicits.global

val employeesCol = CollectionCodecRef("myDb", "employees", classOf[Employee], createCodecProvider[Employee]())
val connection = MongoConnection.create1("mongodb://host:27017", employeesCol)
val isDbPresent = connection.use { colOperator =>
  for {
    isDbPresent <- colOperator.db.existsDatabase("myDb")
    isCollectionPresent <- colOperator.db.existsCollection("my_collection")
  } yield (isDbPresent, isCollectionPresent)
}
// if the db existed you could also check the existence of a particular collection
val isCollectionPresent: Task[Boolean] = MongoDb.existsCollection("myDb", "my_collection")

Create

Create a new collection within a given database.

import org.mongodb.scala.bson.codecs.Macros.createCodecProvider
import monix.connect.mongodb.client.{MongoConnection, CollectionCodecRef}

val employeesCol = CollectionCodecRef("myDb", "employeesCol", classOf[Employee], createCodecProvider[Employee]())
val connection = MongoConnection.create1("mongodb://host:27017", employeesCol).use(_.db.createCollection("db123"))

Notice that the collection we are explicitly creating is different to the one we passed to CollectionRef, the reason is because myDb is automatically created with the connection, so we don't need to explicitly create it.

List

List collection and database names.

import monix.connect.mongodb.client.{CollectionCodecRef, CollectionRef, MongoConnection}
import org.mongodb.scala.bson.codecs.Macros.createCodecProvider
import monix.reactive.Observable

val employeesCol = CollectionCodecRef("myDb", "employeesCol", classOf[Employee], createCodecProvider[Employee]())
val connection = MongoConnection.create1("mongodb://host:27017", employeesCol).use { collOperator =>
  //collection names from all databases
  val collections: Observable[String] = for {
    databaseName <- collOperator.db.listDatabases
    collectionName <- collOperator.db.listCollections(databaseName)
    // calling `listCollections` without arguments, would only
    // return the collections names from the current database 
  } yield collectionName
  collections.count
}

Rename

Rename collection's name on convenience.

import cats.effect.Resource
import monix.eval.Task
import monix.connect.mongodb.client.{CollectionOperator, MongoConnection}

val connection: Resource[Task, CollectionOperator[Employee]]
val t = connection.use(_.db.renameCollection("oldCollectionName", "newCollectionName"))

Drop

And finally, drop either the entire database or collection.

import cats.effect.Resource
import monix.eval.Task
import monix.connect.mongodb.client.{CollectionOperator, MongoConnection}

val connection: Resource[Task, CollectionOperator[Document]]
connection.use { operator =>
  for {
    _ <- operator.db.dropCollection("db123", "coll123")
    _ <- operator.db.dropDatabase("db123")
  } yield ()
}.runToFuture

Source

The MongoSource class abstracts the read-like operations aggregate, count, distinct and find and others, see examples for these in next subsections:

Aggregate

This source aggregates documents according to the specified aggregation pipeline. You can find below examples for performing group by and unwind aggregations.

Group By

import org.bson.Document
import com.mongodb.client.model.{Accumulators, Aggregates, Filters}
import cats.effect.Resource
import monix.eval.Task
import monix.connect.mongodb.client.CollectionOperator

val connection: Resource[Task, CollectionOperator[Employee]]
val aggregated: Task[Option[Document]] = connection.use {
 operator =>
   val filter = Aggregates.`match`(Filters.eq("city", "Caracas"))
   val group = Aggregates.group("group", Accumulators.avg("average", "$age"))
   // eventually returns a [[Document]] with the field `average` 
   // that contains the age average of Venezuelan employees.
   operator.source.aggregate(Seq(filter, group)).headOptionL
}

Unwind

Deconstructs an array field from the input documents to output a document for each element. So each output document is a copy of the the main document with the value of the array field replaced by the element.

In the below example the hobbies of the specified employee will be deconstructed:

import com.mongodb.client.model.{Aggregates, Filters}
import monix.connect.mongodb.client.{CollectionCodecRef, MongoConnection}
import monix.execution.Scheduler.Implicits.global
import org.mongodb.scala.bson.codecs.Macros._
import org.bson.codecs.configuration.CodecRegistries.{fromProviders, fromRegistries}
import org.bson.codecs.configuration.CodecRegistry

case class Person(name: String, age: Int, hobbies: Seq[String])

case class UnwoundPerson(name: String, age: Int, hobbies: String)

val codecRegistry: CodecRegistry = fromRegistries(fromProviders(classOf[Person], classOf[UnwoundPerson]))
val col = CollectionCodecRef("myDb", "persons", classOf[Person], codecRegistry)
MongoConnection.create1("mongodb://host:27017", col).use { operator =>
  for {
    _ <- operator.single.insertOne(Person("Mario", 32, List("reading", "running", "programming")))
    unwound <- {
      val filter = Aggregates.`match`(Filters.gte("age", 32))
      val unwind = Aggregates.unwind("$hobbies")
      operator.source.aggregate(Seq(filter, unwind), classOf[UnwoundPerson]).toListL
      /** Returns ->
        * List(
        * UnwoundPerson("Mario", 32, "reading"), 
        * UnwoundPerson("Mario", 32, "running"),
        * UnwoundPerson("Mario", 32, "programming")
        * )
        */
    }
    //
  } yield unwound
}.runToFuture

Count

countAll

Counts all the documents, without filtering.

import cats.effect.Resource
import monix.connect.mongodb.client.{CollectionOperator, MongoConnection}
import monix.eval.Task

val connection: Resource[Task, CollectionOperator[Document]]
val count = connection.use(_.source.countAll)

count

Counts the number of elements that matched with the given filter.

import cats.effect.Resource
import com.mongodb.client.model.Filters
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task

val connection: Resource[Task, CollectionOperator[Employee]]
// counts the number of employees based in Sydney
val count = connection.use(_.source.count(Filters.eq("city", "Sydney")))

Distinct

Gets the distinct values of the specified field name.

import cats.effect.Resource
import com.mongodb.client.model.Filters
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task

val connection: Resource[Task, CollectionOperator[Employee]]
// count of the distinct cities
val distinctCount = connection.use(_.source.distinct("city", classOf[String]).countL)

Find

findAll

Finds all the documents.

import cats.effect.Resource
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
import monix.reactive.Observable

val connection: Resource[Task, CollectionOperator[Employee]]
val t = connection.use { operator =>
  val allEmployees: Observable[Employee] = operator.source.findAll
  //business logic here
  allEmployees.completedL
}

find

Finds the documents in the collection that matched the query filter.

import cats.effect.Resource
import com.mongodb.client.model.Filters
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
import monix.reactive.Observable

val connection: Resource[Task, CollectionOperator[Employee]]
connection.use { operator =>
  val rioEmployees: Observable[Employee] = operator.source.find(Filters.eq("city", "Rio"))
  //business logic here
  rioEmployees.completedL
}.runToFuture

findOne

Finds the first encountered document in the collection that matched the query filter if exists, or the empty option if does not.

import cats.effect.Resource
import com.mongodb.client.model.Filters
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
import monix.reactive.Observable

val connection: Resource[Task, CollectionOperator[Employee]]
connection.use { operator =>
  val rioEmployee: Task[Option[Employee]] = operator.source.findOne(Filters.eq("city", "Rio"))
  //business logic here
  rioEmployee
}.runToFuture

findOneAndDelete

Atomically find a document and remove it.

import cats.effect.Resource
import com.mongodb.client.model.Filters
import monix.connect.mongodb.client.{CollectionOperator, MongoConnection}
import monix.eval.Task
import monix.reactive.Observable
import org.bson.conversions.Bson

// finds and deletes one Cracow employee older than 66 and
val filter = Filters.and(Filters.eq("city", "Cracow"), Filters.gt("age", 66))
val connection: Resource[Task, CollectionOperator[Employee]]
connection.use { operator =>
  val retiredEmployee: Task[Option[Employee]] = operator.source.findOneAndDelete(filter)
  //business logic here
  retiredEmployee
}.runToFuture

findOneAndReplace

Atomically find a document and replace it.

import cats.effect.Resource
import com.mongodb.client.model.Filters
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task

val filter = Filters.and(Filters.eq("city", "Cadiz"), Filters.eq("name", "Gustavo"))
val replacement = Employee(name = "Alberto", city = "Cadiz", age = 33)
val connection: Resource[Task, CollectionOperator[Employee]]
connection.use { operator =>
  val replacedEmployee: Task[Option[Employee]] = operator.source.findOneAndReplace(filter, replacement)
  //business logic here
  replacedEmployee
}.runToFuture

findOneAndUpdate

Atomically find a document and update it.

import cats.effect.Resource
import com.mongodb.client.model.{Filters, Updates}
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task

val filter = Filters.and(Filters.eq("city", "Moscou"), Filters.eq("name", "Vladimir"))
val update = Updates.inc("age", 1)
val connection: Resource[Task, CollectionOperator[Employee]]
connection.use(_.source.findOneAndUpdate(filter, update)).runToFuture

Single & Sinks

The Single and Sink components are documented together since they both implement exactly the same set of operations insert, delete, replace and update, although with the difference that the first executes the operation in single task and the latter pipes the elements of the stream to a Monix Consumer to execute them. Most of the Sink consumers have an alternative method with the suffix Par, which executes requests grouped in sequences in a parallel fashion as batch operations.

The following sub-sections represent the list of different operations and sinks available to use, with a small example on each one.

Insert

insertOne

Inserts the provided document to the specified collection.

Single:

import cats.effect.Resource
import monix.connect.mongodb.domain.InsertOneResult
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task

val employee: Employee = Employee("Luke", "London", 41)
val connection: Resource[Task, CollectionOperator[Employee]]
val t: Task[InsertOneResult] = connection.use(_.single.insertOne(employee))

Sink:

import cats.effect.Resource
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
import monix.reactive.Observable

val employee = List(Employee("Luke", "London", 41))
val connection: Resource[Task, CollectionOperator[Employee]]
connection.use { operator =>
  Observable.from(employee) // Observable[Employee]
    .consumeWith(operator.sink.insertOne())
}.runToFuture

val employees = List(Employee("Luke", "London", 41), Employee("Matt", "Warsaw", 23), 
  Employee("Jack", "Berlin", 24), Employee("John", "London", 41))
connection.use { operator =>
  Observable.from(employees.grouped(2).toList) // Observable[List[Employee]]
    .consumeWith(operator.sink.insertOnePar())
}.runToFuture

insertMany

Inserts the provided list of documents to the specified collection.

Single:

import cats.effect.Resource
import monix.connect.mongodb.domain.InsertManyResult
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task

val employees: List[Employee] = List(Employee("Jim", "Vienna", 33), Employee("Albert", "Vienna", 45))

val connection: Resource[Task, CollectionOperator[Employee]]
val t: Task[InsertManyResult] = connection.use(_.single.insertMany(employees))

Sink:

import cats.effect.Resource
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
import monix.reactive.Observable

val employees: List[Employee] = List(
  Employee("Salomon", "Berlin", 23),
  Employee("Victor", "Berlin", 54),
  Employee("Gabriel", "Paris", 35)
)
val connection: Resource[Task, CollectionOperator[Employee]]
connection.use { operator =>
  Observable.from(employees) // Observable[Employee]
    .bufferTumbling(3) // Observable[Seq[Employee]]
    .consumeWith(operator.sink.insertMany())
}.runToFuture

Delete

deleteOne

Removes at most one document from the collection that matches the given filter. If no documents match, the _ collection_ is not modified.

Single:

import cats.effect.Resource
import com.mongodb.client.model.Filters
import monix.connect.mongodb.domain.DeleteResult
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task

val filter = Filters.eq("name", "Bob")
val connection: Resource[Task, CollectionOperator[Employee]]
val t: Task[DeleteResult] = connection.use(_.single.deleteOne(filter))

Sink:

import com.mongodb.client.model.Filters
import cats.effect.Resource
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
import monix.reactive.Observable

val connection: Resource[Task, CollectionOperator[Employee]]
connection.use { operator =>
  // employees to be deleted from the collection
  Observable.from(List("Lauren", "Marie", "Simon")) // Observable[String] 
    .map(name => Filters.eq("name", name)) // Observable[Bson]
    .consumeWith(operator.sink.deleteOne())
}.runToFuture

connection.use { operator =>
  // employees to be deleted from the collection
  Observable.from(List("Lauren", "Marie", "Simon", "Matt").grouped(2).toList) // Observable[List[String]] 
    .map(names => names.map(name => Filters.eq("name", name))) // Observable[List[Bson]]
    .consumeWith(operator.sink.deleteOnePar())
}.runToFuture

deleteMany

Removes all documents from the collection that match the given query filter. Again if no documents match, the _ collection_ is not modified.

Single:

import cats.effect.Resource
import com.mongodb.client.model.Filters
import monix.connect.mongodb.domain.DeleteResult
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task

val connection: Resource[Task, CollectionOperator[Employee]]
// delete all employees older than 65 y.o
val filter = Filters.gt("age", 65)
val t: Task[DeleteResult] = connection.use(_.single.deleteMany(filter))

Sink:

import cats.effect.Resource
import com.mongodb.client.model.Filters
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
import monix.reactive.Observable

val connection: Resource[Task, CollectionOperator[Employee]]
connection.use { operator =>
  // all employees from `Germany`, `Italy` or `Turkey` were fired (deleted)
  Observable.from(List("Germany", "Italy", "Turkey")) // Observable[String] 
    .map(city => Filters.eq("city", city)) // Observable[Bson]
    .consumeWith(operator.sink.deleteMany())
}.runToFuture

connection.use { operator =>
  // all employees from `Germany`, `Italy` or `Turkey` were fired (deleted)
  Observable.from(List("Germany", "Italy", "Turkey").grouped(2).toList) // Observable[List[String]] 
    .map(cities => cities.map(city => Filters.eq("city", city))) // Observable[List[Bson]]
    .consumeWith(operator.sink.deleteManyPar())
}.runToFuture

Replace

replaceOne

Replaces a document in the collection according to the specified arguments.

Single:

import com.mongodb.client.model.Filters
import monix.connect.mongodb.domain.DeleteResult
import cats.effect.Resource
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task

val connection: Resource[Task, CollectionOperator[Employee]]
// assuming that `Bob` exists in the collection it gets 
// retired and replaced by `Alice`
val filter = Filters.eq("name", "Bob")
val replacement = Employee("Alice", "France", 43)
val t: Task[DeleteResult] = connection.use(_.single.replaceOne(filter, replacement))

Sink:

import cats.effect.Resource
import com.mongodb.client.model.Filters
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
import monix.reactive.Observable
import org.bson.conversions.Bson

val connection: Resource[Task, CollectionOperator[Employee]]
// Presumably the collection have employees 1 and 2, then they are replaced by 3 and 4
val replacements: Seq[(Bson, Employee)] =
  List(
    (Filters.eq("name", "Employee1"), Employee("Employee3", 43, "Rio")),
    (Filters.eq("name", "Employee2"), Employee("Employee4", 37, "Rio"))
  )
connection.use { operator =>
  Observable.from(replacements) // Observable[(Bson, Employee)]
    .consumeWith(operator.sink.replaceOne())
}.runToFuture

val replacements2: Seq[(Bson, Employee)] =
  List(
    (Filters.eq("name", "Employee1"), Employee("Employee5", 43, "Rio")),
    (Filters.eq("name", "Employee2"), Employee("Employee6", 37, "Rio")),
    (Filters.eq("name", "Employee3"), Employee("Employee7", 43, "Rio")),
    (Filters.eq("name", "Employee4"), Employee("Employee8", 37, "Rio"))
  )
connection.use { operator =>
  Observable.from(replacements2.grouped(2).toList) // Observable[List[(Bson, Employee)]]
    .consumeWith(operator.sink.replaceOnePar())
}.runToFuture

Update

updateOne

Update a single document in the collection according to the specified arguments.

Single:

import cats.effect.Resource
import com.mongodb.client.model.{Filters, Updates}
import monix.connect.mongodb.domain.UpdateResult
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task

val connection: Resource[Task, CollectionOperator[Employee]]
/** Updates Bob's employee record for its anniversary, an empty option is returned 
  * when there was no record that matched with the given filter. */
val filter = Filters.eq("name", "Bob")
val anniversaryUpdate = Updates.inc("age", 1)
val t: Task[UpdateResult] = connection.use(_.single.updateOne(filter, anniversaryUpdate))

Sink:

import cats.effect.Resource
import com.mongodb.client.model.{Filters, Updates}
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
import monix.reactive.Observable
import org.bson.conversions.Bson

val connection: Resource[Task, CollectionOperator[Employee]]
connection.use { operator =>
  val filter = Filters.eq("city", "Porto")
  val update = Updates.set("city", "Lisbon")
  val updateElements: Seq[(Bson, Bson)] = List.fill(4)((filter, update))
  // imagine that a company wants to send four of its employees from Porto to Lisbon
  Observable.from(updateElements) // Observable[(Bson, Bson)]
    .consumeWith(operator.sink.updateOne())
}.runToFuture

connection.use { operator =>
    val filter = Filters.eq("city", "Porto")
    val update = Updates.set("city", "Lisbon")
    val updateElements: List[List[(Bson, Bson)]] = List.fill(4)((filter, update)).grouped(2).toList
    // imagine that a company wants to send four of its employees from Porto to Lisbon
    Observable.from(updateElements) // Observable[List[(Bson, Bson)]]
      .consumeWith(operator.sink.updateOnePar())
  }.runToFuture

updateMany

Update all documents in the collection according to the specified arguments.

Single:

import cats.effect.Resource
import com.mongodb.client.model.{Filters, Updates}
import monix.connect.mongodb.domain.UpdateResult
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task

val connection: Resource[Task, CollectionOperator[Employee]]
val filter = Filters.eq("city", "Spain")
val update = Updates.push("hobbies", "Football")
val t: Task[UpdateResult] = connection.use(_.single.updateMany(filter, update))

Sink:

import cats.effect.Resource
import com.mongodb.client.model.{Filters, Updates}
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
import monix.reactive.Observable
import org.bson.conversions.Bson

val connection: Resource[Task, CollectionOperator[Employee]]
connection.use { operator =>
  val cities: Set[String] = Set("Seattle", "Nairobi", "Dakar")
  Observable.from(cities) // Observable[String]
    .map(city => (Filters.eq("city", city), Updates.pull("hobbies", "Table Tennis"))) // Observable[(Bson, Bson)]
    .consumeWith(operator.sink.updateMany())
}.runToFuture

connection.use { operator =>
  val cities: Set[String] = Set("Seattle", "Nairobi", "Dakar")
  Observable.from(cities.grouped(2).toList) // Observable[String]
    .map(cities => cities.map(city => (
      Filters.eq("city", city),
      Updates.pull("hobbies", "Table Tennis")
    )).toList) // Observable[List[(Bson, Bson)]]
    .consumeWith(operator.sink.updateManyPar())
}.runToFuture

Indexing

createIndex

Creates an index on the collection according to the specified arguments.

Single:

import cats.effect.Resource
import com.mongodb.client.model.{Filters, IndexModel, Indexes, Updates}
import monix.connect.mongodb.domain.UpdateResult
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task

val connection: Resource[Task, CollectionOperator[Employee]]
val key = new IndexModel(Indexes.ascending("name")).getKeys
val t: Task[UpdateResult] = connection.use(_.single.createIndex(key))

createIndexes

Create multiple indexes on the collection according to the specified arguments.

Single:

import cats.effect.Resource
import com.mongodb.client.model.{CreateIndexOptions, Filters, IndexModel, IndexOptions, Indexes, Updates}
import monix.connect.mongodb.domain.UpdateResult
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task

import java.util.concurrent.TimeUnit

val connection: Resource[Task, CollectionOperator[Employee]]
val indexes = List(
  new IndexModel(Indexes.descending("name")),
  new IndexModel(Indexes.ascending("city"), new IndexOptions().background(true).unique(false))
)
val options = new CreateIndexOptions().maxTime(5, TimeUnit.SECONDS)
val t: Task[UpdateResult] = connection.use(_.single.createIndexes(indexes, options))
← HDFSRedis →
  • Introduction
    • No Scala3 support
  • Dependency
  • Collection Reference
    • CollectionCodecRef
    • CollectionDocumentRef
  • Connection
    • Connection String
    • Client Settings
  • CollectionOperator
    • Multiple Collections
  • Database
    • Exists
    • Create
    • List
    • Rename
    • Drop
  • Source
    • Aggregate
    • Count
    • Distinct
    • Find
  • Single & Sinks
    • Insert
    • Delete
    • Replace
    • Update
    • Indexing

Copyright © 2020-2022 The Monix Connect Developers.