Monix Connect

Monix Connect

  • API Docs
  • Documentation
  • GitHub

›Documentation

Documentation

  • Overview
  • Apache Parquet
  • AWS DynamoDB
  • AWS S3
  • AWS SQS
  • Elasticsearch
  • Google Cloud Storage
  • HDFS
  • MongoDB
  • Redis

AWS SQS

Introduction

The Simple Queue Service (SQS) is a managed message queue service offered by AWS. It provides an HTTP API over which applications can produce and consume messages from different queues. A queue itself is fully managed by AWS, which makes SQS an easy solution for passing messages between different parts of software systems that run in the cloud, commonly used as a message broker to communicate different systems, providing backpressure, availability, fault tolerance and more.

Dependency

Add the following dependency to get started:

libraryDependencies += "io.monix" %% "monix-sqs" % "0.6.0"

Async Client

This connector uses the underlying SqsAsyncClient from the java aws sdk, allowing authenticating and interact through a non-blocking HTTP connection, between our application and the AWS SQS.

This library offers different ways to create the connection, all available from the singleton object monix.connect.sqs.Sqs and explained in more detail in the following sub-sections.

From config

The laziest and recommended way to create the Sqs connection is to do it from configuration file. It will return a cats effect Resouce with Task, which will take care of acquiring the aws client and releasing it after its usage. To do so, you'd just need to create an application.conf following the template from reference.conf, which also represents the default config in case no additional is provided.

Below snippet shows an example of the configuration file to authenticate via StaticCredentialsProvider and region EU-WEST-1, as you can appreciate the http client settings are commented out since they are optional, however they could have been specified too for a more fine-grained configuration of the underlying NettyNioAsyncHttpClient.

  monix-aws: {
    credentials {
      // [anonymous, default, environment, instance, system, profile, static]
      provider: "static" 

      // optional - only required with static credentials
      static {
        access-key-id: "TESTKEY"      
        secret-access-key: "TESTSECRET"
        session-token: ""
      }
    }
  
    // [ap-south-1, us-gov-east-1, af-south-1, eu-west-2, ...]
    region: "eu-west-1"

    // optional
    #endpoint: ""

    // optional
    # http-client: {
    #   max-concurrency: 10
    #   max-pending-connection-acquires: 1000
    #   connection-acquisition-timeout: 2 minutes
    #   connection-time-to-live: 1 minute
    #   use-idle-connection-reaper: false
    #   read-timeout: 100 seconds
    #   write-timeout: 100 seconds
    # }
  }

This config file should be placed in the resources folder, therefore it will be automatically picked up from the method call Sqs.fromConfig, which will return a cats.effect.Resource[Task, Sqs]. The resource is responsible of the creation and release of the Sqs client.

Try to reuse the created Sqs client as much as possible in your application, otherwise, creating it multiple times will waste precious resources... See below code snippet to understand the concept:

 import monix.connect.sqs.Sqs
 import monix.connect.sqs.domain.QueueName
 import monix.connect.sqs.producer.StandardMessage
 import monix.eval.Task
 import scalapb.descriptors.ScalaType.Message
 import software.amazon.awssdk.services.s3.model.NoSuchKeyException
 import pureconfig.KebabCase

 import scala.concurrent.duration._
 
 def runSqsApp(sqs: Sqs): Task[Array[Byte]] = {
   val queueName = QueueName("my-queue")
   for {
     queueUrl <- sqs.operator.createQueue(queueName)
     standardMessage = StandardMessage("sampleBody")
     _ <- sqs.producer.sendSingleMessage(queueUrl, standardMessage)
     receivedMessage <- sqs.consumer.receiveSingleManualDelete(queueUrl, waitTimeSeconds = 3.seconds)
   } yield receivedMessage
 }

  // It allows to specify the [[pureconfig.NamingConvention]]
  // from its argument, by default it uses the [[KebabCase]].  
  val f = Sqs.fromConfig(KebabCase).use(runSqsApp).runToFuture
  // The connection gets created and released within the use method and the `Sqs`
  // instance is directly passed and should be reused across our application

