TransactionalWorker.java

package com.github.jonasrutishauser.transactional.event.core.store;

import java.io.IOException;
import java.io.StringReader;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.github.jonasrutishauser.transactional.event.api.EventTypeResolver;
import com.github.jonasrutishauser.transactional.event.api.context.ContextualProcessor;
import com.github.jonasrutishauser.transactional.event.api.handler.Handler;
import com.github.jonasrutishauser.transactional.event.core.PendingEvent;
import com.github.jonasrutishauser.transactional.event.core.handler.EventHandlers;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;

@ApplicationScoped
class TransactionalWorker {

    private static final Logger LOGGER = LogManager.getLogger();

    private final PendingEventStore store;
    private final HandlerProvider handlerProvider;
    private final ContextualProcessor processor;

    TransactionalWorker() {
        this(null, null, null, null, null);
    }

    @Inject
    TransactionalWorker(PendingEventStore store, @Any Instance<Handler> handlers, EventHandlers handlerExtension,
            EventTypeResolver typeResolver, ContextualProcessor processor) {
        this.store = store;
        this.handlerProvider = new HandlerProvider(handlers, handlerExtension, typeResolver);
        this.processor = processor;
    }

    @Transactional
    public void process(String eventId) {
        PendingEvent event = store.getAndLockEvent(eventId);
        processor.process(event.getId(), event.getType(), getContextProperties(event.getContext()), event.getPayload(),
                handlerProvider.handler(event.getType()));
        store.delete(event);
    }

    @Transactional
    public void processFailed(String eventId) {
        PendingEvent event = store.getAndLockEvent(eventId);
        store.updateForRetry(event);
    }

    private Properties getContextProperties(String context) {
        Properties properties = new Properties();
        if (context != null) {
            try {
                properties.load(new StringReader(context));
            } catch (IOException e) {
                LOGGER.warn("unexpected IOException while reading context", e);
            }
        }
        return properties;
    }

    private static class HandlerProvider {
        private final ConcurrentMap<String, Handler> handlerMap = new ConcurrentHashMap<>();
        private final Instance<Handler> handlers;
        private final EventHandlers handlerExtension;
        private final EventTypeResolver typeResolver;

        public HandlerProvider(Instance<Handler> handlers, EventHandlers handlerExtension,
                EventTypeResolver typeResolver) {
            this.handlers = handlers;
            this.handlerExtension = handlerExtension;
            this.typeResolver = typeResolver;
        }

        public Handler handler(String eventType) {
            return handlerMap.computeIfAbsent(eventType, this::getHandler);
        }

        private synchronized Handler getHandler(String eventType) {
            if (handlerMap.containsKey(eventType)) {
                // because this method is synchronized we can ensure that an instance is only
                // created once
                return handlerMap.get(eventType);
            }
            return handlers.select(handlerExtension.getHandlerQualifier(typeResolver, eventType)).get();
        }
    }
}