Monix Connect

Monix Connect

  • API Docs
  • Documentation
  • GitHub

›Documentation

Documentation

  • Overview
  • Apache Parquet
  • AWS DynamoDB
  • AWS S3
  • AWS SQS
  • Elasticsearch
  • Google Cloud Storage
  • HDFS
  • MongoDB
  • Redis

Redis

Introduction

Redis is an open source, in-memory data structure store, used as a database, cache and message broker. providing high availability, scalability and a outstanding performance. sorted sets, and a set of commands that can run atomically on these, like appending to a string; incrementing the value in a hash; pushing an element It supports data structures such as string, hashes, lists, to inter-operate with, and most of them are also available from the java api.

This connector has been built on top of lettuce, the most popular java library for operating with a non blocking Redis client.

Dependency

Add the following dependency:

libraryDependencies += "io.monix" %% "monix-redis" % "0.6.0"

Redis Connection

The first step is to create a RedisConnection a simple, scalable and pure interface that allows to communicate to a Redis Standalone or Cluster servers.

Remember that the created connection is an expensive resource, as it is made with the underlying lettuce which also uses netty and holds a set of io.netty.channel.EventLoopGroup that use multiple threads. So, reuse the connection as much as possible!

Standalone

In order to create a standalone connection, we will use the companion object's signature RedisConnection.standalone, which returns a connection instance.

In order to create the connection, first we would just need a single monix.connect.redis.client.RedisUri relative to the redis standalone server:

import monix.connect.redis.client.{RedisConnection, RedisUri}

// RedisUri has an overloaded `apply` which also allows host and port to be passed separately
// like RedisUri("localhost", 6379) 
val redisUri = RedisUri("redis://localhost:6379")

// then we create the connection
val redisConn: RedisConnection = RedisConnection.standalone(redisUri)

Cluster

Creating a cluster connection is seamlessly to the standalone one, they both end up encoded the same parent class RedisConnection, but for the fact that its creation requires multiple RedisUris that represent the set of redis servers in the cluster.

import monix.connect.redis.client.{RedisConnection, RedisUri}

val redisNode1 = RedisUri("my.redis.node.1", 7000)
val redisNode2 = RedisUri("my.redis.node.2", 7001)
val redisNode3 = RedisUri("my.redis.node.3", 7002)

val redisClusterConn: RedisConnection = RedisConnection.cluster(List(redisNode1, redisNode2, redisNode3))

RedisCmd

Once we got a RedisConnection, we can start using the RedisCmd, a case class that contains all the redis commands for server, key, list, set, sorted set and hash.
The RedisCmd is actually accessible through using a cats.effect.Resource with monix.eval.Task, it actually abstracts the logic of acquiring and releasing the connection with its associated resources.

In the following example we will create a connection that by default encodes Keys and Values as Strings, persisting them into UTF format in Redis.

import cats.effect.Resource
import monix.connect.redis.client.{RedisCmd, RedisConnection, RedisUri}
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global

val redisUri = RedisUri("redis://localhost:6379")

val redisConn: Resource[Task, RedisCmd[String, String]] = RedisConnection.standalone(redisUri).connectUtf

val k1: String = "key1"
val value: String = "a"
val k2: String = "key2"
val values: List[String] = List("b", "c", "d")

// from there on, we can start using the connection
// and since `RedisCmd` is a case class, we can apply 
// pattern matching against it, which will nicely allow us
// to de-compose the different RedisCommands its different api.
// alternatively you can also do:  redisConn.use { redisCmd => redisCmd.string.get("k1") }
redisConn.use { case RedisCmd(hash, keys, list, server, set, sortedSet, string) =>
  for {
    _ <- server.flushAll
    _ <- keys.touch(k1)
    _ <- string.set(k1, value)
    _ <- keys.rename(k1, k2)
    _ <- list.lPush(k1, values: _*)
    v <- string.get(k2)
    _ <- v match {
      case Some(value) => list.lPush(k1, value)
      case None => Task.unit
    }
    _ <- keys.del(k2)
    len <- list.lLen(k1)
  } yield (len)
}.runToFuture

