AWS S3
Introduction
AWS Simple Storage Service (S3) is an object storage service that offers industry leading scalability, availability, security and performance. It allows data storage of any amount of data, commonly used as a data lake for big data applications which can now be easily integrated with monix.
This module exposes a wide range of methods for interacting with S3 buckets and objects, where the most exciting ones are the multipart download and upload, since they use the power of reactive streams to deal with objects of any size in smaller parts (chunks).
Dependency
Add the following dependency in your build.sbt:
libraryDependencies += "io.monix" %% "monix-s3" % "0.6.0"
Async Client
This connector uses the underlying S3AsyncClient
from the java aws sdk,
it will allow us to authenticate and create a non blocking channel between our application and the AWS S3 service.
There are different ways to create the connection, all available from the singleton object monix.connect.s3.S3
. It is explained in more detail in the following sub-sections:
From config
The laziest and recommended way to create the S3 connection is to do it from configuration file.
To do so, you'd just need to create an application.conf
following the template from reference.conf,
which also represents the default config in case no additional is provided.
Below snippet shows an example of the configuration file to authenticate via StaticCredentialsProvider
and region EU-WEST-1
, as you can appreciate the
http client settings are commented out since they are optional, however they could have been specified too for a more fine grained configuration of the underlying NettyNioAsyncHttpClient
.
monix-aws: {
credentials {
// [anonymous, default, environment, instance, system, profile, static]
provider: "static"
// optional - only required with static credentials
static {
access-key-id: "TESTKEY"
secret-access-key: "TESTSECRET"
session-token: ""
}
}
// [ap-south-1, us-gov-east-1, af-south-1, eu-west-2, ...]
region: "eu-west-1"
// optional
#endpoint: ""
// optional
# http-client: {
# max-concurrency: 10
# max-pending-connection-acquires: 1000
# connection-acquisition-timeout: 2 minutes
# connection-time-to-live: 1 minute
# use-idle-connection-reaper: false
# read-timeout: 100 seconds
# write-timeout: 100 seconds
# }
}
This config file should be placed in the resources
folder, therefore it will be automatically picked up from the method call S3.fromConfig
, which will return a cats.effect.Resource[Task, S3]
.
The resource is responsible of the creation and release of the S3 client.
Try to reuse the created S3 client as much as possible in your application multiple times in your application. Otherwise, creating it multiple times will waste precious resources... See below code snippet to understand the concept:
import monix.connect.s3.S3
import monix.eval.Task
import software.amazon.awssdk.services.s3.model.NoSuchKeyException
import pureconfig.KebabCase
val bucket = "my-bucket"
val key = "my-key"
val content = "my-content"
def runS3App(s3: S3): Task[Array[Byte]] = {
for {
_ <- s3.createBucket(bucket)
_ <- s3.upload(bucket, key, content.getBytes)
existsObject <- s3.existsObject(bucket, key)
download <- {
if(existsObject) s3.download(bucket, key)
else Task.raiseError(NoSuchKeyException.builder().build())
}
} yield download
}
// It allows to specify the [[pureconfig.NamingConvention]]
// from its argument, by default it uses the [[KebabCase]].
val f = S3.fromConfig(KebabCase).use(s3 => runS3App(s3)).runToFuture
// The connection got created and released within the use method and the `S3`
// instance is directly passed and should be reused across our application
There is an alternative way to use fromConfig
which is to load the config first and then passing it to
the method. The difference is that in this case we will be able to override a specific configuration
from the code, whereas before we were reading it and creating the client straight after.
import monix.connect.aws.auth.MonixAwsConf
import monix.connect.s3.S3
import monix.eval.Task
import software.amazon.awssdk.services.s3.model.NoSuchKeyException
import pureconfig.KebabCase
import software.amazon.awssdk.regions.Region
val f = MonixAwsConf.load(KebabCase).memoizeOnSuccess.flatMap{ awsConf =>
val updatedAwsConf = awsConf.copy(region = Region.EU_CENTRAL_1)
S3.fromConfig(updatedAwsConf).use(s3 => runS3App(s3))
}.runToFuture
Create
On the other hand, one can pass the AWS configurations by parameters, and that is safe since the method also handles the acquisition and release of the connection with
Resource
. The example below produce exactly the same result as previously using the config file:
import cats.effect.Resource
import monix.connect.s3.S3
import monix.eval.Task
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.regions.Region
val s3AccessKey: String = "TESTKEY"
val s3SecretKey: String = "TESTSECRET"
val basicAWSCredentials = AwsBasicCredentials.create(s3AccessKey, s3SecretKey)
val staticCredProvider = StaticCredentialsProvider.create(basicAWSCredentials)
val s3: Resource[Task, S3] = S3.create(staticCredProvider, Region.AWS_GLOBAL)
Create Unsafe
Another different alternatively is to just pass an already created instance of a software.amazon.awssdk.services.s3.S3AsyncClient
, which in that case,
the return type would be directly S3
, so there won't be no need to deal with Resource
.
As the same title suggests, this is not a pure way of creating an S3
since it might be possible to pass a malformed
instance by parameter or just one that was already released (closed).
An example:
import monix.connect.s3.S3
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.regions.Region
val s3AsyncClient: S3AsyncClient = S3AsyncClient
.builder()
.credentialsProvider(DefaultCredentialsProvider.create())
.region(Region.EU_CENTRAL_1)
.build()
val s3: S3 = S3.createUnsafe(s3AsyncClient)
Notice that createUnsafe
an overloaded method that also has just another variant that excepts settings values:
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.regions.Region
import monix.connect.s3.S3
val bucket: String = "my-bucket"
val s3: S3 = S3.createUnsafe(DefaultCredentialsProvider.create(), Region.AF_SOUTH_1)
Create bucket
Once you have configured the client, you would probably need to create a bucket:
import software.amazon.awssdk.services.s3.model.CreateBucketResponse
import monix.connect.s3.S3
val bucket: String = "my-bucket"
val s3: S3
val t: Task[CreateBucketResponse] = s3.createBucket(bucket)
Delete bucket
On the other hand if you want to remove the bucket:
import software.amazon.awssdk.services.s3.model.DeleteBucketResponse
import monix.connect.s3.S3
val bucket: String = "my-bucket"
val s3: S3
val t: Task[DeleteBucketResponse] = s3.deleteBucket(bucket)
Delete object
You can also delete an object within the specified bucket with:
import software.amazon.awssdk.services.s3.model.{DeleteObjectResponse, ListObjectsResponse}
import monix.connect.s3.S3
val bucket: String = "my-bucket"
val key: String = "path/to/my/object.txt"
val s3: S3
val t: Task[DeleteObjectResponse] = s3.deleteObject(bucket, key)
Exists bucket
Check whether the specified bucket exists or not.
import monix.connect.s3.S3
val bucket: String = "my-bucket"
val s3: S3
val t: Task[Boolean] = s3.existsBucket(bucket)
Exists object
Check whether the specified object exists or not.
import monix.connect.s3.S3
val bucket: String = "my-bucket"
val key: String = "path/to/my/object.txt"
val s3: S3
val t: Task[Boolean] = s3.existsObject(bucket, key))
List objects
Lists the objects within a bucket and prefix.
import monix.connect.s3.S3
import monix.eval.Task
import monix.reactive.Consumer
import monix.reactive.Observable
import software.amazon.awssdk.services.s3.model.{DeleteObjectResponse, ListObjectsResponse}
import software.amazon.awssdk.services.s3.model.S3Object
val bucket = "my-bucket"
val prefix = "prefix/to/list/keys/"
// example to calculate the total object's size
val totalSize: Task[Long] = S3.fromConfig.use{ s3 =>
s3.listObjects(bucket, prefix = Some(prefix))
.consumeWith(Consumer.foldLeft(0)((acc, s3Object) => acc + s3Object.size))
}
// use `maxTotalKeys` to set a limit to the number of keys you want to fetch
val s3ObjectsWithLimit: Observable[S3Object] =
S3.createUnsafe(credentialsProvider, region)
.listObjects(bucket, maxTotalKeys = Some(1011), prefix = Some(prefix))
Listing a very long list of objects can take long, and it might cause the underlying netty connection to error due to
a request timeout.
In order avoid that, you could set a smaller maximum total of keys fetched, or alternatively increase the timeouts
setting on software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient
,
which will be added to the software.amazon.awssdk.services.s3.S3AsyncClient
, you can also change that from the .conf file.
List latest and oldest objects
This connector also exposes a variant for listing the latest and oldest objects under a given path.
They might be handy in those cases where for instance, there is a list of files under
the same path prefix , but we are only interested in the latest updated one,
in such case you could just make use of listLatestObject
or listLatestNObjects
.
On the other hand if the interest is on the oldest objects, listOldestObject
or listOldestNObjects
would be the appropriate choice.
Download
Downloads the specified object in a single request as byte array.
import software.amazon.awssdk.services.s3.model.PutObjectResponse
import monix.connect.s3.S3
import monix.eval.Task
val bucket = "my-bucket"
val key = "sample/download/path/file.txt"
val s3: S3
val t: Task[Array[Byte]] = s3.download("my-bucket", key)
Note that this operation is only suitable to be used for small objects,
for large ones downloadMultipart
should be used.
On contrast, it might be useful in those cases in which the user only wants to download the first n bytes from an object:
// only downloads the first 10 bytes
s3.download("my-bucket", key, firstNBytes = Some(10))
Download multipart
A method that safely downloads objects of any size by performing partial download requests.
The number of bytes to get per each request is specified by the chunkSize
.
import monix.connect.s3.S3
import monix.reactive.Observable
import monix.reactive.Consumer
val bucket = "my-bucket"
val key = "sample/path/file.json"
val s3: S3
// the minimum configurable chunksize is 5MB, although the last chunk might be of smaller size
val ob: Observable[Array[Byte]] = s3.downloadMultipart("my-bucket", key, chunkSize = 5242880)
Upload
You can also easily upload an object into an S3 with the upload signature. Note that if you need to update large amount of data you should not be using this method, see instead multipartUpload`.
import software.amazon.awssdk.services.s3.model.PutObjectResponse
import monix.connect.s3.S3
import monix.eval.Task
val bucket = "my-bucket"
val key = "sample/upload/path/file.txt"
val content: Array[Byte] = "Dummy file content".getBytes()
val s3: S3
val t: Task[PutObjectResponse] = s3.upload("my-bucket", key, content)
Upload multipart
When dealing with large files of data you should use the multipartUpload
operation.
It is used for consuming an observable of bytes that sends partial upload request of an specific minimum size.
When the emitted bytes are smallaller than the minimumb request chunk size, they will keep being accumulated until reaching the minimum size mentioned previously.
In comparison with the single upload, it reduces substantially the risk on getting OOM errors or failed http requests due to the size of the body.
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse
import monix.connect.s3.S3
import monix.reactive.Observable
import monix.reactive.Consumer
import monix.eval.Task
val bucket = "my-bucket"
val key = "sample/upload/path/file.txt"
val ob = Observable[Array[Byte]] // preasumably we have a byte array `Observable`
val s3: S3
val f = ob.consumeWith(s3.uploadMultipart(bucket, key)).runToFuture
Note that the chunksize
can be specified on the method call, in which the default and minimum value is 5MB (5242880 bytes).
It also accepts specific aws configurations such as acl
, requestPayer
, etc.
Local testing
There is actually a very good support on regards to testing AWS S3
locally, the following sections describes different popular alternatives:
Localstack
A fully functional local AWS cloud stack available as a docker image.
You would just need to define it as a service in your docker-compose.yml
:
localstack:
image: localstack/localstack:0.11.0
hostname: localstack
container_name: localstack
ports:
- '4566:4566'
environment:
- SERVICES=s3
# very important to specify `s3` on `SERVICES` env var, it would prevent to spin up the rest of the AWS services.
Then, execute the following command to build and run the localstack image:
docker-compose -f ./docker-compose.yml up -d localstack
A good point on favor to using localstack is that it provides support for AWS Anonymous Credentials, meaning that you can easily connect to your local S3 service with no required authentication.
Below snippet is an example on how to set up the config to be used from S3.create()
:
{
monix-aws: {
credentials {
provider: "anonymous"
}
endpoint: "http://localhost:4566"
region: "us-west-2"
}
}
On the other hand, if you prefer to create the client from your app:
import monix.connect.s3.S3
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider
import java.net.URI
val localStackEndpoint = "http://localhost:4566"
val s3: S3 = S3.createUnsafe(new AnonymousCredentialsProvider, Region.EU_WEST_1, Some(URI.create(localStackEndpoint)))
Creating local buckets in localstack
Whenever you create a bucket on localstack, you would better set its ACL (Access Control List) as public-read
since it might prevent you to encounter 403 access denied when reading it back.
If you set the container_name
field to localstack in docker-compose.yaml
you can create the bucket and specify the right ACL like:
docker exec localstack awslocal s3 mb s3://my-bucket
docker exec localstack awslocal s3api put-bucket-acl --bucket my-bucket --acl public-read
On the other hand, if prefer to do that from code:
import monix.connect.s3.S3
import org.scalatest.BeforeAndAfterAll
override def beforeAll() = {
super.beforeAll()
S3.createUnsafe(s3AsyncClient).createBucket("my-bucket", acl = Some("public-read")).runSyncUnsafe()
}
Minio
Minio is another well known docker image that emulates AWS S3.
The advantages of using minio over localstack is that it provides a nice UI that allows you to quickly visualize and manage the objects and buckets stored in your local, instead of having to exec in your local image.
On the other hand, a disadvantage could be that it does not support Anonymous Credentials, so you have to specify key and secret to create the connection.
Add the following service description to your docker-compose.yaml
file:
minio:
image: minio/minio
ports:
- "9000:9000"
volumes:
- ./minio/data:/data
environment:
- MINIO_ACCESS_KEY=TESTKEY
- MINIO_SECRET_KEY=TESTSECRET
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 35s
timeout: 20s
retries: 3
command: server --compat /data
Then, run the following command to build and start the minio image:
docker-compose -f ./docker-compose.yml up -d minio
Check out that the service has started correctly, notice that there is a healthcheck on the definition of the minio service, that's because it is a heavy image and sometimes it takes bit long to start or it even fails, so by adding it will prevent that to happen.
Finally, now you can already create the connection to AWS S3, notice that minio does not support Anonymous credentials, instead you'll have to use the Static Credentials Provider and specify the key and secret corresponding respectively to the
defined environment variables MINIO_ACCESS_KEY
and MINIO_SECRET_KEY
.
Below snippet represents the settings that would be needed to locally connect to minio using config file, which should be placed
under your test resources folder src/test/resources
.
{
monix-aws: {
credentials {
provider: "static"
access-key-id: "TESTKEY"
secret-access-key: "TESTSECRET"
}
endpoint: "http://localhost:9000"
region: "us-west-2"
}
}
Alternatively you can statically create connection by parameters:
import monix.connect.s3.S3
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import java.net.URI
val minioEndPoint: String = "http://localhost:9000"
val s3AccessKey: String = "TESTKEY" //equal to the env var `MINIO_ACCESS_KEY`
val s3SecretKey: String = "TESTSECRET" //equal to the `env var `MINIO_SECRET_KEY`
val basicAWSCredentials = AwsBasicCredentials.create(s3AccessKey, s3SecretKey)
val staticCredentialsProvider = StaticCredentialsProvider.create(basicAWSCredentials)
val s3Resource = S3.create(basicAWSCredentials, Region.EU_WEST_1, Some(URI.create(minioEndPoint)))
JVM S3 Mock library
In case you prefer to start and stop the S3 service from the code of same test and therefore not depending on docker but just on a JVM library dependency, you can refer to findify/s3Mock to find out more.