package org.springframework.cloud.sleuth.instrument.messaging;

import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.function.Function;
import org.aspectj.weaver.model.AsmRelationshipUtils;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.cloud.sleuth.propagation.Propagator;
import org.springframework.cloud.stream.binder.BinderTypeRegistry;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.messaging.support.ExecutorChannelInterceptor;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.util.ClassUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:BOOT-INF/lib/spring-cloud-sleuth-instrumentation-3.0.4.jar:org/springframework/cloud/sleuth/instrument/messaging/TracingChannelInterceptor.class */
public final class TracingChannelInterceptor implements ExecutorChannelInterceptor, ApplicationContextAware {
    public static final String STREAM_DIRECT_CHANNEL = "org.springframework.cloud.stream.messaging.DirectWithAttributesChannel";
    private static final String REMOTE_SERVICE_NAME = "broker";
    private static final Class<?> directWithAttributesChannelClass;
    private final ThreadLocalSpan threadLocalSpan = new ThreadLocalSpan();
    private final Tracer tracer;
    private final Propagator.Setter<MessageHeaderAccessor> injector;
    private final Propagator.Getter<MessageHeaderAccessor> extractor;
    private final MessageSpanCustomizer messageSpanCustomizer;
    private final Propagator propagator;
    private final Function<String, String> remoteServiceNameMapper;
    private ApplicationContext applicationContext;
    private static final LogAccessor log = new LogAccessor((Class<?>) TracingChannelInterceptor.class);
    private static final boolean hasDirectChannelClass = ClassUtils.isPresent("org.springframework.integration.channel.DirectChannel", null);
    private static final boolean hasBinderTypeRegistry = ClassUtils.isPresent("org.springframework.cloud.stream.binder.BinderTypeRegistry", null);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/spring-cloud-sleuth-instrumentation-3.0.4.jar:org/springframework/cloud/sleuth/instrument/messaging/TracingChannelInterceptor$SpanAndScope.class */
    public static class SpanAndScope {
        final Span span;
        final Tracer.SpanInScope scope;