Custom Codecs

In the previous sections it was shown how to create a connection to redis and to start using the RedisCmd with its different redis modules. The created connection was exposed within a cats resource as RedisCmd[String, String], meaning that it expects Strings for both Keys and Values. In order to decide how do we want our redis connection to encode and decode k and v, we would need to pass a custom Codec both for key and value. A Codec is a sealed trait conformed by UTFCodec and ByteArrayCodec, in which you can create instances of those from its companion object with the respective signatures utf and byteArray, see below snippet:

package monix.connect.redis.client

object Codec {
  def utf[T](encoder: T => String, decoder: String => T) = ???
  def byteArray[T](encoder: T => Array[Byte], decoder: Array[Byte] => T) = ???
}

You will find some already predefined Codec for Int, Float, Double, BigInt and BigDecimal under the package object monix.connect.redis._.

The next subsections are an example of creating custom codec that mixes UTFCodec and ByteArrayCodec:

UTFCodec

In this case we will create two custom UTFCodec[T], one for keys as Int and the other for Double which will represent the redis values, resulting in RedisCmd[Int, Double].

These two will be passed as parameters when connecting to redis with connectUtf.

import monix.connect.redis.client.{Codec, RedisCmd, RedisConnection, RedisUri, UtfCodec}
import monix.execution.CancelableFuture
import monix.execution.Scheduler.Implicits.global

import scala.util.{Failure, Try}

// there is already a predefined int utf codec under `monix.connect.redis._`
implicit val intUtfCodec: UtfCodec[Int] = Codec.utf(_.toString, //serializes int to str
  //deserializes str back to int
  str => Try(str.toInt)
    .failed.flatMap { ex =>
    logger.info("Failed to deserialize from Redis to `Int`")
    Failure(ex)
  }.getOrElse(0)
)

// there is already a predefined double utf codec under `monix.connect.redis._`
implicit val doubleUtfCodec: UtfCodec[Double] = Codec.utf(_.toString, //serializes double to str
  //deserializes str back to double
  str => Try(str.toDouble)
    .failed.flatMap { ex =>
    logger.info("Failed to deserialize from Redis to `Double`")
    Failure(ex)
  }.getOrElse(0.0)
)

val redisUri = RedisUri("redis://localhost:6379")

val f: CancelableFuture[Option[Double]] = 
 RedisConnection.standalone(redisUri)
  .connectUtf(intUtfCodec, doubleUtfCodec) //this can be passed implicitly but is explicit for didactic purposes
  .use { redisCmd: RedisCmd[Int, Double] =>
    //your business logic here
    redisCmd.list.lPush(11, 123.134) >> redisCmd.list.rPop(11) //Some(123.134) 
  }.runToFuture

BytesCodec

On the other hand, there is also a BytesCodec[T], which des/serializes from/to Array[Byte]. In this case we will show an example of using Protobuf serialization format to dealing with redis keys and values:

In below snippet we defined our proto objects, in which PersonPK will represent the redis key and Person the value.

syntax = "proto3";

package monix.connect.redis.test;

message PersonPk {
    string id = 1;
}

message Person {
    string name = 1;
    int64 age = 2;
    repeated string hobbies = 3;
}

With the generated proto scala sources, we can proceed to creating a BytesCodec[PersonPk] and BytesCodec[Person]:

import monix.connect.redis.client.{BytesCodec, Codec}
implicit val personPkCodec: BytesCodec[PersonPk] = 
  Codec.byteArray(pk => PersonPk.toByteArray(pk), bytes => PersonPk.parseFrom(bytes))
implicit val personCodec: BytesCodec[Person] =
  Codec.byteArray(person => Person.toByteArray(person), bytes => Person.parseFrom(bytes))

Finally, we are ready to start creating the connection using the previously defined protobuf codecs:

import monix.connect.redis.client.{Codec, RedisCmd, RedisConnection, RedisUri, UtfCodec}
import monix.connect.redis.test.protobuf.{Person, PersonPk}
import monix.execution.CancelableFuture
import monix.execution.Scheduler.Implicits.global

val redisUri = RedisUri("redis://localhost:6379")

