package org.springframework.messaging.tcp.reactor;

import io.netty5.buffer.Buffer;
import io.netty5.channel.ChannelHandlerContext;
import io.netty5.channel.group.ChannelGroup;
import io.netty5.channel.group.DefaultChannelGroup;
import io.netty5.handler.codec.ByteToMessageDecoder;
import io.netty5.util.concurrent.ImmediateEventExecutor;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.ReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.netty5.NettyInbound;
import reactor.netty5.NettyOutbound;
import reactor.netty5.resources.ConnectionProvider;
import reactor.netty5.resources.LoopResources;
import reactor.netty5.tcp.TcpClient;
import reactor.util.retry.Retry;

/* loaded from: input_file:BOOT-INF/lib/spring-messaging-6.1.5.jar:org/springframework/messaging/tcp/reactor/ReactorNetty2TcpClient.class */
public class ReactorNetty2TcpClient<P> implements TcpOperations<P> {
    private static final int PUBLISH_ON_BUFFER_SIZE = 16;
    private final TcpClient tcpClient;
    private final TcpMessageCodec<P> codec;

    @Nullable
    private final ChannelGroup channelGroup;

    @Nullable
    private final LoopResources loopResources;

    @Nullable
    private final ConnectionProvider poolResources;
    private final Scheduler scheduler = Schedulers.newParallel("tcp-client-scheduler");
    private Log logger = LogFactory.getLog(ReactorNetty2TcpClient.class);
    private volatile boolean stopping;

    /* loaded from: input_file:BOOT-INF/lib/spring-messaging-6.1.5.jar:org/springframework/messaging/tcp/reactor/ReactorNetty2TcpClient$ReactorNettyHandler.class */
    private class ReactorNettyHandler implements BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> {
        private final TcpConnectionHandler<P> connectionHandler;

        ReactorNettyHandler(TcpConnectionHandler<P> tcpConnectionHandler) {
            this.connectionHandler = tcpConnectionHandler;
        }

        @Override // java.util.function.BiFunction
        public Publisher<Void> apply(NettyInbound nettyInbound, NettyOutbound nettyOutbound) {
            nettyInbound.withConnection(connection -> {
                if (ReactorNetty2TcpClient.this.logger.isDebugEnabled()) {
                    ReactorNetty2TcpClient.this.logger.debug("Connected to " + connection.address());
                }
            });
            Sinks.Empty empty = Sinks.empty();
            ReactorNetty2TcpConnection reactorNetty2TcpConnection = new ReactorNetty2TcpConnection(nettyInbound, nettyOutbound, ReactorNetty2TcpClient.this.codec, empty);
            ReactorNetty2TcpClient.this.scheduler.schedule(() -> {
                this.connectionHandler.afterConnected(reactorNetty2TcpConnection);
            });
            nettyInbound.withConnection(connection2 -> {
                connection2.addHandlerFirst(new StompMessageDecoder(ReactorNetty2TcpClient.this.codec));
            });
            Flux publishOn = nettyInbound.receiveObject().cast(Message.class).publishOn(ReactorNetty2TcpClient.this.scheduler, 16);
            TcpConnectionHandler<P> tcpConnectionHandler = this.connectionHandler;
            Objects.requireNonNull(tcpConnectionHandler);
            Consumer consumer = tcpConnectionHandler::handleMessage;
            TcpConnectionHandler<P> tcpConnectionHandler2 = this.connectionHandler;
            Objects.requireNonNull(tcpConnectionHandler2);
            Consumer<? super Throwable> consumer2 = tcpConnectionHandler2::handleFailure;
            TcpConnectionHandler<P> tcpConnectionHandler3 = this.connectionHandler;
            Objects.requireNonNull(tcpConnectionHandler3);
            publishOn.subscribe(consumer, consumer2, tcpConnectionHandler3::afterConnectionClosed);
            return empty.asMono();
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/spring-messaging-6.1.5.jar:org/springframework/messaging/tcp/reactor/ReactorNetty2TcpClient$StompMessageDecoder.class */
    private static class StompMessageDecoder<P> extends ByteToMessageDecoder {
        private final TcpMessageCodec<P> codec;

        StompMessageDecoder(TcpMessageCodec<P> tcpMessageCodec) {
            this.codec = tcpMessageCodec;
        }

        protected void decode(ChannelHandlerContext channelHandlerContext, Buffer buffer) throws Exception {
            ByteBuffer allocate = ByteBuffer.allocate(buffer.readableBytes());
            buffer.readBytes(allocate);
            allocate.flip();
            Iterator<Message<P>> it = this.codec.decode(allocate).iterator();
            while (it.hasNext()) {
                channelHandlerContext.fireChannelRead(it.next());
            }
        }
    }

    public ReactorNetty2TcpClient(String str, int i, TcpMessageCodec<P> tcpMessageCodec) {
        Assert.notNull(str, "host is required");
        Assert.notNull(tcpMessageCodec, "ReactorNettyCodec is required");
        this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
        this.loopResources = LoopResources.create("tcp-client-loop");
        this.poolResources = ConnectionProvider.create("tcp-client-pool", 10000);
        this.codec = tcpMessageCodec;
        this.tcpClient = TcpClient.create(this.poolResources).host(str).port(i).runOn(this.loopResources, false).doOnConnected(connection -> {
            this.channelGroup.add(connection.channel());
        });
    }

    public ReactorNetty2TcpClient(Function<TcpClient, TcpClient> function, TcpMessageCodec<P> tcpMessageCodec) {
        Assert.notNull(tcpMessageCodec, "ReactorNettyCodec is required");
        this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
        this.loopResources = LoopResources.create("tcp-client-loop");
        this.poolResources = ConnectionProvider.create("tcp-client-pool", 10000);
        this.codec = tcpMessageCodec;
        this.tcpClient = function.apply(TcpClient.create(this.poolResources).runOn(this.loopResources, false).doOnConnected(connection -> {
            this.channelGroup.add(connection.channel());
        }));
    }

    public ReactorNetty2TcpClient(TcpClient tcpClient, TcpMessageCodec<P> tcpMessageCodec) {
        Assert.notNull(tcpClient, "TcpClient is required");
        Assert.notNull(tcpMessageCodec, "ReactorNettyCodec is required");
        this.tcpClient = tcpClient;
        this.codec = tcpMessageCodec;
        this.channelGroup = null;
        this.loopResources = null;
        this.poolResources = null;
    }

    public void setLogger(Log log) {
        this.logger = log;
    }

    public Log getLogger() {
        return this.logger;
    }

    @Override // org.springframework.messaging.tcp.TcpOperations
    public CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> tcpConnectionHandler) {
        Assert.notNull(tcpConnectionHandler, "TcpConnectionHandler is required");
        if (this.stopping) {
            return handleShuttingDownConnectFailure(tcpConnectionHandler);
        }
        Mono connect = extendTcpClient(this.tcpClient, tcpConnectionHandler).handle(new ReactorNettyHandler(tcpConnectionHandler)).connect();
        Objects.requireNonNull(tcpConnectionHandler);
        return connect.doOnError(tcpConnectionHandler::afterConnectFailure).then().toFuture();
    }

