1. Introduction

Micronaut Worker is a library for advanced scheduling and work distribution in Micronaut. Compared to the Micronaut @Scheduled annotation, it adds the following capabilities

Unlike the @Scheduled annotation, Micronaut Worker annotations are not repeatable, but they can be combined in meaningful ways. For example, a method annotated with @FixedRate('10m') @InitialDelay('1m') executes every ten minutes, but it will wait one minute before the first execution.

All jobs are disabled by default in the function and cli environments, but can be triggered individually by calling jobManager.forceRun("job-name").

2. Installation

Micronaut Worker is available in Maven Central.

Gradle Installation
repositories {
    mavenCentral()
}

dependencies {
    // minimal dependency with local queue and executor
    // select some of the following dependencies to enable more features
    implementation("com.agorapulse:micronaut-worker:2.7.3")

    // to enable /jobs endpoint
    implementation("com.agorapulse:micronaut-worker-management:2.7.3")

    // to enable AWS SDK v1 SQS queues integration
    implementation("com.agorapulse:micronaut-worker-queues-sqs-v1:2.7.3")

    // to enable AWS SDK v2 SQS queues integration
    implementation("com.agorapulse:micronaut-worker-queues-sqs-v2:2.7.3")

    // to enable running jobs as CLI apps
    implementation("com.agorapulse:micronaut-worker-runner:2.7.3")

    // to enable Redis leader/follower capabilities
    implementation("com.agorapulse:micronaut-worker-executor-redis:2.7.3")

    // to enable Redis queues integration
    implementation("com.agorapulse:micronaut-worker-queues-redis:2.7.3")
}

3. Usage

3.1. Scheduling

The basic usage of this library is to provide an easy way to run scheduled tasks in a distributed environment.

3.1.1. Cron Jobs

To create a scheduled CRON job, annotate a bean method with @Cron containing the CRON definition string.

Cron Job
@Cron('0 0 0/1 ? * *')
public void job() {
    // your code here
}

3.1.2. Fixed Delay Jobs

To create a fixed delay job, annotate a bean method with @FixedDelay containing the duration string.

Fixed Delay Job
@FixedDelay('10m')
public void job() {
    // your code here
}

3.1.3. Fixed Rate Jobs

To create a fixed delay job, annotate a bean method with @FixedRate containing the duration string.

Fixed Rate Job
@FixedRate('10m')
public void job() {
    // your code here
}

3.1.4. Initial Delay Jobs

To create a job with an initial delay, annotate a bean method with @InitialDelay containing the duration string. This annotation can be combined with @FixedRate and @FixedDelay annotations.

Initial Delay Job
@InitialDelay('10m')
public void initialDelay() {
    // your code here
}

@InitialDelay('10m') @FixedRate('5m')
public void fixedRate() {
    // your code here
}

@InitialDelay('10m') @FixedDelay('15m')
public void fixedDelay() {
    // your code here
}

3.2. Concurrency Management

You can limit the number of parallel executions with the @Concurrency annotation. The @Consecutive annotation is an alias for @Concurrency(1) and disables parallel execution completely.

Fixed Rate Job Running at Most Five Parallel Tasks
@Concurrency(5)
@FixedRate('10m')
public void job() {
    // your code here
}

3.3. Fork Options

You can spawn the method execution multiple times in parallel in a single server instance with the @Fork annotation.

Fixed Rate Job Running at Five Parallel Tasks
@Fork(5)
@FixedRate('10m')
public void job() {
    // your code here
}

You should consider using your own scheduler with the pool of the matching size:

application.yml
micronaut:
  executors:
    test-job:                                                                           (1)
      core-pool-size: 5                                                                 (2)
worker:
  jobs:
    test-job:                                                                           (3)
      scheduler: test-job                                                               (4)
1 The name of the new custom scheduler
2 The size of the pool should be the same as the fork value
3 The name of the job
4 The scheduler definition using the name of the executor declared above
Each job can define it’s owns scheduler name using scheduler annotation or configuration value but to prevent multiple jobs blocking each others execution, it’s recommended to keep the default value. Keeping the default value will create separate executor for each job.
For the consumer jobs, the messages are consumed synchronously so if you want to benefit from @Fork execution then keep the number of maxMessages to the default value 1.