val personPk = PersonPk("personId123")
val hobbies = List("Snowboarding", "Programming")
val person = Person("Alice", 25, hobbies)

val f: CancelableFuture[Option[Person]] =
  RedisConnection.standalone(redisUri)
    .connectUtf(personPkCodec, personCodec)
    .use{ redisCmd: RedisCmd[PersonPk, Person] =>
      for {
        _ <-redisCmd.string.set(personPk, person)
        person <- redisCmd.string.get(personPk)
      } yield person
    }.runToFuture

Commands

This redis connector implementation provides a wide range of commands to perform a different operations, for the most common used modules and types: : (Keys, Hashes , List, Server , Sets, SortedSets and Strings). See an example on how to use each of them in the following sub-sections:

Keys

The below snippet shows a simple example of using key commands.

import monix.connect.redis.client.{RedisConnection, RedisUri}
import scala.concurrent.duration._

val k: String // assuming that the key already exists
val redisUri = RedisUri("redis://localhost:6379")

RedisConnection.standalone(redisUri)
  .connectUtf
  .use(cmd =>
    for {
      randomKey <- cmd.key.randomKey() //returns a random key from the db
      _ <- cmd.key.expire(k, 100 seconds) //specifies an expiration timeout for the k1
      ttl <- cmd.key.ttl(k) //returns the time to live as `FiniteDuration`
    } yield (randomKey, ttl)
  )

Hashes

The following example uses the redis hash api RedisHash to insert a single element into a hash and read it back from the hash.

import monix.connect.redis.client.{RedisConnection, RedisUri, RedisCmd}
import scala.concurrent.duration._

val key: String 
val field: String 
val value: String 
val redisUri = RedisUri("redis://localhost:6379")
val prefix = "dummy-prefix-"

RedisConnection.standalone(redisUri)
  .connectUtf
  .use { cmd =>
    for {
      _ <- cmd.hash.hSet(key, field, value)
      //adds a prefix to all values in a hash 
      _ <- cmd.hash.hGetAll(key).mapEval { case (f, v) => cmd.hash.hSet(key, f, prefix + v) }.completedL
      prefixedValue <- cmd.hash.hGet(key, field)
    } yield prefixedValue
  }.runToFuture

Lists

The following example uses the redis list api RedisList to insert elements into a redis list and reading them back with limited size.

import monix.connect.redis.client.{RedisCmd, RedisConnection, RedisUri}
import monix.eval.Task

import scala.concurrent.duration._

val key1: String
val key2: String
val values: List[String]
val redisUri = RedisUri("redis://localhost:6379")
val prefix = "dummy-prefix-"

RedisConnection.standalone(redisUri)
  .connectUtf
  .use { cmd =>
    for {
      initialSize <- cmd.list.lPush(key1, values)
      //copies all values from `key1` to `key2` adding a static prefix to each element 
      _ <- cmd.list.lGetAll(key1)
        .mapEval(v => cmd.list.lPush(key2, prefix + v)).completedL
      //checks if key1 and key2 have the same size
      haveSameSize <- cmd.list.lLen(key1).map(_ == initialSize)
    } yield haveSameSize
  }.runToFuture

Server

The following code shows how to remove all keys from all dbs in redis using the server api RedisServer a very basic but also common use case:

import monix.connect.redis.client.{RedisCmd, RedisConnection, RedisUri}
import monix.eval.Task

val redisUri = RedisUri("redis://localhost:6379")
val prefix = "dummy-prefix-"

val f = RedisConnection.standalone(redisUri)
  .connectUtf.use(_.server.flushAll).runToFuture

Sets

The Redis Set commands api provides operations to work with sets, see a practical example in below code snippet.

import monix.connect.redis.client.{RedisCmd, RedisConnection, RedisUri}
import monix.eval.Task
val k1: String
val k2: String
val redisUri = RedisUri("redis://localhost:6379")

val f = RedisConnection.standalone(redisUri)
  .connectUtf
  .use { cmd =>
    for {
      _ <- cmd.set.sAdd(k1, "a", "b", "c") *>
        cmd.set.sAdd(k2, "c", "d") 
      finalSize <- cmd.set.sUnionStore(k1, k2)
    } yield finalSize //4 = ["a", "b", "c", "d"]
  }.runToFuture