    protected TcpClient extendTcpClient(TcpClient tcpClient, TcpConnectionHandler<P> tcpConnectionHandler) {
        return tcpClient;
    }

    @Override // org.springframework.messaging.tcp.TcpOperations
    public CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> tcpConnectionHandler, ReconnectStrategy reconnectStrategy) {
        Assert.notNull(tcpConnectionHandler, "TcpConnectionHandler is required");
        Assert.notNull(reconnectStrategy, "ReconnectStrategy is required");
        if (this.stopping) {
            return handleShuttingDownConnectFailure(tcpConnectionHandler);
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        Mono doOnNext = extendTcpClient(this.tcpClient, tcpConnectionHandler).handle(new ReactorNettyHandler(tcpConnectionHandler)).connect().doOnNext(connection -> {
            completableFuture.complete(null);
        });
        Objects.requireNonNull(completableFuture);
        Mono doOnError = doOnNext.doOnError(completableFuture::completeExceptionally);
        Objects.requireNonNull(tcpConnectionHandler);
        doOnError.doOnError(tcpConnectionHandler::afterConnectFailure).flatMap((v0) -> {
            return v0.onDispose();
        }).retryWhen(Retry.from(flux -> {
            return flux.map(retrySignal -> {
                return Integer.valueOf((int) retrySignal.totalRetriesInARow());
            }).flatMap(num -> {
                return reconnect(num, reconnectStrategy);
            });
        })).repeatWhen(flux2 -> {
            return flux2.scan(1, (num, l) -> {
                Integer.valueOf(num.intValue() + 1);
                return num;
            }).flatMap(num2 -> {
                return reconnect(num2, reconnectStrategy);
            });
        }).subscribe();
        return completableFuture;
    }

    private CompletableFuture<Void> handleShuttingDownConnectFailure(TcpConnectionHandler<P> tcpConnectionHandler) {
        IllegalStateException illegalStateException = new IllegalStateException("Shutting down.");
        tcpConnectionHandler.afterConnectFailure(illegalStateException);
        return Mono.error(illegalStateException).toFuture();
    }

    private Publisher<? extends Long> reconnect(Integer num, ReconnectStrategy reconnectStrategy) {
        Long timeToNextAttempt = reconnectStrategy.getTimeToNextAttempt(num.intValue());
        return timeToNextAttempt != null ? Mono.delay(Duration.ofMillis(timeToNextAttempt.longValue()), this.scheduler) : Mono.empty();
    }

    @Override // org.springframework.messaging.tcp.TcpOperations
    public CompletableFuture<Void> shutdownAsync() {
        Mono<Void> stopScheduler;
        if (this.stopping) {
            return CompletableFuture.completedFuture(null);
        }
        this.stopping = true;
        if (this.channelGroup != null) {
            Sinks.Empty empty = Sinks.empty();
            this.channelGroup.close().addListener(future -> {
                empty.tryEmitEmpty();
            });
            Mono asMono = empty.asMono();
            if (this.loopResources != null) {
                asMono = asMono.onErrorComplete().then(this.loopResources.disposeLater());
            }
            if (this.poolResources != null) {
                asMono = asMono.onErrorComplete().then(this.poolResources.disposeLater());
            }
            stopScheduler = asMono.onErrorComplete().then(stopScheduler());
        } else {
            stopScheduler = stopScheduler();
        }
        return stopScheduler.toFuture();
    }

    private Mono<Void> stopScheduler() {
        return Mono.fromRunnable(() -> {
            this.scheduler.dispose();
            for (int i = 0; i < 20 && !this.scheduler.isDisposed(); i++) {
                try {
                    Thread.sleep(100L);
                } catch (Throwable th) {
                    return;
                }
            }
        });
    }

    public String toString() {
        return "ReactorNetty2TcpClient[" + this.tcpClient + "]";
    }
}
