package org.springframework.cloud.sleuth.instrument.messaging;

import java.lang.reflect.Type;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.cloud.context.scope.refresh.RefreshScopeRefreshedEvent;
import org.springframework.cloud.function.context.catalog.FunctionAroundWrapper;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.cloud.sleuth.instrument.messaging.SleuthMessagingSpan;
import org.springframework.cloud.sleuth.instrument.reactor.ReactorSleuth;
import org.springframework.cloud.sleuth.propagation.Propagator;
import org.springframework.context.ApplicationListener;
import org.springframework.core.env.Environment;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:BOOT-INF/lib/spring-cloud-sleuth-instrumentation-3.1.9.jar:org/springframework/cloud/sleuth/instrument/messaging/TraceFunctionAroundWrapper.class */
public class TraceFunctionAroundWrapper extends FunctionAroundWrapper implements ApplicationListener<RefreshScopeRefreshedEvent> {
    private static final Log log = LogFactory.getLog(TraceFunctionAroundWrapper.class);
    private final Environment environment;
    private final Tracer tracer;
    private final Propagator propagator;
    private final Propagator.Setter<MessageHeaderAccessor> injector;
    private final Propagator.Getter<MessageHeaderAccessor> extractor;
    private final TraceMessageHandler traceMessageHandler;
    private final List<FunctionMessageSpanCustomizer> customizers;
    final Map<String, String> functionToDestinationCache;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/spring-cloud-sleuth-instrumentation-3.1.9.jar:org/springframework/cloud/sleuth/instrument/messaging/TraceFunctionAroundWrapper$MessageAndSpansAndScope.class */
    public static class MessageAndSpansAndScope {
        MessageAndSpans messageAndSpans;
        Span span;
        Tracer.SpanInScope scope;
        boolean handled;

        MessageAndSpansAndScope() {
        }

        void error(Throwable th) {
            if (this.span != null) {
                this.span.error(th);
            }
        }

        void handle() {
            this.handled = true;
        }

        boolean isHandled() {
            return this.handled;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void end() {
            if (this.span != null) {
                this.span.end();
            }
            if (this.scope != null) {
                this.scope.close();
            }
        }
    }

    public TraceFunctionAroundWrapper(Environment environment, Tracer tracer, Propagator propagator, Propagator.Setter<MessageHeaderAccessor> setter, Propagator.Getter<MessageHeaderAccessor> getter) {
        this(environment, tracer, propagator, setter, getter, Collections.emptyList());
    }

    public TraceFunctionAroundWrapper(Environment environment, Tracer tracer, Propagator propagator, Propagator.Setter<MessageHeaderAccessor> setter, Propagator.Getter<MessageHeaderAccessor> getter, List<FunctionMessageSpanCustomizer> list) {
        this.functionToDestinationCache = new ConcurrentHashMap();
        this.environment = environment;
        this.tracer = tracer;
        this.propagator = propagator;
        this.injector = setter;
        this.extractor = getter;
        this.customizers = list;
        this.traceMessageHandler = TraceMessageHandler.forNonSpringIntegration(this.tracer, this.propagator, this.injector, this.extractor, this.customizers);
    }

    @Override // org.springframework.cloud.function.context.catalog.FunctionAroundWrapper
    protected Object doApply(Object obj, SimpleFunctionRegistry.FunctionInvocationWrapper functionInvocationWrapper) {
        if (FunctionTypeUtils.isCollectionOfMessage(functionInvocationWrapper.getOutputType())) {
            return functionInvocationWrapper.apply(obj);
        }
        if (functionInvocationWrapper.isInputTypePublisher() || functionInvocationWrapper.isOutputTypePublisher()) {
            if (obj == null || (obj instanceof Publisher)) {
                return reactorStream((Publisher) obj, functionInvocationWrapper);
            }
            logDebugAboutMessageTypes(obj);
            return functionInvocationWrapper.apply(obj);
        }
        if (obj == null || (obj instanceof Message)) {
            return nonReactorStream((Message) obj, functionInvocationWrapper);
        }
        logDebugAboutMessageTypes(obj);
        return functionInvocationWrapper.apply(obj);
    }

