PendingEventStore.java
package com.github.jonasrutishauser.transactional.event.core.store;
import static jakarta.enterprise.event.TransactionPhase.BEFORE_COMPLETION;
import static jakarta.interceptor.Interceptor.Priority.LIBRARY_AFTER;
import static jakarta.transaction.Transactional.TxType.MANDATORY;
import static java.lang.Math.min;
import static java.sql.Statement.SUCCESS_NO_INFO;
import static java.sql.Types.VARCHAR;
import static java.util.Collections.emptySet;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.function.IntPredicate;
import javax.sql.DataSource;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.github.jonasrutishauser.transactional.event.api.Configuration;
import com.github.jonasrutishauser.transactional.event.api.Events;
import com.github.jonasrutishauser.transactional.event.api.monitoring.ProcessingDeletedEvent;
import com.github.jonasrutishauser.transactional.event.api.monitoring.ProcessingUnblockedEvent;
import com.github.jonasrutishauser.transactional.event.api.store.BlockedEvent;
import com.github.jonasrutishauser.transactional.event.api.store.EventStore;
import com.github.jonasrutishauser.transactional.event.api.store.QueryAdapter;
import com.github.jonasrutishauser.transactional.event.core.PendingEvent;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Event;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;
@ApplicationScoped
class PendingEventStore implements EventStore {
private static final Logger LOGGER = LogManager.getLogger();
private final Clock clock;
private final Configuration configuration;
private final DataSource dataSource;
private final QueryAdapterFactory queryAdapterFactory;
private final LockOwner lockOwner;
private String insertSQL;
private String readSQL;
private String deleteSQL;
private String deleteBlockedSQL;
private String updateSQL;
private String updateSQLwithLockOwner;
private String aquireSQL;
private String readBlockedSQL;
private String readBlockedForUpdateSQL;
private final Event<ProcessingUnblockedEvent> unblockedEvent;
private final Event<ProcessingDeletedEvent> deletedEvent;
PendingEventStore() {
this(null, null, null, null, null, null);
}
@Inject
PendingEventStore(Configuration configuration, @Events DataSource dataSource,
QueryAdapterFactory queryAdapterFactory, LockOwner lockOwner,
Event<ProcessingUnblockedEvent> unblockedEvent, Event<ProcessingDeletedEvent> deletedEvent) {
this(Clock.systemUTC(), configuration, dataSource, queryAdapterFactory, lockOwner, unblockedEvent,
deletedEvent);
}
PendingEventStore(Clock clock, Configuration configuration, @Events DataSource dataSource,
QueryAdapterFactory queryAdapterFactory, LockOwner lockOwner,
Event<ProcessingUnblockedEvent> unblockedEvent, Event<ProcessingDeletedEvent> deletedEvent) {
this.clock = clock;
this.configuration = configuration;
this.dataSource = dataSource;
this.queryAdapterFactory = queryAdapterFactory;
this.lockOwner = lockOwner;
this.unblockedEvent = unblockedEvent;
this.deletedEvent = deletedEvent;
}
@PostConstruct
void initSqlQueries() {
QueryAdapter adapter = queryAdapterFactory.getQueryAdapter();
insertSQL = "INSERT INTO " + configuration.getTableName()
+ " (id, event_type, context, payload, published_at, tries, lock_owner, locked_until) VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
readSQL = "SELECT * FROM " + configuration.getTableName() + " WHERE id=? FOR UPDATE";
deleteSQL = "DELETE FROM " + configuration.getTableName() + " WHERE id=? AND lock_owner=?";
deleteBlockedSQL = "DELETE FROM " + configuration.getTableName() + " WHERE id=? AND locked_until="
+ Long.MAX_VALUE;
updateSQL = "UPDATE " + configuration.getTableName() + " SET tries=?, lock_owner=?, locked_until=? WHERE id=?";
updateSQLwithLockOwner = updateSQL + " AND lock_owner=?";
aquireSQL = adapter.fixLimits(adapter.addSkipLocked("SELECT id, tries FROM " + configuration.getTableName()
+ " WHERE locked_until<=? {LIMIT ?} FOR UPDATE", configuration.getMaxAquire()));
String readBlocked = "SELECT * FROM " + configuration.getTableName() + " WHERE locked_until=" + Long.MAX_VALUE;
readBlockedSQL = adapter.fixLimits(readBlocked + " {LIMIT ?}");
readBlockedForUpdateSQL = adapter.addSkipLocked(readBlocked + " AND id=? FOR UPDATE");
}
@Override
@Transactional
public boolean unblock(String eventId) {
boolean result;
try (Connection connection = dataSource.getConnection();
PreparedStatement readStatement = connection.prepareStatement(readBlockedForUpdateSQL);
ResultSet resultSet = executeQuery(readStatement, eventId);
PreparedStatement updateStatement = connection.prepareStatement(updateSQL)) {
if (resultSet.next()) {
updateStatement.setInt(1, 0);
updateStatement.setNull(2, VARCHAR);
updateStatement.setLong(3, Instant.now(clock).toEpochMilli());
updateStatement.setString(4, eventId);
result = updateStatement.executeUpdate() > 0;
} else {
result = false;
}
} catch (SQLException exception) {
LOGGER.error("failed to unblock event '{}'", eventId, exception);
result = false;
}
if (result) {
unblockedEvent.fire(new ProcessingUnblockedEvent(eventId));
}
return result;
}
@Override
@Transactional
public boolean delete(String eventId) {
boolean result;
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(deleteBlockedSQL)) {
statement.setString(1, eventId);
statement.execute();
result = statement.getUpdateCount() > 0;
} catch (SQLException exception) {
LOGGER.error("failed to unblock event '{}'", eventId, exception);
result = false;
}
if (result) {
deletedEvent.fire(new ProcessingDeletedEvent(eventId));
}
return result;
}
@Override
@Transactional
public Collection<BlockedEvent> getBlockedEvents(int maxElements) {
Collection<BlockedEvent> result = new ArrayList<>();
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(readBlockedSQL);
ResultSet resultSet = executeQuery(statement, maxElements)) {
while (resultSet.next()) {
result.add(new BlockedEvent(resultSet.getString("id"), //
resultSet.getString("event_type"), //
resultSet.getString("payload"), //
resultSet.getTimestamp("published_at").toLocalDateTime()));
}
} catch (SQLException exception) {
LOGGER.error("failed to read blocked events", exception);
}
return result;
}
@Transactional(MANDATORY)
void store(@Observes(during = BEFORE_COMPLETION) @Priority(LIBRARY_AFTER) EventsPublished events) {
String errorMessage = "failed to insert pending events";
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(insertSQL)) {
for (PendingEvent event : events.getEvents()) {
statement.setString(1, event.getId());
statement.setString(2, event.getType());
statement.setString(3, event.getContext());
statement.setString(4, event.getPayload());
statement.setTimestamp(5, Timestamp.valueOf(event.getPublishedAt()));
statement.setInt(6, event.getTries());
statement.setString(7, lockOwner.getId());
statement.setLong(8,
event.getDelayedUntil().map(Instant::toEpochMilli).orElseGet(lockOwner::getUntilToProcess));
statement.addBatch();
}
int[] result = statement.executeBatch();
if (result.length != events.getEvents().size() || Arrays.stream(result).anyMatch(updateCountIsNot(1))) {
LOGGER.error("failed to insert pending events (results: {})", result);
throw new IllegalStateException(errorMessage);
}
} catch (SQLException exception) {
LOGGER.error(errorMessage, exception);
throw new IllegalStateException(errorMessage, exception);
}
}
@Transactional(value = MANDATORY, dontRollbackOn = NoSuchElementException.class)
public PendingEvent getAndLockEvent(String id) {
String errorMessage = "failed to read pending event with id " + id;
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(readSQL);
ResultSet resultSet = executeQueryWithTimeout(statement, id, 1)) {
if (!resultSet.next()) {
throw new NoSuchElementException(errorMessage);
}
String owner = resultSet.getString("lock_owner");
long lockedUntil = resultSet.getLong("locked_until");
if (!lockOwner.isOwningForProcessing(owner, lockedUntil)) {
throw new ConcurrentModificationException("No longer the owner");
}
return new PendingEvent(id, resultSet.getString("event_type"), resultSet.getString("context"),
resultSet.getString("payload"), resultSet.getTimestamp("published_at").toLocalDateTime(),
null, resultSet.getInt("tries"));
} catch (SQLException exception) {
LOGGER.error(errorMessage, exception);
throw new IllegalStateException(errorMessage);
}
}
@Transactional(MANDATORY)
public void delete(PendingEvent event) {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(deleteSQL)) {
statement.setString(1, event.getId());
statement.setString(2, lockOwner.getId());
statement.setQueryTimeout(10);
if (statement.executeUpdate() < 1) {
throw new NoSuchElementException("failed to delete pending event with id " + event.getId());
}
} catch (SQLException exception) {
String errorMessage = "failed to delete pending event with id " + event.getId();
LOGGER.error(errorMessage, exception);
throw new IllegalStateException(errorMessage);
}
}
@Transactional(MANDATORY)
public void updateForRetry(PendingEvent event) {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(updateSQLwithLockOwner)) {
statement.setInt(1, event.getTries() + 1);
statement.setNull(2, VARCHAR);
statement.setLong(3, lockOwner.getUntilForRetry(event.getTries(), event.getId()));
statement.setString(4, event.getId());
statement.setString(5, lockOwner.getId());
statement.setQueryTimeout(10);
if (statement.executeUpdate() < 1) {
throw new NoSuchElementException("failed to update pending event with id " + event.getId());
}
} catch (SQLException exception) {
String errorMessage = "failed to update pending event with id " + event.getId();
LOGGER.error(errorMessage, exception);
throw new IllegalStateException(errorMessage);
}
}
@Transactional
public Set<String> aquire(int maxAquire) {
Set<String> result = new HashSet<>();
int limit = min(maxAquire, configuration.getMaxAquire());
if (limit < 1) {
return emptySet();
}
try (Connection connection = dataSource.getConnection();
PreparedStatement aquireStatement = connection.prepareStatement(aquireSQL);
ResultSet resultSet = executeQuery(aquireStatement, lockOwner.getMinUntilForAquire(), limit);
PreparedStatement updateStatement = connection.prepareStatement(updateSQL)) {
while (limit-- > 0 && resultSet.next()) {
result.add(resultSet.getString("id"));
updateStatement.setInt(1, resultSet.getInt("tries"));
updateStatement.setString(2, lockOwner.getId());
updateStatement.setLong(3, lockOwner.getUntilToProcess());
updateStatement.setString(4, resultSet.getString("id"));
updateStatement.addBatch();
}
if (!result.isEmpty()) {
int[] res = updateStatement.executeBatch();
if (res.length != result.size() || Arrays.stream(res).anyMatch(updateCountIsNot(1))) {
LOGGER.warn("failed to aquire pending events (update failed; results: {})", res);
result = emptySet();
}
}
} catch (SQLException exception) {
LOGGER.warn("failed to aquire pending events", exception);
result = emptySet();
}
return result;
}
private IntPredicate updateCountIsNot(int expected) {
return count -> count != expected && count != SUCCESS_NO_INFO;
}
private ResultSet executeQuery(PreparedStatement statement, String stringParam) throws SQLException {
statement.setString(1, stringParam);
return statement.executeQuery();
}
private ResultSet executeQueryWithTimeout(PreparedStatement statement, String stringParam, int seconds)
throws SQLException {
statement.setString(1, stringParam);
statement.setQueryTimeout(seconds);
return statement.executeQuery();
}
private ResultSet executeQuery(PreparedStatement statement, int intParam) throws SQLException {
statement.setInt(1, intParam);
return statement.executeQuery();
}
private ResultSet executeQuery(PreparedStatement statement, long param1, int param2) throws SQLException {
statement.setLong(1, param1);
if (statement.getParameterMetaData().getParameterCount() > 1) {
statement.setInt(2, param2);
} else {
statement.setFetchSize(1);
}
return statement.executeQuery();
}
}