There is an alternative way to use fromConfig which is to load the config first and then passing it to the method. The difference is that in this case we will be able to override a specific configuration from the code, whereas before we were reading it and creating the client straight after.

 import monix.connect.aws.auth.MonixAwsConf
 import monix.connect.sqs.Sqs
 import monix.eval.Task
 import software.amazon.awssdk.services.s3.model.NoSuchKeyException
 import pureconfig.KebabCase
 import software.amazon.awssdk.regions.Region

 val f = MonixAwsConf.load(KebabCase).memoizeOnSuccess.flatMap{ awsConf =>
   val updatedAwsConf = awsConf.copy(region = Region.EU_CENTRAL_1)
   Sqs.fromConfig(updatedAwsConf).use(runSqsApp)
 }.runToFuture

Create

On the other hand, one can pass the AWS configurations by parameters, and that is safe since the method also handles the acquisition and release of the connection with Resource. The example below produce exactly the same result as previously using the config file:

import cats.effect.Resource
import monix.connect.sqs.Sqs
import monix.eval.Task
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.regions.Region

val accessKey: String = "TESTKEY"
val secretKey: String = "TESTSECRET"
val basicAWSCredentials = AwsBasicCredentials.create(accessKey, secretKey)
val staticCredProvider = StaticCredentialsProvider.create(basicAWSCredentials)

val sqs: Resource[Task, Sqs] = Sqs.create(staticCredProvider, Region.AWS_GLOBAL)   

Create Unsafe

The last and less recommended alternatively to create the connection is to just pass an already created instance of a software.amazon.awssdk.services.s3.S3AsyncClient, which in that case, the returns directly the Sqs, avoiding to dealing with Resource. As the same title suggests, this is not a safe way of creating an Sqs since it puts on the developer the responsibility to correctly close the client, making it less idiomatic and prone to errors like eagerly closing the connection or on the other hand, never releasing it, thus waisting resources.

An example:

import monix.connect.sqs.Sqs
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.regions.Region

val sqsAsyncClient: SqsAsyncClient = SqsAsyncClient
 .builder()
 .credentialsProvider(DefaultCredentialsProvider.create())
 .region(Region.EU_CENTRAL_1)
 .build()

val sqs: Sqs = Sqs.createUnsafe(sqsAsyncClient)

Notice that createUnsafe is an overloaded method that also accepts to pass the settings values separately:

import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.regions.Region
import monix.connect.sqs.Sqs

val sqs: Sqs = Sqs.createUnsafe(DefaultCredentialsProvider.create(), Region.AF_SOUTH_1)

Core Components

Once the connection is created, we are ready to start sending and receiving messages to sqs queues. Notice that the Sqs instance we created previously is a case class conformed of three basic components Operator, Consumer, Producer.

Operator

This component aggregates the utility operations in the SQS domain, like creating and deleting and listing queues, get the url out of the queue name, creating tags, permissions and so on.

Create queue

Standard

Creating a standard queue only requires the queue name and returns its respective generated url, which will be used later from other operations to refer to this queue, instead of by using the name. You can alternatively pass some queue attributes to that queue like the delaySeconds, visibilityTimeout, etc. Otherwise, it will use their default values.

import monix.connect.sqs.{Sqs, SqsOperator}
import monix.connect.sqs.domain.QueueName

val sqs: Sqs = Sqs.fromConfig.use{ case Sqs(operator: SqsOperator, _, _) =>
  val queueName = QueueName("my-standard-queue")
  operator.createQueue(queueName)
}

Fifo

On the other hand, if the application or business logic requires to use a FIFO queue, it would require the FIFO_QUEUE queue attribute to be true and the queue name ending with .fifo .

import monix.connect.sqs.{Sqs, SqsOperator}
import software.amazon.awssdk.services.sqs.model.QueueAttributeName
import monix.connect.sqs.domain.QueueName

