Monix Connect

Monix Connect

  • API Docs
  • Documentation
  • GitHub

›Documentation

Documentation

  • Overview
  • Apache Parquet
  • AWS DynamoDB
  • AWS S3
  • AWS SQS
  • Elasticsearch
  • Google Cloud Storage
  • HDFS
  • MongoDB
  • Redis

AWS S3

Introduction

AWS Simple Storage Service (S3) is an object storage service that offers industry leading scalability, availability, security and performance. It allows data storage of any amount of data, commonly used as a data lake for big data applications which can now be easily integrated with monix.

This module exposes a wide range of methods for interacting with S3 buckets and objects, where the most exciting ones are the multipart download and upload, since they use the power of reactive streams to deal with objects of any size in smaller parts (chunks).

Dependency

Add the following dependency in your build.sbt:

libraryDependencies += "io.monix" %% "monix-s3" % "0.6.0"

Async Client

This connector uses the underlying S3AsyncClient from the java aws sdk, it will allow us to authenticate and create a non blocking channel between our application and the AWS S3 service.

There are different ways to create the connection, all available from the singleton object monix.connect.s3.S3. It is explained in more detail in the following sub-sections:

From config

The laziest and recommended way to create the S3 connection is to do it from configuration file. To do so, you'd just need to create an application.conf following the template from reference.conf, which also represents the default config in case no additional is provided.

Below snippet shows an example of the configuration file to authenticate via StaticCredentialsProvider and region EU-WEST-1, as you can appreciate the http client settings are commented out since they are optional, however they could have been specified too for a more fine grained configuration of the underlying NettyNioAsyncHttpClient.

  monix-aws: {
    credentials {
      // [anonymous, default, environment, instance, system, profile, static]
      provider: "static" 

      // optional - only required with static credentials
      static {
        access-key-id: "TESTKEY"      
        secret-access-key: "TESTSECRET"
        session-token: ""
      }
    }
  
    // [ap-south-1, us-gov-east-1, af-south-1, eu-west-2, ...]
    region: "eu-west-1"

    // optional
    #endpoint: ""

    // optional
    # http-client: {
    #   max-concurrency: 10
    #   max-pending-connection-acquires: 1000
    #   connection-acquisition-timeout: 2 minutes
    #   connection-time-to-live: 1 minute
    #   use-idle-connection-reaper: false
    #   read-timeout: 100 seconds
    #   write-timeout: 100 seconds
    # }
  }

This config file should be placed in the resources folder, therefore it will be automatically picked up from the method call S3.fromConfig, which will return a cats.effect.Resource[Task, S3]. The resource is responsible of the creation and release of the S3 client.

Try to reuse the created S3 client as much as possible in your application multiple times in your application. Otherwise, creating it multiple times will waste precious resources... See below code snippet to understand the concept:

 import monix.connect.s3.S3
 import monix.eval.Task
 import software.amazon.awssdk.services.s3.model.NoSuchKeyException
 import pureconfig.KebabCase

 val bucket = "my-bucket"
 val key = "my-key"
 val content = "my-content"
 
 def runS3App(s3: S3): Task[Array[Byte]] = {
   for {
     _ <- s3.createBucket(bucket)
     _ <- s3.upload(bucket, key, content.getBytes)
     existsObject <- s3.existsObject(bucket, key)
     download <- {
       if(existsObject) s3.download(bucket, key)
       else Task.raiseError(NoSuchKeyException.builder().build()) 
     }
   } yield download
 }


  // It allows to specify the [[pureconfig.NamingConvention]]
  // from its argument, by default it uses the [[KebabCase]].  
  val f = S3.fromConfig(KebabCase).use(s3 => runS3App(s3)).runToFuture
  // The connection got created and released within the use method and the `S3`
  // instance is directly passed and should be reused across our application

There is an alternative way to use fromConfig which is to load the config first and then passing it to the method. The difference is that in this case we will be able to override a specific configuration from the code, whereas before we were reading it and creating the client straight after.

 import monix.connect.aws.auth.MonixAwsConf
 import monix.connect.s3.S3
 import monix.eval.Task
 import software.amazon.awssdk.services.s3.model.NoSuchKeyException
 import pureconfig.KebabCase
 import software.amazon.awssdk.regions.Region

 val f = MonixAwsConf.load(KebabCase).memoizeOnSuccess.flatMap{ awsConf =>
   val updatedAwsConf = awsConf.copy(region = Region.EU_CENTRAL_1)
   S3.fromConfig(updatedAwsConf).use(s3 => runS3App(s3))
 }.runToFuture