    private void logDebugAboutMessageTypes(Object obj) {
        if (log.isDebugEnabled()) {
            String name = obj.getClass().getName();
            log.debug("We only support tracing for Message types. You need to wrap your function type [" + name + "] into [Message<" + name + ">]");
        }
    }

    private Object reactorStream(Publisher publisher, SimpleFunctionRegistry.FunctionInvocationWrapper functionInvocationWrapper) {
        if (publisher == null && functionInvocationWrapper.isSupplier()) {
            return reactorStreamSupplier(publisher, functionInvocationWrapper);
        }
        Type genericType = FunctionTypeUtils.getGenericType(functionInvocationWrapper.getInputType());
        if (FunctionTypeUtils.getRawType(genericType).equals(Message.class)) {
            return FunctionTypeUtils.isMono(functionInvocationWrapper.getInputType()) ? reactorMonoStream(functionInvocationWrapper, publisher) : reactorFluxStream(functionInvocationWrapper, publisher);
        }
        if (log.isDebugEnabled()) {
            log.debug("Target function [" + functionInvocationWrapper.getFunctionDefinition() + "] has raw input type [" + genericType + "] and should be [" + Message.class + "]. Will not wrap it.");
        }
        return functionInvocationWrapper.apply(publisher);
    }

    private Object reactorMonoStream(SimpleFunctionRegistry.FunctionInvocationWrapper functionInvocationWrapper, Publisher<Message> publisher) {
        if (log.isDebugEnabled()) {
            log.debug("Will instrument a stream Mono function");
        }
        Mono flatMap = Mono.from(publisher).doOnNext(message -> {
            this.tracer.withSpan(null);
        }).map(message2 -> {
            return this.traceMessageHandler.wrapInputMessage(message2, inputDestination(functionInvocationWrapper.getFunctionDefinition()));
        }).flatMap(messageAndSpans -> {
            return Mono.deferContextual(contextView -> {
                MessageAndSpansAndScope messageAndSpansAndScope = (MessageAndSpansAndScope) contextView.get(MessageAndSpansAndScope.class);
                messageAndSpansAndScope.messageAndSpans = messageAndSpans;
                messageAndSpansAndScope.span = messageAndSpans.childSpan;
                setNameAndTag(functionInvocationWrapper, messageAndSpans.childSpan);
                messageAndSpansAndScope.scope = this.tracer.withSpan(messageAndSpans.childSpan);
                return Mono.just(messageAndSpans.msg);
            });
        });
        if (functionInvocationWrapper.isConsumer()) {
            return functionInvocationWrapper.apply(reactorStreamConsumer(flatMap));
        }
        Publisher publisher2 = (Publisher) functionInvocationWrapper.apply(flatMap);
        return publisher2 instanceof Mono ? messageMono(functionInvocationWrapper, (Mono) publisher2) : messageFlux(functionInvocationWrapper, (Flux) publisher2);
    }

    private Mono<Message> messageMono(SimpleFunctionRegistry.FunctionInvocationWrapper functionInvocationWrapper, Mono<Message> mono) {
        return Mono.deferContextual(contextView -> {
            MessageAndSpansAndScope messageAndSpansAndScope = (MessageAndSpansAndScope) contextView.get(MessageAndSpansAndScope.class);
            Mono map = mono.doOnNext(message -> {
                messageAndSpansAndScope.end();
                messageAndSpansAndScope.handle();
            }).map(message2 -> {
                MessageAndSpan wrapOutputMessage = this.traceMessageHandler.wrapOutputMessage(message2, messageAndSpansAndScope.messageAndSpans.parentSpan, outputDestination(functionInvocationWrapper.getFunctionDefinition()));
                this.traceMessageHandler.afterMessageHandled(wrapOutputMessage.span, null);
                return wrapOutputMessage.msg;
            });
            Objects.requireNonNull(messageAndSpansAndScope);
            return map.doOnError(messageAndSpansAndScope::error).doFinally(signalType -> {
                if (messageAndSpansAndScope.isHandled()) {
                    return;
                }
                messageAndSpansAndScope.end();
            });
        }).contextWrite(context -> {
            return context.put(MessageAndSpansAndScope.class, new MessageAndSpansAndScope());
        });
    }