SortedSets

The Redis SortedSet commands api provides operations to work with sorted sets, see a practical example in below code snippet, where three scored elements (akka VScore), are inserted into a sorted set and then incrementing the score of the middle one.

import monix.connect.redis.client.{RedisCmd, RedisConnection, RedisUri}
import monix.connect.redis.domain.{VScore, ZRange}

val k: String
val redisUri: RedisUri

val f = RedisConnection.standalone(redisUri)
  .connectUtf
  .use { cmd =>
    for {
      _ <- cmd.sortedSet.zAdd(k, VScore("Bob", 1)) >> 
        cmd.sortedSet.zAdd(k, VScore("Alice", 2)) >> 
        cmd.sortedSet.zAdd(k, VScore("Jamie", 5))
      //increments middle one by `increment` so it becomes the highest score of the set
      _ <- cmd.sortedSet.zIncrBy(k, 6, "Bob") 
      //returns those members with score higher than 4 ["Bob", "Jamie"]
      zRange <- cmd.sortedSet.zRangeByScore(k, ZRange.gt(5)).toListL
      min <- cmd.sortedSet.zPopMin(k) // Alice
      max <- cmd.sortedSet.zPopMax(k) // Bob
    } yield (min, max, zRange)
  }.runToFuture

Strings

The Redis Strings commands api provides operations to work with strings, see a practical example in below code snippet, where we insert a string into the given key and get its size.

import monix.connect.redis.client.{RedisCmd, RedisConnection, RedisUri}

val k: String
val v: String
val redisUri: RedisUri

val f = RedisConnection.standalone(redisUri)
  .connectUtf
  .use { cmd =>
    for {
      _ <- cmd.string.set(k, v)
      size <- cmd.string.strLen(k)
    } yield size
  }.runToFuture

Local testing

The local tests will use the redis docker image from docker hub.

Standalone server

Add the following service description to your docker-compose.yml file:

 redis:
   image: redis
   ports:
     - 6379:6379

Run the following command to build and start the redis server:

docker-compose -f ./docker-compose.yml up -d redis

Check out that the service has started correctly.

Finally, following code shows how you can create the redis connection to the local server, but you would have to modify that to fit your use case - i.e it will be different to connect to a redis cluster or if authenticating to the server is needed using with key and secret, etc.)

import monix.connect.redis.client
import monix.connect.redis.client.{RedisConnection, RedisUri}

val redisUri = RedisUri("redis://host:port")
val standaloneConn = RedisConnection.standalone(redisUri)

Now you are ready to run your application!

Cluster

On the other hand, if you want to test how your application will behave running with a redis cluster, you can use grokzen/redis-cluster

  redisCluster:
    restart: always
    image: grokzen/redis-cluster:6.0.5
    ports:
      - "7000:7000"
      - "7001:7001"
      - "7002:7002"
      - "7003:7003"
      - "7004:7004"
      - "7005:7005"
    environment:
      - STANDALONE=true
      - IP=0.0.0.0

And then from the application side you would do:

import monix.connect.redis.client
import monix.connect.redis.client.{RedisConnection, RedisUri}

val redisUris: Seq[RedisUri] = (0 to 5).map(n => RedisUri(s"redis://localhost:${(7000 + n)}"))
val clusterConn = RedisConnection.cluster(redisUris)

Yet to come

  • Master Replica connection.
  • Pub/sub, Streams, Transactions, HyperLogLog, Geolocation, Scripting commands.
← MongoDB
  • Introduction
  • Dependency
  • Redis Connection
    • Standalone
    • Cluster
  • RedisCmd
  • Custom Codecs
    • UTFCodec
    • BytesCodec
  • Commands
    • Keys
    • Hashes
    • Lists
    • Server
    • Sets
    • SortedSets
    • Strings
  • Local testing
    • Standalone server
    • Cluster
  • Yet to come

Copyright © 2020-2022 The Monix Connect Developers.