Sqs.fromConfig.use { case Sqs(operator: SqsOperator, _, _) =>
  val fifoQueueName = QueueName("my-fifo-queue.fifo") //important that it ends with fifo 
  val queueAttributes = Map(QueueAttributeName.FIFO_QUEUE -> "true")
  operator.createQueue(fifoQueueName, attributes = queueAttributes)
}

Get queue url

The queue url is required on all operations, including producing and consuming messages. This, the action eventually returns QueueUrl given its QueueName, however, it will return a failed task if queue name does not exist.

import monix.connect.sqs.{Sqs, SqsOperator}
import monix.connect.sqs.domain.QueueName

Sqs.fromConfig.use { case Sqs(operator: SqsOperator, _, _) =>
  val queueName = QueueName("my-queue")
  // the queue must already exist, otherwise it will fail 
  queueUrl <- operator.getQueueUrl(queueName)
}.runToFuture

Delete queue

Permanently deletes the queue from sqs from the given QueueName.

import monix.connect.sqs.Sqs
import monix.connect.sqs.domain.QueueName

Sqs.fromConfig.use { sqs =>
  val queueName = QueueName("my-queue")
  for {
    queueUrl <- sqs.operator.getQueueUrl(queueName)
    _ <- sqs.operator.deleteQUeue(queueUrl)
  } yield ()
}.runToFuture

Purge queue

Use this operation if you don't want to delete a queue instead remove all of its messages. The process might take up to 60 seconds regardless of the queue's size.

import monix.connect.sqs.Sqs
import monix.connect.sqs.domain.QueueName

Sqs.fromConfig.use { sqs =>
  val queueName = QueueName("my-queue")
  for {
    queueUrl <- sqs.operator.getQueueUrl(queueName)
    _ <- sqs.operator.purgeQueue(queueUrl)
  } yield ()
}.runToFuture

List queues

Emits the queue urls in the current region. The list operation can be conditioned with a queuePrefix, which then would only return the queues whose name starts with the given string. Additionally, the number of elements returned can be limited with the maxResults argument.

import monix.connect.sqs.Sqs
import monix.connect.sqs.domain.QueueUrl
import monix.reactive.Observable

Sqs.fromConfig.use { sqs =>
  val maxResults = 10 //lists at most 10 queues
  val queuePrefix = "some" //only list queues that starts with this prefix
  val queuesObs: Observable[QueueUrl] = sqs.operator.listQueueUrls(Some(queuePrefix), Some(maxResults))
  //business logic here
  queuesObs.countL
}.runToFuture

Producer

This library makes it easier to the user by distinguishing the message abstractions to be sent in two types, FifoMessage and StandardMessage, which will be used relatively with the destination queue type.

package monix.connect.sqs.producer
final case class FifoMessage(body: String,
                       groupId: String,
                       deduplicationId: Option[String] = Option.empty,
                       messageAttributes: Map[String, MessageAttribute] = Map.empty,
                       awsTraceHeader: Option[MessageAttribute])
  extends Message(body, groupId = Some(groupId), deduplicationId = deduplicationId, messageAttributes, awsTraceHeader)
package monix.connect.sqs.producer
final case class StandardMessage(
  body: String,
  messageAttributes: Map[String, MessageAttribute] = Map.empty,
  awsTraceHeader: Option[MessageAttribute] = None,
  delayDuration: Option[FiniteDuration] = None)
  extends Message(body, groupId = None, deduplicationId = None, messageAttributes, awsTraceHeader, delayDuration)

As you can appreciate in above snippets, the groupId and deduplicationId are unique for FifoMessages whereas the StandardMessage is the only one that has delayDuration.

These slight differences are the reason why they are kept in different classes, otherwise it would be confusing to work directly with, and you would have to procure not to create an invalid event to the sqs queue.

Obviously, each message must be used with its respective queue type. Meaning that when producing a new message, we must use FifoMessages for fifo queues and StandardMessage for standard ones.