3.4. Distributed Jobs

The worker library excels in the distributed jobs area. You can run jobs only on the leader server or only on the followers or with the given concurrency level.

3.4.1. Leaders and Followers Jobs

Micronaut Worker can help you to run jobs in distributed environments. You can choose to run the code only on the leader server or only on the followers.

Leader Only Job
@LeaderOnly
@Cron('0 0 0/1 ? * *')
public void job() {
    // your code here
}
Follower Only Job
@FollowerOnly
@FixedRate('10m')
public void job() {
    // your code here
}

3.4.2. Jobs using Queue

In a distributed environment, tasks often process messages using queues. In Micronaut Worker, you have three type of queue related jobs - producers, consumers and pipes - depending on the number of arguments they have and their return value.

Quick Start

You can use two conventional annotations @QueueProducer and @QueueConsumer to publish and consume messages from a queue.

Queue Producer and Consumer
public record Message(String message) { }                                           (1)

@Cron('0 0 0/1 ? * *')                                                              (2)
@QueueProducer("my-queue")                                                          (3)
public Flux<Message> produceToMyQueue() {                                           (4)
    return Flux.just("Hello", "World").map(Message::new);
}

@QueueConsumer("my-queue")                                                          (5)
public void listenToMyQueue(Message message) {                                      (6)
    // your code here
}
1 Two jobs will communicate to each other using the given record object
2 The producer jobs must have some scheduled trigger associated with them, for example cron or fixed rate
3 Use the @QueueProducer annotation with the name of the queue to publish messages
4 The producer job must return some value, ideally a Publisher of given messages
5 Use the @QueueConsumer annotation with the name of the queue to consume messages
6 The consumer job must have a single parameter of the same type as the producer job returns
The value for @Fork is the same as the number of maxMessages for @QueueConsumer annotation. The value of waitingTime in @QueueConsumer is the same as the associated @FixedRate value.
Advanced Usage

The name of the queue can be customised using @Consumes and @Produces annotations, otherwise the name is derived from the simple name of the class with any Job, Consumer, or Producer suffix (in this order recursively). If the micronaut.application.name property is set, the extracted name is prefixed with the application name followed by an underscore. For example, with micronaut.application.name set to myapp, MyOwnConsumerJob will have the default queue name myapp_MyOwn.

Producer

Producer jobs return a value, usually a Publisher, collection of objects, or a single object. Producer jobs are always run only on the leader server.

Producer Job
@InitialDelay("50ms")
public Flux<String> hello() {
    return Flux.just("Hello", "World");
}
If the producer job returns a Publisher, the messages can be batched if underlying implementation supports it. At the moment, only SQS AWS v2 implementation supports batching.
Consumer

Consumer jobs take a single parameter. They are usually a @FixedRate job waiting for a message from a queue. Messages can be sent into the queue from external systems, a producer job, or using JobManager#enqueue method.

Consumer Job
@FixedRate("100ms") @InitialDelay("100ms")
public void listen(String word) {
    words.add(word);
}
Pipe

Pipe jobs are a combination of producer and consumer jobs. They usually use @Consumes and @Produces to specify the source, and the destination name of the queue.

Pipe Job
@Named("my-pipe")
@FixedRate("50ms")
@Consumes("AnyWords")
@Produces("UpperWords")
public String pipe(String message) {
    return message.toUpperCase();
}

As with a consumer job, you can use JobManager#enqueue to send messages to the job.

Send Message to Job
jobManager.enqueue("my-pipe", "hello");

3.5. Configuration

3.5.1. General Settings

You can disable all jobs by setting worker.enabled to false:

Disabling All Jobs
worker:
  enabled: false
All jobs are disabled for the test and function environments.

You can set the default queue type using the worker.queue-type configuration property. This is convenient for local development to ensure your application is running against the local implementation.

Setting the Default Queue Type
worker:
  queue-type: local

You can override the default scheduler (TaskExecutors.SCHEDULED) by setting the worker.scheduler property.

Setting the Default Scheduler
worker:
  scheduler: virtual
You can let your jobs executed using virtual threads by using virtual executor.

3.5.2. Job Configuration

