InstrumentedProcessor.java
package com.github.jonasrutishauser.transactional.event.core.opentelemetry;
import static io.opentelemetry.api.trace.SpanKind.CONSUMER;
import static io.opentelemetry.api.trace.StatusCode.ERROR;
import static java.nio.charset.StandardCharsets.UTF_8;
import static jakarta.interceptor.Interceptor.Priority.LIBRARY_BEFORE;
import java.util.Properties;
import jakarta.annotation.Priority;
import jakarta.decorator.Decorator;
import jakarta.decorator.Delegate;
import jakarta.enterprise.inject.Any;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import com.github.jonasrutishauser.transactional.event.api.Events;
import com.github.jonasrutishauser.transactional.event.api.context.ContextualProcessor;
import com.github.jonasrutishauser.transactional.event.api.handler.Handler;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
@Decorator
@Priority(LIBRARY_BEFORE)
public class InstrumentedProcessor implements ContextualProcessor {
private final ContextualProcessor delegate;
private final Tracer tracer;
private final TextMapPropagator propagator;
private final String lockOwnerId;
private final TextMapGetter<Properties> getter = new TextMapGetter<Properties>() {
@Override
public String get(Properties carrier, String key) {
return carrier.getProperty(key);
}
@Override
public Iterable<String> keys(Properties carrier) {
return carrier.stringPropertyNames();
}
};
@Inject
InstrumentedProcessor(@Delegate @Any ContextualProcessor delegate, @Events Tracer tracer,
@Events TextMapPropagator propagator, @Named("lockOwner.id") String lockOwnerId) {
this.delegate = delegate;
this.tracer = tracer;
this.propagator = propagator;
this.lockOwnerId = lockOwnerId;
}
@Override
public void process(String id, String type, Properties context, String payload, Handler handler) {
Context extractedContext = propagator.extract(Context.current(), context, getter);
SpanBuilder spanBuilder = tracer.spanBuilder(type + " process") //
.setSpanKind(CONSUMER) //
.setAttribute("messaging.system", "transactional-event") //
.setAttribute("messaging.destination", type) //
.setAttribute("messaging.message_id", id) //
.setAttribute("messaging.message_payload_size_bytes", payload.getBytes(UTF_8).length) //
.setAttribute("messaging.operation", "process") //
.setAttribute("messaging.consumer_id", lockOwnerId);
if (extractedContext != context) {
spanBuilder.addLink(Span.fromContext(extractedContext).getSpanContext());
}
Span span = spanBuilder.startSpan();
try (Scope unused = span.makeCurrent()) {
delegate.process(id, type, context, payload, handler);
} catch (RuntimeException e) {
span.setStatus(ERROR, "Processing failed");
span.recordException(e, Attributes.builder().put("exception.escaped", true).build());
throw e;
} finally {
span.end();
}
}
}