    private Object reactorFluxStream(SimpleFunctionRegistry.FunctionInvocationWrapper functionInvocationWrapper, Publisher<Message> publisher) {
        if (log.isDebugEnabled()) {
            log.debug("Will instrument a stream Flux function");
        }
        Flux flatMap = Flux.from(publisher).doOnNext(message -> {
            this.tracer.withSpan(null);
        }).map(message2 -> {
            return this.traceMessageHandler.wrapInputMessage(message2, inputDestination(functionInvocationWrapper.getFunctionDefinition()));
        }).flatMap(messageAndSpans -> {
            return Flux.deferContextual(contextView -> {
                MessageAndSpansAndScope messageAndSpansAndScope = (MessageAndSpansAndScope) contextView.get(MessageAndSpansAndScope.class);
                messageAndSpansAndScope.messageAndSpans = messageAndSpans;
                messageAndSpansAndScope.span = messageAndSpans.childSpan;
                setNameAndTag(functionInvocationWrapper, messageAndSpans.childSpan);
                messageAndSpansAndScope.scope = this.tracer.withSpan(messageAndSpans.childSpan);
                return Mono.just(messageAndSpans.msg);
            });
        });
        if (functionInvocationWrapper.isConsumer()) {
            return functionInvocationWrapper.apply(reactorStreamConsumer(flatMap));
        }
        Publisher publisher2 = (Publisher) functionInvocationWrapper.apply(flatMap);
        return publisher2 instanceof Mono ? messageMono(functionInvocationWrapper, (Mono) publisher2) : messageFlux(functionInvocationWrapper, (Flux) publisher2);
    }

    private Flux<Message> messageFlux(SimpleFunctionRegistry.FunctionInvocationWrapper functionInvocationWrapper, Flux<Message> flux) {
        return Flux.deferContextual(contextView -> {
            MessageAndSpansAndScope messageAndSpansAndScope = (MessageAndSpansAndScope) contextView.get(MessageAndSpansAndScope.class);
            Flux map = flux.doOnNext(message -> {
                messageAndSpansAndScope.end();
                messageAndSpansAndScope.handle();
            }).map(message2 -> {
                MessageAndSpan wrapOutputMessage = this.traceMessageHandler.wrapOutputMessage(message2, messageAndSpansAndScope.messageAndSpans.parentSpan, outputDestination(functionInvocationWrapper.getFunctionDefinition()));
                this.traceMessageHandler.afterMessageHandled(wrapOutputMessage.span, null);
                return wrapOutputMessage.msg;
            });
            Objects.requireNonNull(messageAndSpansAndScope);
            return map.doOnError(messageAndSpansAndScope::error).doFinally(signalType -> {
                if (messageAndSpansAndScope.isHandled()) {
                    return;
                }
                messageAndSpansAndScope.end();
            });
        }).contextWrite(context -> {
            return context.put(MessageAndSpansAndScope.class, new MessageAndSpansAndScope());
        });
    }

    private Object reactorStreamConsumer(Object obj) {
        return obj instanceof Mono ? Mono.deferContextual(contextView -> {
            MessageAndSpansAndScope messageAndSpansAndScope = (MessageAndSpansAndScope) contextView.get(MessageAndSpansAndScope.class);
            Objects.requireNonNull(messageAndSpansAndScope);
            return ((Mono) obj).doOnError(messageAndSpansAndScope::error).doFinally(signalType -> {
                messageAndSpansAndScope.end();
            });
        }).contextWrite(context -> {
            return context.put(MessageAndSpansAndScope.class, new MessageAndSpansAndScope());
        }) : Flux.deferContextual(contextView2 -> {
            MessageAndSpansAndScope messageAndSpansAndScope = (MessageAndSpansAndScope) contextView2.get(MessageAndSpansAndScope.class);
            Objects.requireNonNull(messageAndSpansAndScope);
            return ((Flux) obj).doOnError(messageAndSpansAndScope::error).doFinally(signalType -> {
                messageAndSpansAndScope.end();
            });
        }).contextWrite(context2 -> {
            return context2.put(MessageAndSpansAndScope.class, new MessageAndSpansAndScope());
        });
    }