Create

On the other hand, one can pass the AWS configurations by parameters, and that is safe since the method also handles the acquisition and release of the connection with Resource. The example below produce exactly the same result as previously using the config file:

import cats.effect.Resource
import monix.connect.s3.S3
import monix.eval.Task
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.regions.Region

val s3AccessKey: String = "TESTKEY"
val s3SecretKey: String = "TESTSECRET"
val basicAWSCredentials = AwsBasicCredentials.create(s3AccessKey, s3SecretKey)
val staticCredProvider = StaticCredentialsProvider.create(basicAWSCredentials)

val s3: Resource[Task, S3] = S3.create(staticCredProvider, Region.AWS_GLOBAL)   

Create Unsafe

Another different alternatively is to just pass an already created instance of a software.amazon.awssdk.services.s3.S3AsyncClient, which in that case, the return type would be directly S3, so there won't be no need to deal with Resource. As the same title suggests, this is not a pure way of creating an S3 since it might be possible to pass a malformed instance by parameter or just one that was already released (closed).

An example:

import monix.connect.s3.S3
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.regions.Region

val s3AsyncClient: S3AsyncClient = S3AsyncClient
 .builder()
 .credentialsProvider(DefaultCredentialsProvider.create())
 .region(Region.EU_CENTRAL_1)
 .build()

val s3: S3 = S3.createUnsafe(s3AsyncClient)

Notice that createUnsafe an overloaded method that also has just another variant that excepts settings values:

import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.regions.Region
import monix.connect.s3.S3

val bucket: String = "my-bucket" 
val s3: S3 = S3.createUnsafe(DefaultCredentialsProvider.create(), Region.AF_SOUTH_1)

Create bucket

Once you have configured the client, you would probably need to create a bucket:

import software.amazon.awssdk.services.s3.model.CreateBucketResponse
import monix.connect.s3.S3

val bucket: String = "my-bucket" 
val s3: S3

val t: Task[CreateBucketResponse] = s3.createBucket(bucket)

Delete bucket

On the other hand if you want to remove the bucket:

import software.amazon.awssdk.services.s3.model.DeleteBucketResponse
import monix.connect.s3.S3

val bucket: String = "my-bucket" 
val s3: S3

val t: Task[DeleteBucketResponse] = s3.deleteBucket(bucket)

Delete object

You can also delete an object within the specified bucket with:

import software.amazon.awssdk.services.s3.model.{DeleteObjectResponse, ListObjectsResponse}
import monix.connect.s3.S3

val bucket: String = "my-bucket" 
val key: String = "path/to/my/object.txt" 
val s3: S3

val t: Task[DeleteObjectResponse] = s3.deleteObject(bucket, key)

Exists bucket

Check whether the specified bucket exists or not.

import monix.connect.s3.S3

val bucket: String = "my-bucket" 
val s3: S3

val t: Task[Boolean] = s3.existsBucket(bucket)

Exists object

Check whether the specified object exists or not.

import monix.connect.s3.S3

val bucket: String = "my-bucket" 
val key: String = "path/to/my/object.txt" 
val s3: S3

val t: Task[Boolean] = s3.existsObject(bucket, key))

List objects

Lists the objects within a bucket and prefix.

import monix.connect.s3.S3
import monix.eval.Task
import monix.reactive.Consumer
import monix.reactive.Observable
import software.amazon.awssdk.services.s3.model.{DeleteObjectResponse, ListObjectsResponse}
import software.amazon.awssdk.services.s3.model.S3Object

val bucket = "my-bucket"
val prefix = "prefix/to/list/keys/"

// example to calculate the total object's size
val totalSize: Task[Long] = S3.fromConfig.use{ s3 =>
 s3.listObjects(bucket, prefix = Some(prefix))
   .consumeWith(Consumer.foldLeft(0)((acc, s3Object) => acc + s3Object.size))
}

// use `maxTotalKeys` to set a limit to the number of keys you want to fetch
val s3ObjectsWithLimit: Observable[S3Object] = 
 S3.createUnsafe(credentialsProvider, region)
   .listObjects(bucket, maxTotalKeys = Some(1011), prefix = Some(prefix))

