MongoDB
Introduction
MongoDB is a document database, in which the data is stored in JSON-like documents differing to the traditional _
row/column_ model. It has a rich and expressive query language that allows you to filter and sort by any field with
support for aggregations and other modern use-cases. The Monix MongoDB connector offers a reactive, non blocking
and resource safe api, which relies in the underlying
the MongoDB Java Reactive Streams driver. The library core data type
to interoperate with collections is CollectionOperator[Doc]
, which is composed by the following four main pillars:
- Database: Used to manage and dealing with mongo databases and collections.
- Single: It exposes single operations to delete, insert, replace and update collections.
- Sink: Implements the same operations as the Operation api, but it in a streaming fashion.
- Source: Used to fetch data from the collections with aggregate, count, distinct and find.
Each of these components is explained in detail in the following sub-sections, but before we will see how configure and connect to the database server.
No Scala3 support
This connector depends on the mongodb-scala-driver which is not yet published for Scala3 as it's build in gradle and there is many code to be ported to the new Scala3 macros. See community status request. https://www.mongodb.com/community/forums/t/scala-3-support/115681
Dependency
Add the following dependency to get started:
libraryDependencies += "io.monix" %% "monix-mongodb" % "0.6.0"
Collection Reference
Before creating the connection, we would need to have a reference to the collection that we want to interoperate with and use for storing and reading documents.
Such reference is identified by the database and collection names, and represented in code by
the CollectionRef
sealed trait, which is inherited by CollectionCodecRef
and CollectionDocRef
.
CollectionCodecRef
The most practical way of representing documents in Scala code is by using a case class, in which then they
would be auto-derived as CodecProvider
using implicit conversions
from import org.mongodb.scala.bson.codecs.Macros._
.
For didactic purposes we have defined a hypothetical Employee
class with its employees
collection, which
that will be used to show how to create a collection reference with our custom Codec that will later be used to create the MongoConnection
.
import org.mongodb.scala.bson.codecs.Macros.createCodecProvider
import monix.connect.mongodb.client.CollectionCodecRef
import monix.connect.mongodb.client.CollectionRef
case class Employee(name: String, age: Int, city: String, hobbies: List[String] = List.empty)
val employee = Employee("Bob", 29, "Barcelona")
val employeesCol: CollectionRef[Employee] =
CollectionCodecRef("myDb", "employees", classOf[Employee], createCodecProvider[Employee]())
CollectionDocumentRef
On the other hand, one could also use org.bson.Document
as a generic and flexible type for representing a document.
In that case CollectionDocumentRef
will be used, which would only require the db and collection name to be
created:
import monix.connect.mongodb.client.{CollectionDocumentRef, CollectionRef}
import org.bson.Document
val sampleDocument: Document = Document.parse("""{"film_name":"Jumanji", "year": 1995 }""")
val documentCol: CollectionRef[Document] = CollectionDocumentRef("myDb", "employees")
Connection
Once we have created collection reference, we will pass it to the method that creates the connection which also requires either a mongo connectionString or client settings. You can find below an example of connecting to a standalone MongoDB server, but being also configurable for Replica Set, and Sharded Cluster with different options such TLS/SSL, etc. For more details, please refer to the official documentation .
Connection String
See in below code snippet how to create the connection with connection string:
import cats.effect.Resource
import monix.connect.mongodb.client.{CollectionCodecRef, MongoConnection}
import org.mongodb.scala.bson.codecs.Macros.createCodecProvider
import com.mongodb.client.model.Filters
val employeesCol = CollectionCodecRef("myDb", "employees", classOf[Employee], createCodecProvider[Employee]())
val connectionStr = "mongodb://localhost:27017"
val connection = MongoConnection.create1(connectionStr, employeesCol)
Client Settings
On the other hand, you could also build the configuration in a typed way with MongoClientSettings
, see an example in
below snippet:
import monix.connect.mongodb.client.{CollectionCodecRef, MongoConnection}
import com.mongodb.{MongoClientSettings, ServerAddress}
import org.mongodb.scala.bson.codecs.Macros.createCodecProvider
import scala.jdk.CollectionConverters._
val clientSettings =
MongoClientSettings.builder()
.applyToClusterSettings(builder =>
builder.hosts(
List(
new ServerAddress("host1", 27017),
new ServerAddress("host2", 27017)
).asJava
)
).build()
val employeesCol = CollectionCodecRef("myDb", "employees", classOf[Employee], createCodecProvider[Employee]())
val connection = MongoConnection.create1(clientSettings, employeesCol)
CollectionOperator
Notice that the creation of the connection returns cats.effect.Resource
, which abstracts the acquisition and release
of the resources consumed by creating the MongoClient
. The usage of such resources provides a CollectionOperator
,
which it is composed by four main components: Database
, Source
, Single
and Sink
, and together, they bring all
the necessary methods to operate with the specified collections.
Multiple Collections
The api is designed to allow using multiple collections (up to 8) in a single connection. This is exposed in the
signatures create1
, create2
, create3
, ..., create8
, and they a TupleN
respectively to the number of
collections, see below an example:
import monix.connect.mongodb.client.{CollectionCodecRef, CollectionDocumentRef, MongoConnection}
import com.mongodb.client.model.Filters
import org.mongodb.scala.bson.codecs.Macros.createCodecProvider
import org.bson.Document
import monix.execution.Scheduler.Implicits.global
import scala.jdk.CollectionConverters._
val business: Document = Document.parse(s"""{"business_name":"MyBusiness", "year": 1995}""")
val employee1 = Employee("Employee1", 21, "Paris")
val employee2 = Employee("Employee2", 29, "Amsterdam")
val businessCol = CollectionDocumentRef(database = "myDb", collection = "business_collection")
val employeesCol =
CollectionCodecRef("myDb", "employees_collection", classOf[Employee], createCodecProvider[Employee]())
// calling `create2` because we want operate with two mongodb collections.
MongoConnection.create2("mongodb://host:27017", (employeesCol, businessCol))
.use { case (employeesOperator, businessOperator) =>
for {
//business logic here
_ <- employeesOperator.single.insertMany(List(employee1, employee2)) >>
businessOperator.single.insertOne(business)
parisEmployeesCount <- employeesOperator.source.count(Filters.eq("city", "Paris"))
myOnlyBusiness <- businessOperator.source.find(Filters.eq("film_name", "MyBusiness")).headL
} yield (parisEmployeesCount, myOnlyBusiness)
}.runToFuture
Database
Dealing with database operations is a common use case in any driver. Nevertheless, the CollectionOperator
comes with
an instance of MongoDatabase
, allowing to create, drop, list and check existence of both databases
and collections,
Exists
The MongoDatabase
is just a reference to the target database, which may or may not already exist. In case you want
to check that before trying to read or insert any data:
import monix.eval.Task
import monix.connect.mongodb.MongoDb
import monix.connect.mongodb.client.MongoConnection
import monix.connect.mongodb.client.CollectionCodecRef
import org.mongodb.scala.bson.codecs.Macros.createCodecProvider
import monix.execution.Scheduler.Implicits.global
val employeesCol = CollectionCodecRef("myDb", "employees", classOf[Employee], createCodecProvider[Employee]())
val connection = MongoConnection.create1("mongodb://host:27017", employeesCol)
val isDbPresent = connection.use { colOperator =>
for {
isDbPresent <- colOperator.db.existsDatabase("myDb")
isCollectionPresent <- colOperator.db.existsCollection("my_collection")
} yield (isDbPresent, isCollectionPresent)
}
// if the db existed you could also check the existence of a particular collection
val isCollectionPresent: Task[Boolean] = MongoDb.existsCollection("myDb", "my_collection")
Create
Create a new collection within a given database.
import org.mongodb.scala.bson.codecs.Macros.createCodecProvider
import monix.connect.mongodb.client.{MongoConnection, CollectionCodecRef}
val employeesCol = CollectionCodecRef("myDb", "employeesCol", classOf[Employee], createCodecProvider[Employee]())
val connection = MongoConnection.create1("mongodb://host:27017", employeesCol).use(_.db.createCollection("db123"))
Notice that the collection we are explicitly creating is different to the one we passed to CollectionRef
, the reason
is because myDb
is automatically created with the connection, so we don't need to explicitly create it.
List
List collection and database names.
import monix.connect.mongodb.client.{CollectionCodecRef, CollectionRef, MongoConnection}
import org.mongodb.scala.bson.codecs.Macros.createCodecProvider
import monix.reactive.Observable
val employeesCol = CollectionCodecRef("myDb", "employeesCol", classOf[Employee], createCodecProvider[Employee]())
val connection = MongoConnection.create1("mongodb://host:27017", employeesCol).use { collOperator =>
//collection names from all databases
val collections: Observable[String] = for {
databaseName <- collOperator.db.listDatabases
collectionName <- collOperator.db.listCollections(databaseName)
// calling `listCollections` without arguments, would only
// return the collections names from the current database
} yield collectionName
collections.count
}
Rename
Rename collection's name on convenience.
import cats.effect.Resource
import monix.eval.Task
import monix.connect.mongodb.client.{CollectionOperator, MongoConnection}
val connection: Resource[Task, CollectionOperator[Employee]]
val t = connection.use(_.db.renameCollection("oldCollectionName", "newCollectionName"))
Drop
And finally, drop either the entire database or collection.
import cats.effect.Resource
import monix.eval.Task
import monix.connect.mongodb.client.{CollectionOperator, MongoConnection}
val connection: Resource[Task, CollectionOperator[Document]]
connection.use { operator =>
for {
_ <- operator.db.dropCollection("db123", "coll123")
_ <- operator.db.dropDatabase("db123")
} yield ()
}.runToFuture
Source
The MongoSource
class abstracts the read-like operations aggregate, count, distinct and find and others, see
examples for these in next subsections:
Aggregate
This source aggregates documents according to the specified aggregation pipeline. You can find below examples for performing group by and unwind aggregations.
Group By
import org.bson.Document
import com.mongodb.client.model.{Accumulators, Aggregates, Filters}
import cats.effect.Resource
import monix.eval.Task
import monix.connect.mongodb.client.CollectionOperator
val connection: Resource[Task, CollectionOperator[Employee]]
val aggregated: Task[Option[Document]] = connection.use {
operator =>
val filter = Aggregates.`match`(Filters.eq("city", "Caracas"))
val group = Aggregates.group("group", Accumulators.avg("average", "$age"))
// eventually returns a [[Document]] with the field `average`
// that contains the age average of Venezuelan employees.
operator.source.aggregate(Seq(filter, group)).headOptionL
}
Unwind
Deconstructs an array field from the input documents to output a document for each element. So each output document is a copy of the the main document with the value of the array field replaced by the element.
In the below example the hobbies of the specified employee will be deconstructed:
import com.mongodb.client.model.{Aggregates, Filters}
import monix.connect.mongodb.client.{CollectionCodecRef, MongoConnection}
import monix.execution.Scheduler.Implicits.global
import org.mongodb.scala.bson.codecs.Macros._
import org.bson.codecs.configuration.CodecRegistries.{fromProviders, fromRegistries}
import org.bson.codecs.configuration.CodecRegistry
case class Person(name: String, age: Int, hobbies: Seq[String])
case class UnwoundPerson(name: String, age: Int, hobbies: String)
val codecRegistry: CodecRegistry = fromRegistries(fromProviders(classOf[Person], classOf[UnwoundPerson]))
val col = CollectionCodecRef("myDb", "persons", classOf[Person], codecRegistry)
MongoConnection.create1("mongodb://host:27017", col).use { operator =>
for {
_ <- operator.single.insertOne(Person("Mario", 32, List("reading", "running", "programming")))
unwound <- {
val filter = Aggregates.`match`(Filters.gte("age", 32))
val unwind = Aggregates.unwind("$hobbies")
operator.source.aggregate(Seq(filter, unwind), classOf[UnwoundPerson]).toListL
/** Returns ->
* List(
* UnwoundPerson("Mario", 32, "reading"),
* UnwoundPerson("Mario", 32, "running"),
* UnwoundPerson("Mario", 32, "programming")
* )
*/
}
//
} yield unwound
}.runToFuture
Count
countAll
Counts all the documents, without filtering.
import cats.effect.Resource
import monix.connect.mongodb.client.{CollectionOperator, MongoConnection}
import monix.eval.Task
val connection: Resource[Task, CollectionOperator[Document]]
val count = connection.use(_.source.countAll)
count
Counts the number of elements that matched with the given filter.
import cats.effect.Resource
import com.mongodb.client.model.Filters
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
val connection: Resource[Task, CollectionOperator[Employee]]
// counts the number of employees based in Sydney
val count = connection.use(_.source.count(Filters.eq("city", "Sydney")))
Distinct
Gets the distinct values of the specified field name.
import cats.effect.Resource
import com.mongodb.client.model.Filters
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
val connection: Resource[Task, CollectionOperator[Employee]]
// count of the distinct cities
val distinctCount = connection.use(_.source.distinct("city", classOf[String]).countL)
Find
findAll
Finds all the documents.
import cats.effect.Resource
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
import monix.reactive.Observable
val connection: Resource[Task, CollectionOperator[Employee]]
val t = connection.use { operator =>
val allEmployees: Observable[Employee] = operator.source.findAll
//business logic here
allEmployees.completedL
}
find
Finds the documents in the collection that matched the query filter.
import cats.effect.Resource
import com.mongodb.client.model.Filters
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
import monix.reactive.Observable
val connection: Resource[Task, CollectionOperator[Employee]]
connection.use { operator =>
val rioEmployees: Observable[Employee] = operator.source.find(Filters.eq("city", "Rio"))
//business logic here
rioEmployees.completedL
}.runToFuture
findOne
Finds the first encountered document in the collection that matched the query filter if exists, or the empty option if does not.
import cats.effect.Resource
import com.mongodb.client.model.Filters
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
import monix.reactive.Observable
val connection: Resource[Task, CollectionOperator[Employee]]
connection.use { operator =>
val rioEmployee: Task[Option[Employee]] = operator.source.findOne(Filters.eq("city", "Rio"))
//business logic here
rioEmployee
}.runToFuture
findOneAndDelete
Atomically find a document and remove it.
import cats.effect.Resource
import com.mongodb.client.model.Filters
import monix.connect.mongodb.client.{CollectionOperator, MongoConnection}
import monix.eval.Task
import monix.reactive.Observable
import org.bson.conversions.Bson
// finds and deletes one Cracow employee older than 66 and
val filter = Filters.and(Filters.eq("city", "Cracow"), Filters.gt("age", 66))
val connection: Resource[Task, CollectionOperator[Employee]]
connection.use { operator =>
val retiredEmployee: Task[Option[Employee]] = operator.source.findOneAndDelete(filter)
//business logic here
retiredEmployee
}.runToFuture
findOneAndReplace
Atomically find a document and replace it.
import cats.effect.Resource
import com.mongodb.client.model.Filters
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
val filter = Filters.and(Filters.eq("city", "Cadiz"), Filters.eq("name", "Gustavo"))
val replacement = Employee(name = "Alberto", city = "Cadiz", age = 33)
val connection: Resource[Task, CollectionOperator[Employee]]
connection.use { operator =>
val replacedEmployee: Task[Option[Employee]] = operator.source.findOneAndReplace(filter, replacement)
//business logic here
replacedEmployee
}.runToFuture
findOneAndUpdate
Atomically find a document and update it.
import cats.effect.Resource
import com.mongodb.client.model.{Filters, Updates}
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
val filter = Filters.and(Filters.eq("city", "Moscou"), Filters.eq("name", "Vladimir"))
val update = Updates.inc("age", 1)
val connection: Resource[Task, CollectionOperator[Employee]]
connection.use(_.source.findOneAndUpdate(filter, update)).runToFuture
Single & Sinks
The Single
and Sink
components are documented together since they both implement exactly the same set of
operations insert, delete, replace and update, although with the difference that the first executes the
operation in single task and the latter pipes the elements of the stream to a Monix Consumer
to execute them.
Most of the Sink
consumers have an alternative method with the suffix Par
, which executes requests grouped
in sequences in a parallel fashion as batch operations.
The following sub-sections represent the list of different operations and sinks available to use, with a small example on each one.
Insert
insertOne
Inserts the provided document to the specified collection.
Single:
import cats.effect.Resource
import monix.connect.mongodb.domain.InsertOneResult
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
val employee: Employee = Employee("Luke", "London", 41)
val connection: Resource[Task, CollectionOperator[Employee]]
val t: Task[InsertOneResult] = connection.use(_.single.insertOne(employee))
Sink:
import cats.effect.Resource
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
import monix.reactive.Observable
val employee = List(Employee("Luke", "London", 41))
val connection: Resource[Task, CollectionOperator[Employee]]
connection.use { operator =>
Observable.from(employee) // Observable[Employee]
.consumeWith(operator.sink.insertOne())
}.runToFuture
val employees = List(Employee("Luke", "London", 41), Employee("Matt", "Warsaw", 23),
Employee("Jack", "Berlin", 24), Employee("John", "London", 41))
connection.use { operator =>
Observable.from(employees.grouped(2).toList) // Observable[List[Employee]]
.consumeWith(operator.sink.insertOnePar())
}.runToFuture
insertMany
Inserts the provided list of documents to the specified collection.
Single:
import cats.effect.Resource
import monix.connect.mongodb.domain.InsertManyResult
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
val employees: List[Employee] = List(Employee("Jim", "Vienna", 33), Employee("Albert", "Vienna", 45))
val connection: Resource[Task, CollectionOperator[Employee]]
val t: Task[InsertManyResult] = connection.use(_.single.insertMany(employees))
Sink:
import cats.effect.Resource
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
import monix.reactive.Observable
val employees: List[Employee] = List(
Employee("Salomon", "Berlin", 23),
Employee("Victor", "Berlin", 54),
Employee("Gabriel", "Paris", 35)
)
val connection: Resource[Task, CollectionOperator[Employee]]
connection.use { operator =>
Observable.from(employees) // Observable[Employee]
.bufferTumbling(3) // Observable[Seq[Employee]]
.consumeWith(operator.sink.insertMany())
}.runToFuture
Delete
deleteOne
Removes at most one document from the collection that matches the given filter. If no documents match, the _ collection_ is not modified.
Single:
import cats.effect.Resource
import com.mongodb.client.model.Filters
import monix.connect.mongodb.domain.DeleteResult
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
val filter = Filters.eq("name", "Bob")
val connection: Resource[Task, CollectionOperator[Employee]]
val t: Task[DeleteResult] = connection.use(_.single.deleteOne(filter))
Sink:
import com.mongodb.client.model.Filters
import cats.effect.Resource
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
import monix.reactive.Observable
val connection: Resource[Task, CollectionOperator[Employee]]
connection.use { operator =>
// employees to be deleted from the collection
Observable.from(List("Lauren", "Marie", "Simon")) // Observable[String]
.map(name => Filters.eq("name", name)) // Observable[Bson]
.consumeWith(operator.sink.deleteOne())
}.runToFuture
connection.use { operator =>
// employees to be deleted from the collection
Observable.from(List("Lauren", "Marie", "Simon", "Matt").grouped(2).toList) // Observable[List[String]]
.map(names => names.map(name => Filters.eq("name", name))) // Observable[List[Bson]]
.consumeWith(operator.sink.deleteOnePar())
}.runToFuture
deleteMany
Removes all documents from the collection that match the given query filter. Again if no documents match, the _ collection_ is not modified.
Single:
import cats.effect.Resource
import com.mongodb.client.model.Filters
import monix.connect.mongodb.domain.DeleteResult
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
val connection: Resource[Task, CollectionOperator[Employee]]
// delete all employees older than 65 y.o
val filter = Filters.gt("age", 65)
val t: Task[DeleteResult] = connection.use(_.single.deleteMany(filter))
Sink:
import cats.effect.Resource
import com.mongodb.client.model.Filters
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
import monix.reactive.Observable
val connection: Resource[Task, CollectionOperator[Employee]]
connection.use { operator =>
// all employees from `Germany`, `Italy` or `Turkey` were fired (deleted)
Observable.from(List("Germany", "Italy", "Turkey")) // Observable[String]
.map(city => Filters.eq("city", city)) // Observable[Bson]
.consumeWith(operator.sink.deleteMany())
}.runToFuture
connection.use { operator =>
// all employees from `Germany`, `Italy` or `Turkey` were fired (deleted)
Observable.from(List("Germany", "Italy", "Turkey").grouped(2).toList) // Observable[List[String]]
.map(cities => cities.map(city => Filters.eq("city", city))) // Observable[List[Bson]]
.consumeWith(operator.sink.deleteManyPar())
}.runToFuture
Replace
replaceOne
Replaces a document in the collection according to the specified arguments.
Single:
import com.mongodb.client.model.Filters
import monix.connect.mongodb.domain.DeleteResult
import cats.effect.Resource
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
val connection: Resource[Task, CollectionOperator[Employee]]
// assuming that `Bob` exists in the collection it gets
// retired and replaced by `Alice`
val filter = Filters.eq("name", "Bob")
val replacement = Employee("Alice", "France", 43)
val t: Task[DeleteResult] = connection.use(_.single.replaceOne(filter, replacement))
Sink:
import cats.effect.Resource
import com.mongodb.client.model.Filters
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
import monix.reactive.Observable
import org.bson.conversions.Bson
val connection: Resource[Task, CollectionOperator[Employee]]
// Presumably the collection have employees 1 and 2, then they are replaced by 3 and 4
val replacements: Seq[(Bson, Employee)] =
List(
(Filters.eq("name", "Employee1"), Employee("Employee3", 43, "Rio")),
(Filters.eq("name", "Employee2"), Employee("Employee4", 37, "Rio"))
)
connection.use { operator =>
Observable.from(replacements) // Observable[(Bson, Employee)]
.consumeWith(operator.sink.replaceOne())
}.runToFuture
val replacements2: Seq[(Bson, Employee)] =
List(
(Filters.eq("name", "Employee1"), Employee("Employee5", 43, "Rio")),
(Filters.eq("name", "Employee2"), Employee("Employee6", 37, "Rio")),
(Filters.eq("name", "Employee3"), Employee("Employee7", 43, "Rio")),
(Filters.eq("name", "Employee4"), Employee("Employee8", 37, "Rio"))
)
connection.use { operator =>
Observable.from(replacements2.grouped(2).toList) // Observable[List[(Bson, Employee)]]
.consumeWith(operator.sink.replaceOnePar())
}.runToFuture
Update
updateOne
Update a single document in the collection according to the specified arguments.
Single:
import cats.effect.Resource
import com.mongodb.client.model.{Filters, Updates}
import monix.connect.mongodb.domain.UpdateResult
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
val connection: Resource[Task, CollectionOperator[Employee]]
/** Updates Bob's employee record for its anniversary, an empty option is returned
* when there was no record that matched with the given filter. */
val filter = Filters.eq("name", "Bob")
val anniversaryUpdate = Updates.inc("age", 1)
val t: Task[UpdateResult] = connection.use(_.single.updateOne(filter, anniversaryUpdate))
Sink:
import cats.effect.Resource
import com.mongodb.client.model.{Filters, Updates}
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
import monix.reactive.Observable
import org.bson.conversions.Bson
val connection: Resource[Task, CollectionOperator[Employee]]
connection.use { operator =>
val filter = Filters.eq("city", "Porto")
val update = Updates.set("city", "Lisbon")
val updateElements: Seq[(Bson, Bson)] = List.fill(4)((filter, update))
// imagine that a company wants to send four of its employees from Porto to Lisbon
Observable.from(updateElements) // Observable[(Bson, Bson)]
.consumeWith(operator.sink.updateOne())
}.runToFuture
connection.use { operator =>
val filter = Filters.eq("city", "Porto")
val update = Updates.set("city", "Lisbon")
val updateElements: List[List[(Bson, Bson)]] = List.fill(4)((filter, update)).grouped(2).toList
// imagine that a company wants to send four of its employees from Porto to Lisbon
Observable.from(updateElements) // Observable[List[(Bson, Bson)]]
.consumeWith(operator.sink.updateOnePar())
}.runToFuture
updateMany
Update all documents in the collection according to the specified arguments.
Single:
import cats.effect.Resource
import com.mongodb.client.model.{Filters, Updates}
import monix.connect.mongodb.domain.UpdateResult
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
val connection: Resource[Task, CollectionOperator[Employee]]
val filter = Filters.eq("city", "Spain")
val update = Updates.push("hobbies", "Football")
val t: Task[UpdateResult] = connection.use(_.single.updateMany(filter, update))
Sink:
import cats.effect.Resource
import com.mongodb.client.model.{Filters, Updates}
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
import monix.reactive.Observable
import org.bson.conversions.Bson
val connection: Resource[Task, CollectionOperator[Employee]]
connection.use { operator =>
val cities: Set[String] = Set("Seattle", "Nairobi", "Dakar")
Observable.from(cities) // Observable[String]
.map(city => (Filters.eq("city", city), Updates.pull("hobbies", "Table Tennis"))) // Observable[(Bson, Bson)]
.consumeWith(operator.sink.updateMany())
}.runToFuture
connection.use { operator =>
val cities: Set[String] = Set("Seattle", "Nairobi", "Dakar")
Observable.from(cities.grouped(2).toList) // Observable[String]
.map(cities => cities.map(city => (
Filters.eq("city", city),
Updates.pull("hobbies", "Table Tennis")
)).toList) // Observable[List[(Bson, Bson)]]
.consumeWith(operator.sink.updateManyPar())
}.runToFuture
Indexing
createIndex
Creates an index on the collection according to the specified arguments.
Single:
import cats.effect.Resource
import com.mongodb.client.model.{Filters, IndexModel, Indexes, Updates}
import monix.connect.mongodb.domain.UpdateResult
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
val connection: Resource[Task, CollectionOperator[Employee]]
val key = new IndexModel(Indexes.ascending("name")).getKeys
val t: Task[UpdateResult] = connection.use(_.single.createIndex(key))
createIndexes
Create multiple indexes on the collection according to the specified arguments.
Single:
import cats.effect.Resource
import com.mongodb.client.model.{CreateIndexOptions, Filters, IndexModel, IndexOptions, Indexes, Updates}
import monix.connect.mongodb.domain.UpdateResult
import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
import java.util.concurrent.TimeUnit
val connection: Resource[Task, CollectionOperator[Employee]]
val indexes = List(
new IndexModel(Indexes.descending("name")),
new IndexModel(Indexes.ascending("city"), new IndexOptions().background(true).unique(false))
)
val options = new CreateIndexOptions().maxTime(5, TimeUnit.SECONDS)
val t: Task[UpdateResult] = connection.use(_.single.createIndexes(indexes, options))