    private Object reactorStreamSupplier(Publisher<?> publisher, SimpleFunctionRegistry.FunctionInvocationWrapper functionInvocationWrapper) {
        Publisher map;
        Publisher publisher2 = (Publisher) functionInvocationWrapper.get();
        if (publisher2 instanceof Mono) {
            if (log.isDebugEnabled()) {
                log.debug("Will instrument a stream Mono supplier");
            }
            Mono mono = (Mono) publisher2;
            map = ReactorSleuth.tracedMono(this.tracer, this.tracer.currentTraceContext(), functionInvocationWrapper.getFunctionDefinition(), () -> {
                return mono;
            }, (obj, span) -> {
                customizedInputMessageSpan(span, obj instanceof Message ? (Message) obj : null);
            }).map(obj2 -> {
                return toMessage(obj2);
            }).map(obj3 -> {
                return getMessageAndSpans((Message) obj3, functionInvocationWrapper.getFunctionDefinition(), setNameAndTag(functionInvocationWrapper, this.tracer.currentSpan()));
            }).doOnNext(obj4 -> {
                customizedOutputMessageSpan(((MessageAndSpan) obj4).span, ((MessageAndSpan) obj4).msg);
            }).doOnNext(obj5 -> {
                this.traceMessageHandler.afterMessageHandled(((MessageAndSpan) obj5).span, null);
            }).map(obj6 -> {
                return ((MessageAndSpan) obj6).msg;
            });
        } else {
            if (log.isDebugEnabled()) {
                log.debug("Will instrument a stream Flux supplier");
            }
            Flux flux = (Flux) publisher2;
            map = ReactorSleuth.tracedFlux(this.tracer, this.tracer.currentTraceContext(), functionInvocationWrapper.getFunctionDefinition(), () -> {
                return flux;
            }, (obj7, span2) -> {
                customizedInputMessageSpan(span2, obj7 instanceof Message ? (Message) obj7 : null);
            }).map(obj8 -> {
                return toMessage(obj8);
            }).map(obj9 -> {
                return getMessageAndSpans((Message) obj9, functionInvocationWrapper.getFunctionDefinition(), setNameAndTag(functionInvocationWrapper, this.tracer.currentSpan()));
            }).doOnNext(obj10 -> {
                customizedOutputMessageSpan(((MessageAndSpan) obj10).span, ((MessageAndSpan) obj10).msg);
            }).doOnNext(obj11 -> {
                this.traceMessageHandler.afterMessageHandled(((MessageAndSpan) obj11).span, null);
            }).map(obj12 -> {
                return ((MessageAndSpan) obj12).msg;
            });
        }
        return map;
    }

    private Span setNameAndTag(SimpleFunctionRegistry.FunctionInvocationWrapper functionInvocationWrapper, Span span) {
        return span.name(functionInvocationWrapper.getFunctionDefinition()).tag(SleuthMessagingSpan.Tags.FUNCTION_NAME.getKey(), functionInvocationWrapper.getFunctionDefinition());
    }

