TransactionalPublisher.java

package com.github.jonasrutishauser.transactional.event.core;

import static jakarta.transaction.Transactional.TxType.MANDATORY;
import static java.time.LocalDateTime.now;

import java.io.IOException;
import java.io.StringWriter;
import java.time.Instant;
import java.util.Properties;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.github.jonasrutishauser.transactional.event.api.context.ContextualPublisher;
import com.github.jonasrutishauser.transactional.event.api.monitoring.PublishingEvent;

import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.event.Event;
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;

@Dependent
class TransactionalPublisher implements ContextualPublisher {

    static final String DELAYED_UNTIL_KEY = "___delayed_until___";

    private static final Logger LOGGER = LogManager.getLogger();

    private final PublishedEvents publishedEvents;
    private final Event<PublishingEvent> publishingEvent;

    TransactionalPublisher() {
        this(null, null);
    }

    @Inject
    TransactionalPublisher(PublishedEvents publishedEvents, Event<PublishingEvent> publishingEvent) {
        this.publishedEvents = publishedEvents;
        this.publishingEvent = publishingEvent;
    }

    @Override
    @Transactional(MANDATORY)
    public void publish(String id, String type, Properties context, String payload) {
        Instant delayedUntil = getAndRemoveDelayedUntil(context);
        PendingEvent pendingEvent = new PendingEvent(id, type, getContextString(context), payload, now(), delayedUntil);
        publishedEvents.add(pendingEvent);
        publishingEvent.fire(new PublishingEvent(id));
    }

    private Instant getAndRemoveDelayedUntil(Properties context) {
        Object delayedUntil = context.remove(DELAYED_UNTIL_KEY);
        if (delayedUntil instanceof Instant) {
            return (Instant) delayedUntil;
        }
        return null;
    }

    private String getContextString(Properties context) {
        if (context.isEmpty()) {
            return null;
        }
        try (StringWriter writer = new StringWriter()) {
            context.store(writer, null);
            return writer.getBuffer().substring(writer.getBuffer().indexOf("\n"));
        } catch (IOException e) {
            LOGGER.warn("unexpected IOException while writing context", e);
            return null;
        }
    }

}