Redis
Introduction
Redis is an open source, in-memory data structure store, used as a database, cache and message broker. providing high availability, scalability and a outstanding performance. sorted sets, and a set of commands that can run atomically on these, like appending to a string; incrementing the value in a hash; pushing an element It supports data structures such as string, hashes, lists, to inter-operate with, and most of them are also available from the java api.
This connector has been built on top of lettuce, the most popular java library for operating with a non blocking Redis client.
Dependency
Add the following dependency:
libraryDependencies += "io.monix" %% "monix-redis" % "0.6.0"
Redis Connection
The first step is to create a RedisConnection
a simple, scalable and pure interface that allows to
communicate to a Redis Standalone or Cluster servers.
Remember that the created connection is an expensive resource, as it is made with the
underlying lettuce which also uses netty and holds a set of io.netty.channel.EventLoopGroup
that use multiple threads. So, reuse the connection as much as possible!
Standalone
In order to create a standalone connection, we will use the companion object's signature RedisConnection.standalone
,
which returns a connection instance.
In order to create the connection, first we would just need a single monix.connect.redis.client.RedisUri
relative to
the redis standalone server:
import monix.connect.redis.client.{RedisConnection, RedisUri}
// RedisUri has an overloaded `apply` which also allows host and port to be passed separately
// like RedisUri("localhost", 6379)
val redisUri = RedisUri("redis://localhost:6379")
// then we create the connection
val redisConn: RedisConnection = RedisConnection.standalone(redisUri)
Cluster
Creating a cluster connection is seamlessly to the standalone one, they both end up encoded the same parent
class RedisConnection
, but for the fact that its creation requires multiple RedisUri
s that represent the set of redis
servers in the cluster.
import monix.connect.redis.client.{RedisConnection, RedisUri}
val redisNode1 = RedisUri("my.redis.node.1", 7000)
val redisNode2 = RedisUri("my.redis.node.2", 7001)
val redisNode3 = RedisUri("my.redis.node.3", 7002)
val redisClusterConn: RedisConnection = RedisConnection.cluster(List(redisNode1, redisNode2, redisNode3))
RedisCmd
Once we got a RedisConnection
, we can start using the RedisCmd
, a case class that contains all the redis commands for
server, key, list, set, sorted set and hash.
The RedisCmd
is actually accessible through using a cats.effect.Resource
with monix.eval.Task
,
it actually abstracts the logic of acquiring and releasing the connection with its associated resources.
In the following example we will create a connection that by default encodes Keys
and Values as Strings
, persisting them into UTF
format in Redis.
import cats.effect.Resource
import monix.connect.redis.client.{RedisCmd, RedisConnection, RedisUri}
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
val redisUri = RedisUri("redis://localhost:6379")
val redisConn: Resource[Task, RedisCmd[String, String]] = RedisConnection.standalone(redisUri).connectUtf
val k1: String = "key1"
val value: String = "a"
val k2: String = "key2"
val values: List[String] = List("b", "c", "d")
// from there on, we can start using the connection
// and since `RedisCmd` is a case class, we can apply
// pattern matching against it, which will nicely allow us
// to de-compose the different RedisCommands its different api.
// alternatively you can also do: redisConn.use { redisCmd => redisCmd.string.get("k1") }
redisConn.use { case RedisCmd(hash, keys, list, server, set, sortedSet, string) =>
for {
_ <- server.flushAll
_ <- keys.touch(k1)
_ <- string.set(k1, value)
_ <- keys.rename(k1, k2)
_ <- list.lPush(k1, values: _*)
v <- string.get(k2)
_ <- v match {
case Some(value) => list.lPush(k1, value)
case None => Task.unit
}
_ <- keys.del(k2)
len <- list.lLen(k1)
} yield (len)
}.runToFuture
Custom Codecs
In the previous sections it was shown how to create a connection to redis and to start using the RedisCmd
with its
different redis modules. The created connection was exposed within a cats resource as
RedisCmd[String, String]
, meaning that it expects Strings
for both Keys and Values.
In order to decide how do we want our redis connection to encode and decode k and v,
we would need to pass a custom Codec
both for key and value.
A Codec
is a sealed trait conformed by UTFCodec
and ByteArrayCodec
, in which you can create instances of those from its
companion object with the respective signatures utf
and byteArray
, see below snippet:
package monix.connect.redis.client
object Codec {
def utf[T](encoder: T => String, decoder: String => T) = ???
def byteArray[T](encoder: T => Array[Byte], decoder: Array[Byte] => T) = ???
}
You will find some already predefined Codec
for Int
, Float
, Double
, BigInt
and BigDecimal
under the package
object monix.connect.redis._
.
The next subsections are an example of creating custom codec that mixes UTFCodec
and ByteArrayCodec
:
UTFCodec
In this case we will create two custom UTFCodec[T]
, one for keys as Int
and the other for Double
which will represent the redis values, resulting in RedisCmd[Int, Double]
.
These two will be passed as parameters when connecting to redis with connectUtf.
import monix.connect.redis.client.{Codec, RedisCmd, RedisConnection, RedisUri, UtfCodec}
import monix.execution.CancelableFuture
import monix.execution.Scheduler.Implicits.global
import scala.util.{Failure, Try}
// there is already a predefined int utf codec under `monix.connect.redis._`
implicit val intUtfCodec: UtfCodec[Int] = Codec.utf(_.toString, //serializes int to str
//deserializes str back to int
str => Try(str.toInt)
.failed.flatMap { ex =>
logger.info("Failed to deserialize from Redis to `Int`")
Failure(ex)
}.getOrElse(0)
)
// there is already a predefined double utf codec under `monix.connect.redis._`
implicit val doubleUtfCodec: UtfCodec[Double] = Codec.utf(_.toString, //serializes double to str
//deserializes str back to double
str => Try(str.toDouble)
.failed.flatMap { ex =>
logger.info("Failed to deserialize from Redis to `Double`")
Failure(ex)
}.getOrElse(0.0)
)
val redisUri = RedisUri("redis://localhost:6379")
val f: CancelableFuture[Option[Double]] =
RedisConnection.standalone(redisUri)
.connectUtf(intUtfCodec, doubleUtfCodec) //this can be passed implicitly but is explicit for didactic purposes
.use { redisCmd: RedisCmd[Int, Double] =>
//your business logic here
redisCmd.list.lPush(11, 123.134) >> redisCmd.list.rPop(11) //Some(123.134)
}.runToFuture
BytesCodec
On the other hand, there is also a BytesCodec[T]
, which des/serializes from/to Array[Byte]
.
In this case we will show an example of using Protobuf
serialization format to dealing with redis keys and values:
In below snippet we defined our proto objects, in which PersonPK
will represent the redis key and Person
the value.
syntax = "proto3";
package monix.connect.redis.test;
message PersonPk {
string id = 1;
}
message Person {
string name = 1;
int64 age = 2;
repeated string hobbies = 3;
}
With the generated proto scala sources, we can proceed to creating a BytesCodec[PersonPk]
and BytesCodec[Person]
:
import monix.connect.redis.client.{BytesCodec, Codec}
implicit val personPkCodec: BytesCodec[PersonPk] =
Codec.byteArray(pk => PersonPk.toByteArray(pk), bytes => PersonPk.parseFrom(bytes))
implicit val personCodec: BytesCodec[Person] =
Codec.byteArray(person => Person.toByteArray(person), bytes => Person.parseFrom(bytes))
Finally, we are ready to start creating the connection using the previously defined protobuf codecs:
import monix.connect.redis.client.{Codec, RedisCmd, RedisConnection, RedisUri, UtfCodec}
import monix.connect.redis.test.protobuf.{Person, PersonPk}
import monix.execution.CancelableFuture
import monix.execution.Scheduler.Implicits.global
val redisUri = RedisUri("redis://localhost:6379")
val personPk = PersonPk("personId123")
val hobbies = List("Snowboarding", "Programming")
val person = Person("Alice", 25, hobbies)
val f: CancelableFuture[Option[Person]] =
RedisConnection.standalone(redisUri)
.connectUtf(personPkCodec, personCodec)
.use{ redisCmd: RedisCmd[PersonPk, Person] =>
for {
_ <-redisCmd.string.set(personPk, person)
person <- redisCmd.string.get(personPk)
} yield person
}.runToFuture
Commands
This redis connector implementation provides a wide range of commands to perform a different operations, for the most common used modules and types: : (Keys, Hashes , List, Server , Sets, SortedSets and Strings). See an example on how to use each of them in the following sub-sections:
Keys
The below snippet shows a simple example of using key commands.
import monix.connect.redis.client.{RedisConnection, RedisUri}
import scala.concurrent.duration._
val k: String // assuming that the key already exists
val redisUri = RedisUri("redis://localhost:6379")
RedisConnection.standalone(redisUri)
.connectUtf
.use(cmd =>
for {
randomKey <- cmd.key.randomKey() //returns a random key from the db
_ <- cmd.key.expire(k, 100 seconds) //specifies an expiration timeout for the k1
ttl <- cmd.key.ttl(k) //returns the time to live as `FiniteDuration`
} yield (randomKey, ttl)
)
Hashes
The following example uses the redis hash api RedisHash
to insert a single element into a hash and read it back from
the hash.
import monix.connect.redis.client.{RedisConnection, RedisUri, RedisCmd}
import scala.concurrent.duration._
val key: String
val field: String
val value: String
val redisUri = RedisUri("redis://localhost:6379")
val prefix = "dummy-prefix-"
RedisConnection.standalone(redisUri)
.connectUtf
.use { cmd =>
for {
_ <- cmd.hash.hSet(key, field, value)
//adds a prefix to all values in a hash
_ <- cmd.hash.hGetAll(key).mapEval { case (f, v) => cmd.hash.hSet(key, f, prefix + v) }.completedL
prefixedValue <- cmd.hash.hGet(key, field)
} yield prefixedValue
}.runToFuture
Lists
The following example uses the redis list api RedisList
to insert elements into a redis list and reading them back
with limited size.
import monix.connect.redis.client.{RedisCmd, RedisConnection, RedisUri}
import monix.eval.Task
import scala.concurrent.duration._
val key1: String
val key2: String
val values: List[String]
val redisUri = RedisUri("redis://localhost:6379")
val prefix = "dummy-prefix-"
RedisConnection.standalone(redisUri)
.connectUtf
.use { cmd =>
for {
initialSize <- cmd.list.lPush(key1, values)
//copies all values from `key1` to `key2` adding a static prefix to each element
_ <- cmd.list.lGetAll(key1)
.mapEval(v => cmd.list.lPush(key2, prefix + v)).completedL
//checks if key1 and key2 have the same size
haveSameSize <- cmd.list.lLen(key1).map(_ == initialSize)
} yield haveSameSize
}.runToFuture
Server
The following code shows how to remove all keys from all dbs in redis using the server api RedisServer
a very basic
but also common use case:
import monix.connect.redis.client.{RedisCmd, RedisConnection, RedisUri}
import monix.eval.Task
val redisUri = RedisUri("redis://localhost:6379")
val prefix = "dummy-prefix-"
val f = RedisConnection.standalone(redisUri)
.connectUtf.use(_.server.flushAll).runToFuture
Sets
The Redis Set commands api provides operations to work with sets, see a practical example in below code snippet.
import monix.connect.redis.client.{RedisCmd, RedisConnection, RedisUri}
import monix.eval.Task
val k1: String
val k2: String
val redisUri = RedisUri("redis://localhost:6379")
val f = RedisConnection.standalone(redisUri)
.connectUtf
.use { cmd =>
for {
_ <- cmd.set.sAdd(k1, "a", "b", "c") *>
cmd.set.sAdd(k2, "c", "d")
finalSize <- cmd.set.sUnionStore(k1, k2)
} yield finalSize //4 = ["a", "b", "c", "d"]
}.runToFuture
SortedSets
The Redis SortedSet commands api provides operations to work with sorted sets,
see a practical example in below code snippet, where three scored elements (akka VScore
), are inserted into a sorted set and
then incrementing the score of the middle one.
import monix.connect.redis.client.{RedisCmd, RedisConnection, RedisUri}
import monix.connect.redis.domain.{VScore, ZRange}
val k: String
val redisUri: RedisUri
val f = RedisConnection.standalone(redisUri)
.connectUtf
.use { cmd =>
for {
_ <- cmd.sortedSet.zAdd(k, VScore("Bob", 1)) >>
cmd.sortedSet.zAdd(k, VScore("Alice", 2)) >>
cmd.sortedSet.zAdd(k, VScore("Jamie", 5))
//increments middle one by `increment` so it becomes the highest score of the set
_ <- cmd.sortedSet.zIncrBy(k, 6, "Bob")
//returns those members with score higher than 4 ["Bob", "Jamie"]
zRange <- cmd.sortedSet.zRangeByScore(k, ZRange.gt(5)).toListL
min <- cmd.sortedSet.zPopMin(k) // Alice
max <- cmd.sortedSet.zPopMax(k) // Bob
} yield (min, max, zRange)
}.runToFuture
Strings
The Redis Strings commands api provides operations to work with strings, see a practical example in below code snippet, where we insert a string into the given key and get its size.
import monix.connect.redis.client.{RedisCmd, RedisConnection, RedisUri}
val k: String
val v: String
val redisUri: RedisUri
val f = RedisConnection.standalone(redisUri)
.connectUtf
.use { cmd =>
for {
_ <- cmd.string.set(k, v)
size <- cmd.string.strLen(k)
} yield size
}.runToFuture
Local testing
The local tests will use the redis docker image from docker hub.
Standalone server
Add the following service description to your docker-compose.yml
file:
redis:
image: redis
ports:
- 6379:6379
Run the following command to build and start the redis server:
docker-compose -f ./docker-compose.yml up -d redis
Check out that the service has started correctly.
Finally, following code shows how you can create the redis connection to the local server, but you would have to modify that to fit your use case - i.e it will be different to connect to a redis cluster or if authenticating to the server is needed using with key and secret, etc.)
import monix.connect.redis.client
import monix.connect.redis.client.{RedisConnection, RedisUri}
val redisUri = RedisUri("redis://host:port")
val standaloneConn = RedisConnection.standalone(redisUri)
Now you are ready to run your application!
Cluster
On the other hand, if you want to test how your application will behave running with a redis cluster, you can use grokzen/redis-cluster
redisCluster:
restart: always
image: grokzen/redis-cluster:6.0.5
ports:
- "7000:7000"
- "7001:7001"
- "7002:7002"
- "7003:7003"
- "7004:7004"
- "7005:7005"
environment:
- STANDALONE=true
- IP=0.0.0.0
And then from the application side you would do:
import monix.connect.redis.client
import monix.connect.redis.client.{RedisConnection, RedisUri}
val redisUris: Seq[RedisUri] = (0 to 5).map(n => RedisUri(s"redis://localhost:${(7000 + n)}"))
val clusterConn = RedisConnection.cluster(redisUris)
Yet to come
- Master Replica connection.
- Pub/sub, Streams, Transactions, HyperLogLog, Geolocation, Scripting commands.