LockOwner.java
package com.github.jonasrutishauser.transactional.event.core.store;
import static com.github.jonasrutishauser.transactional.event.core.random.Random.randomId;
import java.time.Clock;
import java.time.Instant;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.github.jonasrutishauser.transactional.event.api.monitoring.ProcessingBlockedEvent;
import com.github.jonasrutishauser.transactional.event.api.ProcessingStrategy;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Event;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import jakarta.inject.Named;
@ApplicationScoped
class LockOwner {
private static final Logger LOGGER = LogManager.getLogger();
private final Clock clock;
private final String id;
private final Event<ProcessingBlockedEvent> processingBlockedEvent;
private final ProcessingStrategy processingStrategy;
LockOwner() {
this(null, null);
}
@Inject
public LockOwner(Event<ProcessingBlockedEvent> processingBlockedEvent, ProcessingStrategy processingStrategy) {
this(Clock.systemUTC(), randomId(), processingBlockedEvent, processingStrategy);
}
LockOwner(Clock clock, String id, Event<ProcessingBlockedEvent> processingBlockedEvent,
ProcessingStrategy processingStrategy) {
LOGGER.info("using lock id: {}", id);
this.id = id;
this.clock = clock;
this.processingBlockedEvent = processingBlockedEvent;
this.processingStrategy = processingStrategy;
}
@Produces
@Named("lockOwner.id")
public String getId() {
return id;
}
public long getUntilToProcess() {
return Instant.now(clock).plus(processingStrategy.processingLockDuration()).toEpochMilli();
}
public long getUntilForRetry(int tries, String eventId) {
if (tries > processingStrategy.maxTries()) {
maxAttemptsReached(eventId);
return Long.MAX_VALUE;
}
return Instant.now(clock).plus(processingStrategy.waitDurationForRetry(tries)).toEpochMilli();
}
protected void maxAttemptsReached(String eventId) {
LOGGER.info("max attempts used, event with id '{}' will be blocked", eventId);
processingBlockedEvent.fire(new ProcessingBlockedEvent(eventId));
}
public long getMinUntilForAquire() {
return Instant.now(clock).toEpochMilli();
}
public boolean isOwningForProcessing(String owner, long lockedUntil) {
return id.equals(owner) && lockedUntil > Instant.now(clock).toEpochMilli();
}
}