AWS DynamoDB
Introduction
Amazon DynamoDB is a key-value and document database that performs at any scale in a single-digit millisecond, a key component for many platforms of the world's fastest growing enterprises that depend on it to support their mission-critical workloads.
The DynamoDB api provides a large list of operations to create, describe, delete, get, _put, batch, scan, list and more. All of them simply extend the same generic superclass on its input and output, the request class they will extend DynamoDbRequest, and for the response from DynamoDbResponse.
The fact that all request types implements from the same superclass, makes it possible to create an abstraction layer on top of it for executing any operation in the same way.
Credits: This design pattern is similar to the one used internally by alpakka-dynamodb, which this connector got inspiration from.
Therefore, the connector provides three generic methods single, transformer and sink that implements the mentioned pattern and therefore allows to deal with any dynamodb operation in different circumstances.
Dependency
Add the following dependency to get started:
libraryDependencies += "io.monix" %% "monix-dynamodb" % "0.6.0"
Async Client
This connector uses the underlying DynamoDbAsyncClient, that allows us to authenticate and create a non blocking channel between our application and the AWS DynamoDB.
There are different ways to create the connection, all available from the singleton object monix.connect.dynamodb.DynamoDb
. Explained in more detail in the following sub-sections:
From config
The laziest and recommended way to create the S3 connection is to do it from configuration file.
To do so, you'd just need to create an application.conf
following the template from reference.conf,
which also represents the default config in case no additional is provided.
Below snippet shows an example of the configuration file to authenticate via StaticCredentialsProvider
and region EU-WEST-1
, as you can appreciate the
http client settings are commented out since they are optional, however they could have been specified too for a more fine grained configuration of the underlying NettyNioAsyncHttpClient
.
monix-aws: {
credentials {
// [anonymous, default, environment, instance, system, profile, static]
provider: "static"
// optional - only required with static credentials
static {
access-key-id: "TESTKEY"
secret-access-key: "TESTSECRET"
session-token: ""
}
}
// [ap-south-1, us-gov-east-1, af-south-1, eu-west-2, ...]
region: "eu-west-1"
// optional
#endpoint: ""
// optional
# http-client: {
# max-concurrency: 10
# max-pending-connection-acquires: 1000
# connection-acquisition-timeout: 2 minutes
# connection-time-to-live: 1 minute
# use-idle-connection-reaper: false
# read-timeout: 100 seconds
# write-timeout: 100 seconds
# }
}
This config file should be placed in the resources
folder, therefore it will be automatically picked up from the method call S3.fromConfig
, which will return a cats.effect.Resource[Task, S3]
.
The resource is responsible of the creation and release of the S3 client.
Try to reuse the created Sqs client as much as possible in your application, otherwise, creating it multiple times will waste precious resources... See below code snippet to understand the concept:
import monix.connect.dynamodb.DynamoDb
import monix.connect.dynamodb.DynamoDbOp.Implicits._
import software.amazon.awssdk.services.dynamodb.model._
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import monix.eval.Task
def runDynamoDbApp(dynamoDb: DynamoDb): Task[Array[Byte]] = {
val tableName = "my-table"
val createTableRequest: CreateTableRequest = ???
val putItemRequest: PutItemRequest = ???
for {
_ <- dynamoDb.single(createTableRequest)
_ <- dynamoDb.single(putItemRequest)
} yield ()
}
// It allows to specify the [[pureconfig.NamingConvention]]
// from its argument, by default it uses the [[pureconfig.KebabCase]].
val f = DynamoDb.fromConfig(KebabCase).use(runDynamoDbApp).runToFuture
// the connection gets created and released within the use method and the `DynamoDb`
// instance is directly passed to our application for an easier interoperability
There is an alternative way to use fromConfig
which is to load the config first and then passing it to
the method. The difference is that in this case we will be able to override a specific configuration
from the code, whereas before we were reading it and creating the client straight after.
import monix.connect.aws.auth.MonixAwsConf
import monix.connect.dynamodb.DynamoDb
import monix.eval.Task
import software.amazon.awssdk.services.s3.model.NoSuchKeyException
import pureconfig.KebabCase
import software.amazon.awssdk.regions.Region
val f = MonixAwsConf.load(KebabCase).memoizeOnSuccess.flatMap{ awsConf =>
val updatedAwsConf = awsConf.copy(region = Region.EU_CENTRAL_1)
DynamoDb.fromConfig(updatedAwsConf).use(runDynamoDbApp)
}.runToFuture
Create
On the other hand, one can pass the AWS configurations by parameters, and that is safe since the method also handles the acquisition and release of the connection with
Resource
. The example below produce exactly the same result as previously using the config file:
import cats.effect.Resource
import monix.connect.dynamodb.DynamoDb
import monix.eval.Task
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.regions.Region
val accessKey: String = "TESTKEY"
val secretKey: String = "TESTSECRET"
val basicAWSCredentials = AwsBasicCredentials.create(accessKey, secretKey)
val staticCredProvider = StaticCredentialsProvider.create(basicAWSCredentials)
val dynamoDbResource: Resource[Task, DynamoDb] = DynamoDb.create(staticCredProvider, Region.AWS_GLOBAL)
Create Unsafe
Another different alternatively is to just pass an already created instance of a software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
,
which in that case the return type would be directly DynamoDb
, so there won't be no need to deal with Resource
.
As the same tittle suggests, this is not a pure way of creating an S3
since it might be possible to pass a malformed
instance by parameter or just one that was already released (closed).
An example:
import monix.connect.dynamodb.DynamoDb
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.regions.Region
val asyncClient: DynamoDbAsyncClient =
DynamoDbAsyncClient
.builder
.credentialsProvider(DefaultCredentialsProvider.create())
.region(Region.EU_CENTRAL_1)
.build
val dynamoDb: DynamoDb = DynamoDb.createUnsafe(asyncClient)
Notice that createUnsafe
an overloaded method that also has just another variant that excepts settings values:
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.regions.Region
import monix.connect.dynamodb.DynamoDb
val bucket: String = "my-bucket"
val dynamoDb: DynamoDb = DynamoDb.createUnsafe(DefaultCredentialsProvider.create(), Region.AF_SOUTH_1)
Implicit operation instances
As mentioned in previous sections, the connector uses generic implementations for all type of requests.
Because of that, the generic methods require the implicit
dynamodb operator to be in the scope of the call.
All the list of implicit operations can be found under monix.connect.dynamodb.DynamoDbOp.Implicits._
,
from there on, you can import the instances of the operations that you want to use, or in contrast,
by importing them all you'll be able to execute any DynamoDbRequest
.
The next sections will not just show an example for creating a single request but also for transforming and consuming streams of DynamoDB requests.
Single Operation
There are cases where we do only want to execute a single request, see on below snippet an example on how to create a table:
import monix.eval.Task
import monix.connect.dynamodb.DynamoDb
import software.amazon.awssdk.services.dynamodb.model._
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
// our dummy table will contain citizen's data
case class Citizen(citizenId: String, city: String, debt: Double)
val tableName = "citizens"
// it is partitioned by `citizenId` and sorted by `city`
val keySchema: List[KeySchemaElement] = {
List(
KeySchemaElement.builder.attributeName("citizenId").keyType(KeyType.HASH).build,
KeySchemaElement.builder.attributeName("city").keyType(KeyType.RANGE).build
)
}
val tableDefinition: List[AttributeDefinition] = {
List(
AttributeDefinition.builder.attributeName("citizenId").attributeType(ScalarAttributeType.S).build(),
AttributeDefinition.builder.attributeName("city").attributeType(ScalarAttributeType.S).build()
)
}
val createTableRequest =
CreateTableRequest
.builder
.tableName(tableName)
.keySchema(keySchema: _*)
.attributeDefinitions(tableDefinition: _*)
.billingMode(BillingMode.PAY_PER_REQUEST)
.build()
//here is where we actually define the execution of creating a table
import monix.connect.dynamodb.DynamoDbOp.Implicits.createTableOp
val t: Task[PutItemResponse] = DynamoDb.fromConfig.use(_.single(createTableRequest))
Notice that in this case the single
signature was used to create the table, but actually it does also accept any subtype of DynamoDbRequest
with its respective operation from monix.connect.dynamodb.DynamoDbOp.Implicits
.
The signature also accepts the number of retries after failure and the delay between them, let's see in the following example how to use that to query items from the table:
import monix.eval.Task
import monix.connect.dynamodb.DynamoDb
import monix.connect.dynamodb.DynamoDbOp.Implicits.getItemOp // required to execute a get operation
import monix.connect.dynamodb.domain.RetryStrategy
import software.amazon.awssdk.services.dynamodb.model.{GetItemRequest, GetItemResponse}
import scala.concurrent.duration._
val getItemRequest: GetItemRequest = ???
val retryStrategy = RetryStrategy(retries = 5, backoffDelay = 600.milliseconds)
val t: Task[GetItemResponse] =
DynamoDb.fromConfig.use(_.single(getItemRequest, retryStrategy))
Consumer
A pre-built Monix Consumer[DynamoDbRequest, DynamoDbResponse]
that provides a safe implementation
for executing any subtype of DynamoDbRequest
and materializes to its respective response.
It does also allow to specifying the backoff strategy, which represents the number of times to retrying failed operations and the delay between them, being no retry by default.
import monix.connect.dynamodb.DynamoDbOp.Implicits._
import monix.connect.dynamodb.DynamoDb
import monix.connect.dynamodb.domain.RetryStrategy
import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
import software.amazon.awssdk.services.dynamodb.model.{PutItemRequest, AttributeValue}
import scala.concurrent.duration._
val strAttr: String => AttributeValue = value => AttributeValue.builder.s(value).build
val numAttr: Int => AttributeValue = value => AttributeValue.builder().n(value.toString).build
import monix.connect.dynamodb.DynamoDbOp.Implicits.putItemOp
val dynamoDb: DynamoDb
def putItemRequest(tableName: String, citizen: Citizen): PutItemRequest =
PutItemRequest
.builder
.tableName(tableName)
.item(Map("citizenId" -> strAttr(citizen.citizenId), "city" -> strAttr(citizen.city), "age" -> numAttr(citizen.age)).asJava)
.build
val citizen1 = Citizen("citizen1", "Rome", 52)
val citizen2 = Citizen("citizen2", "Rome", 43)
val putItemRequests: List[PutItemRequest] = List(citizen1, citizen2).map(putItemRequest("citizens-table", _))
val t = Observable
.fromIterable(putItemRequests)
.consumeWith(dynamoDb.sink())
Transformer
Finally, the connector also provides a transformer for Observable
that describes the execution of
any type of DynamoDb request, returning its respective response: Observable[DynamoDbRequest] => Observable[DynamoDbResponse]
.
The below code shows an example of a stream that transforms incoming get requests into its subsequent responses:
import monix.connect.dynamodb.DynamoDbOp.Implicits._
import monix.connect.dynamodb.DynamoDb
import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.dynamodb.model.{GetItemRequest, GetItemResponse}
import monix.connect.dynamodb.DynamoDbOp.Implicits.getItemOp
def getItemRequest(tableName: String, citizenId: String, city: String): GetItemRequest =
GetItemRequest.builder
.tableName(tableName)
.key(Map("citizenId" -> strAttr(citizen.citizenId), "city" -> strAttr(citizen.city)).asJava)
.attributesToGet("age")
.build
val getItemRequests: List[GetItemRequest] = List("citizen1", "citizen2").map(getItemRequest("citizens", _, city = "Rome"))
val dynamoDb: DynamoDb
val ob: Observable[GetItemResponse] = {
Observable
.fromIterable(getItemRequests)
.transform(dynamoDb.transformer())
}
The transformer signature also accepts also the number of retries and delay after a failure to be passed.
Local testing
Localstack provides a fully functional local AWS cloud stack that in this case the user can use to develop and test locally application's integration with DynamoDB.
Add the following service description to your docker-compose.yaml
file:
dynamodb:
image: localstack/localstack:latest
ports:
- '4569:4569'
environment:
- SERVICES=dynamodb
Run the following command to run the DynamoDb service:
docker-compose -f docker-compose.yml up -d dynamodb
Check out that the service has started correctly.
Finally create the dynamodb connection:
import monix.connect.dynamodb.DynamoDb
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.regions.Region
val staticCredentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"))
val dynamoDb = DynamoDb.create(staticCredentialsProvider, Region.AWS_GLOBAL, "http://localhost:4569")
Alternatively, you can create the client from config file, in this case you'd need to placed under the resources folder src/test/resources/application.conf
:
{
monix-aws: {
credentials {
provider: "static"
access-key-id: "x"
secret-access-key: "x"
}
endpoint: "http://localhost:4569"
region: "us-west-2"
}
}
As seen in previous sections, to create the connection from config:
import monix.connect.dynamodb.DynamoDb
val dynamoDb = DynamoDb.fromConfig
You are now ready to run your application!