TransactionalEventPublisher.java
package com.github.jonasrutishauser.transactional.event.core;
import static java.time.Instant.now;
import static com.github.jonasrutishauser.transactional.event.core.TransactionalPublisher.DELAYED_UNTIL_KEY;
import static com.github.jonasrutishauser.transactional.event.core.random.Random.randomId;
import static jakarta.transaction.Transactional.TxType.MANDATORY;
import java.time.temporal.TemporalAmount;
import java.util.Properties;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.github.jonasrutishauser.transactional.event.api.EventPublisher;
import com.github.jonasrutishauser.transactional.event.api.EventTypeResolver;
import com.github.jonasrutishauser.transactional.event.api.context.ContextualPublisher;
@Dependent
public class TransactionalEventPublisher implements EventPublisher {
private static final Logger LOGGER = LogManager.getLogger();
private final EventTypeResolver typeResolver;
private final Serializer eventSerializer;
private final ContextualPublisher publisher;
TransactionalEventPublisher() {
this(null, null, null);
}
@Inject
TransactionalEventPublisher(EventTypeResolver typeResolver, Serializer eventSerializer,
ContextualPublisher publisher) {
this.typeResolver = typeResolver;
this.eventSerializer = eventSerializer;
this.publisher = publisher;
}
@Override
@Transactional(MANDATORY)
public void publish(Object event) {
String id = randomId();
String type = typeResolver.resolve(event.getClass());
String payload = eventSerializer.serialize(event);
publisher.publish(id, type, new Properties(), payload);
LOGGER.debug("enqueued event '{}' with type '{}' (payload '{}')", id, type, payload);
}
@Override
@Transactional(MANDATORY)
public void publishDelayed(Object event, TemporalAmount delay) {
String id = randomId();
String type = typeResolver.resolve(event.getClass());
String payload = eventSerializer.serialize(event);
Properties context = new Properties();
context.put(DELAYED_UNTIL_KEY, now().plus(delay));
publisher.publish(id, type, context, payload);
LOGGER.debug("enqueued event '{}' with type '{}' (payload '{}') and delay {}", id, type, payload, delay);
}
}