QuarkusEventExecutor.java

package com.github.jonasrutishauser.transactional.event.quarkus.concurrent;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;

import com.github.jonasrutishauser.transactional.event.api.Events;
import com.github.jonasrutishauser.transactional.event.core.concurrent.EventExecutor;

import io.quarkus.arc.DefaultBean;
import io.quarkus.virtual.threads.VirtualThreads;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;

@ApplicationScoped
class QuarkusEventExecutor implements EventExecutor {

    private final ExecutorService executor;
    private final ScheduledExecutorService scheduler;

    QuarkusEventExecutor() {
        this(null, null);
    }

    @Inject
    QuarkusEventExecutor(@Events ExecutorService executor, ScheduledExecutorService scheduler) {
        this.executor = executor;
        this.scheduler = scheduler;
    }

    @Override
    public void execute(Runnable command) {
        executor.execute(command);
    }

    @Override
    public Task schedule(Runnable command, LongSupplier intervalInMillis) {
        ScheduledTask task = new ScheduledTask(command, intervalInMillis);
        task.start();
        return task;
    }

    private class ScheduledTask implements Task {
        private final AtomicBoolean running = new AtomicBoolean(true);
        private final AtomicReference<Future<?>> future = new AtomicReference<>();
        private final Runnable command;
        private final LongSupplier intervalInMillis;

        public ScheduledTask(Runnable command, LongSupplier intervalInMillis) {
            this.command = command;
            this.intervalInMillis = intervalInMillis;
        }

        public void start() {
            if (running.get()) {
                CompletableFuture<Void> commandRun = CompletableFuture.runAsync(command,
                        runnable -> scheduler.schedule(() -> {
                            if (running.get()) {
                                runnable.run();
                            }
                        }, intervalInMillis.getAsLong(), MILLISECONDS));
                future.set(commandRun);
                commandRun.thenRun(this::start);
            }
        }

        @Override
        public void cancel() {
            running.set(false);
            future.get().cancel(true);
        }
    }

    @Dependent
    static class DefaultExecutorService {
        private final ExecutorService executor;

        @Inject
        DefaultExecutorService(@VirtualThreads ExecutorService executor) {
            this.executor = executor;
        }

        @Events
        @Produces
        @DefaultBean
        ExecutorService getExecutor() {
            return executor;
        }
    }
}