Listing a very long list of objects can take long, and it might cause the underlying netty connection to error due to a request timeout. In order avoid that, you could set a smaller maximum total of keys fetched, or alternatively increase the timeouts setting on software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient, which will be added to the software.amazon.awssdk.services.s3.S3AsyncClient, you can also change that from the .conf file.

List latest and oldest objects

This connector also exposes a variant for listing the latest and oldest objects under a given path. They might be handy in those cases where for instance, there is a list of files under the same path prefix , but we are only interested in the latest updated one, in such case you could just make use of listLatestObject or listLatestNObjects. On the other hand if the interest is on the oldest objects, listOldestObject or listOldestNObjects would be the appropriate choice.

Download

Downloads the specified object in a single request as byte array.

import software.amazon.awssdk.services.s3.model.PutObjectResponse
import monix.connect.s3.S3
import monix.eval.Task

val bucket = "my-bucket"
val key = "sample/download/path/file.txt"
val s3: S3

val t: Task[Array[Byte]] = s3.download("my-bucket", key)

Note that this operation is only suitable to be used for small objects, for large ones downloadMultipart should be used.

On contrast, it might be useful in those cases in which the user only wants to download the first n bytes from an object:

  // only downloads the first 10 bytes
  s3.download("my-bucket", key, firstNBytes = Some(10))

Download multipart

A method that safely downloads objects of any size by performing partial download requests. The number of bytes to get per each request is specified by the chunkSize.

import monix.connect.s3.S3
import monix.reactive.Observable
import monix.reactive.Consumer
val bucket = "my-bucket"
val key = "sample/path/file.json"
val s3: S3

// the minimum configurable chunksize is 5MB, although the last chunk might be of smaller size
val ob: Observable[Array[Byte]] = s3.downloadMultipart("my-bucket", key, chunkSize = 5242880)

Upload