In continuation, let's see the different existing signatures to produce message to Sqs queues, all of them can be used with either FifoMessage or StandardMessage, as the two of them extend the same class, monix.connect.sqs.producer.Message.

Send single message

This is the most basic produce method operation, in which only one message will be sent to the queue.

import monix.connect.sqs.Sqs
import monix.connect.sqs.domain.QueueName
import monix.connect.sqs.producer.StandardMessage

Sqs.fromConfig.use { case Sqs(operator, producer, _) =>
  val queueName = QueueName("my-queue")
  for {
    queueUrl <- operator.getQueueUrl(queueName)
    message = StandardMessage("Dummy content")
    response <- producer.sendSingleMessage(message, queueUrl)
  } yield response
}.runToFuture

Send parallel batch

This operation instead of single message, it expects a list that is split in groups of ten and sent in batches in parallel.

import monix.connect.sqs.Sqs
import monix.connect.sqs.domain.QueueName
import monix.connect.sqs.producer.StandardMessage

Sqs.fromConfig.use { case Sqs(operator, producer, _) =>
  val queueName = QueueName("my-queue")
  for {
    queueUrl <- operator.getQueueUrl(queueName)
    messages = List(StandardMessage("1"), StandardMessage("2"), StandardMessage("3"))
    response <- producer.sendParBatch(messages, queueUrl)
  } yield response
}.runToFuture

Send sink

A basic sink (aka monix Consumer) that listens to incoming Messages and produces them one by one to the specified queue.

import monix.connect.sqs.{Sqs, SqsOperator}
import monix.connect.sqs.domain.QueueName
import monix.connect.sqs.producer.FifoMessage
import monix.reactive.Observable
import software.amazon.awssdk.services.sqs.model.QueueAttributeName

Sqs.fromConfig.use { case Sqs(operator, producer, _) =>
  val groupId = "group-id123"
  val queueName = QueueName("my-fifo-queue.fifo")
  val queueAttributes = Map(QueueAttributeName.FIFO_QUEUE -> "true", QueueAttributeName.CONTENT_BASED_DEDUPLICATION -> "true")
  for {
    queueUrl <- operator.createQueue(fifoQueueName, attributes = queueAttributes)
    messages = List(FifoMessage("1", groupId), FifoMessage("2", groupId))
    _ <- Observable.fromIterable(messages).consumeWith(producer.sendSink(queueUrl))
  } yield ()
}.runToFuture

Send par batch sink

A more advanced and performant version than sendBatch, which listens to lists of Messages to be emitted splitting them in groups of at most ten messages that are sent in batches in parallel.

import monix.connect.sqs.Sqs
import monix.connect.sqs.domain.QueueName
import monix.connect.sqs.producer.StandardMessage
import monix.reactive.Observable
import software.amazon.awssdk.services.sqs.model.QueueAttributeName

Sqs.fromConfig.use { case Sqs(operator, producer, _) =>
  val queueName = QueueName("my-queue")
  // each batch can be of any size
  // the library takes care of splitting them in groups of 10
  // as it is the maximum size allowed in sqs
  val batch1 = List(StandardMessage("1"), StandardMessage("2"))
  val batch2 = List(StandardMessage("3"), StandardMessage("4"))

  for {
    queueUrl <- operator.createQueue(fifoQueueName, attributes = queueAttributes)
    _ <- Observable.fromIterable(List(batch1, batch2)).consumeWith(producer.sendParBatchSink(queueUrl))
  } yield ()
}.runToFuture

Consumer

Messages from sqs queues can be consumed as DeletableMessage or just as plain ConsumedMessage, more details about it are to be explained in the following sub sections.

Deletable message

As the name implies, a deletable message provides the user with the ability to delete a messages from its source queue, such action is normally performed once the message is considered to be processed. The choice for an application with at least once messaging semantics.

import monix.connect.sqs.{Sqs, SqsOperator}
import monix.connect.sqs.domain.QueueName
import monix.connect.sqs.producer.DeletableMessage