        SpanAndScope(Span span, Tracer.SpanInScope spanInScope) {
            this.span = span;
            this.scope = spanInScope;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/spring-cloud-sleuth-instrumentation-3.0.4.jar:org/springframework/cloud/sleuth/instrument/messaging/TracingChannelInterceptor$ThreadLocalSpan.class */
    public static class ThreadLocalSpan {
        private static final LogAccessor log = new LogAccessor((Class<?>) ThreadLocalSpan.class);
        private final ThreadLocal<SpanAndScope> threadLocalSpan = new ThreadLocal<>();
        private final LinkedBlockingDeque<SpanAndScope> spans = new LinkedBlockingDeque<>();

        ThreadLocalSpan() {
        }

        void set(SpanAndScope spanAndScope) {
            SpanAndScope spanAndScope2 = this.threadLocalSpan.get();
            if (spanAndScope2 != null) {
                this.spans.addFirst(spanAndScope2);
            }
            this.threadLocalSpan.set(spanAndScope);
        }

        SpanAndScope get() {
            return this.threadLocalSpan.get();
        }

        void remove() {
            this.threadLocalSpan.remove();
            if (this.spans.isEmpty()) {
                return;
            }
            try {
                SpanAndScope removeFirst = this.spans.removeFirst();
                log.debug(() -> {
                    return "Took span [" + removeFirst + "] from thread local";
                });
                this.threadLocalSpan.set(removeFirst);
            } catch (NoSuchElementException e) {
                log.trace(e, () -> {
                    return "Failed to remove a span from the queue";
                });
            }
        }
    }

    public TracingChannelInterceptor(Tracer tracer, Propagator propagator, Propagator.Setter<MessageHeaderAccessor> setter, Propagator.Getter<MessageHeaderAccessor> getter, Function<String, String> function, MessageSpanCustomizer messageSpanCustomizer) {
        this.tracer = tracer;
        this.propagator = propagator;
        this.injector = setter;
        this.extractor = getter;
        this.remoteServiceNameMapper = function;
        this.messageSpanCustomizer = messageSpanCustomizer;
    }

    @Override // org.springframework.context.ApplicationContextAware
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public Message<?> preSend(Message<?> message, MessageChannel messageChannel) {
        Message<?> message2 = getMessage(message);
        log.debug(() -> {
            return "Received a message in pre-send " + message2;
        });
        MessageHeaderAccessor mutableHeaderAccessor = mutableHeaderAccessor(message2);
        Span.Builder extract = this.propagator.extract(mutableHeaderAccessor, this.extractor);
        MessageHeaderPropagatorSetter.removeAnyTraceHeaders(mutableHeaderAccessor, this.propagator.fields());
        Span start = this.messageSpanCustomizer.customizeSend(extract.kind(Span.Kind.PRODUCER), message, messageChannel).remoteServiceName(toRemoteServiceName(mutableHeaderAccessor, this.remoteServiceNameMapper, this.applicationContext)).start();
        log.debug(() -> {
            return "Extracted result from headers " + start;
        });
        setSpanInScope(start);
        this.propagator.inject(start.context(), mutableHeaderAccessor, this.injector);
        log.debug(() -> {
            return "Created a new span in pre send " + start;
        });
        Message<?> outputMessage = outputMessage(message, message2, mutableHeaderAccessor);
        if (isDirectChannel(messageChannel)) {
            beforeHandle(outputMessage, messageChannel, null);
        }
        return outputMessage;
    }

    private void setSpanInScope(Span span) {
        this.threadLocalSpan.set(new SpanAndScope(span, this.tracer.withSpan(span)));
        log.debug(() -> {
            return "Put span in scope " + span;
        });
    }

    private static String toRemoteServiceName(MessageHeaderAccessor messageHeaderAccessor, Function<String, String> function, ApplicationContext applicationContext) {
        Iterator it = messageHeaderAccessor.getMessageHeaders().keySet().iterator();
        while (it.hasNext()) {
            String apply = function.apply((String) it.next());
            if (StringUtils.hasText(apply)) {
                return apply;
            }
        }
        if (!hasBinderTypeRegistry || applicationContext == null) {
            return REMOTE_SERVICE_NAME;
        }
        Iterator it2 = ((BinderTypeRegistry) applicationContext.getBean(BinderTypeRegistry.class)).getAll().keySet().iterator();
        while (it2.hasNext()) {
            String apply2 = function.apply((String) it2.next());
            if (StringUtils.hasText(apply2)) {
                return apply2;
            }
        }
        return REMOTE_SERVICE_NAME;
    }

    private Message<?> outputMessage(Message<?> message, Message<?> message2, MessageHeaderAccessor messageHeaderAccessor) {
        MessageHeaderAccessor mutableHeaderAccessor = mutableHeaderAccessor(message);
        if (!(message instanceof ErrorMessage)) {
            mutableHeaderAccessor.copyHeaders(messageHeaderAccessor.getMessageHeaders());
            return new GenericMessage(message2.getPayload(), isWebSockets(mutableHeaderAccessor) ? mutableHeaderAccessor.getMessageHeaders() : new MessageHeaders(mutableHeaderAccessor.getMessageHeaders()));
        }
        ErrorMessage errorMessage = (ErrorMessage) message;
        mutableHeaderAccessor.copyHeaders(MessageHeaderPropagatorSetter.propagationHeaders(messageHeaderAccessor.getMessageHeaders(), this.propagator.fields()));
        return new ErrorMessage((Throwable) errorMessage.getPayload(), isWebSockets(mutableHeaderAccessor) ? mutableHeaderAccessor.getMessageHeaders() : new MessageHeaders(mutableHeaderAccessor.getMessageHeaders()), errorMessage.getOriginalMessage());
    }

    private static boolean isWebSockets(MessageHeaderAccessor messageHeaderAccessor) {
        return messageHeaderAccessor.getMessageHeaders().containsKey("stompCommand") || messageHeaderAccessor.getMessageHeaders().containsKey("simpMessageType");
    }

    private static boolean isDirectChannel(MessageChannel messageChannel) {
        Class<?> targetClass = AopUtils.getTargetClass(messageChannel);
        return (directWithAttributesChannelClass == null || !directWithAttributesChannelClass.isAssignableFrom(targetClass)) && hasDirectChannelClass && DirectChannel.class.isAssignableFrom(targetClass);
    }

    public void afterSendCompletion(Message<?> message, MessageChannel messageChannel, boolean z, Exception exc) {
        if (isDirectChannel(messageChannel)) {
            afterMessageHandled(message, messageChannel, null, exc);
        }
        log.debug(() -> {
            return "Will finish the current span after completion " + this.tracer.currentSpan();
        });
        finishSpan(exc);
    }

    public Message<?> postReceive(Message<?> message, MessageChannel messageChannel) {
        MessageHeaderAccessor mutableHeaderAccessor = mutableHeaderAccessor(message);
        log.debug(() -> {
            return "Received a message in post-receive " + message;
        });
        Span start = this.propagator.extract(mutableHeaderAccessor, this.extractor).start();
        log.debug(() -> {
            return "Extracted result from headers " + start;
        });
        Span consumerSpanReceive = consumerSpanReceive(message, messageChannel, mutableHeaderAccessor, start);
        setSpanInScope(consumerSpanReceive);
        log.debug(() -> {
            return "Created a new span that will be injected in the headers " + consumerSpanReceive;
        });
        this.propagator.inject(consumerSpanReceive.context(), mutableHeaderAccessor, this.injector);
        log.debug(() -> {
            return "Created a new span in post receive " + consumerSpanReceive;
        });
        mutableHeaderAccessor.setImmutable();
        if (!(message instanceof ErrorMessage)) {
            return new GenericMessage(message.getPayload(), mutableHeaderAccessor.getMessageHeaders());
        }
        ErrorMessage errorMessage = (ErrorMessage) message;
        return new ErrorMessage((Throwable) errorMessage.getPayload(), mutableHeaderAccessor.getMessageHeaders(), errorMessage.getOriginalMessage());
    }

    private Span consumerSpanReceive(Message<?> message, MessageChannel messageChannel, MessageHeaderAccessor messageHeaderAccessor, Span span) {
        Span.Builder parent = this.tracer.spanBuilder().setParent(span.context());
        MessageHeaderPropagatorSetter.removeAnyTraceHeaders(messageHeaderAccessor, this.propagator.fields());
        return this.messageSpanCustomizer.customizeReceive(parent.kind(Span.Kind.CONSUMER), message, messageChannel).remoteServiceName(toRemoteServiceName(messageHeaderAccessor, this.remoteServiceNameMapper, this.applicationContext)).start();
    }

    public void afterReceiveCompletion(Message<?> message, MessageChannel messageChannel, Exception exc) {
        log.debug(() -> {
            return "Will finish the current span after receive completion " + this.tracer.currentSpan();
        });
        finishSpan(exc);
    }

    public Message<?> beforeHandle(Message<?> message, MessageChannel messageChannel, MessageHandler messageHandler) {
        MessageHeaderAccessor mutableHeaderAccessor = mutableHeaderAccessor(message);
        log.debug(() -> {
            return "Received a message in before handle " + message;
        });
        Span start = this.messageSpanCustomizer.customizeHandle(this.tracer.nextSpan(consumerSpan(message, messageChannel, mutableHeaderAccessor)), message, messageChannel).start();
        if (log.isDebugEnabled()) {
            log.debug("Created consumer span " + start);
        }
        setSpanInScope(start);
        MessageHeaderPropagatorSetter.removeAnyTraceHeaders(mutableHeaderAccessor, this.propagator.fields());
        if (log.isDebugEnabled()) {
            log.debug("Created a new span in before handle " + start);
        }
        if (message instanceof ErrorMessage) {
            return new ErrorMessage((Throwable) message.getPayload(), mutableHeaderAccessor.getMessageHeaders());
        }
        mutableHeaderAccessor.setImmutable();
        return new GenericMessage(message.getPayload(), mutableHeaderAccessor.getMessageHeaders());
    }

    private Span consumerSpan(Message<?> message, MessageChannel messageChannel, MessageHeaderAccessor messageHeaderAccessor) {
        Span.Builder extract = this.propagator.extract(messageHeaderAccessor, this.extractor);
        if (log.isDebugEnabled()) {
            log.debug("Extracted result from headers - will finish it immediately " + extract);
        }
        extract.kind(Span.Kind.CONSUMER).start();
        extract.remoteServiceName(REMOTE_SERVICE_NAME);
        Span start = this.messageSpanCustomizer.customizeHandle(extract, message, messageChannel).start();
        start.end();
        return start;
    }

    public void afterMessageHandled(Message<?> message, MessageChannel messageChannel, MessageHandler messageHandler, Exception exc) {
        log.debug(() -> {
            return "Will finish the current span after message handled " + this.tracer.currentSpan();
        });
        finishSpan(exc);
    }

    void finishSpan(Exception exc) {
        SpanAndScope spanFromThreadLocal = getSpanFromThreadLocal();
        if (spanFromThreadLocal == null) {
            return;
        }
        Span span = spanFromThreadLocal.span;
        Tracer.SpanInScope spanInScope = spanFromThreadLocal.scope;
        if (span.isNoop()) {
            log.debug(() -> {
                return "Span " + span + " is noop - will stop the scope";
            });
            spanInScope.close();
            return;
        }
        if (exc != null) {
            String message = exc.getMessage();
            if (message == null) {
                message = exc.getClass().getSimpleName();
            }
            span.tag(AsmRelationshipUtils.DECLARE_ERROR, message);
        }
        log.debug(() -> {
            return "Will finish the and its corresponding scope " + span;
        });
        span.end();
        spanInScope.close();
    }

    private SpanAndScope getSpanFromThreadLocal() {
        SpanAndScope spanAndScope = this.threadLocalSpan.get();
        log.debug(() -> {
            return "Took span [" + spanAndScope + "] from thread local";
        });
        this.threadLocalSpan.remove();
        return spanAndScope;
    }

    private static MessageHeaderAccessor mutableHeaderAccessor(Message<?> message) {
        MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
        if (accessor != null && accessor.isMutable()) {
            return accessor;
        }
        MessageHeaderAccessor mutableAccessor = MessageHeaderAccessor.getMutableAccessor(message);
        mutableAccessor.setLeaveMutable(true);
        return mutableAccessor;
    }

    private static Message<?> getMessage(Message<?> message) {
        Message<?> failedMessage;
        Object payload = message.getPayload();
        if ((payload instanceof MessagingException) && (failedMessage = ((MessagingException) payload).getFailedMessage()) != null) {
            return failedMessage;
        }
        return message;
    }

    static {
        directWithAttributesChannelClass = ClassUtils.isPresent(STREAM_DIRECT_CHANNEL, null) ? ClassUtils.resolveClassName(STREAM_DIRECT_CHANNEL, null) : null;
    }
}