You can also easily upload an object into an S3 with the upload signature. Note that if you need to update large amount of data you should not be using this method, see instead multipartUpload`.

import software.amazon.awssdk.services.s3.model.PutObjectResponse
import monix.connect.s3.S3
import monix.eval.Task

val bucket = "my-bucket"
val key = "sample/upload/path/file.txt"
val content: Array[Byte] = "Dummy file content".getBytes()
val s3: S3

val t: Task[PutObjectResponse] = s3.upload("my-bucket", key, content)

Upload multipart

When dealing with large files of data you should use the multipartUpload operation. It is used for consuming an observable of bytes that sends partial upload request of an specific minimum size. When the emitted bytes are smallaller than the minimumb request chunk size, they will keep being accumulated until reaching the minimum size mentioned previously.

In comparison with the single upload, it reduces substantially the risk on getting OOM errors or failed http requests due to the size of the body.

import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse
import monix.connect.s3.S3
import monix.reactive.Observable
import monix.reactive.Consumer
import monix.eval.Task

val bucket = "my-bucket"
val key = "sample/upload/path/file.txt"
val ob = Observable[Array[Byte]] // preasumably we have a byte array `Observable`
val s3: S3

val f = ob.consumeWith(s3.uploadMultipart(bucket, key)).runToFuture

Note that the chunksize can be specified on the method call, in which the default and minimum value is 5MB (5242880 bytes). It also accepts specific aws configurations such as acl, requestPayer, etc.

Local testing

There is actually a very good support on regards to testing AWS S3 locally, the following sections describes different popular alternatives:

Localstack

A fully functional local AWS cloud stack available as a docker image.

You would just need to define it as a service in your docker-compose.yml:

localstack:
   image: localstack/localstack:0.11.0
   hostname: localstack
   container_name: localstack
   ports:
     - '4566:4566'
   environment:
     - SERVICES=s3
# very important to specify `s3` on `SERVICES` env var, it would prevent to spin up the rest of the AWS services.

Then, execute the following command to build and run the localstack image:

docker-compose -f ./docker-compose.yml up -d localstack

A good point on favor to using localstack is that it provides support for AWS Anonymous Credentials, meaning that you can easily connect to your local S3 service with no required authentication.

Below snippet is an example on how to set up the config to be used from S3.create():

{
 monix-aws: {
   credentials {
     provider: "anonymous"  
   }
   endpoint: "http://localhost:4566"
   region: "us-west-2"
 }
}

On the other hand, if you prefer to create the client from your app:

import monix.connect.s3.S3
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider
import java.net.URI

val localStackEndpoint = "http://localhost:4566"

val s3: S3 = S3.createUnsafe(new AnonymousCredentialsProvider, Region.EU_WEST_1, Some(URI.create(localStackEndpoint)))

Creating local buckets in localstack

Whenever you create a bucket on localstack, you would better set its ACL (Access Control List) as public-read since it might prevent you to encounter 403 access denied when reading it back. If you set the container_name field to localstack in docker-compose.yaml you can create the bucket and specify the right ACL like:

docker exec localstack awslocal s3 mb s3://my-bucket
docker exec localstack awslocal s3api put-bucket-acl --bucket my-bucket --acl public-read

On the other hand, if prefer to do that from code:

import monix.connect.s3.S3
import org.scalatest.BeforeAndAfterAll

override def beforeAll() = {
  super.beforeAll()
  S3.createUnsafe(s3AsyncClient).createBucket("my-bucket", acl = Some("public-read")).runSyncUnsafe()
}

Minio

Minio is another well known docker image that emulates AWS S3.

The advantages of using minio over localstack is that it provides a nice UI that allows you to quickly visualize and manage the objects and buckets stored in your local, instead of having to exec in your local image.

On the other hand, a disadvantage could be that it does not support Anonymous Credentials, so you have to specify key and secret to create the connection.

Add the following service description to your docker-compose.yaml file:

minio:
  image: minio/minio
  ports:
    - "9000:9000"
  volumes:
    - ./minio/data:/data
  environment:
    - MINIO_ACCESS_KEY=TESTKEY
    - MINIO_SECRET_KEY=TESTSECRET
  healthcheck:
    test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
    interval: 35s
    timeout: 20s
    retries: 3
  command: server --compat /data

Then, run the following command to build and start the minio image:

docker-compose -f ./docker-compose.yml up -d minio

Check out that the service has started correctly, notice that there is a healthcheck on the definition of the minio service, that's because it is a heavy image and sometimes it takes bit long to start or it even fails, so by adding it will prevent that to happen.

Finally, now you can already create the connection to AWS S3, notice that minio does not support Anonymous credentials, instead you'll have to use the Static Credentials Provider and specify the key and secret corresponding respectively to the defined environment variables MINIO_ACCESS_KEY and MINIO_SECRET_KEY.

Below snippet represents the settings that would be needed to locally connect to minio using config file, which should be placed under your test resources folder src/test/resources.

{
 monix-aws: {
   credentials {
     provider: "static"  
     access-key-id: "TESTKEY"
     secret-access-key: "TESTSECRET"    
   }
   endpoint: "http://localhost:9000"
   region: "us-west-2"
 }
}

Alternatively you can statically create connection by parameters:

import monix.connect.s3.S3
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import java.net.URI

val minioEndPoint: String = "http://localhost:9000"

val s3AccessKey: String = "TESTKEY" //equal to the env var `MINIO_ACCESS_KEY`  
val s3SecretKey: String = "TESTSECRET" //equal to the `env var `MINIO_SECRET_KEY`

val basicAWSCredentials = AwsBasicCredentials.create(s3AccessKey, s3SecretKey)
val staticCredentialsProvider = StaticCredentialsProvider.create(basicAWSCredentials)

val s3Resource = S3.create(basicAWSCredentials, Region.EU_WEST_1, Some(URI.create(minioEndPoint)))

JVM S3 Mock library

In case you prefer to start and stop the S3 service from the code of same test and therefore not depending on docker but just on a JVM library dependency, you can refer to findify/s3Mock to find out more.

← AWS DynamoDBAWS SQS →
  • Introduction
  • Dependency
  • Async Client
    • From config
  • Create
  • Create Unsafe
  • Create bucket
  • Delete bucket
  • Delete object
  • Exists bucket
  • Exists object
  • List objects
    • List latest and oldest objects
  • Download
  • Download multipart
  • Upload
  • Upload multipart
  • Local testing
    • Localstack
    • Minio
    • JVM S3 Mock library

Copyright © 2020-2022 The Monix Connect Developers.