1. Introduction
Micronaut Worker is a library for advanced scheduling and work distribution in Micronaut. Compared to the Micronaut
@Scheduled
annotation, it adds the following capabilities
-
Custom annotations for particular use cases -
@Cron
,@FixedDelay
,@FixedRate
, and@InitialDelay
-
Runtime configuration via
worker.jobs
properties -
Queue consumers and producers via
@QueueConsumer
and@QueueProducer
annotations -
Distributed processing support with optional queues and leader election
-
Job execution events
JobExecutionStartedEvent
,JobExecutionFinishedEvent
,JobExecutionResultEvent
andJobExecutorEvent
-
Built in support for Micronaut Snitch
-
Built in support for Micronaut Console
-
Ability to execute a single job from the CLI (for e.g. https://aws.amazon.com/batch/ or https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task_definitions.html)
Unlike the @Scheduled
annotation, Micronaut Worker annotations are not repeatable, but
they can be combined in meaningful ways. For example, a method annotated with @FixedRate('10m') @InitialDelay('1m')
executes every
ten minutes, but it will wait one minute before the first execution.
All jobs are disabled by default in the function and cli environments, but can be triggered individually by calling jobManager.forceRun("job-name") .
|
2. Installation
Micronaut Worker is available in Maven Central.
repositories {
mavenCentral()
}
dependencies {
// minimal dependency with local queue and executor
// select some of the following dependencies to enable more features
implementation("com.agorapulse:micronaut-worker:2.7.3")
// to enable /jobs endpoint
implementation("com.agorapulse:micronaut-worker-management:2.7.3")
// to enable AWS SDK v1 SQS queues integration
implementation("com.agorapulse:micronaut-worker-queues-sqs-v1:2.7.3")
// to enable AWS SDK v2 SQS queues integration
implementation("com.agorapulse:micronaut-worker-queues-sqs-v2:2.7.3")
// to enable running jobs as CLI apps
implementation("com.agorapulse:micronaut-worker-runner:2.7.3")
// to enable Redis leader/follower capabilities
implementation("com.agorapulse:micronaut-worker-executor-redis:2.7.3")
// to enable Redis queues integration
implementation("com.agorapulse:micronaut-worker-queues-redis:2.7.3")
}
3. Usage
3.1. Scheduling
The basic usage of this library is to provide an easy way to run scheduled tasks in a distributed environment.
3.1.1. Cron Jobs
To create a scheduled CRON job, annotate a bean method with @Cron
containing the CRON definition string.
@Cron('0 0 0/1 ? * *')
public void job() {
// your code here
}
3.1.2. Fixed Delay Jobs
To create a fixed delay job, annotate a bean method with @FixedDelay
containing the duration string.
@FixedDelay('10m')
public void job() {
// your code here
}
3.1.3. Fixed Rate Jobs
To create a fixed delay job, annotate a bean method with @FixedRate
containing the duration string.
@FixedRate('10m')
public void job() {
// your code here
}
3.1.4. Initial Delay Jobs
To create a job with an initial delay, annotate a bean method with @InitialDelay
containing the duration string.
This annotation can be combined with @FixedRate
and @FixedDelay
annotations.
@InitialDelay('10m')
public void initialDelay() {
// your code here
}
@InitialDelay('10m') @FixedRate('5m')
public void fixedRate() {
// your code here
}
@InitialDelay('10m') @FixedDelay('15m')
public void fixedDelay() {
// your code here
}
3.2. Concurrency Management
You can limit the number of parallel executions with the @Concurrency
annotation. The @Consecutive
annotation
is an alias for @Concurrency(1)
and disables parallel execution completely.
@Concurrency(5)
@FixedRate('10m')
public void job() {
// your code here
}
3.3. Fork Options
You can spawn the method execution multiple times in parallel in a single server instance with the @Fork
annotation.
@Fork(5)
@FixedRate('10m')
public void job() {
// your code here
}
You should consider using your own scheduler with the pool of the matching size:
micronaut:
executors:
test-job: (1)
core-pool-size: 5 (2)
worker:
jobs:
test-job: (3)
scheduler: test-job (4)
1 | The name of the new custom scheduler |
2 | The size of the pool should be the same as the fork value |
3 | The name of the job |
4 | The scheduler definition using the name of the executor declared above |
Each job can define it’s owns scheduler name using scheduler annotation or configuration value but to prevent multiple jobs blocking each others execution, it’s recommended to keep the default value. Keeping the default value will create separate executor for each job.
|
For the consumer jobs, the messages are consumed synchronously so if you want to benefit from @Fork execution then keep the number of maxMessages to the default value 1 .
|
3.4. Distributed Jobs
The worker library excels in the distributed jobs area. You can run jobs only on the leader server or only on the followers or with the given concurrency level.
3.4.1. Leaders and Followers Jobs
Micronaut Worker can help you to run jobs in distributed environments. You can choose to run the code only on the leader server or only on the followers.
@LeaderOnly
@Cron('0 0 0/1 ? * *')
public void job() {
// your code here
}
@FollowerOnly
@FixedRate('10m')
public void job() {
// your code here
}
3.4.2. Jobs using Queue
In a distributed environment, tasks often process messages using queues. In Micronaut Worker, you have three type of queue related jobs - producers, consumers and pipes - depending on the number of arguments they have and their return value.
Quick Start
You can use two conventional annotations @QueueProducer
and @QueueConsumer
to publish and consume messages from a queue.
public record Message(String message) { } (1)
@Cron('0 0 0/1 ? * *') (2)
@QueueProducer("my-queue") (3)
public Flux<Message> produceToMyQueue() { (4)
return Flux.just("Hello", "World").map(Message::new);
}
@QueueConsumer("my-queue") (5)
public void listenToMyQueue(Message message) { (6)
// your code here
}
1 | Two jobs will communicate to each other using the given record object |
2 | The producer jobs must have some scheduled trigger associated with them, for example cron or fixed rate |
3 | Use the @QueueProducer annotation with the name of the queue to publish messages |
4 | The producer job must return some value, ideally a Publisher of given messages |
5 | Use the @QueueConsumer annotation with the name of the queue to consume messages |
6 | The consumer job must have a single parameter of the same type as the producer job returns |
The value for @Fork is the same as the number of maxMessages for @QueueConsumer annotation. The value of waitingTime in @QueueConsumer is the same as the associated @FixedRate value.
|
Advanced Usage
The name of the queue can be customised using @Consumes
and @Produces
annotations, otherwise the name is derived from the simple name of the class with any Job
, Consumer
, or Producer
suffix (in this order recursively). If the micronaut.application.name
property is set, the
extracted name is prefixed with the application name followed by an underscore. For example,
with micronaut.application.name
set to myapp
, MyOwnConsumerJob
will have the
default queue name myapp_MyOwn
.
Producer
Producer jobs return a value, usually a Publisher
, collection of objects, or a single object.
Producer jobs are always run only on the leader server.
@InitialDelay("50ms")
public Flux<String> hello() {
return Flux.just("Hello", "World");
}
If the producer job returns a Publisher , the messages can be batched if underlying implementation supports it. At the moment, only SQS AWS v2 implementation supports batching.
|
Consumer
Consumer jobs take a single parameter. They are usually a @FixedRate
job waiting for a message from a queue.
Messages can be sent into the queue from external systems, a producer job, or using JobManager#enqueue
method.
@FixedRate("100ms") @InitialDelay("100ms")
public void listen(String word) {
words.add(word);
}
Pipe
Pipe jobs are a combination of producer and consumer jobs. They usually use @Consumes
and @Produces
to specify the source, and the destination name of the queue.
@Named("my-pipe")
@FixedRate("50ms")
@Consumes("AnyWords")
@Produces("UpperWords")
public String pipe(String message) {
return message.toUpperCase();
}
As with a consumer job, you can use JobManager#enqueue
to send messages to the job.
jobManager.enqueue("my-pipe", "hello");
3.5. Configuration
3.5.1. General Settings
You can disable all jobs by setting worker.enabled
to false
:
worker:
enabled: false
All jobs are disabled for the test and function environments.
|
You can set the default queue type using the worker.queue-type
configuration property. This is convenient for local development to ensure your application is running against the local implementation.
worker:
queue-type: local
You can override the default scheduler (TaskExecutors.SCHEDULED
) by setting the worker.scheduler
property.
worker:
scheduler: virtual
You can let your jobs executed using virtual threads by using virtual executor.
|
3.5.2. Job Configuration
Anything you can configure using annotations can be configured externally.
The name of the job for classes having just a single job method is the name of the class hyphenated, e.g., sample-job
for a class SampleJob
. If the class contains more than one job method, jobs are created
for each method, and the name contains both the simple class name and the name of the method, e.g.,
sample-job-method-one
for SampleJob#methodOne
. You can override the default name by using
@Named("another-name")
. The custom name must already be hyphenated.
You can individually disable specific jobs:
worker:
jobs:
sample-job:
enabled: false
You can even change the type of the job:
worker:
jobs:
sample-job:
enabled: true
cron: '0 0 0/1 ? * *'
You can only use one of cron , fixed-delay , and fixed-rate settings. If more than one
is used, the first of cron , fixed-delay or fixed-rate is selected in this particular order. You can use initial-delay either individually or with fixed-delay or fixed-rate settings.
|
You can change the concurrency level and leader/follower execution:
worker:
jobs:
sample-job:
enabled: true
follower-only: true
concurrency: 10
You can configure the consumer and producer queues for distributed jobs.
worker:
jobs:
sample-job:
enabled: true
consumer:
queue-name: OtherQueue
queue-type: local
max-messages: 100
waiting-time: 100ms
producer:
queue-name: Firehose
queue-type: local
3.6. Events
There are currently three events being fired:
-
JobExecutionStartedEvent
- fired before executing the job -
JobExecutionFinishedEvent
- fired after execution -
JobExecutionResultEvent
- fired after execution of a producer or pipe job
The JobExecutionResultEvent contains a reference to the result of the execution. Any modifications of the result may cause unexpected behavior.
|
The basic example (already present in the codebase) is simple logging:
package com.agorapulse.worker.event;
import com.agorapulse.worker.Job;
import io.micronaut.runtime.event.annotation.EventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import jakarta.inject.Singleton;
import java.util.Optional;
@Singleton
public class JobEventsLogger {
private static final Logger LOGGER = LoggerFactory.getLogger(Job.class);
@EventListener
void onJobExecutionStarted(JobExecutionStartedEvent event) {
if (LOGGER.isDebugEnabled()) {
Optional<Object> message = event.getMessage();
if (message.isPresent()) {
LOGGER.debug("Starting job {}#{} with message {}", event.getName(), event.getId(), message.get());
} else {
LOGGER.debug("Starting job {}#{}", event.getName(), event.getId());
}
}
}
@EventListener
void onJobExecutionResult(JobExecutionResultEvent event) {
if (LOGGER.isDebugEnabled()) {
Object result = event.getResult();
if (result != null) {
LOGGER.debug("Job {}#{} emitted result {}", event.getName(), event.getId(), result);
} else if (LOGGER.isTraceEnabled()){
LOGGER.trace("No results emitted from job {}#{}", event.getName(), event.getId());
}
}
}
@EventListener
void onJobExecutionFinished(JobExecutionFinishedEvent event) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Finished executing job {}#{} in {}", event.getName(), event.getStatus().getId(), event.getStatus().getHumanReadableDuration());
}
}
}
If Micronaut Snitch is present on the classpath and configured with the name of the job, the snitch method is called automatically after successful execution.
|
3.7. CLI Runner
You can run a single job from the command line using the com.agorapulse.worker.runner.JobRunner
class as the main class. The arguments are the names of the jobs to run. All other jobs are disabled, even when enabled in the configuration (see corner cases below). The application will run until all jobs are finished.
java -cp myapp-shadow.jar com.agorapulse.worker.runner.JobRunner sample-job other-job
In some corner cases, some unrelated jobs can still be executed if they have a very short delay or frequency if they are manually enabled in the configuration. Please, prefer annotation driven jobs over configuring them manually in the configuration to avoid this issue. |
3.8. Management
You can use jobs
management endpoint, by default located at /jobs
, to see the status of all the jobs in the application.
[
{
"configuration": {
"name": "sample-job",
"enabled": true,
"concurrency": 0,
"leaderOnly": false,
"followerOnly": false,
"initialDelay": "10m",
"scheduler": "scheduled",
"fork": 1,
"consumer": {
"queueName": "Sample",
"queueType": null,
"maxMessages": 10,
"waitingTime": "20s"
},
"producer": {
"queueName": "Sample",
"queueType": null
}
},
"status": {
"executionCount": 0,
"lastTriggered": null,
"lastFinished": null,
"lastDuration": null,
"lastException": null,
"name": "sample-job",
"lastId": null
},
"source": "method com.agorapulse.worker.management.SampleJob#run",
"name": "sample-job"
}
]
3.9. Console Integration
For security reasons, Micronaut Worker does not provide any management endpoint, but it integrates with Micronaut Console to monitor the jobs and event to trigger them manually.
An instance of JobManager
aliased as jobs
is added to the script bindings. There is a also variable added for
each job. The name is the lower-camel-case version of the job name, e.g., sampleJob
for the sample-job
job.
Available Variables:
ctx: io.micronaut.context.DefaultApplicationContext
jobs: com.agorapulse.worker.console.ConsoleJobManager
request: io.micronaut.http.server.netty.NettyHttpRequest
sampleJob: com.agorapulse.worker.console.JobAccessor
user: com.agorapulse.micronaut.console.User
The job variables are instances of JobAccessor , which also provides methods run() and enqueue(message) to let you easily trigger jobs from the console. You can also use method reconfigure(consumer) that changes the in-memory configuration of the job and reschedules it.
|
A simple script with just variable jobs
will print the status of the current job execution. Depending
on which console endpoint you choose, you get either a text or JSON summary.
jobs
|==============================================================================================================================================|
| Name | Running | Last Triggered | Last Finished | Took |
|==============================================================================================================================================|
| sample-job | 0 | null | null | |
|==============================================================================================================================================|
{
"result": [
{
"configuration": {
"name": "sample-job",
"enabled": true,
"concurrency": 0,
"leaderOnly": false,
"followerOnly": false,
"initialDelay": "10m",
"scheduler": "scheduled",
"fork": 1,
"consumer": {
"queueName": "Sample",
"queueType": null,
"maxMessages": 10,
"waitingTime": "20s"
},
"producer": {
"queueName": "Sample",
"queueType": null
}
},
"status": {
"executionCount": 0,
"lastTriggered": null,
"lastFinished": null,
"lastDuration": null,
"lastException": null,
"name": "sample-job",
"lastId": null
},
"source": "method com.agorapulse.worker.console.SampleJob#run",
"name": "sample-job"
}
]
}
Reconfiguring the job will try to change the configuration of the job and reschedule it if it’s still enabled.
import java.time.Duration
sampleJob.reconfigure {
enabled true
initialDelay Duration.ofMillis(1)
}
Returning a job variable from the script will render details for that job.
sampleJob
|==============================================================================================================================================|
| Name | Running | Last Triggered | Last Finished | Took |
|==============================================================================================================================================|
| sample-job | 0 | null | null | |
|==============================================================================================================================================|
{
"result": {
"configuration": {
"name": "sample-job",
"enabled": true,
"concurrency": 0,
"leaderOnly": false,
"followerOnly": false,
"initialDelay": "10m",
"scheduler": "scheduled",
"fork": 1,
"consumer": {
"queueName": "Sample",
"queueType": null,
"maxMessages": 10,
"waitingTime": "20s"
},
"producer": {
"queueName": "Sample",
"queueType": null
}
},
"status": {
"executionCount": 0,
"lastTriggered": null,
"lastFinished": null,
"lastDuration": null,
"lastException": null,
"name": "sample-job",
"lastId": null
},
"source": "method com.agorapulse.worker.console.SampleJob#run",
"name": "sample-job"
}
}