Anything you can configure using annotations can be configured externally. The name of the job for classes having just a single job method is the name of the class hyphenated, e.g., sample-job for a class SampleJob. If the class contains more than one job method, jobs are created for each method, and the name contains both the simple class name and the name of the method, e.g., sample-job-method-one for SampleJob#methodOne. You can override the default name by using @Named("another-name"). The custom name must already be hyphenated.

You can individually disable specific jobs:

Disabling Single Job
worker:
  jobs:
    sample-job:
      enabled: false

You can even change the type of the job:

Switch to Cron Job
worker:
  jobs:
    sample-job:
      enabled: true
      cron: '0 0 0/1 ? * *'
You can only use one of cron, fixed-delay, and fixed-rate settings. If more than one is used, the first of cron, fixed-delay or fixed-rate is selected in this particular order. You can use initial-delay either individually or with fixed-delay or fixed-rate settings.

You can change the concurrency level and leader/follower execution:

Concurrency Selection
worker:
  jobs:
    sample-job:
      enabled: true
      follower-only: true
      concurrency: 10

You can configure the consumer and producer queues for distributed jobs.

Queues Customisation
worker:
  jobs:
    sample-job:
      enabled: true
      consumer:
        queue-name: OtherQueue
        queue-type: local
        max-messages: 100
        waiting-time: 100ms
      producer:
        queue-name: Firehose
        queue-type: local

3.6. Events

There are currently three events being fired:

  • JobExecutionStartedEvent - fired before executing the job

  • JobExecutionFinishedEvent - fired after execution

  • JobExecutionResultEvent - fired after execution of a producer or pipe job

The JobExecutionResultEvent contains a reference to the result of the execution. Any modifications of the result may cause unexpected behavior.

The basic example (already present in the codebase) is simple logging:

Logging Job Execution using Events
package com.agorapulse.worker.event;

import com.agorapulse.worker.Job;
import io.micronaut.runtime.event.annotation.EventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.inject.Singleton;

import java.util.Optional;

@Singleton
public class JobEventsLogger {

    private static final Logger LOGGER = LoggerFactory.getLogger(Job.class);

    @EventListener
    void onJobExecutionStarted(JobExecutionStartedEvent event) {
        if (LOGGER.isDebugEnabled()) {
            Optional<Object> message = event.getMessage();
            if (message.isPresent()) {
                LOGGER.debug("Starting job {}#{} with message {}", event.getName(), event.getId(), message.get());
            } else {
                LOGGER.debug("Starting job {}#{}", event.getName(), event.getId());
            }
        }
    }

    @EventListener
    void onJobExecutionResult(JobExecutionResultEvent event) {
        if (LOGGER.isDebugEnabled()) {
            Object result = event.getResult();
            if (result != null) {
                LOGGER.debug("Job {}#{} emitted result {}", event.getName(), event.getId(), result);
            } else if (LOGGER.isTraceEnabled()){
                LOGGER.trace("No results emitted from job {}#{}", event.getName(), event.getId());
            }
        }
    }

    @EventListener
    void onJobExecutionFinished(JobExecutionFinishedEvent event) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Finished executing job {}#{} in {}", event.getName(), event.getStatus().getId(), event.getStatus().getHumanReadableDuration());
        }
    }

}
If Micronaut Snitch is present on the classpath and configured with the name of the job, the snitch method is called automatically after successful execution.

3.7. CLI Runner

You can run a single job from the command line using the com.agorapulse.worker.runner.JobRunner class as the main class. The arguments are the names of the jobs to run. All other jobs are disabled, even when enabled in the configuration (see corner cases below). The application will run until all jobs are finished.

Run Job from CLI
java -cp myapp-shadow.jar com.agorapulse.worker.runner.JobRunner sample-job other-job
In some corner cases, some unrelated jobs can still be executed if they have a very short delay or frequency if they are manually enabled in the configuration. Please, prefer annotation driven jobs over configuring them manually in the configuration to avoid this issue.

3.8. Management

You can use jobs management endpoint, by default located at /jobs, to see the status of all the jobs in the application.