    private Object nonReactorStream(Message<byte[]> message, SimpleFunctionRegistry.FunctionInvocationWrapper functionInvocationWrapper) {
        Span nameAndTag;
        MessageAndSpans messageAndSpans = null;
        if (message == null && functionInvocationWrapper.isSupplier()) {
            if (log.isDebugEnabled()) {
                log.debug("Creating a span for a supplier");
            }
            nameAndTag = setNameAndTag(functionInvocationWrapper, this.tracer.nextSpan());
            customizedInputMessageSpan(nameAndTag, null);
        } else {
            if (log.isDebugEnabled()) {
                log.debug("Will retrieve the tracing headers from the message");
            }
            messageAndSpans = this.traceMessageHandler.wrapInputMessage(message, inputDestination(functionInvocationWrapper.getFunctionDefinition()));
            if (log.isDebugEnabled()) {
                log.debug("Wrapped input msg " + messageAndSpans);
            }
            nameAndTag = setNameAndTag(functionInvocationWrapper, messageAndSpans.childSpan);
        }
        try {
            try {
                Tracer.SpanInScope withSpan = this.tracer.withSpan(nameAndTag.start());
                try {
                    Object apply = messageAndSpans == null ? functionInvocationWrapper.get() : functionInvocationWrapper.apply(messageAndSpans.msg);
                    if (withSpan != null) {
                        withSpan.close();
                    }
                    if (apply == null) {
                        if (!log.isDebugEnabled()) {
                            return null;
                        }
                        log.debug("Returned message is null - we have a consumer");
                        return null;
                    }
                    Message<?> message2 = toMessage(apply);
                    if (log.isDebugEnabled()) {
                        log.debug("Will instrument the output message");
                    }
                    MessageAndSpan wrapOutputMessage = messageAndSpans != null ? this.traceMessageHandler.wrapOutputMessage(message2, messageAndSpans.parentSpan, outputDestination(functionInvocationWrapper.getFunctionDefinition())) : getMessageAndSpans(message2, functionInvocationWrapper.getFunctionDefinition(), nameAndTag);
                    if (log.isDebugEnabled()) {
                        log.debug("Wrapped output msg " + wrapOutputMessage);
                    }
                    this.traceMessageHandler.afterMessageHandled(wrapOutputMessage.span, null);
                    return wrapOutputMessage.msg;
                } catch (Throwable th) {
                    if (withSpan != null) {
                        try {
                            withSpan.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
                this.traceMessageHandler.afterMessageHandled(nameAndTag, null);
            }
        } catch (Exception e) {
            throw e;
        }
    }

    MessageAndSpan getMessageAndSpans(Message<?> message, String str, Span span) {
        return this.traceMessageHandler.wrapOutputMessage(message, span, outputDestination(str));
    }

    private void customizedInputMessageSpan(Span span, Message<?> message) {
        this.customizers.forEach(functionMessageSpanCustomizer -> {
            functionMessageSpanCustomizer.customizeInputMessageSpan(span, message);
        });
    }

    private void customizedOutputMessageSpan(Span span, Message<?> message) {
        this.customizers.forEach(functionMessageSpanCustomizer -> {
            functionMessageSpanCustomizer.customizeOutputMessageSpan(span, message);
        });
    }

    private Message<?> toMessage(Object obj) {
        return !(obj instanceof Message) ? MessageBuilder.withPayload(obj).build() : (Message) obj;
    }

    String inputDestination(String str) {
        return this.functionToDestinationCache.computeIfAbsent(str, str2 -> {
            String str2 = "spring.cloud.stream.function.bindings." + str2 + "-in-0";
            return this.environment.getProperty("spring.cloud.stream.bindings." + (this.environment.containsProperty(str2) ? this.environment.getProperty(str2) : str2 + "-in-0") + ".destination", str2);
        });
    }

    String outputDestination(String str) {
        return this.functionToDestinationCache.computeIfAbsent(str, str2 -> {
            String str2 = "spring.cloud.stream.function.bindings." + str2 + "-out-0";
            return this.environment.getProperty("spring.cloud.stream.bindings." + (this.environment.containsProperty(str2) ? this.environment.getProperty(str2) : str2 + "-out-0") + ".destination", str2);
        });
    }

    @Override // org.springframework.context.ApplicationListener
    public void onApplicationEvent(RefreshScopeRefreshedEvent refreshScopeRefreshedEvent) {
        if (log.isDebugEnabled()) {
            log.debug("Context refreshed, will reset the cache");
        }
        this.functionToDestinationCache.clear();
    }
}
