DispatcherImpl.java
package com.github.jonasrutishauser.transactional.event.core.store;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static org.eclipse.microprofile.metrics.MetricUnits.NONE;
import static org.eclipse.microprofile.metrics.MetricUnits.SECONDS;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.microprofile.metrics.annotation.Gauge;
import com.github.jonasrutishauser.transactional.event.api.Configuration;
import com.github.jonasrutishauser.transactional.event.core.PendingEvent;
import com.github.jonasrutishauser.transactional.event.core.concurrent.EventExecutor;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
class DispatcherImpl implements Dispatcher {
private static final Logger LOGGER = LogManager.getLogger();
private final Configuration configuration;
private final WorkProcessorImpl processor;
private final EventExecutor executor;
private final PendingEventStore store;
private final AtomicInteger dispatchedRunning = new AtomicInteger();
private final BlockingQueue<String> eventsToDispatch = new LinkedBlockingQueue<>();
private volatile int intervalSeconds = 30;
DispatcherImpl() {
this(null, null, null, null);
}
@Inject
DispatcherImpl(Configuration configuration, WorkProcessorImpl dispatcher, EventExecutor executor,
PendingEventStore store) {
this.configuration = configuration;
this.processor = dispatcher;
this.executor = executor;
this.store = store;
}
@PostConstruct
void initIntervalSeconds() {
intervalSeconds = configuration.getInitialDispatchInterval();
}
@Override
public void processDirect(EventsPublished events) {
for (PendingEvent event : events.getEvents()) {
if (event.getDelayedUntil().isEmpty()) {
processDirect(event);
}
}
}
private void processDirect(PendingEvent event) {
String eventId = event.getId();
if (dispatchable() > 0) {
try {
executeCounting(eventId);
} catch (RejectedExecutionException e) {
LOGGER.warn("Failed to submit event {} for processing: {}", eventId, e.getMessage());
}
} else if (eventsToDispatch.size() < 8 * configuration.getMaxAquire()) {
if (!eventsToDispatch.offer(eventId)) {
LOGGER.warn("Failed to submit event {} for processing", eventId);
}
} else {
LOGGER.warn("There are already too many events to process event {}", eventId);
}
}
@Override
public long dispatchInterval() {
if (dispatchable() <= 0) {
return configuration.getAllInUseInterval();
}
return intervalSeconds * 1000l;
}
public void schedule() {
try {
scheduleImpl();
} catch (RuntimeException e) {
intervalSeconds = min(configuration.getMaxDispatchInterval(), max(intervalSeconds * 2, 1));
throw e;
}
}
private synchronized void scheduleImpl() {
for (boolean empty = false; !empty && eventsToDispatch.size() < configuration.getMaxConcurrentDispatching();) {
Set<String> events = store.aquire(configuration.getMaxAquire());
events.forEach(eventsToDispatch::offer);
empty = events.isEmpty();
}
if (!eventsToDispatch.isEmpty()) {
intervalSeconds = 0;
} else {
intervalSeconds = min(configuration.getMaxDispatchInterval(), max(intervalSeconds * 2, 1));
}
while (dispatchable() > 0 && !eventsToDispatch.isEmpty()) {
try {
executeCounting(eventsToDispatch.take());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (RejectedExecutionException e) {
LOGGER.warn("Failed to dispatch events: {}", e.getMessage());
}
}
}
@Gauge(name = "com.github.jonasrutishauser.transaction.event.dispatched.processing",
description = "Number of dispatched events being processed", unit = NONE, absolute = true)
public int getDispatchedRunning() {
return dispatchedRunning.get();
}
@Gauge(name = "com.github.jonasrutishauser.transaction.event.dispatch.interval",
description = "Interval between lookups for events to process", unit = SECONDS, absolute = true)
public int getIntervalSeconds() {
return intervalSeconds;
}
private int dispatchable() {
return configuration.getMaxConcurrentDispatching() - dispatchedRunning.get();
}
private void executeCounting(String eventId) {
Callable<Boolean> supplier = processor.get(eventId);
try {
executor.execute(counting(() -> {
try {
if (!Boolean.TRUE.equals(supplier.call())) {
intervalSeconds = 0;
}
} catch (Exception e) {
LOGGER.catching(e);
}
}));
} catch (RejectedExecutionException e) {
dispatchedRunning.decrementAndGet();
throw e;
}
}
private Runnable counting(Runnable task) {
dispatchedRunning.incrementAndGet();
return () -> {
try {
task.run();
} finally {
dispatchedRunning.decrementAndGet();
}
};
}
}