Jobs Response
[
    {
        "configuration": {
            "name": "sample-job",
            "enabled": true,
            "concurrency": 0,
            "leaderOnly": false,
            "followerOnly": false,
            "initialDelay": "10m",
            "scheduler": "scheduled",
            "fork": 1,
            "consumer": {
                "queueName": "Sample",
                "queueType": null,
                "maxMessages": 10,
                "waitingTime": "20s"
            },
            "producer": {
                "queueName": "Sample",
                "queueType": null
            }
        },
        "status": {
            "executionCount": 0,
            "lastTriggered": null,
            "lastFinished": null,
            "lastDuration": null,
            "lastException": null,
            "name": "sample-job",
            "lastId": null
        },
        "source": "method com.agorapulse.worker.management.SampleJob#run",
        "name": "sample-job"
    }
]

3.9. Console Integration

For security reasons, Micronaut Worker does not provide any management endpoint, but it integrates with Micronaut Console to monitor the jobs and event to trigger them manually.

An instance of JobManager aliased as jobs is added to the script bindings. There is a also variable added for each job. The name is the lower-camel-case version of the job name, e.g., sampleJob for the sample-job job.

Additional Binding into Micoronaut Console
Available Variables:

    ctx: io.micronaut.context.DefaultApplicationContext
    jobs: com.agorapulse.worker.console.ConsoleJobManager
    request: io.micronaut.http.server.netty.NettyHttpRequest
    sampleJob: com.agorapulse.worker.console.JobAccessor
    user: com.agorapulse.micronaut.console.User
The job variables are instances of JobAccessor, which also provides methods run() and enqueue(message) to let you easily trigger jobs from the console. You can also use method reconfigure(consumer) that changes the in-memory configuration of the job and reschedules it.

A simple script with just variable jobs will print the status of the current job execution. Depending on which console endpoint you choose, you get either a text or JSON summary.

Job Manager Script
jobs
Job Manager Script - Text Result
|==============================================================================================================================================|
| Name                                               | Running  | Last Triggered            | Last Finished             | Took                 |
|==============================================================================================================================================|
| sample-job                                         | 0        | null                      | null                      |                      |
|==============================================================================================================================================|
Job Manager Script - JSON Result
{
    "result": [
        {
            "configuration": {
                "name": "sample-job",
                "enabled": true,
                "concurrency": 0,
                "leaderOnly": false,
                "followerOnly": false,
                "initialDelay": "10m",
                "scheduler": "scheduled",
                "fork": 1,
                "consumer": {
                    "queueName": "Sample",
                    "queueType": null,
                    "maxMessages": 10,
                    "waitingTime": "20s"
                },
                "producer": {
                    "queueName": "Sample",
                    "queueType": null
                }
            },
            "status": {
                "executionCount": 0,
                "lastTriggered": null,
                "lastFinished": null,
                "lastDuration": null,
                "lastException": null,
                "name": "sample-job",
                "lastId": null
            },
            "source": "method com.agorapulse.worker.console.SampleJob#run",
            "name": "sample-job"
        }
    ]
}

Reconfiguring the job will try to change the configuration of the job and reschedule it if it’s still enabled.

Job Manager Script - Reconfigure
import java.time.Duration

sampleJob.reconfigure {
    enabled true
    initialDelay Duration.ofMillis(1)
}

Returning a job variable from the script will render details for that job.

Job Detail Script
sampleJob
Job Detail Script - Text Result
|==============================================================================================================================================|
| Name                                               | Running  | Last Triggered            | Last Finished             | Took                 |
|==============================================================================================================================================|
| sample-job                                         | 0        | null                      | null                      |                      |
|==============================================================================================================================================|
Job Detail Script - JSON Result
{
    "result": {
        "configuration": {
            "name": "sample-job",
            "enabled": true,
            "concurrency": 0,
            "leaderOnly": false,
            "followerOnly": false,
            "initialDelay": "10m",
            "scheduler": "scheduled",
            "fork": 1,
            "consumer": {
                "queueName": "Sample",
                "queueType": null,
                "maxMessages": 10,
                "waitingTime": "20s"
            },
            "producer": {
                "queueName": "Sample",
                "queueType": null
            }
        },
        "status": {
            "executionCount": 0,
            "lastTriggered": null,
            "lastFinished": null,
            "lastDuration": null,
            "lastException": null,
            "name": "sample-job",
            "lastId": null
        },
        "source": "method com.agorapulse.worker.console.SampleJob#run",
        "name": "sample-job"
    }
}