Set of useful libraries for Micronaut. All the libraries are available in the Maven Central.
-
AWS SDK for Micronaut - integration for DynamoDB, Kinesis, Simple Storage Service (S3), Simple Email Service (SES), Simple Notification Service (SNS), Simple Queue Service (SQS) and [WebSockets for API Gateway]
-
Micronaut for API Gateway Proxy - develop API Gateway Proxy Lambda functions using Micronaut HTTP server capabilities (currently superseded by the official library)
AWS SDK for Micronaut
AWS SDK for Micronaut is a successor of Grails AWS SDK Plugin. If you are Grails AWS SDK Plugin user you should find many of services familiar.
Provided integrations:
|
Note
|
Micronaut for API Gateway Proxy is handled separately in its own library. |
Key concepts of the AWS SDK for Micronaut:
-
Fully leveraging of Micronaut best practises
-
Low-level API clients such as
AmazonDynamoDBavailable for dependency injection -
Declarative clients and services such as
@KinesisClientwhere applicable -
Configuration driven named service beans
-
Sensible defaults
-
Conditional beans based on presence of classes on the classpath or on the presence of specific properties
-
-
Fully leveraging existing AWS SDK configuration chains (e.g. default credential provider chain, default region provider chain)
-
Strong focus on the ease of testing
-
Low-level API clients such as
AmazonDynamoDBinjected by Micronaut and overridable in the tests -
All high-level services hidden behind an interface for easy mocking in the tests
-
Declarative clients and services for easy mocking in the tests
-
-
Java-enabled but Groovy is a first-class citizen
In this documentation, the high-level approaches will be discussed first before the lower-level services.
Installation
Use artefacts starting with micronaut-amazon-awssdk.
Since 1.2.8 see the particular subprojects for installation instruction.
CloudWatch Logs
This library provides support for reading the latest CloudWatch Logs for given log group, usually when testing Lambda functions.
Installation
implementation 'com.agorapulse:micronaut-amazon-awssdk-cloudwatchlogs:{project-version}'
<dependency>
<groupId>com.agorapulse</groupId>
<artifactId>micronaut-amazon-awssdk-cloudwatchlogs</artifactId>
<version>{project-version}</version>
</dependency>
CloudWatch Logs Service
There is a bean CloudWatchLogsService which can be used to read the latest log events.
package com.agorapulse.micronaut.amazon.awssdk.lambda;
import com.agorapulse.micronaut.amazon.awssdk.cloudwatchlogs.CloudWatchLogsService;
import jakarta.inject.Singleton;
@Singleton
public class LogCheckService {
private final CloudWatchLogsService logsService; // (1)
public LogCheckService(CloudWatchLogsService logsService) {
this.logsService = logsService;
}
public boolean contains(String logGroup, String text) {
return logsService.getLogEvents(logGroup)
.anyMatch(e -> e.message().contains(text)); // (2)
}
}
-
Inject
@CloudWatchLogsServiceinto the bean -
use `getLogEvents(String) obtain a stream of the latest log events
Testing
You can very easily create a Lambda function locally with Testcontainers and LocalStack using micronaut-amazon-awssdk-integration-testing module.
You need to add following dependencies into your build file to get the service connected to Localstack automatically:
testImplementation 'com.agorapulse:micronaut-amazon-awssdk-integration-testing:{project-version}'
<dependency>
<groupId>com.agorapulse</groupId>
<artifactId>micronaut-amazon-awssdk-integration-testing</artifactId>
<version>{project-version}</version>
</dependency>
DynamoDB
Amazon DynamoDB is a fully managed NoSQL database service that provides fast and predictable performance with seamless scalability.
This library provides two approaches to work with DynamoDB tables and entities:
-
High-level Declarative Services with
@Service -
Middle-level DynamoDB Service
Installation
annotationProcessor 'com.agorapulse:micronaut-amazon-awssdk-dynamodb-annotation-processor:{project-version}'
implementation 'com.agorapulse:micronaut-amazon-awssdk-dynamodb:{project-version}'
// for Kotlin Query DSL
implementation 'com.agorapulse:micronaut-amazon-awssdk-dynamodb-kotlin:{project-version}'
<dependency>
<groupId>com.agorapulse</groupId>
<artifactId>micronaut-amazon-awssdk-dynamodb</artifactId>
<version>{project-version}</version>
</dependency>
<!-- for Kotlin Query DSL -->
<dependency>
<groupId>com.agorapulse</groupId>
<artifactId>micronaut-amazon-awssdk-dynamodb</artifactId>
<version>{project-version}</version>
</dependency>
|
Note
|
For Kotlin use kapt instead of annotationProcessor configuration.
|
Entity Class
The entity class is a class which instances represent the items in DynamoDB.
For AWS SDK v2 you don’t need to use the native annotations but
you fill their counterparts in com.agorapulse.micronaut.amazon.awssdk.dynamodb.annotation package. The only requirements is that
the class needs to be annotated either with @Introspected or @DynamoDbBean. There is a replacement for @DynamoDBTypeConvertedJson annotation as well - you can use @ConvertedJson annotation instead.
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.annotation.PartitionKey
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.annotation.Projection
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.annotation.SecondaryPartitionKey
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.annotation.SecondarySortKey
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.annotation.SortKey
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.annotation.TimeToLive
import groovy.transform.Canonical
import groovy.transform.CompileStatic
import io.micronaut.core.annotation.Introspected
import software.amazon.awssdk.services.dynamodb.model.ProjectionType
import java.time.Instant
@Canonical
@Introspected // (1)
@CompileStatic
class DynamoDBEntity {
public static final String DATE_INDEX = 'date'
public static final String RANGE_INDEX = 'rangeIndex'
public static final String GLOBAL_INDEX = 'globalIndex'
@PartitionKey String parentId // (2)
@SortKey String id // (3)
@SecondarySortKey(indexNames = RANGE_INDEX) // (4)
String rangeIndex
@Projection(ProjectionType.ALL) // (5)
@SecondarySortKey(indexNames = DATE_INDEX)
Date date
Integer number = 0
@Projection(ProjectionType.ALL)
@SecondaryPartitionKey(indexNames = GLOBAL_INDEX) // (6)
String getGlobalIndex() {
return "$parentId:$id"
}
@TimeToLive('365d') // (7)
Instant created
}
-
The entity must be annotated with
@Introspectedor@DynamoDBBean -
The entity must provide the partition key using
@ParitionKeyannotation -
The sort key is optional
-
The secondary indices are generated automatically if not present
-
If the secondary indices are generated then the projection type must be specified (the default is KEYS_ONLY)
-
The secondary indices can be read only if you derive them from the other attributes
-
You can use
@TimeToLiveannotation to specify the attribute which will be used for TTL. The annotation used on class level, theInstant.now()will be used as a reference time.
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.annotation.ConvertedJson;
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.annotation.PartitionKey;
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.annotation.Projection;
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.annotation.SecondaryPartitionKey;
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.annotation.SecondarySortKey;
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.annotation.SortKey;
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.annotation.TimeToLive;
import io.micronaut.core.annotation.Introspected;
import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
import java.time.Instant;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@Introspected // (1)
public class DynamoDBEntity implements PlaybookAware {
public static final String DATE_INDEX = "date";
public static final String RANGE_INDEX = "rangeIndex";
public static final String GLOBAL_INDEX = "globalIndex";
private String parentId;
private String id;
private String rangeIndex;
private Date date;
private Integer number = 0;
private Instant created;
private Map<String, List<String>> mapProperty = new LinkedHashMap<>();
private Set<String> stringSetProperty = new HashSet<>();
private Options options = new Options();
@PartitionKey // (2)
public String getParentId() {
return parentId;
}
public void setParentId(String parentId) {
this.parentId = parentId;
}
@SortKey // (3)
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
@SecondarySortKey(indexNames = RANGE_INDEX) // (4)
public String getRangeIndex() {
return rangeIndex;
}
public void setRangeIndex(String rangeIndex) {
this.rangeIndex = rangeIndex;
}
@Projection(ProjectionType.ALL) // (5)
@SecondarySortKey(indexNames = DATE_INDEX)
public Date getDate() {
return date;
}
public void setDate(Date date) {
this.date = date;
}
public Integer getNumber() {
return number;
}
public void setNumber(Integer number) {
this.number = number;
}
@Projection(ProjectionType.ALL)
@SecondaryPartitionKey(indexNames = GLOBAL_INDEX) // (6)
public String getGlobalIndex() {
return parentId + ":" + id;
}
@TimeToLive("365d") // (7)
public Instant getCreated() {
return created;
}
public void setCreated(Instant created) {
this.created = created;
}
public Map<String, List<String>> getMapProperty() {
return mapProperty;
}
public void setMapProperty(Map<String, List<String>> mapProperty) {
this.mapProperty = mapProperty;
}
public Set<String> getStringSetProperty() {
return stringSetProperty;
}
public void setStringSetProperty(Set<String> stringSetProperty) {
this.stringSetProperty = stringSetProperty;
}
@ConvertedJson
public Options getOptions() {
return options;
}
public void setOptions(Options options) {
this.options = options;
}
//CHECKSTYLE:OFF
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DynamoDBEntity that = (DynamoDBEntity) o;
return Objects.equals(parentId, that.parentId) &&
Objects.equals(id, that.id) &&
Objects.equals(rangeIndex, that.rangeIndex) &&
Objects.equals(date, that.date) &&
Objects.equals(number, that.number) &&
Objects.equals(mapProperty, that.mapProperty) &&
Objects.equals(created, that.created) &&
Objects.equals(stringSetProperty, that.stringSetProperty);
}
@Override
public int hashCode() {
return Objects.hash(parentId, id, rangeIndex, date, number, mapProperty, stringSetProperty);
}
@Override
public String toString() {
return "DynamoDBEntity{" +
"parentId='" + parentId + '\'' +
", id='" + id + '\'' +
", rangeIndex='" + rangeIndex + '\'' +
", date=" + date +
", number=" + number +
", mapProperty=" + mapProperty +
", created=" + created +
", stringSetProperty=" + stringSetProperty +
'}';
}
//CHECKSTYLE:ON
}
-
The entity must be annotated with
@Introspectedor@DynamoDBBean -
The entity must provide the partition key using
@ParitionKeyannotation -
The sort key is optional
-
The secondary indices are generated automatically if not present
-
If the secondary indices are generated then the projection type must be specified (the default is KEYS_ONLY)
-
The secondary indices can be read only if you derive them from the other attributes
-
You can use
@TimeToLiveannotation to specify the attribute which will be used for TTL. The annotation used on class level, theInstant.now()will be used as a reference time.
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.annotation.*
import io.micronaut.core.annotation.Introspected
import software.amazon.awssdk.services.dynamodb.model.ProjectionType
import java.time.Instant
import java.util.*
@Introspected // (1)
class DynamoDBEntity {
@PartitionKey // (2)
var parentId: String? = null
@SortKey // (3)
var id: String? = null
@SecondarySortKey(indexNames = [RANGE_INDEX]) // (4)
var rangeIndex: String? = null
@SecondarySortKey(indexNames = [DATE_INDEX]) // (5)
@Projection(ProjectionType.ALL)
var date: Date? = null
var number = 0
var map: Map<String, List<String>>? = null
@SecondaryPartitionKey(indexNames = [GLOBAL_INDEX]) // (6)
@Projection(ProjectionType.ALL)
fun getGlobalIndex(): String {
return "$parentId:$id"
}
@TimeToLive("365d") // (7)
var created: Instant? = null
companion object {
const val DATE_INDEX = "date"
const val RANGE_INDEX = "rangeIndex"
const val GLOBAL_INDEX = "globalIndex"
}
}
-
The entity must be annotated with
@Introspectedor@DynamoDBBean -
The entity must provide the partition key using
@ParitionKeyannotation -
The sort key is optional
-
The secondary indices are generated automatically if not present
-
If the secondary indices are generated then the projection type must be specified (the default is KEYS_ONLY)
-
The secondary indices can be read only if you derive them from the other attributes
-
You can use
@TimeToLiveannotation to specify the attribute which will be used for TTL. The annotation used on class level, theInstant.now()will be used as a reference time.
|
Warning
|
The @TimeToLive annotation only adds read only attribute to the entity. You need to enable TTL on the table manually.
|
Immutable Entities
For immutable entities such as these declared as records, use @Introspected with builder configuration:
@Immutable
@Introspected(builder = @Introspected.IntrospectionBuilder(builderClass = Customer.Builder.class))
public record Customer(
@PartitionKey Long id,
@SortKey String name
) {
public static class Builder {
public Builder id(Long id) { /* ... */ }
public Builder name(String name) { /* ... */ }
public Customer build() { /* ... */ }
}
}
Records actually does not require @Immutable annotation because they are immutable by design. They only requires @Introspected annotation with builder configuration.
For some versions of Micronaut the builder is detected automatically so you may only need to use @Introspected annotation.
|
Note
|
Alternative: use @DynamoDbImmutable that is mapped to @Introspected annotation using the DynamoDB annotation processor.
|
Declarative Services with @Service
Declarative services are very similar to Grails GORM Data Services.
If you place Service annotation on the interface then methods matching predefined pattern will be automatically implemented.
|
Note
|
Use packages starting com.agorapulse.micronaut.amazon.awssdk.dynamodb.
|
Method Signatures
The following example shows many of available method signatures:
@Service(DynamoDBEntity)
interface DynamoDBItemDBService {
DynamoDBEntity get(String hash, String rangeKey)
DynamoDBEntity load(String hash, String rangeKey)
List<DynamoDBEntity> getAll(String hash, List<String> rangeKeys)
List<DynamoDBEntity> getAll(String hash, String... rangeKeys)
List<DynamoDBEntity> loadAll(String hash, List<String> rangeKeys)
List<DynamoDBEntity> loadAll(String hash, String... rangeKeys)
DynamoDBEntity save(DynamoDBEntity entity)
List<DynamoDBEntity> saveAll(DynamoDBEntity... entities)
List<DynamoDBEntity> saveAll(Iterable<DynamoDBEntity> entities)
int count(String hashKey)
int count(String hashKey, String rangeKey)
@Query({
query(DynamoDBEntity) {
hash hashKey
range {
eq DynamoDBEntity.RANGE_INDEX, rangeKey
}
}
})
int countByRangeIndex(String hashKey, String rangeKey)
@Query({
query(DynamoDBEntity) {
hash hashKey
range { between DynamoDBEntity.DATE_INDEX, after, before }
}
})
int countByDates(String hashKey, Date after, Date before)
Publisher<DynamoDBEntity> query(String hashKey)
Publisher<DynamoDBEntity> query(String hashKey, String rangeKey)
@Query({
query(DynamoDBEntity) {
hash hashKey
range {
eq DynamoDBEntity.RANGE_INDEX, rangeKey
}
only {
rangeIndex
}
}
})
Publisher<DynamoDBEntity> queryByRangeIndex(String hashKey, String rangeKey)
@Query({
query(DynamoDBEntity) {
hash hashKey
range { between DynamoDBEntity.DATE_INDEX, after, before }
}
})
List<DynamoDBEntity> queryByDates(String hashKey, Date after, Date before)
void delete(DynamoDBEntity entity)
void delete(String hashKey, String rangeKey)
@Query({
query(DynamoDBEntity) {
hash hashKey
range {
eq DynamoDBEntity.RANGE_INDEX, rangeKey
}
}
})
int deleteByRangeIndex(String hashKey, String rangeKey)
@Query({
query(DynamoDBEntity) {
hash hashKey
range { between DynamoDBEntity.DATE_INDEX, after, before }
}
})
int deleteByDates(String hashKey, Date after, Date before)
@Update({
update(DynamoDBEntity) {
hash hashKey
range rangeKey
add 'number', 1
returnUpdatedNew { number }
}
})
Number increment(String hashKey, String rangeKey)
@Update({
update(DynamoDBEntity) {
hash hashKey
range rangeKey
add 'number', -1
returnUpdatedNew { number }
}
})
Number decrement(String hashKey, String rangeKey)
@Scan({
scan(DynamoDBEntity) {
filter {
eq DynamoDBEntity.RANGE_INDEX, foo
}
}
})
Publisher<DynamoDBEntity> scanAllByRangeIndex(String foo)
}
@Service(DynamoDBEntity.class)
public interface DynamoDBEntityService {
class EqRangeIndex implements Function<Map<String, Object>, DetachedQuery> {
public DetachedQuery apply(Map<String, Object> arguments) {
return Builders.query(DynamoDBEntity.class)
.hash(arguments.get("hashKey"))
.range(r -> r.eq(DynamoDBEntity.RANGE_INDEX, arguments.get("rangeKey")));
}
}
class EqRangeProjection implements Function<Map<String, Object>, DetachedQuery> {
public DetachedQuery apply(Map<String, Object> arguments) {
return Builders.query(DynamoDBEntity.class)
.hash(arguments.get("hashKey"))
.range(r ->
r.eq(DynamoDBEntity.RANGE_INDEX, arguments.get("rangeKey"))
)
.only(DynamoDBEntity.RANGE_INDEX);
}
}
class EqRangeScan implements Function<Map<String, Object>, DetachedScan> {
public DetachedScan apply(Map<String, Object> arguments) {
return Builders.scan(DynamoDBEntity.class)
.filter(f -> f.eq(DynamoDBEntity.RANGE_INDEX, arguments.get("foo")));
}
}
class BetweenDateIndex implements Function<Map<String, Object>, DetachedQuery> {
public DetachedQuery apply(Map<String, Object> arguments) {
return Builders.query(DynamoDBEntity.class)
.hash(arguments.get("hashKey"))
.range(r -> r.between(DynamoDBEntity.DATE_INDEX, arguments.get("after"), arguments.get("before")));
}
}
class IncrementNumber implements Function<Map<String, Object>, DetachedUpdate> {
public DetachedUpdate apply(Map<String, Object> arguments) {
return Builders.update(DynamoDBEntity.class)
.hash(arguments.get("hashKey"))
.range(arguments.get("rangeKey"))
.add("number", 1)
.returnUpdatedNew(DynamoDBEntity::getNumber);
}
}
class DecrementNumber implements Function<Map<String, Object>, DetachedUpdate> {
public DetachedUpdate apply(Map<String, Object> arguments) {
return Builders.update(DynamoDBEntity.class)
.hash(arguments.get("hashKey"))
.range(arguments.get("rangeKey"))
.add("number", -1)
.returnUpdatedNew(DynamoDBEntity::getNumber);
}
}
DynamoDBEntity get(String hash, String rangeKey);
DynamoDBEntity load(String hash, String rangeKey);
List<DynamoDBEntity> getAll(String hash, List<String> rangeKeys);
List<DynamoDBEntity> getAll(String hash, String... rangeKeys);
List<DynamoDBEntity> loadAll(String hash, List<String> rangeKeys);
List<DynamoDBEntity> loadAll(String hash, String... rangeKeys);
DynamoDBEntity save(DynamoDBEntity entity);
List<DynamoDBEntity> saveAll(DynamoDBEntity... entities);
List<DynamoDBEntity> saveAll(Iterable<DynamoDBEntity> entities);
int count(String hashKey);
int count(String hashKey, String rangeKey);
@Query(EqRangeIndex.class)
int countByRangeIndex(String hashKey, String rangeKey);
@Query(BetweenDateIndex.class)
int countByDates(String hashKey, Date after, Date before);
Publisher<DynamoDBEntity> query(String hashKey);
Publisher<DynamoDBEntity> query(String hashKey, String rangeKey);
@Query(EqRangeProjection.class)
Publisher<DynamoDBEntity> queryByRangeIndex(String hashKey, String rangeKey);
@Query(BetweenDateIndex.class)
List<DynamoDBEntity> queryByDates(String hashKey, Date after, Date before);
void delete(DynamoDBEntity entity);
void delete(String hashKey, String rangeKey);
@Query(EqRangeIndex.class)
int deleteByRangeIndex(String hashKey, String rangeKey);
@Query(BetweenDateIndex.class)
int deleteByDates(String hashKey, Date after, Date before);
@Update(IncrementNumber.class)
Number increment(String hashKey, String rangeKey);
@Update(DecrementNumber.class)
Number decrement(String hashKey, String rangeKey);
@Scan(EqRangeScan.class)
Publisher<DynamoDBEntity> scanAllByRangeIndex(String foo);
}
@Service(value = DynamoDBEntity::class, tableName = "DynamoDBJava")
interface DynamoDBEntityService {
fun sget(@PartitionKey parentId: String, @SortKey id: String): DynamoDBEntity
fun load(@PartitionKey parentId: String, @SortKey id: String): DynamoDBEntity
fun getAll(hash: String, rangeKeys: List<String>): List<DynamoDBEntity>
fun getAll(hash: String, vararg rangeKeys: String): List<DynamoDBEntity>
fun loadAll(hash: String, rangeKeys: List<String>): List<DynamoDBEntity>
fun loadAll(hash: String, vararg rangeKeys: String): List<DynamoDBEntity>
fun save(entity: DynamoDBEntity): DynamoDBEntity
fun saveAll(vararg entities: DynamoDBEntity): List<DynamoDBEntity?>?
fun saveAll(entities: Iterable<DynamoDBEntity>): List<DynamoDBEntity>
fun count(hashKey: String): Int
fun count(hashKey: String, rangeKey: String): Int
class EqRangeIndex : QueryFunction<DynamoDBEntity>({ args: Map<String, Any> ->
partitionKey(args.get("hashKey"))
index(DynamoDBEntity.RANGE_INDEX)
sortKey {
eq(args["rangeKey"])
}
})
@Query(EqRangeIndex::class)
fun countByRangeIndex(hashKey: String, rangeKey: String): Int
class BetweenDateIndex : QueryFunction<DynamoDBEntity>({ args: Map<String, Any> ->
index(DynamoDBEntity.DATE_INDEX)
partitionKey(args["hashKey"])
sortKey { between(args["after"], args["before"]) }
page(1)
})
@Query(BetweenDateIndex::class)
fun countByDates(hashKey: String, after: Date, before: Date): Int
fun query(hashKey: String): Publisher<DynamoDBEntity>
fun query(hashKey: String, rangeKey: String): Publisher<DynamoDBEntity>
class EqRangeProjection : QueryFunction<DynamoDBEntity>({ args: Map<String, Any> ->
partitionKey(args["hashKey"])
index(DynamoDBEntity.RANGE_INDEX)
sortKey { eq(args["rangeKey"]) }
only(DynamoDBEntity.RANGE_INDEX)
})
@Query(EqRangeProjection::class)
fun queryByRangeIndex(hashKey: String, rangeKey: String): Publisher<DynamoDBEntity>
@Query(BetweenDateIndex::class)
fun queryByDates(hashKey: String, after: Date, before: Date): List<DynamoDBEntity>
class BetweenDateIndexScroll : QueryFunction<DynamoDBEntity>({ args: Map<String, Any> ->
index(DynamoDBEntity.DATE_INDEX)
partitionKey(args["hashKey"])
lastEvaluatedKey(args["lastEvaluatedKey"])
sortKey { between(args["after"], args["before"]) }
})
@Query(BetweenDateIndexScroll::class)
fun queryByDatesScroll(
hashKey: String,
after: Date,
before: Date,
lastEvaluatedKey: DynamoDBEntity
): List<DynamoDBEntity>
fun delete(entity: DynamoDBEntity)
fun delete(hashKey: String, rangeKey: String)
@Query(EqRangeIndex::class)
fun deleteByRangeIndex(hashKey: String, rangeKey: String): Int
@Query(BetweenDateIndex::class)
fun deleteByDates(hashKey: String, after: Date, before: Date): Int
class IncrementNumber : UpdateFunction<DynamoDBEntity, Int>({ args: Map<String, Any> ->
partitionKey(args["hashKey"])
sortKey(args["rangeKey"])
add("number", 1)
returnUpdatedNew(DynamoDBEntity::number)
})
@Update(IncrementNumber::class)
fun increment(hashKey: String, rangeKey: String): Number
class DecrementNumber : UpdateFunction<DynamoDBEntity, Int>({ args: Map<String, Any> ->
partitionKey(args["hashKey"])
sortKey(args["rangeKey"])
add("number", -1)
returnUpdatedNew(DynamoDBEntity::number)
})
@Update(DecrementNumber::class)
fun decrement(hashKey: String, rangeKey: String): Number
class EqRangeScan : ScanFunction<DynamoDBEntity>({ args: Map<String, Any> ->
filter {
eq(DynamoDBEntity.RANGE_INDEX, args["foo"])
}
})
@Scan(EqRangeScan::class)
fun scanAllByRangeIndex(foo: String): Publisher<DynamoDBEntity>
}
The following table summarizes the supported method signatures:
| Return Type | Method Name | Arguments | Example | Description |
|---|---|---|---|---|
|
|
An entity, array of entities or iterable of entities |
|
Persists the entity or a list of entities and returns self |
|
|
Hash key and optional range key, array of range keys or iterable of range keys annotated with |
|
Loads a single entity or a list of entities from the table. Range key is required for tables which defines the range key |
|
|
Hash key and optional range key annotated with |
|
Counts the items in the database. Beware, this can be very expensive operation in DynamoDB. See Advanced Queries for advanced use cases |
|
|
Entity or Hash key and optional range key annotated with |
|
Deletes an item which can be specified with hash key and optional range key. See Advanced Queries for advanced use cases |
|
|
Entity or Hash key and optional range key annotated with |
|
Queries for all entities with given hash key and/or range key. |
(contextual) |
(none of above) |
Any arguments which will be translated into arguments map |
(see below) |
Query, scan or update. See Advanced Queries, Scanning and Updates for advanced use cases |
|
Tip
|
Calling any of the declarative service method will create the DynamoDB table automatically if it does not exist already. |
Advanced Queries
DynamoDB integration does not support feature known as dynamic finders.
Instead you can annotate any method with @Query annotation to make it
-
counting method if its name begins with
count -
batch delete method if its name begins with
delete -
otherwise an advanced query method
For DynamoDB v2 you can use @Index, @Consistent @Descending, @Filter, @Limit, @Page and @LastEvaluatedKey annotations to further customize the query without the need of creating a custom query class and use it with @Query annotation.
@Consistent // (1)
@Descending // (2)
@Index(DynamoDBEntity.DATE_INDEX) // (3)
List<DynamoDBEntity> findAllByNumber(
@PartitionKey String parentId,
Integer number // (4)
);
int countAllByOptionalNumber(
@PartitionKey String parentId,
@Nullable Integer number // (5)
);
List<DynamoDBEntity> findAllByNumberGreaterThan(
@PartitionKey String parentId,
@Filter( // (6)
value = Filter.Operator.GT,
name = "number" // (7)
) Integer theNumber
);
@Index(DynamoDBEntity.RANGE_INDEX)
List<DynamoDBEntity> findAllByRangeBeginsWith(
@PartitionKey String parentId,
@SortKey // (8)
@Filter(
value = Filter.Operator.BEGINS_WITH, // (9)
name = "rangeIndex" // (10)
)
String rangeIndexPrefix
);
@Index(DynamoDBEntity.RANGE_INDEX)
int deleteAllByRangeBeginsWith( // (11)
@PartitionKey String parentId,
@SortKey
@Filter(
value = Filter.Operator.BEGINS_WITH,
name = "rangeIndex"
)
String rangeIndexPrefix
);
List<DynamoDBEntity> findAllByNumberNot(
@PartitionKey String parentId,
@Filter(Filter.Operator.NE) Integer number,
@LastEvaluatedKey DynamoDBEntity lastEvaluatedKey, // (12)
@Page int page, // (13)
@Limit int limit // (14)
);
-
You can use
@Consistentannotation to make the query consistent -
You can use
@Descendingannotation to sort the results in descending order -
You can use
@Indexannotation to specify the index to use to query the table -
Any parameters that are not partition or sort keys are used as filter conditions
-
If the parameter is annotated with
@Nullablethen the filter condition is only applied when the parameter is notnull -
You can use
@Filterannotation to specify the filter condition operator -
You can use
@Filterannotation to specify the attribute name -
You can combine
@SortKeyand@Filterannotations to specify the sort key condition -
Only`EQ`,
LE,LT,GE,GT,BETWEENandBEGINS_WITHoperators are supported -
You can also use
@Filterannotation to specify the sort key name -
If you use any customization annotations on delete method, then the method will be used as batch delete method
-
You can pass the last evaluated key to the query. It must be the same type as the entity type.
-
You can use
@Pageannotation to give the query the pagination hint -
You can use
@Limitannotation to specify the maximum number of items to return
|
Tip
|
The operator EQ is used by default if @Filter annotation is not present. This makes it special and the service introduction tries to find the appropriate operation based on the actual value. For collections or arrays, inList operation is actually used. If the actual value is null then isNull operation is used. For other types, eq operation is used. For sort keys, eq operation is always used.
|
For more complex queries you can use @Query annotation with a class implementing QueryFunction interface:
import static com.agorapulse.micronaut.amazon.awssdk.dynamodb.groovy.GroovyBuilders.* // (1)
@Service(DynamoDBEntity) // (2)
interface DynamoDBItemDBService {
@Query({ // (3)
query(DynamoDBEntity) {
partitionKey hashKey // (4)
index DynamoDBEntity.RANGE_INDEX
range {
eq rangeKey // (5)
}
only { // (6)
rangeIndex // (7)
}
}
})
Publisher<DynamoDBEntity> queryByRangeIndex(String hashKey, String rangeKey) // (8)
}
-
GroovyBuildersclass provides all necessary factory methods and keywords -
Annotate an interface with
@Servicewith the type of the entity as itsvalue -
@Queryannotation accepts a closure which returns a query builder (see QueryBuilder for full reference) -
Specify a partition key with
partitionKeymethod and method’shashKeyargument -
Specify some range key criteria with the method’s
rangeKeyargument (see RangeConditionCollector for full reference) -
You can limit which properties are returned from the query
-
Only
rangeIndexproperty will be populated in the entities returned -
The arguments have no special meaning but you can use them in the query. The method must return either
Publisher,StreamorIterableof entities.
@Service(value = DynamoDBEntity.class, tableName = "${test.table.name:DynamoDBJava}") // (1)
public interface DynamoDBEntityService {
class EqRangeProjection implements QueryFunction<DynamoDBEntity> { // (2)
public QueryBuilder<DynamoDBEntity> query(Map<String, Object> arguments) {
return builder().partitionKey(arguments.get("hashKey")) // (3)
.index(DynamoDBEntity.RANGE_INDEX)
.sortKey(r ->
r.eq(arguments.get("rangeKey")) // (4)
)
.only(DynamoDBEntity.RANGE_INDEX); // (5)
}
}
@Query(EqRangeProjection.class) // (6)
Publisher<DynamoDBEntity> queryByRangeIndex(String hashKey, String rangeKey); // (7)
}
-
Annotate an interface with
@Servicewith the type of the entity as itsvalue -
Define class which implements
QueryFunction -
Specify a partition key with
partitionKeymethod and method’shashKeyargument -
Specify some range key criteria with the method’s
rangeKeyargument (see RangeConditionCollector for full reference) -
Only
rangeIndexproperty will be populated in the entities returned -
@Queryannotation accepts a class which implementsFunction<Map<String, Object>, DetachedQuery> -
The arguments have no special meaning but you can use them in the query using
argumentsmap. The method must return eitherPublisher,StreamorListof entities.
@Service(value = DynamoDBEntity::class, tableName = "DynamoDBJava") // (1)
interface DynamoDBEntityService {
class EqRangeProjection : QueryFunction<DynamoDBEntity>({ args: Map<String, Any> -> // (2)
partitionKey(args["hashKey"]) // (3)
index(DynamoDBEntity.RANGE_INDEX)
sortKey { eq(args["rangeKey"]) } // (4)
only(DynamoDBEntity.RANGE_INDEX) // (5)
})
@Query(EqRangeProjection::class) // (6)
fun queryByRangeIndex(hashKey: String, rangeKey: String): Publisher<DynamoDBEntity> // (7)
}
-
Annotate an interface with
@Servicewith the type of the entity as itsvalue -
Create class that extends
com.agorapulse.micronaut.amazon.awssdk.dynamodb.kotlin.QueryFunctionand use the DSL constructor -
Specify a partition key with
partitionKeymethod and method’shashKeyargument -
Specify some range key criteria with the method’s
rangeKeyargument (see RangeConditionCollector for full reference) -
Only
rangeIndexproperty will be populated in the entities returned -
@Queryannotation accepts a class which implementsFunction<Map<String, Object>, DetachedQuery> -
The arguments have no special meaning but you can use them in the query using
argumentsmap. The method must return eitherPublisher,StreamorListof entities.
Scanning
DynamoDB integration does not support feature known as dynamic finders.
If you need to scan the table by unindexed attributes you can annotate any method with @Scan annotation to make it
-
counting method if its name begins with
count -
otherwise an advanced query method
import static com.agorapulse.micronaut.amazon.awssdk.dynamodb.groovy.GroovyBuilders.* // (1)
@Service(DynamoDBEntity) // (2)
interface DynamoDBItemDBService {
@Scan({ // (3)
scan(DynamoDBEntity) {
filter {
eq DynamoDBEntity.RANGE_INDEX, foo // (4)
}
only {
rangeIndex
}
}
})
Publisher<DynamoDBEntity> scanAllByRangeIndex(String foo) // (5)
}
-
GroovyBuildersclass provides all necessary factory methods and keywords -
Annotate an interface with
@Servicewith the type of the entity as itsvalue -
@Scanannotation accepts a closure which returns a scan builder (see ScanBuilder for full reference) -
Specify some filter criteria with the method’s
fooargument (see RangeConditionCollector for full reference) -
The arguments have no special meaning but you can use them in the scan definition. The method must return either
Publisher,StreamorListof entities.
@Service(value = DynamoDBEntity.class, tableName = "${test.table.name:DynamoDBJava}") // (1)
public interface DynamoDBEntityService {
class EqRangeScan implements ScanFunction<DynamoDBEntity> { // (2)
@Override
public ScanBuilder<DynamoDBEntity> scan(Map<String, Object> args) {
return builder().filter(f ->
f.eq(DynamoDBEntity.RANGE_INDEX, args.get("foo")) // (3)
);
}
}
@Scan(EqRangeScan.class) // (4)
Publisher<DynamoDBEntity> scanAllByRangeIndex(String foo); // (5)
}
-
Annotate an interface with
@Servicewith the type of the entity as itsvalue -
Define class which implements
ScanFunction -
Specify some filter criteria with the method’s
fooargument (see RangeConditionCollector for full reference) -
@Scanannotation accepts a class which implementsFunction<Map<String, Object>, DetachedScan> -
The arguments have no special meaning but you can use them in the scan definition. The method must return either
Publisher,StreamorListof entities.
@Service(value = DynamoDBEntity::class, tableName = "DynamoDBJava") // (1)
interface DynamoDBEntityService {
class EqRangeScan : ScanFunction<DynamoDBEntity>({ args: Map<String, Any> -> // (2)
filter {
eq(DynamoDBEntity.RANGE_INDEX, args["foo"]) // (3)
}
})
@Scan(EqRangeScan::class) // (4)
fun scanAllByRangeIndex(foo: String): Publisher<DynamoDBEntity> // (5)
}
-
Annotate an interface with
@Servicewith the type of the entity as itsvalue -
Define class which implements
ScanFunction -
Specify some filter criteria with the method’s
fooargument (see RangeConditionCollector for full reference) -
@Scanannotation accepts a class which implementsFunction<Map<String, Object>, DetachedScan> -
The arguments have no special meaning but you can use them in the scan definition. The method must return either
Publisher,StreamorListof entities.
Updates
Declarative services allows you to execute fine-grained updates. Any method annotated with @Update will perform the update in the DynamoDB table.
import static com.agorapulse.micronaut.amazon.awssdk.dynamodb.groovy.GroovyBuilders.* // (1)
@Service(DynamoDBEntity) // (2)
interface DynamoDBItemDBService {
@Update({ // (3)
update(DynamoDBEntity) {
partitionKey hashKey // (4)
sortKey rangeKey // (5)
add 'number', 1 // (6)
returnUpdatedNew { number } // (7)
}
})
Number increment(String hashKey, String rangeKey) // (8)
}
-
Buildersclass provides all necessary factory methods and keywords -
Annotate an interface with
@Servicewith the type of the entity as itsvalue -
@Updateannotation accepts a closure which returns an update builder (see UpdateBuilder for full reference) -
Specify a partition key with
partitionKeymethod and method’shashKeyargument -
Specify a sort key with
sortKeymethod and method’srangeKeyargument -
Specify update operation - increment
numberattribute (see UpdateBuilder for full reference). You may have multiple update operations. -
Specify what should be returned from the method (see UpdateBuilder for full reference).
-
The arguments have no special meaning but you can use them in the scan definition. The method’s return value depends on the value returned from
returnUpdatedNewmapper.
@Service(value = DynamoDBEntity.class, tableName = "${test.table.name:DynamoDBJava}") // (1)
public interface DynamoDBEntityService {
class IncrementNumber implements UpdateFunction<DynamoDBEntity, Integer> { // (2)
@Override
public UpdateBuilder<DynamoDBEntity, Integer> update(Map<String, Object> args) {
return builder().partitionKey(args.get("hashKey")) // (3)
.sortKey(args.get("rangeKey")) // (4)
.add("number", 1) // (5)
.returnUpdatedNew(DynamoDBEntity::getNumber); // (6)
}
}
@Update(IncrementNumber.class) // (7)
Number increment(String hashKey, String rangeKey); // (8)
}
-
Annotate an interface with
@Servicewith the type of the entity as itsvalue -
Define class which implements
Function<Map<String, Object>, DetachedUpdate> -
Specify a partition key with
partitionKeymethod and method’shashKeyargument -
Specify a sort key with
sortKeymethod and method’srangeKeyargument -
Specify update operation - increment
numberattribute (see UpdateBuilder for full reference). You may have multiple update operations. -
Specify what should be returned from the method (see UpdateBuilder for full reference).
-
@Updateannotation accepts a class which implementsFunction<Map<String, Object>, DetachedUpdate> -
The arguments have no special meaning but you can use them in the scan definition. The method’s return value depends on the value returned from
returnUpdatedNewmapper.
@Service(value = DynamoDBEntity::class, tableName = "DynamoDBJava") // (1)
interface DynamoDBEntityService {
class IncrementNumber : UpdateFunction<DynamoDBEntity, Int>({ args: Map<String, Any> ->// (2)
partitionKey(args["hashKey"]) // (3)
sortKey(args["rangeKey"]) // (4)
add("number", 1) // (5)
returnUpdatedNew(DynamoDBEntity::number) // (6)
})
@Update(IncrementNumber::class) // (7)
fun increment(hashKey: String, rangeKey: String): Number // (8)
}
-
Annotate an interface with
@Servicewith the type of the entity as itsvalue -
Define class which extends
com.agorapulse.micronaut.amazon.awssdk.dynamodb.kotlin.UpdateFunctionand use the DSL constructor -
Specify a partition key with
partitionKeymethod and method’shashKeyargument -
Specify a sort key with
sortKeymethod and method’srangeKeyargument -
Specify update operation - increment
numberattribute (see UpdateBuilder for full reference). You may have multiple update operations. -
Specify what should be returned from the method (see UpdateBuilder for full reference).
-
@Updateannotation accepts a class which implementsFunction<Map<String, Object>, DetachedUpdate> -
The arguments have no special meaning, but you can use them in the scan definition. The method’s return value depends on the value returned from
returnUpdatedNewmapper.
DynamoDB Service
DynamoDBService provides middle-level API for working with DynamoDB tables and entities. You can obtain instance of DynamoDBService from
DynamoDBServiceProvider which can be injected to any bean.
DynamoDBServiceProvider provider = context.getBean(DynamoDBServiceProvider)
DynamoDBService<DynamoDBEntity> service = provider.findOrCreate(DynamoDBEntity) // (1)
service.createTable() // (2)
service.save(new DynamoDBEntity( // (3)
parentId: '1',
id: '1',
rangeIndex: 'foo',
number: 1,
date: Date.from(REFERENCE_DATE)
))
service.get('1', '1') // (4)
service.query { // (5)
partitionKey '1'
index DynamoDBEntity.DATE_INDEX
range { between from, to }
}
service.update { // (6)
partitionKey '1001'
sortKey '1'
add 'number', 13
returns allNew
}
service.delete('1001', '1') // (7)
-
Obtain the instance of
DynamoDBServicefromDynamoDBServiceProvider(provider can be injected) -
Create table for the entity
-
Save an entity
-
Load the entity by its hash and range keys
-
Query the table for entities with given range index value
-
Increment a property for entity specified by hash and range keys
-
Delete an entity
DynamoDBServiceProvider provider = context.getBean(DynamoDBServiceProvider);
DynamoDBService<DynamoDBEntity> service = provider.findOrCreate(DynamoDBEntity.class); // (1)
service.createTable(); // (2)
DynamoDBEntity entity = new DynamoDBEntity();
entity.setParentId("1");
entity.setId("1");
entity.setRangeIndex("foo");
entity.setNumber(1);
entity.setDate(new Date());
service.save(entity); // (3)
service.get("1", "1"); // (4)
service.query(query -> // (5)
query.partitionKey("1")
.index(DynamoDBEntity.DATE_INDEX)
.range(r -> r.between(from, to))
);
service.update(update -> // (6)
update.partitionKey("1001")
.sortKey("1")
.add("number", 13)
.returns(ReturnValue.ALL_NEW)
);
service.delete("1001", "1"); // (7)
-
Obtain the instance of
DynamoDBServicefromDynamoDBServiceProvider(provider can be injected) -
Create table for the entity
-
Save an entity
-
Load the entity by its hash and range keys
-
Query the table for entities with given range index value
-
Increment a property for entity specified by hash and range keys
-
Delete an entity
Please see DynamoDBService for full reference.
Testing
You can very easily mock any of the interfaces and declarative services but if you need close-to-production
DynamoDB integration works well with Testcontainers and LocalStack using micronaut-amazon-awssdk-integration-testing module.
You need to add following dependencies into your build file:
testImplementation 'com.agorapulse:micronaut-amazon-awssdk-integration-testing:{project-version}'
<dependency>
<groupId>com.agorapulse</groupId>
<artifactId>micronaut-amazon-awssdk-dynamodb</artifactId>
<version>{project-version}</version>
</dependency>
Then you can set up your tests like this:
@MicronautTest // (1)
class DefaultDynamoDBServiceSpec extends Specification {
@Inject DynamoDBServiceProvider dynamoDBServiceProvider // (2)
DynamoDbService<DynamoDBEntity> dbs
void setup() {
dbs = dynamoDBServiceProvider.findOrCreate(DynamoDBEntity) // (3)
}
// test methods
}
-
Annotate the specification with
@MicronautTestto let Micronaut handle the application context lifecycle -
Use
@Injectto let Micronaut inject the beans into your tests -
Create the low-level service using
DynamoDBServiceProvider
@MicronautTest // (1)
@Property(name = "test.table.name", value = "DynamoDBDeclarativeJava")
public class DeclarativeServiceTest {
@Inject DynamoDBServiceProvider provider // (2)
@Test
public void testSomething() {
DynamoDBService<DynamoDBEntity> s = provider.findOrCreate(DynamoDBEntity.class);// (3)
// test code
}
}
-
Annotate the specification with
@MicronautTestto let Micronaut handle the application context lifecycle -
Use
@Injectto let Micronaut inject the beans into your tests -
Create the low-level service using
DynamoDBServiceProvider
|
Tip
|
You can save time creating the new Localstack container by sharing it between the tests. application-test.yml
Alternatively you can use different container than Localstack, for example Amazon DynamoDB Local container. application-test.yml
You can also use MiniStack as a drop-in alternative to LocalStack by enabling the application-test.yml
The same per-service container override mechanism is available via |
Data Loader
You can data exported as CSV from the AWS DynamoDB Console:
These data can be loaded using DynamoDbLoader bean. The following test shows how to load the data from the CSV file into your DynamoDB tables. Use the integration testing library as described above to have the automatic local DynamoDB for testing.
@MicronautTest // (1)
@Property(name = "aws.dynamodb.create-tables", value = "true") // (2)
class DynamoDbLoaderTest {
private static final Fixt FIXT = Fixt.create(DynamoDbLoaderTest.class); // (3)
@Inject DynamoDbLoader loader;
@Inject DynamoDBServiceProvider provider;
@AfterEach
void cleanUp() {
var service = provider.findOrCreate(TestEntity.class);
service.deleteAll(service.findAll("1"));
}
@Test
void loadIntoDynamoDb() {
Map<Class<?>, Iterable<String>> mappings = Map.of(
TestEntity.class, List.of("test-entity.csv") // (4)
);
loader.loadAll(FIXT::readText, mappings); // (5)
TestEntity fromDb = provider.findOrCreate(TestEntity.class).get("1", null); // (6)
TestEntity referenceEntity = getReferenceEntity();
assertEquals(referenceEntity, fromDb);
}
@Test
void skipLoadingTo() {
List<TestEntity> loaded = loader.readAll( // (7)
FIXT::readText, TestEntity.class, List.of("test-entity.csv")
).toList();
assertEquals(1, loaded.size());
TestEntity fromDb = provider.findOrCreate(TestEntity.class).get("1", null);
assertNull(fromDb);
}
private static TestEntity getReferenceEntity() {
TestEntity referenceEntity = new TestEntity();
referenceEntity.setId("1");
referenceEntity.setName("test-one");
referenceEntity.setActive(true);
referenceEntity.setCreated(Instant.parse("2019-01-01T00:00:00Z"));
referenceEntity.setCount(2);
referenceEntity.setValue(3.4);
referenceEntity.setData(Map.of("string", "text"));
referenceEntity.setTags(Set.of("one", "two", "three"));
return referenceEntity;
}
}
-
Test must be annotated wiht
@MicronautTestto allow loader injection -
Property
aws.dynamodb.create-tablesguarantees that the tables are created automatically -
Fixt is very convenient way how to keep your test fixtures organized and can be easily used with the loader
-
The mapping is specified as map with the entity types as keys and iterables of the CSV files as values
-
Load the data from the CSV file
-
After loading the data, the entities according to the rows in the CSV file are available in the database
-
Alternatively you can load the data without saving them to the DynamoDB using the
readmethods
This is how the files are laid out for this particular example:

Kinesis
Amazon Kinesis makes it easy to collect, process, and analyze real-time, streaming data so you can get timely insights and react quickly to new information.
This library provides three approaches to work with Kinesis streams:
-
High-level Publishing with
@KinesisClient -
High-level Listening with
@KinesisListener -
Middle-level Kinesis Service
Installation
// for Kinesis client
annotationProcessor 'com.agorapulse:micronaut-amazon-awssdk-kinesis-annotation-processor:{project-version}'
implementation 'com.agorapulse:micronaut-amazon-awssdk-kinesis:{project-version}'
// for Kinesis listener
implementation 'com.agorapulse:micronaut-amazon-awssdk-kinesis-worker:{project-version}'
<!-- for Kinesis client -->
<dependency>
<groupId>com.agorapulse</groupId>
<artifactId>micronaut-amazon-awssdk-kinesis</artifactId>
<version>{project-version}</version>
</dependency>
<!-- for Kinesis listener -->
<dependency>
<groupId>com.agorapulse</groupId>
<artifactId>micronaut-amazon-awssdk-kinesis-worker</artifactId>
<version>{project-version}</version>
</dependency>
|
Note
|
For Kotlin use kapt instead of annotationProcessor configuration.
|
Configuration
You need no configuration at all but some of the configuration may be useful for you.
aws:
kinesis:
stream: DefaultStream # (1)
streams:
other: # (2)
stream: OtherStream # (3)
listener:
application: my-application # (4)
worker-id: my-worker-id # (5)
stream: ListenerStream # (6)
listeners:
other: # (7)
stream: OtherListenerStream
-
You can specify the default stream for
KinesisServiceand@KinesisClient -
You can define multiple configurations
-
Each of the configuration can be access using
@Named('other') KinesisServicequalifier or you can define the configuration asvalueof@KinesisClient('other') -
For Kinesis listeners you should provide application name which defaults to
micronaut.application.nameif not present -
You can also provide the worker ID of the Kinesis worker
-
This is the default stream to listen
-
You can listen to multiple Kinesis streams by declaring the name of the configuration in the annotation such as
@KinesisListener("other")
Publishing with @KinesisClient
If you place KinesisClient annotation on the interface then methods
matching predefined pattern will be automatically implemented. Every method of KinesisClient puts new records into
the stream.
Use packages starting com.agorapulse.micronaut.amazon.awssdk.kinesis.
The following example shows many of available method signatures for publishing records:
@KinesisClient // (1)
interface DefaultClient {
void putRecordString(String record); // (2)
PutRecordResponse putRecord(String partitionKey, String record); // (3)
void putRecordAnno(@PartitionKey String id, String record); // (4)
void putRecord(String partitionKey, String record, String sequenceNumber); // (5)
void putRecordAnno( // (6)
@PartitionKey String id,
String record,
@SequenceNumber String sqn
);
void putRecordAnnoNumbers( // (7)
@PartitionKey Long id,
String record,
@SequenceNumber int sequenceNumber
);
}
-
@KinesisClientannotation makes the interface a Kinesis client -
You can put String into the stream with generated UUID as partition key
-
You can user predefined partition key
-
If the name of the argument does not contain word
paritionthen@PartitionKeyannotation must to be used -
You can put String into the stream with predefined partition key and a sequence number
-
If the name of the sequence number argument does not contain word
sequencethen@SequenceKeyannotation must to be used -
The type of parition key and sequence number does not matter as the value will be always converted to string
@KinesisClient // (1)
interface DefaultClient {
void putRecordBytes(byte[] record); // (2)
void putRecordDataByteArray(@PartitionKey String id, byte[] value); // (3)
PutRecordsResponse putRecords(Iterable<PutRecordsRequestEntry> entries); // (4)
PutRecordsResponse putRecords(PutRecordsRequestEntry... entries); // (5)
PutRecordsResponse putRecord(PutRecordsRequestEntry entry); // (6)
}
-
@KinesisClientannotation makes the interface a Kinesis client -
You can put byte array into the stream, UUID as partition key will be generated
-
If the name of the argument does not contain word
paritionthen@PartitionKeyannotation must to be used -
You can put several records wrapped into iterable of
PutRecordsRequestEntry -
You can put several records wrapped into array of
PutRecordsRequestEntry -
If the single argument is of type
PutRecordsRequestEntrythenPutRecordsResultobject is returned from the method despite only single record has been published
@KinesisClient // (1)
interface DefaultClient {
void putRecordObject(Pogo pogo); // (2)
PutRecordsResponse putRecordObjects(Pogo... pogo); // (3)
PutRecordsResponse putRecordObjects(Iterable<Pogo> pogo); // (4)
void putRecordDataObject(@PartitionKey String id, Pogo value); // (5)
}
-
@KinesisClientannotation makes the interface a Kinesis client -
You can put any object into the stream, UUID as partition key will be generated, the objects will be serialized to JSON
-
You can put array of any objects into the stream, UUID as partition key will be generated for each record, each object will be serialized to JSON
-
You can put iterable of any objects into the stream, UUID as partition key will be generated for each record, each object will be serialized to JSON
-
You can put any object into the stream with predefined partition key, if the name of the argument does not contain word
paritionthen@PartitionKeyannotation must to be used
@KinesisClient // (1)
interface DefaultClient {
PutRecordResponse putEvent(MyEvent event); // (2)
PutRecordsResponse putEventsIterable(Iterable<MyEvent> events); // (3)
void putEventsArrayNoReturn(MyEvent... events); // (4)
@Stream("OtherStream") PutRecordResponse putEventToStream(MyEvent event); // (5)
}
-
@KinesisClientannotation makes the interface a Kinesis client -
You can put object implementing
Eventinto the stream -
You can put iterable of objects implementing
Eventinto the stream -
You can put array of objects implementing
Eventinto the stream -
Without any parameters
@KinesisClientpublishes to default stream of the default configuration but you can change it using@Streamannotation on the method
|
Note
|
The return value of the method is PutRecordResponse or PutRecordsResponse, but it can be always omitted and replaced with void.
|
By default, KinesisClient publishes records into the default stream defined by aws.kinesis.stream property.
You can switch to different configuration by changing the value of the annotation such as @KinesisClient("other") or
by setting the stream property of the annotation such as @KinesisClient(stream = "MyStream"). You can change stream
used by particular method using @Stream annotation as mentioned above.
Listening with @KinesisListener
|
Tip
|
Before you start implementing your service with @KinesisListener you may consider implementing a Lambda function instead.
|
If you place KinesisListener annotation on the method in the bean then the method will be triggered with the new records in the stream.
@Singleton // (1)
public class KinesisListenerTester {
@KinesisListener
public void listenString(String string) { // (2)
String message = "EXECUTED: listenString(" + string + ")";
logExecution(message);
}
@KinesisListener
public void listenRecord(KinesisClientRecord record) { // (3)
logExecution("EXECUTED: listenRecord(" + record + ")");
}
@KinesisListener
public void listenStringRecord(String string, KinesisClientRecord record) { // (4)
logExecution("EXECUTED: listenStringRecord(" + string + ", " + record + ")");
}
@KinesisListener
public void listenObject(MyEvent event) { // (5)
logExecution("EXECUTED: listenObject(" + event + ")");
}
@KinesisListener
public void listenObjectRecord(MyEvent event, KinesisClientRecord record) { // (6)
logExecution("EXECUTED: listenObjectRecord(" + event + ", " + record + ")");
}
@KinesisListener
public void listenPogoRecord(Pogo event) { // (7)
logExecution("EXECUTED: listenPogoRecord(" + event + ")");
}
public List<String> getExecutions() {
return executions;
}
public void setExecutions(List<String> executions) {
this.executions = executions;
}
private void logExecution(String message) {
executions.add(message);
System.err.println(message);
}
private List<String> executions = new CopyOnWriteArrayList<>();
}
-
@KinesisListenermethod must be declared in a bean, e.g.@Singleton -
You can listen to just plain string records
-
You can listen to
KinesisClientRecordobjects -
You can listen to both string and
KinesisClientRecordobjects -
You can listen to objects implementing
Eventinterface -
You can listen to both
EventandKinesisClientRecordobjects -
You can listen to any object as long as it can be unmarshalled from the record payload
You can listen to different than default configuration by changing the value of the annotation such as @KinesisListener("other").
Multiple methods in a single application can listen to the same configuration (stream). In that case, every method will be executed with the incoming payload.
Kinesis Service
KinesisService provides middle-level API for creating, describing, and deleting streams. You can manage shards as well as read records
from particular shards.
Instance of KinesisService is created for the default Kinesis configuration and each stream configuration in aws.kinesis.streams map.
You should always use @Named qualifier when injecting KinesisService if you have more than one stream configuration present, e.g. @Named("other") KinesisService otherService.
Please, see KinesisService for the full reference.
Testing
You can very easily mock any of the interfaces and declarative services but if you need close-to-production
DynamoDB integration works well with Testcontainers and LocalStack using micronaut-amazon-awssdk-integration-testing module.
You need to add following dependencies into your build file:
testImplementation 'com.agorapulse:micronaut-amazon-awssdk-integration-testing:{project-version}'
<dependency>
<groupId>com.agorapulse</groupId>
<artifactId>micronaut-amazon-awssdk-dynamodb</artifactId>
<version>{project-version}</version>
</dependency>
Then you can set up your tests like this:
@MicronautTest // (1)
class KinesisDemoSpec extends Specification {
@Inject KinesisService service // (2)
@Retry
void 'new default stream'() {
when:
CreateStreamResponse stream = service.createStream('NewStream')
then:
stream
}
}
-
Annotate the specification with
@MicronautTestto let Micronaut handle the application context lifecycle -
Use
@Injectto let Micronaut inject the beans into your tests
@MicronautTest // (1)
public class KinesisJavaDemoTest {
@Inject KinesisService service; // (2)
@Test
public void testJavaService() {
assertNotNull(service.createStream("TestStream"));
}
}
-
Annotate the specification with
@MicronautTestto let Micronaut handle the application context lifecycle -
Use
@Injectto let Micronaut inject the beans into your tests
|
Tip
|
You can save time creating the new Localstack container by sharing it between the tests. application-test.yml
|
Lambda
With AWS Lambda, you can run code without provisioning or managing servers. You pay only for the compute time that you consume—there’s no charge when your code isn’t running.
This library provides support for function invocation using @LambdaClient introduction.
Installation
implementation 'com.agorapulse:micronaut-amazon-awssdk-lambda:{project-version}'
<dependency>
<groupId>com.agorapulse</groupId>
<artifactId>micronaut-amazon-awssdk-lambda</artifactId>
<version>{project-version}</version>
</dependency>
Configuration
You can configure the function name in the configuration
aws:
lambda:
functions:
hello: # (1)
function: HelloFunction # (2)
-
The name of the configuration to be used with the interface such as
@LambdaClient("hello") -
The name of the function to execute
Invocation using @LambdaClient
If you place LambdaClient annotation on the interface then any of its methods will invoke the function. Methods that return void will be invoked with Event invocation type - the client won’t wait until the invocation is finished.
Use packages starting com.agorapulse.micronaut.amazon.awssdk.lambda.
The following example shows typical Lambda client interface:
package com.agorapulse.micronaut.amazon.awssdk.lambda;
import com.agorapulse.micronaut.amazon.awssdk.lambda.annotation.LambdaClient;
@LambdaClient("hello") // (1)
public interface HelloConfigurationClient {
HelloResponse hello(String name); // (2)
}
-
This
@LambdaClientwill be invoked against function defined inaws.lambda.functions.hello.functionproperty -
The function will be invoked with an object containing the property
namewith the actual argument
package com.agorapulse.micronaut.amazon.awssdk.lambda;
import com.agorapulse.micronaut.amazon.awssdk.lambda.annotation.Body;
import com.agorapulse.micronaut.amazon.awssdk.lambda.annotation.LambdaClient;
@LambdaClient(function = "HelloFunction") // (1)
public interface HelloBodyClient {
HelloResponse hello(@Body HelloRequest request); // (2)
}
-
You can specify the name of the function directly in the annotation using
functionproperty -
You can use
@Bodyannotation to use the whole argument object as a payload of the function
Testing
You can very easily create a Lambda function locally with Testcontainers and LocalStack using micronaut-amazon-awssdk-integration-testing module.
You need to add following dependencies into your build file:
testImplementation 'com.agorapulse:micronaut-amazon-awssdk-integration-testing:{project-version}'
<dependency>
<groupId>com.agorapulse</groupId>
<artifactId>micronaut-amazon-awssdk-dynamodb</artifactId>
<version>{project-version}</version>
</dependency>
Then you can set up your tests like this:
package com.agorapulse.micronaut.amazon.awssdk.lambda;
import com.agorapulse.testing.fixt.Fixt;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.zeroturnaround.zip.ZipUtil;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.lambda.LambdaClient;
import software.amazon.awssdk.services.lambda.model.Runtime;
import jakarta.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.util.Collections;
@MicronautTest // (1)
public class HelloClientTest {
private static final Fixt FIXT = Fixt.create(AbstractClientSpec.class); // (2)
@TempDir private File tmp;
@Inject private LambdaClient lambda; // (3)
@Inject private HelloClient client; // (4)
@BeforeEach
public void setupSpec() {
prepareHelloFunction(); // (5)
}
@Test
public void invokeFunction() {
HelloResponse result = client.hello("Vlad"); // (6)
Assertions.assertEquals("Hello Vlad", result.getMessage()); // (7)
}
private void prepareHelloFunction() {
boolean alreadyExists = lambda.listFunctions()
.functions()
.stream()
.anyMatch(fn -> "HelloFunction".equals(fn.functionName()));
if (alreadyExists) {
return;
}
File functionDir = new File(tmp, "HelloFunction");
functionDir.mkdirs();
FIXT.copyTo("HelloFunction", functionDir);
File functionArchive = new File(tmp, "function.zip");
ZipUtil.pack(functionDir, functionArchive);
lambda.createFunction(create -> create.functionName("HelloFunction")
.runtime(Runtime.NODEJS16_X)
.role("HelloRole")
.handler("index.handler")
.environment(e ->
e.variables(Collections.singletonMap("MICRONAUT_ENVIRONMENTS", "itest"))//(8)
)
.code(code -> {
try {
InputStream archiveStream = Files.newInputStream(functionArchive.toPath());
SdkBytes archiveBytes = SdkBytes.fromInputStream(archiveStream);
code.zipFile(archiveBytes);
} catch (IOException e) {
throw new IllegalStateException(
"Failed to create function from archive " + functionArchive, e
);
}
})
.build());
}
}
-
Annotate the specification with
@MicronautTestto let Micronaut handle the application context lifecycle -
Fixtis used to organize the function fixture -
The
LambdaClientis populated automatically pointing to the Localstack test container -
The function client can be injected as well
-
The function is created in Localstack if not present yet
-
The function is invoked
-
The result of the invocation is compared to the expected value
-
Set the Micronaut environment for the AWS Lambda function
|
Caution
|
If your Lambda function under test itself integrates with some other AWS services then you need to set them up in Localstack and set the endpoints correctly to point to the Localstack mocks. application-itest.yml
|
|
Tip
|
You can save time creating the new Localstack container by sharing it between the tests. application-test.yml
|
Integration Testing
You can very verify proper serialization and deserialization of the function arguments and results using micronaut-amazon-awssdk-lambda-integration-testing module.
You need to add following dependencies into your build file:
testImplementation 'com.agorapulse:micronaut-amazon-awssdk-lambda-integration-testing:{project-version}'
<dependency>
<groupId>com.agorapulse</groupId>
<artifactId>micronaut-amazon-awssdk-lambda-integration-testing</artifactId>
<version>{project-version}</version>
</dependency>
You can use the TestAwsLambdaRuntime runner to test the function execution, including the serialization.
import io.micronaut.json.JsonMapper
import io.micronaut.test.annotation.MockBean
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import jakarta.inject.Inject
import spock.lang.Specification
@MicronautTest // (1)
class TestAwsLambdaRuntimeSpec extends Specification {
private static final String RESPONSE_TEXT = 'olleH'
private static final String REQUEST = '{ "message": "Hello" }'
@Inject JsonMapper mapper
@Inject TestAwsLambdaRuntime aws // (2)
@MockBean(DefaultSomeService) SomeService someService() { // (3)
return Mock(SomeService) {
transform(_ as String) >> { String msg -> msg.reverse() }
}
}
void 'handle request'() { // (4)
expect:
aws.handleRequest(PojoHandler, REQUEST).text == RESPONSE_TEXT
aws.handleRequest(PojoHandler, new ByteArrayInputStream(REQUEST.bytes)).text == RESPONSE_TEXT
aws.apply(FunctionHandler, REQUEST).text == RESPONSE_TEXT
aws.apply(FunctionHandler, new ByteArrayInputStream(REQUEST.bytes)).text == RESPONSE_TEXT
aws.<Response>invoke(PojoHandler.name, REQUEST).text == RESPONSE_TEXT
aws.<Response>invoke(PojoHandler.name, new ByteArrayInputStream(REQUEST.bytes)).text == RESPONSE_TEXT
aws.<Response>invoke(SimpleHandler.name + '::execute', REQUEST).text == RESPONSE_TEXT
aws.<Response>invoke(SimpleHandler.name + '::execute', REQUEST).text == RESPONSE_TEXT
mapper.readValue(aws.stream(PojoStreamHandler, REQUEST).toString(), Response).text == RESPONSE_TEXT
mapper.readValue(aws.stream(PojoStreamHandler, new ByteArrayInputStream(REQUEST.bytes)).toString(), Response).text == RESPONSE_TEXT
mapper.readValue(aws.stream(PojoStreamHandler.name, REQUEST).toString(), Response).text == RESPONSE_TEXT
mapper.readValue(aws.stream(PojoStreamHandler.name, new ByteArrayInputStream(REQUEST.bytes)).toString(), Response).text == RESPONSE_TEXT
}
}
-
Use
@MicronautTestto let Micronaut handle the application context lifecycle -
Inject the
TestAwsLambdaRuntimerunner -
Use can mock any services as in any other Micronaut test
-
Use one of the methods to execute the function and verify the result
import io.micronaut.json.JsonMapper;
import io.micronaut.test.annotation.MockBean;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;
@MicronautTest // (1)
public class TestAwsLambdaRuntimeTest {
private static final String RESPONSE_TEXT = "olleH";
private static final String REQUEST = "{ \"message\": \"Hello\" }";
@Inject JsonMapper mapper;
@Inject TestAwsLambdaRuntime aws; // (2)
@MockBean(DefaultSomeService.class) SomeService someService() { // (3)
SomeService mock = Mockito.mock(SomeService.class);
when(mock.transform("Hello")).thenReturn(RESPONSE_TEXT);
return mock;
}
@Test
void handleRequest() throws Exception { // (4)
assertEquals(
RESPONSE_TEXT,
aws.handleRequest(PojoHandler.class, REQUEST).getText()
);
assertEquals(
RESPONSE_TEXT,
aws.handleRequest(PojoHandler.class, new ByteArrayInputStream(REQUEST.getBytes(StandardCharsets.UTF_8))).getText()
);
assertEquals(
RESPONSE_TEXT,
aws.apply(FunctionHandler.class, REQUEST).getText()
);
assertEquals(
RESPONSE_TEXT,
aws.apply(FunctionHandler.class, new ByteArrayInputStream(REQUEST.getBytes(StandardCharsets.UTF_8))).getText()
);
assertEquals(
RESPONSE_TEXT,
aws.<Response>invoke(PojoHandler.class.getName(), REQUEST).getText()
);
assertEquals(
RESPONSE_TEXT,
aws.<Response>invoke(PojoHandler.class.getName(), new ByteArrayInputStream(REQUEST.getBytes(StandardCharsets.UTF_8))).getText()
);
assertEquals(
RESPONSE_TEXT,
aws.<Response>invoke(SimpleHandler.class.getName() + "::execute", REQUEST).getText()
);
assertEquals(
RESPONSE_TEXT,
aws.<Response>invoke(SimpleHandler.class.getName() + "::execute", REQUEST).getText()
);
assertEquals(
RESPONSE_TEXT,
mapper.readValue(aws.stream(PojoStreamHandler.class, REQUEST).toString(), Response.class).getText()
);
assertEquals(
RESPONSE_TEXT,
mapper.readValue(aws.stream(PojoStreamHandler.class, new ByteArrayInputStream(REQUEST.getBytes(StandardCharsets.UTF_8))).toString(), Response.class).getText()
);
assertEquals(
RESPONSE_TEXT,
mapper.readValue(aws.stream(PojoStreamHandler.class.getName(), REQUEST).toString(), Response.class).getText()
);
assertEquals(
RESPONSE_TEXT,
mapper.readValue(aws.stream(PojoStreamHandler.class.getName(), new ByteArrayInputStream(REQUEST.getBytes(StandardCharsets.UTF_8))).toString(), Response.class).getText()
);
}
}
-
Use
@MicronautTestto let Micronaut handle the application context lifecycle -
Inject the
TestAwsLambdaRuntimerunner -
Use can mock any services as in any other Micronaut test
-
Use one of the methods to execute the function and verify the result
Simple Storage Service (S3)
Amazon Simple Storage Service (Amazon S3) is an object storage service that offers industry-leading scalability, data availability, security, and performance.
This library provides basic support for Amazon S3 using Simple Storage Service
Installation
implementation 'com.agorapulse:micronaut-amazon-awssdk-s3:{project-version}'
<dependency>
<groupId>com.agorapulse</groupId>
<artifactId>micronaut-amazon-awssdk-s3</artifactId>
<version>{project-version}</version>
</dependency>
Configuration
You can store the name of the bucket in the configuration using aws.s3.bucket property. You can create additional configurations
by providing 'aws.s3.buckets' configuration map.
aws:
s3:
bucket: DefaultBucket # (1)
use-path-style-urls: true # (2)
buckets:
test: # (3)
bucket: SomeOtherBucket # (4)
-
You can define default bucket for the service
-
Force path style URL usage
-
You can define multiple configurations
-
Each of the configuration can be access using
@Named('test') SimpleStorageServicequalifier
Simple Storage Service
SimpleStorageService provides middle-level API for managing buckets and uploading and downloading files.
Instance of SimpleStorageService is created for the default S3 configuration and each bucket configuration in aws.s3.buckets map.
You should always use @Named qualifier when injecting SimpleStorageService if you have more than one bucket configuration present, e.g. @Named("test") SimpleStorageService service.
Following example shows some of the most common use cases for working with S3 buckets.
service.createBucket(MY_BUCKET); // (1)
assertTrue(service.listBucketNames().contains(MY_BUCKET)); // (2)
-
Create new bucket of given name
-
The bucket is present within the list of all bucket names
File sampleContent = createFileWithSampleContent();
service.storeFile(TEXT_FILE_PATH, sampleContent); // (1)
assertTrue(service.exists(TEXT_FILE_PATH)); // (2)
Publisher<S3Object> summaries = service.listObjectSummaries("foo"); // (3)
assertEquals(Long.valueOf(0L), Flux.from(summaries).count().block());
-
Upload file
-
File is uploaded
-
File is present in the summaries of all files
InputStreamservice.storeInputStream( // (1)
KEY,
new ByteArrayInputStream(SAMPLE_CONTENT.getBytes()),
metadata -> {
metadata.contentLength((long) SAMPLE_CONTENT.length())
.contentType("text/plain")
.contentDisposition("bar.baz");
}
);
Publisher<S3Object> fooSummaries = service.listObjectSummaries("foo"); // (2)
assertEquals(KEY, Flux.from(fooSummaries).blockFirst().key());
-
Upload data from stream
-
Stream is uploaded
String url = service.generatePresignedUrl(KEY, TOMORROW); // (1)
assertEquals(SAMPLE_CONTENT, download(url)); // (2)
-
Generate presigned URL
-
Downloaded content corresponds with the expected content
File file = new File(tmp, "bar.baz"); // (1)
service.getFile(KEY, file); // (2)
assertTrue(file.exists());
assertEquals(SAMPLE_CONTENT, new String(Files.readAllBytes(Paths.get(file.toURI()))));
-
Prepare a destination file
-
Download the file locally
service.deleteFile(TEXT_FILE_PATH); // (1)
assertFalse(service.exists(TEXT_FILE_PATH)); // (2)
-
Delete file
-
The file is no longer present
service.deleteBucket(); // (1)
assertFalse(service.listBucketNames().contains(MY_BUCKET)); // (2)
-
Delete bucket
-
The Bucket is no longer present
Please, see SimpleStorageService for the full reference.
Testing
You can very easily mock any of the interfaces and declarative services but if you need close-to-production
DynamoDB integration works well with Testcontainers and LocalStack using micronaut-amazon-awssdk-integration-testing module.
You need to add following dependencies into your build file:
testImplementation 'com.agorapulse:micronaut-amazon-awssdk-integration-testing:{project-version}'
<dependency>
<groupId>com.agorapulse</groupId>
<artifactId>micronaut-amazon-awssdk-dynamodb</artifactId>
<version>{project-version}</version>
</dependency>
Then you can set up your tests like this:
@MicronautTest // (1)
@Property(name = 'aws.s3.bucket', value = MY_BUCKET) // (2)
class SimpleStorageServiceSpec extends Specification {
@Inject SimpleStorageService service // (3)
// test methods
}
-
Annotate the specification with
@MicronautTestto let Micronaut handle the application context lifecycle -
Annotate the specification with
@Propertyto set the required Micronaut properties -
Use
@Injectto let Micronaut inject the beans into your tests
@MicronautTest // (1)
@Property(name = "aws.s3.bucket", value = SimpleStorageServiceTest.MY_BUCKET) // (2)
public class SimpleStorageServiceTest {
@Inject SimpleStorageService service; // (3)
// test methods
}
-
Annotate the specification with
@MicronautTestto let Micronaut handle the application context lifecycle -
Annotate the specification with
@Propertyto set the required Micronaut properties -
Use
@Injectto let Micronaut inject the beans into your tests
|
Tip
|
You can save time creating the new Localstack container by sharing it between the tests. application-test.yml
|
Simple Email Service (SES)
Amazon Simple Email Service (Amazon SES) is a cloud-based email sending service designed to help digital marketers and application developers send marketing, notification, and transactional emails. It is a reliable, cost-effective service for businesses of all sizes that use email to keep in contact with their customers.
This library provides basic support for Amazon SES using Simple Email Service
Installation
implementation 'com.agorapulse:micronaut-amazon-awssdk-ses:{project-version}'
<dependency>
<groupId>com.agorapulse</groupId>
<artifactId>micronaut-amazon-awssdk-ses</artifactId>
<version>{project-version}</version>
</dependency>
Simple Email Service
SimpleEmailService provides DSL for creating and sending simple emails with attachments. As the other services,
it uses default credentials chain to obtain the credentials.
Following example shows how to send an email with attachment.
EmailDeliveryStatus status = service.send { // (1)
subject 'Hi Paul' // (2)
from 'subscribe@groovycalamari.com' // (3)
to 'me@sergiodelamo.com' // (4)
htmlBody '<p>This is an example body</p>' // (5)
configurationSetName 'configuration-set' // (6)
tags mapOfTags // (7)
attachment { // (8)
filepath thePath // (9)
filename 'test.pdf' // (10)
mimeType 'application/pdf' // (11)
description 'An example pdf' // (12)
}
}
-
Start building an email
-
Define subject of the email
-
Define the from address
-
Define one or more recipients
-
Define HTML body (alternatively you can declare plain text body as well)
-
Define configuration set for the email (https://docs.aws.amazon.com/ses/latest/dg/using-configuration-sets.html)
-
Define tags for the email, they will be included in SES events
-
Build an attachment
-
Define the location of the file to be sent
-
Define the file name (optional - deduced from the file)
-
Define the mime type (usually optional - deduced from the file)
-
Define the description of the file (optional)
EmailDeliveryStatus status = service.send(e -> // (1)
e.subject("Hi Paul") // (2)
.from("subscribe@groovycalamari.com") // (3)
.to("me@sergiodelamo.com") // (4)
.htmlBody("<p>This is an example body</p>") // (5)
.configurationSetName("configuration-set") // (6)
.tags(mapOfTags) // (7)
.attachment(a -> // (8)
a.filepath(filepath) // (9)
.filename("test.pdf") // (10)
.mimeType("application/pdf") // (11)
.description("An example pdf") // (12)
)
);
-
Start building an email
-
Define subject of the email
-
Define the from address
-
Define one or more recipients
-
Define HTML body (alternatively you can declare plain text body as well)
-
Define configuration set for the email (https://docs.aws.amazon.com/ses/latest/dg/using-configuration-sets.html)
-
Define tags for the email, they will be included in SES events
-
Build an attachment
-
Define the location of the file to be sent
-
Define the file name (optional - deduced from the file)
-
Define the mime type (usually optional - deduced from the file)
-
Define the description of the file (optional)
Please, see SimpleEmailService for the full reference.
Configuration
You can configure the SES client using aws.ses prefix in your application.yml file.
Use aws.ses.use-base64-encoding-for-multipart-emails to enable base64 encoding for HTML body of multipart emails.
aws:
ses:
use-base64-encoding-for-multipart-emails: true
Testing
It is recommended just to mock the SimpleEmailService in your tests as it only contains single abstract method.
Simple Notification Service (SNS)
Amazon Simple Notification Service (SNS) is a highly available, durable, secure, fully managed pub/sub messaging service that enables you to decouple microservices, distributed systems, and serverless applications.
This library provides two approaches to work with Simple Notification Service topics:
-
High-level Publishing with
@NotificationClient -
Middle-level Simple Notification Service
Installation
annotationProcessor 'com.agorapulse:micronaut-amazon-awssdk-sns-annotation-processor:{project-version}'
implementation 'com.agorapulse:micronaut-amazon-awssdk-sns:{project-version}'
<dependency>
<groupId>com.agorapulse</groupId>
<artifactId>micronaut-amazon-awssdk-sns</artifactId>
<version>{project-version}</version>
</dependency>
|
Note
|
For Kotlin use kapt instead of annotationProcessor configuration.
|
Configuration
No configuration is required but some of the configuration properties may be useful for you.
aws:
sns:
topic: TestTopic # (1)
ios:
arn: 'arn:aws:sns:eu-west-1:123456789:...' # (2)
android:
arn: 'arn:aws:sns:eu-west-1:123456789:...' # (3)
amazon:
arn: 'arn:aws:sns:eu-west-1:123456789:...' # (4)
topics:
test: # (5)
topic: SomeOtherTopic # (6)
-
You can specify the default topic for
SimpleNotificationServiceand@NotificationClient -
Amazon Resource Name for the iOS application mobile push
-
Amazon Resource Name for the Android application mobile push
-
Amazon Resource Name for the Amazon application mobile push
-
You can define multiple configurations
-
Each of the configuration can be access using
@Named('test') SimpleNotificationServicequalifier or you can define the configuration asvalueof@NotificationClient('test')
Publishing with @NotificationClient
If you place NotificationClient annotation on the interface then methods
matching predefined pattern will be automatically implemented. Methods containing word sms will send text messages.
Other methods of NotificationClient will publish new messages into the topic.
Use packages starting com.agorapulse.micronaut.amazon.awssdk.sns.
The following example shows many of available method signatures for publishing records:
package com.agorapulse.micronaut.amazon.awssdk.sns;
import com.agorapulse.micronaut.amazon.awssdk.sns.annotation.MessageDeduplicationId;
import com.agorapulse.micronaut.amazon.awssdk.sns.annotation.MessageGroupId;
import com.agorapulse.micronaut.amazon.awssdk.sns.annotation.NotificationClient;
import com.agorapulse.micronaut.amazon.awssdk.sns.annotation.Topic;
import java.util.Map;
@NotificationClient // (1)
interface DefaultClient {
String OTHER_TOPIC = "OtherTopic";
@Topic("OtherTopic") String publishMessageToDifferentTopic(Pogo pogo); // (2)
String publishMessage(Pogo message); // (3)
String publishMessage(Pogo message, @MessageGroupId String groupId, @MessageDeduplicationId String deduplicationId); // (4)
String publishMessage(String subject, Pogo message); // (5)
String publishMessage(String subject, Pogo message, Map<String, String> attributes);
String publishMessage(String message); // (6)
String publishMessage(String subject, String message);
String publishMessage(String subject, String message, Map<String, String> attributes);
String sendSMS(String phoneNumber, String message); // (7)
String sendSms(String phoneNumber, String message, Map attributes); // (8)
}
-
@NotificationClientannotation makes the interface a SNS client -
You can specify to which topic is the message published using
@Topicannotation -
You can publish any object which can be converted into JSON.
-
For FIFO Topics the annotations
@MessageGroupIdand@MessageDeduplicationIdcan be added on method parameters to forward these attributes when publishing -
You can add additional subject to published message (only useful for few protocols, e.g. email)
-
You can publish a string message
-
You can send SMS using the word
SMSin the name of the method. One argument must be phone number and its name must contain the wordnumber -
You can provide additional attributes for the SMS message
|
Note
|
The return value of the methods is message id returned by AWS. |
|
Tip
|
You can add Map<String, String> attributes argument to send message attributes.
|
By default, NotificationClient publishes messages into the default topic defined by aws.sns.topic property.
You can switch to different configuration by changing the value of the annotation such as @NotificationClient("other") or
by setting the topic property of the annotation such as @NotificationClient(topic = "SomeTopic"). You can change topic
used by particular method using @Topic annotation as mentioned above.
Simple Notification Service
SimpleNotificationService provides middle-level API for creating, describing, and deleting topics. You can manage applications, endpoints and devices.
You can send messages and notifications.
Instance of SimpleNotificationService is created for the default SNS configuration and each topics configuration in aws.sns.topics map.
You should always use @Named qualifier when injecting SimpleNotificationService if you have more than one topic configuration present, e.g. @Named("other") SimpleNotificationService otherService.
Following example shows some of the most common use cases for working with Amazon SNS.
Working with Topics
String topicArn = service.createTopic(TEST_TOPIC); // (1)
Topic found = Flux.from(service.listTopics()).filter(t -> // (2)
t.topicArn().endsWith(TEST_TOPIC)
).blockFirst();
-
Create new topic of given name
-
The topic is present within the list of all topics' names
String subArn = service.subscribeTopicWithEmail(topicArn, EMAIL); // (1)
String messageId = service.publishMessageToTopic( // (2)
topicArn,
"Test Email",
"Hello World"
);
service.unsubscribeTopic(subArn); // (3)
-
Subscribe to the topic with an email (there are more variants of this method to subscribe to most common protocols such as HTTP(S) endpoints, SQS, …)
-
Publish message to the topic
-
Use the subscription ARN to unsubscribe from the topic
service.deleteTopic(topicArn); // (1)
Long zero = Flux.from(service.listTopics()).filter(t -> // (2)
t.topicArn().endsWith(TEST_TOPIC)
).count().block();
-
Delete the topic
-
The topic is no longer present within the list of all topics' names
Working with Applications
String appArn = service.createPlatformApplication( // (1)
"my-app",
APNS,
null,
API_KEY
);
String endpoint = service.createPlatformEndpoint(appArn, DEVICE_TOKEN, DATA); // (2)
String jsonMessage = "{\"data\", \"{\"foo\": \"some bar\"}\", \"notification\", \"{\"title\": \"some title\", \"body\": \"some body\"}\"}";
String msgId = service.sendNotification(endpoint, APNS, jsonMessage); // (3)
service.validateDeviceToken(appArn, endpoint, DEVICE_TOKEN, DATA); // (4)
service.unregisterDevice(endpoint); // (5)
-
Create new Android application (more platforms available)
-
Register Android device (more platforms available)
-
Send Android notification (more platforms available)
-
Validate Android device
-
Unregister device
Sending SMS
Map<String, MessageAttributeValue> attrs = Collections.emptyMap();
String msgId = service.sendSMSMessage(PHONE_NUMBER, "Hello World", attrs); // (1)
-
Send a message to the phone number
Please, see SimpleNotificationService for the full reference.
Testing
You can very easily mock any of the interfaces and declarative services but if you need close-to-production
DynamoDB integration works well with Testcontainers and LocalStack using micronaut-amazon-awssdk-integration-testing module.
You need to add following dependencies into your build file:
testImplementation 'com.agorapulse:micronaut-amazon-awssdk-integration-testing:{project-version}'
<dependency>
<groupId>com.agorapulse</groupId>
<artifactId>micronaut-amazon-awssdk-dynamodb</artifactId>
<version>{project-version}</version>
</dependency>
Then you can set up your tests like this:
@MicronautTest // (1)
@Property(name = 'aws.sns.topic', value = TEST_TOPIC) // (2)
@Property(name = 'aws.sns.ios.arn', value = IOS_APP_ARN)
@Property(name = 'aws.sns.ios-sandbox.arn', value = IOS_SANDBOX_APP_ARN)
@Property(name = 'aws.sns.android.arn', value = ANDROID_APP_ARN)
@Property(name = 'aws.sns.amazon.arn', value = AMAZON_APP_ARN)
@Property(name = 'aws.sqs.queue', value = TEST_QUEUE)
class SimpleNotificationServiceSpec extends Specification {
@Inject SimpleNotificationService service // (3)
// tests
}
-
Annotate the specification with
@MicronautTestto let Micronaut handle the application context lifecycle -
Annotate the specification with
@Propertyto set the required Micronaut properties -
Use
@Injectto let Micronaut inject the beans into your tests
class SimpleNotificationServiceTest {
@MicronautTest // (1)
@Property(name = "aws.sns.topic", value = SimpleNotificationServiceTest.TEST_TOPIC) // (2)
public class SimpleNotificationServiceTest {
@Inject SimpleNotificationService service; // (3)
// tests
}
-
Annotate the specification with
@MicronautTestto let Micronaut handle the application context lifecycle -
Annotate the specification with
@Propertyto set the required Micronaut properties -
Use
@Injectto let Micronaut inject the beans into your tests
|
Tip
|
You can save time creating the new Localstack container by sharing it between the tests. application-test.yml
|
Simple Queue Service (SQS)
Amazon Simple Queue Service (SQS) is a fully managed message queuing service that enables you to decouple and scale microservices, distributed systems, and serverless applications. SQS eliminates the complexity and overhead associated with managing and operating message oriented middleware, and empowers developers to focus on differentiating work.
This library provides two approaches to work with Simple Queue Service queues:
-
High-level Publishing with
@QueueClient -
Middle-level Simple Queue Service
Installation
annotationProcessor 'com.agorapulse:micronaut-amazon-awssdk-sqs-annotation-processor:{project-version}'
implementation 'com.agorapulse:micronaut-amazon-awssdk-sqs:{project-version}'
<dependency>
<groupId>com.agorapulse</groupId>
<artifactId>micronaut-amazon-awssdk-sqs</artifactId>
<version>{project-version}</version>
</dependency>
|
Note
|
For Kotlin use kapt instead of annotationProcessor configuration.
|
Configuration
No configuration is required but some of the configuration properties may be useful for you.
aws:
sqs:
queue-name-prefix: dev_ # (1)
auto-create-queue: true # (2)
cache-queue-urls: true # (3)
queue: DefaultQueue # (4)
fifo: false # (5)
content-based-deduplication: false # (6)
delay-seconds: 0 # (7)
message-retention-period: 345600 # (8)
maximum-message-size: 262144 # (9)
visibility-timeout: 30 # (10)
queues:
test: # (11)
queue: SomeOtherQueue # (12)
-
Queue prefix is prepended to every queue name (may be useful for local development)
-
Whether to create any missing queue automatically (default
false) -
Whether to first fetch all queues and set up queue to url cache first time the service is prompted for the queue URL (default
false) -
You can specify the default queue for
SimpleQueueServiceand@QueueClient -
Whether the newly created queues are supposed to be FIFO queues (default
false) -
Enable content based deduplication for FIFO queues (default
false) -
The length of time, in seconds, for which the delivery of all messages in the queue is delayed. Valid values: An integer from
0to900(15 minutes). Default:0. -
The length of time, in seconds, for which Amazon SQS retains a message. Valid values: An integer representing seconds, from
60(1 minute) to1,209,600(14 days). Default:345,600(4 days). -
The limit of how many bytes a message can contain before Amazon SQS rejects it. Valid values: An integer from
1,024bytes (1 KiB) up to262,144bytes (256 KiB). Default:262,144(256 KiB). -
The visibility timeout for the queue, in seconds. Valid values: an integer from
0to43,200(12 hours). Default:30. -
You can define multiple configurations
-
Each of the configuration can be access using
@Named('test') SimpleQueueServicequalifier, or you can define the configuration asvalueof@QueueClient('test')
Publishing with @QueueClient
If you place QueueClient annotation on the interface then methods
matching predefined pattern will be automatically implemented. Methods containing word delete will delete queue messages.
Other methods of QueueClient will publish new records into the queue.
Use packages starting com.agorapulse.micronaut.amazon.awssdk.sqs.
The following example shows many of available method signatures for publishing records:
package com.agorapulse.micronaut.amazon.awssdk.sqs;
import com.agorapulse.micronaut.amazon.awssdk.sqs.annotation.Queue;
import com.agorapulse.micronaut.amazon.awssdk.sqs.annotation.QueueClient;
import org.reactivestreams.Publisher;
import java.util.List;
@QueueClient // (1)
interface DefaultClient {
@Queue(value = "OtherQueue", group = "SomeGroup")
String sendMessageToQueue(String message); // (2)
String sendMessage(Pogo message); // (3)
String sendMessage(byte[] record); // (4)
String sendMessage(String record); // (5)
String sendMessage(String record, int delay); // (6)
String sendMessage(String record, String group); // (7)
String sendMessage(String record, int delay, String group); // (8)
void sendStringMessages(Publisher<String> messages); // (9)
Publisher<String> sendMessages(Publisher<Pogo> messages); // (10)
void deleteMessage(String messageId); // (11)
List<String> sendMessages(List<Pogo> messages); // (12)
List<String> sendStringMessages(List<String> messages); // (13)
List<String> sendMessages(Pogo[] messages); // (14)
List<String> sendStringMessages(String[] messages); // (15)
void sendMessagesVoid(List<Pogo> messages); // (16)
String OTHER_QUEUE = "OtherQueue";
}
-
@QueueClientannotation makes the interface a SQS client -
You can specify to which queue is the message published using
@Queueannotation, you can also specify thegroupfor FIFO queues -
You can publish any record object which can be converted into JSON.
-
You can publish a byte array record
-
You can publish a string record
-
You can publish a string with custom delay
-
You can publish a string with custom FIFO queue group
-
You can publish a string with custom delay and FIFO queue group
-
You can send multiple messages at once when the argument is
Publisher -
If the return type is also publisher type then you need to subscribe to the publisher to actually send the messages
-
You can delete published message using the message ID
-
You can send multiple messages at once when the argument is
Listand return a list of message IDs -
You can send multiple string messages at once when the argument is
Listand return a list of message IDs -
You can send multiple messages at once when the argument is an array and return a list of message IDs
-
You can send multiple string messages at once when the argument is an array and return a list of message IDs
-
You can send multiple messages at once and return void
|
Note
|
The return value of the publishing methods is message id returned by AWS. |
By default, QueueClient publishes records into the default queue defined by aws.sqs.queue property.
You can switch to different configuration by changing the value of the annotation such as @QueueClient("other") or
by setting the queue property of the annotation such as @QueueClient(queue = "SomeQueue"). You can change queue
used by particular method using @Queue annotation as mentioned above.
Simple Queue Service
SimpleQueuenService provides middle-level API for creating, describing, and deleting queues. It allows to publish, receive and delete records.
Instance of SimpleQueueService is created for the default SQS configuration and each queue configuration in aws.sqs.queues map.
You should always use @Named qualifier when injecting SimpleQueueService if you have more than one topic configuration present, e.g. @Named("other") SimpleQueueService otherService.
Following example shows some of the most common use cases for working with Amazon SQS.
String queueUrl = service.createQueue(TEST_QUEUE); // (1)
assertTrue(service.listQueueUrls().contains(queueUrl)); // (2)
-
Create new queue of given name
-
The queue URL is present within the list of all queues' URLs
Map<QueueAttributeName, String> queueAttributes = service
.getQueueAttributes(TEST_QUEUE); // (1)
assertEquals("0", queueAttributes
.get(QueueAttributeName.DELAY_SECONDS)); // (2)
-
Fetch queue’s attributes
-
You can read the queue’s attributes from the map
service.deleteQueue(TEST_QUEUE); // (1)
assertFalse(service.listQueueUrls().contains(queueUrl)); // (2)
-
Delete queue
-
The queue URL is no longer present within the list of all queues' URLs
String msgId = service.sendMessage(DATA); // (1)
assertNotNull(msgId);
List<Message> messages = service.receiveMessages(); // (2)
Message first = messages.get(0);
assertEquals(DATA, first.body()); // (3)
assertEquals(msgId, first.messageId());
assertEquals(1, messages.size());
service.deleteMessage(first.receiptHandle()); // (4)
-
Send a message
-
Receive messages from the queue (in another application)
-
Read message body
-
Developers are responsible to delete the message from the queue themselves
|
Tip
|
Try to use AWS Lambda functions triggered by SQS messages to handle incoming SQS messages instead of implementing complex message handling logic yourselves. |
Please, see SimpleQueueService for the full reference.
Testing
You can very easily mock any of the interfaces and declarative services but if you need close-to-production
DynamoDB integration works well with Testcontainers and LocalStack using micronaut-amazon-awssdk-integration-testing module.
You need to add following dependencies into your build file:
testImplementation 'com.agorapulse:micronaut-amazon-awssdk-integration-testing:{project-version}'
<dependency>
<groupId>com.agorapulse</groupId>
<artifactId>micronaut-amazon-awssdk-dynamodb</artifactId>
<version>{project-version}</version>
</dependency>
Then you can set up your tests like this:
@MicronautTest // (1)
@Property(name = 'aws.sqs.queue', value = TEST_QUEUE) // (2)
class SimpleQueueServiceSpec extends Specification {
@Inject SimpleQueueService service // (3)
// tests
}
-
Annotate the specification with
@MicronautTestto let Micronaut handle the application context lifecycle -
Annotate the specification with
@Propertyto set the required Micronaut properties -
Use
@Injectto let Micronaut inject the beans into your tests
@MicronautTest // (1)
@Property(name = "aws.sqs.queue", value = SimpleQueueServiceTest.TEST_QUEUE) // (2)
public class SimpleQueueServiceTest {
@Inject SimpleQueueService service; // (3)
// tests
}
-
Annotate the specification with
@MicronautTestto let Micronaut handle the application context lifecycle -
Annotate the specification with
@Propertyto set the required Micronaut properties -
Use
@Injectto let Micronaut inject the beans into your tests
|
Tip
|
You can save time creating the new Localstack container by sharing it between the tests. application-test.yml
|
Security Token Service (STS)
The AWS Security Token Service (STS) is a web service that enables you to request temporary, limited-privilege credentials for AWS Identity and Access Management (IAM) users or for users that you authenticate (federated users).
This library provides basic support for Amazon STS using Security Token Service
Installation
implementation 'com.agorapulse:micronaut-amazon-awssdk-sts:{project-version}'
<dependency>
<groupId>com.agorapulse</groupId>
<artifactId>micronaut-amazon-awssdk-sts</artifactId>
<version>{project-version}</version>
</dependency>
Security Token Service
SecurityTokenService provides only one method (with multiple variations) to create credentials
which assumes usage of a certain IAM role.
Following example shows how to create credentials for assumed role.
service.assumeRole('session', 'arn:::my-role', 360) {
externalId '123456789'
}
Please, see SecurityTokenService for the full reference.
Testing
It is recommended just to mock the SecurityTokenService in your tests as it only contains single abstract method.
Configuration
See the configuration sections for particular services.
Following services support configuring region and endpoint:
-
CloudWatch
-
DynamoDB
-
Lambda
-
Kinesis
-
S3
-
SES
-
SNS
-
SQS
-
STS
For example, to configure region for DynamoDB you can add following settings:
aws:
dynamodb:
region: us-east-1
endpoint: http://localhost:8000
The same service can also be configured with different HTTP client settings when using AWS SDK 2.x.:
aws:
dynamodb:
# can be url-connection or aws-crt, apache is the default
client: url-connection
# can be aws-crt, netty is the default
async-client: aws-crt
The client libraries must be added to the classpath. For example for Gradle add the following dependencies:
dependencies {
runtimeOnly "software.amazon.awssdk:aws-crt-client:$awsSdk2Version"
runtimeOnly "software.amazon.awssdk:url-connection-client:$awsSdk2Version"
runtimeOnly "software.amazon.awssdk:netty-nio-client:$awsSdk2Version"
runtimeOnly "software.amazon.awssdk:apache-client:$awsSdk2Version"
}
The particular client builder can be then configured using BeanCreatedEventListener beans:
@Singleton
public class NettyClientCustomizer implements BeanCreatedEventListener<NettyNioAsyncHttpClient.Builder> {
@Override
public NettyNioAsyncHttpClient.Builder onCreated(@NonNull BeanCreatedEvent<NettyNioAsyncHttpClient.Builder> event) {
event.getBean().readTimeout(Duration.ofSeconds(10));
return event.getBean();
}
}
Micronaut for API Gateway Proxy
API Gateway Lambda Proxy support for Micronaut has been replaced by an official suport Micronaut AWS API Gateway Support
Micronaut Grails
Micronaut Grails package has been moved into own repository.