def fakeDbInsert(deletableMessage: DeletableMessage): Task[Unit]

Sqs.fromConfig.use { case Sqs(operator, _, consumer) =>
  val queueName = QueueName("my-queue")
  val groupId = "groupId123"
  val deduplicationId = "deduplicationId123"
  for {
    queueUrl <- operator.getQueueUrl(queueName)
    message = FifoMessage(body = "my dummy content", groupId = groupId, deduplicationId = deduplicationId)
    _ <- consumer.receiveManualDelete(queueUrl) //: Observable[DeletableMessage]
      .doOnNext(fakeDbInsert)
      // only after the message is inserted in the db we delete the message from the queue 
      .mapEval(deletableMessage => deletableMessage.deleteFromQueue()) 
      .completedL
  } yield response
}.runToFuture

If you prefer to only perform a single receive request, there is also a method called receiveSingleManualDelete, which will return a list of as many messages as specified in the maxMessages argument (max 10), wrapped in a Task.

Auto deleted message

On the other hand there might be cases where it is preferred to implemented our application with at most one semantics, for instance when it must not process the same message twice, or it's affordable to losing messages.

In these cases we could just use the auto deleted signatures, which will emit events that are already deleted from the queue, so that the user will not have to worry about processing it multiple times nor handling its deletion.

There are two different signatures for consuming auto/already-deleted messages, the first one performs a single receive request and is called receiveSingleAutoDelete, returning a list of at most 10 messages wrapped in Task. Therefore, the streaming version of such is just called receiveAutoDelete, which returns an Observable that keeps emitting plain ConsumedMessages.

Local testing

The recommended way to test an application that relies on Sqs is to use docker. Below are shown two of the most solutions to getting started:

Localstack

Localstack provides a fully functional local AWS cloud stack that in this case the user can use to develop and test locally and offline the integration of the application with SQS.

Add the following service description to your docker-compose.yaml file:

services:
  localstack:
    image: localstack/localstack:0.11.1
    ports:
      - '4576:4576'
  environment:
    - SERVICES=sqs #prevents to start the rest of the aws services

Elasticmq

Another alternative that also implements a fully asynchronous Sqs compatible interface, and it is based in akka actors.

services:
  elasticmq:
    image: softwaremill/elasticmq-native:latest
    ports:
      - '9324:9324'

Run the docker command to build and start the sqs service:

docker-compose -f docker-compose.yml up -d 

Check out that the service has started correctly.

Below snippet shows a sample of the settings used to locally Sqs using a hocoon config file, which should be placed under your test resources folder src/test/resources.

{
 monix-aws: {
   credentials {
     provider: "static"  
     access-key-id: "TESTKEY"
     secret-access-key: "TESTSECRET"    
   }
   endpoint: "http://localhost:9324"
   region: "us-west-2"
 }
}

Alternatively you can statically create connection by parameters:

import monix.connect.sqs.Sqs
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}

import java.net.URI

val endPoint: String = "http://localhost:9324"

val awsAccessKey: String = "TESTKEY" 
val awsSecretKey: String = "TESTSECRET" 

val basicAWSCredentials = AwsBasicCredentials.create(awsAccessKey, awsSecretKey)
val staticCredentialsProvider = StaticCredentialsProvider.create(basicAWSCredentials)

val sqs = Sqs.create(credentialsProvider = basicAWSCredentials, region = Region.EU_WEST_1, endpoint = Some(endPoint))

Now you are ready to have fun!

← AWS S3Elasticsearch →
  • Introduction
  • Dependency
  • Async Client
    • From config
  • Create
    • Create Unsafe
  • Core Components
  • Operator
    • Create queue
    • Get queue url
    • Delete queue
    • Purge queue
    • List queues
  • Producer
    • Send single message
    • Send parallel batch
    • Send sink
    • Send par batch sink
  • Consumer
    • Deletable message
    • Auto deleted message
  • Local testing
    • Localstack
    • Elasticmq

Copyright © 2020-2022 The Monix Connect Developers.