package org.springframework.cloud.sleuth.instrument.rsocket;

import io.netty.buffer.CompositeByteBuf;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.frame.FrameType;
import io.rsocket.metadata.RoutingMetadata;
import io.rsocket.metadata.TracingMetadataCodec;
import io.rsocket.metadata.WellKnownMimeType;
import io.rsocket.util.RSocketProxy;
import java.util.HashSet;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.TraceContext;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.cloud.sleuth.docs.AssertingSpan;
import org.springframework.cloud.sleuth.docs.AssertingSpanBuilder;
import org.springframework.cloud.sleuth.instrument.rsocket.SleuthRSocketSpan;
import org.springframework.cloud.sleuth.internal.EncodingUtils;
import org.springframework.cloud.sleuth.propagation.Propagator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.ContextView;

/* loaded from: input_file:BOOT-INF/lib/spring-cloud-sleuth-instrumentation-3.1.5.jar:org/springframework/cloud/sleuth/instrument/rsocket/TracingRequesterRSocketProxy.class */
public class TracingRequesterRSocketProxy extends RSocketProxy {
    private static final Log log = LogFactory.getLog((Class<?>) TracingRequesterRSocketProxy.class);
    private final Propagator propagator;
    private final Propagator.Setter<CompositeByteBuf> setter;
    private final Tracer tracer;
    private final boolean isZipkinPropagationEnabled;

    public TracingRequesterRSocketProxy(RSocket rSocket, Propagator propagator, Propagator.Setter<CompositeByteBuf> setter, Tracer tracer, boolean z) {
        super(rSocket);
        this.propagator = propagator;
        this.setter = setter;
        this.tracer = tracer;
        this.isZipkinPropagationEnabled = z;
    }

    public Mono<Void> fireAndForget(Payload payload) {
        return setSpan(payload2 -> {
            return super.fireAndForget(payload2);
        }, payload, FrameType.REQUEST_FNF);
    }

    public Mono<Payload> requestResponse(Payload payload) {
        return setSpan(payload2 -> {
            return super.requestResponse(payload2);
        }, payload, FrameType.REQUEST_RESPONSE);
    }

    <T> Mono<T> setSpan(Function<Payload, Mono<T>> function, Payload payload, FrameType frameType) {
        return Mono.deferContextual(contextView -> {
            Span.Builder spanBuilder = spanBuilder(contextView);
            String str = (String) new RoutingMetadata(CompositeMetadataUtils.extract(payload.sliceMetadata(), WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString())).iterator().next();
            AssertingSpan start = AssertingSpanBuilder.of(SleuthRSocketSpan.RSOCKET_REQUESTER_SPAN, spanBuilder.kind(Span.Kind.PRODUCER)).name(frameType.name() + StringUtils.SPACE + str).tag(SleuthRSocketSpan.Tags.ROUTE, str).tag(SleuthRSocketSpan.Tags.REQUEST_TYPE, frameType.name()).start();
            if (log.isDebugEnabled()) {
                log.debug("Extracted result from context or thread local " + start);
            }
            Payload cleanTracingMetadata = PayloadUtils.cleanTracingMetadata(payload, new HashSet(this.propagator.fields()));
            TraceContext context = start.context();
            if (this.isZipkinPropagationEnabled) {
                injectDefaultZipkinRSocketHeaders(cleanTracingMetadata, context);
            }
            this.propagator.inject(context, cleanTracingMetadata.metadata(), this.setter);
            Mono mono = (Mono) function.apply(cleanTracingMetadata);
            start.getClass();
            return mono.doOnError(start::error).doFinally(signalType -> {
                start.end();
            });
        });
    }

    private void injectDefaultZipkinRSocketHeaders(Payload payload, TraceContext traceContext) {
        TracingMetadataCodec.Flags flags = traceContext.sampled() == null ? TracingMetadataCodec.Flags.UNDECIDED : traceContext.sampled().booleanValue() ? TracingMetadataCodec.Flags.SAMPLE : TracingMetadataCodec.Flags.NOT_SAMPLE;
        long[] fromString = EncodingUtils.fromString(traceContext.traceId());
        long[] fromString2 = EncodingUtils.fromString(traceContext.spanId());
        long[] fromString3 = EncodingUtils.fromString(traceContext.parentId());
        if (fromString.length == 2) {
            TracingMetadataCodec.encode128(payload.metadata().alloc(), fromString[0], fromString[1], fromString2[0], EncodingUtils.fromString(traceContext.parentId())[0], flags);
        } else {
            TracingMetadataCodec.encode64(payload.metadata().alloc(), fromString[0], fromString2[0], fromString3[0], flags);
        }
    }

    private Span.Builder spanBuilder(ContextView contextView) {
        Span.Builder spanBuilder = this.tracer.spanBuilder();
        if (contextView.hasKey(TraceContext.class)) {
            spanBuilder = spanBuilder.setParent((TraceContext) contextView.get(TraceContext.class));
        } else if (this.tracer.currentSpan() != null) {
            spanBuilder = spanBuilder.setParent(this.tracer.currentSpan().context());
        }
        return spanBuilder;
    }

    public Flux<Payload> requestStream(Payload payload) {
        return Flux.deferContextual(contextView -> {
            return setSpan(payload2 -> {
                return super.requestStream(payload2);
            }, payload, contextView);
        });
    }

    public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
        return Flux.from(publisher).switchOnFirst((signal, flux) -> {
            Payload payload = (Payload) signal.get();
            return payload != null ? setSpan(payload2 -> {
                return super.requestChannel(flux.skip(1L).startWith(payload2));
            }, payload, signal.getContextView()) : flux;
        });
    }

    <T> Flux<Payload> setSpan(Function<Payload, Flux<Payload>> function, Payload payload, ContextView contextView) {
        AssertingSpan start = AssertingSpanBuilder.of(SleuthRSocketSpan.RSOCKET_REQUESTER_SPAN, spanBuilder(contextView).kind(Span.Kind.PRODUCER)).name((String) new RoutingMetadata(CompositeMetadataUtils.extract(payload.sliceMetadata(), WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString())).iterator().next()).start();
        if (log.isDebugEnabled()) {
            log.debug("Extracted result from context or thread local " + start);
        }
        Payload cleanTracingMetadata = PayloadUtils.cleanTracingMetadata(payload, new HashSet(this.propagator.fields()));
        this.propagator.inject(start.context(), cleanTracingMetadata.metadata(), this.setter);
        Flux<Payload> apply = function.apply(cleanTracingMetadata);
        start.getClass();
        return apply.doOnError(start::error).doFinally(signalType -> {
            start.end();
        });
    }
}
