package io.r2dbc.mssql.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.r2dbc.mssql.client.ssl.SslConfiguration;
import io.r2dbc.mssql.client.ssl.TdsSslHandler;
import io.r2dbc.mssql.message.ClientMessage;
import io.r2dbc.mssql.message.Message;
import io.r2dbc.mssql.message.TransactionDescriptor;
import io.r2dbc.mssql.message.header.PacketIdProvider;
import io.r2dbc.mssql.message.tds.ProtocolException;
import io.r2dbc.mssql.message.tds.Redirect;
import io.r2dbc.mssql.message.token.AbstractInfoToken;
import io.r2dbc.mssql.message.token.Attention;
import io.r2dbc.mssql.message.token.EnvChangeToken;
import io.r2dbc.mssql.message.token.FeatureExtAckToken;
import io.r2dbc.mssql.message.token.LoginAckToken;
import io.r2dbc.mssql.message.type.Collation;
import io.r2dbc.mssql.util.Assert;
import io.r2dbc.spi.R2dbcException;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import java.security.GeneralSecurityException;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.SynchronousSink;
import reactor.netty.Connection;
import reactor.netty.NettyOutbound;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.tcp.SslProvider;
import reactor.netty.tcp.TcpClient;
import reactor.netty.tcp.TcpSslContextSpec;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

/* loaded from: input_file:io/r2dbc/mssql/client/ReactorNettyClient.class */
public final class ReactorNettyClient implements Client {
    private static final Logger logger = Loggers.getLogger(ReactorNettyClient.class);
    private static final boolean DEBUG_ENABLED = logger.isDebugEnabled();
    private static final Supplier<MssqlConnectionClosedException> UNEXPECTED = () -> {
        return new MssqlConnectionClosedException("Connection unexpectedly closed");
    };
    private static final Supplier<MssqlConnectionClosedException> EXPECTED = () -> {
        return new MssqlConnectionClosedException("Connection closed");
    };
    private static final Supplier<MssqlConnectionClosedException> CLOSED = () -> {
        return new MssqlConnectionClosedException("Cannot exchange messages because the connection is closed");
    };
    private final ConnectionContext context;
    private final ByteBufAllocator byteBufAllocator;
    private final Connection connection;
    private final TdsEncoder tdsEncoder;
    private final Consumer<EnvChangeToken> handleEnvChange;
    private final RequestQueue requestQueue;
    private final Consumer<FeatureExtAckToken> featureAckChange = featureExtAckToken -> {
        Iterator<FeatureExtAckToken.FeatureToken> it = featureExtAckToken.getFeatureTokens().iterator();
        while (it.hasNext()) {
            if (it.next() instanceof FeatureExtAckToken.ColumnEncryption) {
                this.encryptionSupported = true;
            }
        }
    };
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private final AtomicLong attentionPropagation = new AtomicLong();
    private final AtomicLong outstandingRequests = new AtomicLong();
    private final Sinks.Many<ClientMessage> requestSink = Sinks.many().unicast().onBackpressureBuffer();
    private final Sinks.Many<Message> responseProcessor = Sinks.many().multicast().onBackpressureBuffer(512, false);
    private final TransactionListener transactionListener = new TransactionListener();
    private final CollationListener collationListener = new CollationListener();
    private final RedirectListener redirectListener = new RedirectListener();
    private ConnectionState state = ConnectionState.PRELOGIN;
    private MessageDecoder decodeFunction = ConnectionState.PRELOGIN.decoder(this);
    private boolean encryptionSupported = false;
    private volatile Optional<Collation> databaseCollation = Optional.empty();
    private Optional<String> databaseVersion = Optional.empty();
    private volatile Optional<Redirect> redirect = Optional.empty();
    private volatile TransactionDescriptor transactionDescriptor = TransactionDescriptor.empty();
    private volatile TransactionStatus transactionStatus = TransactionStatus.AUTO_COMMIT;

    /* loaded from: input_file:io/r2dbc/mssql/client/ReactorNettyClient$CollationListener.class */
    class CollationListener implements EnvironmentChangeListener {
        CollationListener() {
        }

        @Override // io.r2dbc.mssql.client.EnvironmentChangeListener
        public void onEnvironmentChange(EnvironmentChangeEvent environmentChangeEvent) {
            if (environmentChangeEvent.getToken().getChangeType() == EnvChangeToken.EnvChangeType.SQLCollation) {
                Collation decode = Collation.decode(Unpooled.wrappedBuffer(environmentChangeEvent.getToken().getNewValue()));
                ReactorNettyClient.this.databaseCollation = Optional.of(decode);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/r2dbc/mssql/client/ReactorNettyClient$ExchangeRequest.class */
    public static class ExchangeRequest {
        private static final AtomicIntegerFieldUpdater<ExchangeRequest> COMPLETED = AtomicIntegerFieldUpdater.newUpdater(ExchangeRequest.class, "completed");
        private static final AtomicIntegerFieldUpdater<ExchangeRequest> SUBMITTED = AtomicIntegerFieldUpdater.newUpdater(ExchangeRequest.class, "submitted");
        private volatile int completed = 0;
        private volatile int submitted = 0;

        ExchangeRequest() {
        }

        public void complete() {
            COMPLETED.set(this, 1);
        }

        public boolean isComplete() {
            return COMPLETED.get(this) == 1;
        }

        void submit(RequestQueue requestQueue, final MonoSink<Flux<Message>> monoSink, final Flux<Message> flux) {
            if (!SUBMITTED.compareAndSet(this, 0, 1)) {
                throw new IllegalStateException("Client exchange can be subscribed only once");
            }
            requestQueue.submit(new Sinkable() { // from class: io.r2dbc.mssql.client.ReactorNettyClient.ExchangeRequest.1
                @Override // io.r2dbc.mssql.client.ReactorNettyClient.Sinkable
                public void onSuccess() {
                    monoSink.success(flux);
                }

                @Override // io.r2dbc.mssql.client.ReactorNettyClient.Sinkable
                public void onError(Throwable th) {
                    monoSink.error(th);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/r2dbc/mssql/client/ReactorNettyClient$MssqlConnectionClosedException.class */
    public static class MssqlConnectionClosedException extends R2dbcNonTransientResourceException {
        public MssqlConnectionClosedException(String str) {
            super(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/r2dbc/mssql/client/ReactorNettyClient$MssqlConnectionException.class */
    public static class MssqlConnectionException extends R2dbcNonTransientResourceException {
        public MssqlConnectionException(Throwable th) {
            super(th);
        }
    }

    /* loaded from: input_file:io/r2dbc/mssql/client/ReactorNettyClient$RedirectListener.class */
    class RedirectListener implements EnvironmentChangeListener {
        RedirectListener() {
        }

        @Override // io.r2dbc.mssql.client.EnvironmentChangeListener
        public void onEnvironmentChange(EnvironmentChangeEvent environmentChangeEvent) {
            if (environmentChangeEvent.getToken().getChangeType() == EnvChangeToken.EnvChangeType.Routing) {
                Redirect decode = Redirect.decode(Unpooled.wrappedBuffer(environmentChangeEvent.getToken().getNewValue()));
                ReactorNettyClient.this.redirect = Optional.of(decode);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/r2dbc/mssql/client/ReactorNettyClient$RequestQueue.class */
    public static class RequestQueue implements Runnable {
        private final Queue<Sinkable> requestQueue = (Queue) Queues.small().get();
        private final AtomicBoolean active = new AtomicBoolean();
        private final ConnectionContext context;

        RequestQueue(ConnectionContext connectionContext) {
            this.context = connectionContext;
        }

        @Nullable
        public Sinkable poll() {
            return this.requestQueue.poll();
        }

        @Override // java.lang.Runnable
        public void run() {
            Sinkable poll = this.requestQueue.poll();
            if (poll != null) {
                if (ReactorNettyClient.DEBUG_ENABLED) {
                    ReactorNettyClient.logger.debug(this.context.getMessage("Initiating queued exchange"));
                }
                poll.onSuccess();
            } else {
                if (ReactorNettyClient.DEBUG_ENABLED) {
                    ReactorNettyClient.logger.debug(this.context.getMessage("Conversation complete"));
                }
                this.active.compareAndSet(true, false);
            }
        }

        void submit(Sinkable sinkable) {
            if (this.active.compareAndSet(false, true)) {
                if (ReactorNettyClient.DEBUG_ENABLED) {
                    ReactorNettyClient.logger.debug(this.context.getMessage("Initiating exchange"));
                }
                sinkable.onSuccess();
            } else {
                if (ReactorNettyClient.DEBUG_ENABLED) {
                    ReactorNettyClient.logger.debug(this.context.getMessage("Queueing exchange"));
                }
                if (!this.requestQueue.offer(sinkable)) {
                    throw new IllegalStateException("Request queue is full");
                }
                drainRequestQueue();
            }
        }

        void drainRequestQueue() {
            if (this.active.compareAndSet(false, true)) {
                Sinkable poll = this.requestQueue.poll();
                if (poll != null) {
                    poll.onSuccess();
                } else {
                    this.active.compareAndSet(true, false);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/r2dbc/mssql/client/ReactorNettyClient$Sinkable.class */
    public interface Sinkable {
        void onSuccess();

        void onError(Throwable th);
    }

    /* loaded from: input_file:io/r2dbc/mssql/client/ReactorNettyClient$TransactionListener.class */
    class TransactionListener implements EnvironmentChangeListener {
        TransactionListener() {
        }

        @Override // io.r2dbc.mssql.client.EnvironmentChangeListener
        public void onEnvironmentChange(EnvironmentChangeEvent environmentChangeEvent) {
            EnvChangeToken token = environmentChangeEvent.getToken();
            if (token.getChangeType() == EnvChangeToken.EnvChangeType.BeginTx || token.getChangeType() == EnvChangeToken.EnvChangeType.EnlistDTC) {
                byte[] newValue = token.getNewValue();
                if (newValue.length != 8) {
                    throw ProtocolException.invalidTds("Transaction descriptor length mismatch");
                }
                if (ReactorNettyClient.DEBUG_ENABLED) {
                    ReactorNettyClient.logger.debug(String.format(ReactorNettyClient.this.context.getMessage("Transaction %s"), token.getChangeType() == EnvChangeToken.EnvChangeType.BeginTx ? "started" : "enlisted"));
                }
                updateStatus(TransactionStatus.STARTED, TransactionDescriptor.from(newValue));
            }
            if (token.getChangeType() == EnvChangeToken.EnvChangeType.CommitTx) {
                if (ReactorNettyClient.DEBUG_ENABLED) {
                    ReactorNettyClient.logger.debug(ReactorNettyClient.this.context.getMessage("Transaction committed"));
                }
                updateStatus(TransactionStatus.EXPLICIT, TransactionDescriptor.empty());
            }
            if (token.getChangeType() == EnvChangeToken.EnvChangeType.RollbackTx) {
                if (ReactorNettyClient.DEBUG_ENABLED) {
                    ReactorNettyClient.logger.debug(ReactorNettyClient.this.context.getMessage("Transaction rolled back"));
                }
                updateStatus(TransactionStatus.EXPLICIT, TransactionDescriptor.empty());
            }
        }

        private void updateStatus(TransactionStatus transactionStatus, TransactionDescriptor transactionDescriptor) {
            ReactorNettyClient.this.transactionStatus = transactionStatus;
            ReactorNettyClient.this.transactionDescriptor = transactionDescriptor;
        }
    }

    private ReactorNettyClient(Connection connection, TdsEncoder tdsEncoder, ConnectionContext connectionContext) {
        Assert.requireNonNull(connection, "Connection must not be null");
        Assert.state(this.responseProcessor.asFlux() instanceof Subscriber, (Supplier<String>) () -> {
            return "Response processor " + this.responseProcessor + " is not a Subscriber. Cannot proceed.";
        });
        this.context = connectionContext;
        StreamDecoder streamDecoder = new StreamDecoder();
        this.handleEnvChange = envChangeToken -> {
            EnvironmentChangeEvent environmentChangeEvent = new EnvironmentChangeEvent(envChangeToken);
            try {
                tdsEncoder.onEnvironmentChange(environmentChangeEvent);
                this.transactionListener.onEnvironmentChange(environmentChangeEvent);
                this.collationListener.onEnvironmentChange(environmentChangeEvent);
                this.redirectListener.onEnvironmentChange(environmentChangeEvent);
            } catch (Exception e) {
                logger.warn(this.context.getMessage("Failed onEnvironmentChange() in {}"), new Object[]{"", e});
            }
        };
        this.byteBufAllocator = connection.outbound().alloc();
        this.connection = connection;
        this.tdsEncoder = tdsEncoder;
        this.requestQueue = new RequestQueue(this.context);
        final Consumer consumer = message -> {
            if (message.getClass() == LoginAckToken.class) {
                this.databaseVersion = Optional.of(((LoginAckToken) message).getVersion().toString());
            }
            ConnectionState connectionState = this.state;
            if (connectionState.canAdvance(message)) {
                ConnectionState next = connectionState.next(message, connection);
                this.state = next;
                this.decodeFunction = next.decoder(this);
            }
        };
        final AtomicReference atomicReference = new AtomicReference();
        final SynchronousSink<Message> synchronousSink = new SynchronousSink<Message>() { // from class: io.r2dbc.mssql.client.ReactorNettyClient.1
            public void complete() {
                throw new UnsupportedOperationException();
            }

            @Deprecated
            public Context currentContext() {
                return Context.empty();
            }

            public ContextView contextView() {
                return Context.empty();
            }

            /* JADX WARN: Multi-variable type inference failed */
            /* JADX WARN: Type inference failed for: r0v6, types: [io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionException] */
            public void error(Throwable th) {
                Throwable th2 = th;
                if (!(th2 instanceof R2dbcException)) {
                    th2 = new MssqlConnectionException(th2);
                }
                ReactorNettyClient.this.responseProcessor.emitError(th2, Sinks.EmitFailureHandler.FAIL_FAST);
            }

            /* JADX WARN: Code restructure failed: missing block: B:11:0x0059, code lost:
            
                if (io.r2dbc.mssql.message.token.AbstractDoneToken.isAttentionAck(r9) != false) goto L13;
             */
            /* JADX WARN: Code restructure failed: missing block: B:12:0x005c, code lost:
            
                r0 = r8.this$0.attentionPropagation.get();
             */
            /* JADX WARN: Code restructure failed: missing block: B:13:0x006a, code lost:
            
                if (r0 != 0) goto L22;
             */
            /* JADX WARN: Code restructure failed: missing block: B:15:0x00a9, code lost:
            
                if (r8.this$0.attentionPropagation.compareAndSet(r0, r0 - 1) == false) goto L39;
             */
            /* JADX WARN: Code restructure failed: missing block: B:20:0x0070, code lost:
            
                if (io.r2dbc.mssql.client.ReactorNettyClient.DEBUG_ENABLED == false) goto L18;
             */
            /* JADX WARN: Code restructure failed: missing block: B:21:0x0073, code lost:
            
                io.r2dbc.mssql.client.ReactorNettyClient.logger.debug(r8.this$0.context.getMessage("Swallowing attention acknowledged, no pending requests: {}. "), new java.lang.Object[]{r9});
             */
            /* JADX WARN: Code restructure failed: missing block: B:23:0x0090, code lost:
            
                if (r0 == null) goto L40;
             */
            /* JADX WARN: Code restructure failed: missing block: B:24:0x0093, code lost:
            
                r0.request(1);
             */
            /* JADX WARN: Code restructure failed: missing block: B:25:0x009a, code lost:
            
                return;
             */
            /* JADX WARN: Code restructure failed: missing block: B:27:?, code lost:
            
                return;
             */
            /* JADX WARN: Code restructure failed: missing block: B:29:0x00ba, code lost:
            
                if (r8.this$0.attentionPropagation.get() <= 0) goto L35;
             */
            /* JADX WARN: Code restructure failed: missing block: B:31:0x00c1, code lost:
            
                if (io.r2dbc.mssql.message.token.AbstractDoneToken.isAttentionAck(r9) != false) goto L35;
             */
            /* JADX WARN: Code restructure failed: missing block: B:33:0x00c7, code lost:
            
                if (io.r2dbc.mssql.client.ReactorNettyClient.DEBUG_ENABLED == false) goto L31;
             */
            /* JADX WARN: Code restructure failed: missing block: B:34:0x00ca, code lost:
            
                io.r2dbc.mssql.client.ReactorNettyClient.logger.debug(r8.this$0.context.getMessage("Discard message {}. Draining frames until attention acknowledgement."), new java.lang.Object[]{r9});
             */
            /* JADX WARN: Code restructure failed: missing block: B:36:0x00e7, code lost:
            
                if (r0 == null) goto L41;
             */
            /* JADX WARN: Code restructure failed: missing block: B:37:0x00ea, code lost:
            
                r0.request(1);
             */
            /* JADX WARN: Code restructure failed: missing block: B:38:0x00f1, code lost:
            
                return;
             */
            /* JADX WARN: Code restructure failed: missing block: B:39:?, code lost:
            
                return;
             */
            /* JADX WARN: Code restructure failed: missing block: B:40:0x00f2, code lost:
            
                r8.this$0.responseProcessor.emitNext(r9, reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST);
             */
            /* JADX WARN: Code restructure failed: missing block: B:41:0x0102, code lost:
            
                return;
             */
            /*
                Code decompiled incorrectly, please refer to instructions dump.
                To view partially-correct add '--show-bad-code' argument
            */
            public void next(io.r2dbc.mssql.message.Message r9) {
                /*
                    Method dump skipped, instructions count: 259
                    To view this dump add '--comments-level debug' option
                */
                throw new UnsupportedOperationException("Method not decompiled: io.r2dbc.mssql.client.ReactorNettyClient.AnonymousClass1.next(io.r2dbc.mssql.message.Message):void");
            }
        };
        connection.inbound().receiveObject().concatMapIterable(obj -> {
            if (obj instanceof ByteBuf) {
                return streamDecoder.decode((ByteBuf) obj, this.decodeFunction);
            }
            if (obj instanceof Message) {
                return Collections.singleton((Message) obj);
            }
            throw ProtocolException.unsupported(String.format("Unexpected protocol message: [%s]", obj));
        }).onErrorResume(this::resumeError).subscribe(new CoreSubscriber<Message>() { // from class: io.r2dbc.mssql.client.ReactorNettyClient.2
            public void onSubscribe(Subscription subscription) {
                atomicReference.set(subscription);
                ReactorNettyClient.this.responseProcessor.asFlux().onSubscribe(subscription);
            }

            public void onNext(Message message2) {
                synchronousSink.next(message2);
            }

            public void onError(Throwable th) {
                synchronousSink.error(th);
            }

            public void onComplete() {
                ReactorNettyClient.this.handleClose();
            }
        });
        this.requestSink.asFlux().concatMap(clientMessage -> {
            Object encodeForSend = encodeForSend(clientMessage);
            return encodeForSend instanceof Publisher ? connection.outbound().sendObject((Publisher) encodeForSend) : connection.outbound().sendObject(encodeForSend);
        }).onErrorResume(this::resumeError).doAfterTerminate(this::handleClose).subscribe();
    }

    private Object encodeForSend(ClientMessage clientMessage) {
        if (DEBUG_ENABLED) {
            logger.debug(this.context.getMessage("Request: {}"), new Object[]{clientMessage});
        }
        return clientMessage.encode(this.connection.outbound().alloc(), this.tdsEncoder.getPacketSize());
    }

    private <T> Mono<T> resumeError(Throwable th) {
        handleConnectionError(th);
        this.requestSink.emitComplete((signalType, emitResult) -> {
            if (!emitResult.isFailure()) {
                return false;
            }
            logger.error(this.context.getMessage("Error: {}"), new Object[]{emitResult});
            return false;
        });
        logger.error(this.context.getMessage("Error: {}"), new Object[]{th.getMessage(), th});
        return (Mono<T>) close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onInfoToken(Message message) {
        logger.debug(this.context.getMessage("Response: {}"), new Object[]{message});
        if (message instanceof AbstractInfoToken) {
            AbstractInfoToken abstractInfoToken = (AbstractInfoToken) message;
            if (abstractInfoToken.getClassification() == AbstractInfoToken.Classification.INFORMATIONAL) {
                logger.debug(this.context.getMessage("Info: Code [{}] Severity [{}]: {}"), new Object[]{Long.valueOf(abstractInfoToken.getNumber()), abstractInfoToken.getClassification(), abstractInfoToken.getMessage()});
            } else {
                logger.debug(this.context.getMessage("Warning: Code [{}] Severity [{}]: {}"), new Object[]{Long.valueOf(abstractInfoToken.getNumber()), abstractInfoToken.getClassification(), abstractInfoToken.getMessage()});
            }
        }
    }

    public static Mono<ReactorNettyClient> connect(String str, int i) {
        Assert.requireNonNull(str, "host must not be null");
        return connect(str, i, Duration.ofSeconds(30L));
    }

    public static Mono<ReactorNettyClient> connect(final String str, final int i, final Duration duration) {
        Assert.requireNonNull(duration, "connect timeout must not be null");
        Assert.requireNonNull(str, "host must not be null");
        return connect(new ClientConfiguration() { // from class: io.r2dbc.mssql.client.ReactorNettyClient.3
            @Override // io.r2dbc.mssql.client.ClientConfiguration
            public String getHost() {
                return str;
            }

            @Override // io.r2dbc.mssql.client.ClientConfiguration
            public int getPort() {
                return i;
            }

            @Override // io.r2dbc.mssql.client.ClientConfiguration
            public Duration getConnectTimeout() {
                return duration;
            }

            @Override // io.r2dbc.mssql.client.ClientConfiguration
            public boolean isTcpKeepAlive() {
                return false;
            }

            @Override // io.r2dbc.mssql.client.ClientConfiguration
            public boolean isTcpNoDelay() {
                return true;
            }

            @Override // io.r2dbc.mssql.client.ClientConfiguration
            public ConnectionProvider getConnectionProvider() {
                return ConnectionProvider.newConnection();
            }

            @Override // io.r2dbc.mssql.client.ssl.SslConfiguration
            public boolean isSslEnabled() {
                return false;
            }

            @Override // io.r2dbc.mssql.client.ssl.SslConfiguration
            public SslContext getSslContext() {
                return SslProvider.builder().sslContext(TcpSslContextSpec.forClient()).build().getSslContext();
            }
        }, (String) null, (UUID) null);
    }

    public static Mono<ReactorNettyClient> connect(ClientConfiguration clientConfiguration, @Nullable String str, @Nullable UUID uuid) {
        Assert.requireNonNull(clientConfiguration, "configuration must not be null");
        ConnectionContext connectionContext = new ConnectionContext(str, uuid);
        logger.debug(connectionContext.getMessage("connect()"));
        PacketIdProvider atomic = PacketIdProvider.atomic();
        TdsEncoder tdsEncoder = new TdsEncoder(atomic);
        return TcpClient.create(clientConfiguration.getConnectionProvider()).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(Math.toIntExact(clientConfiguration.getConnectTimeout().toMillis()))).option(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(clientConfiguration.isTcpKeepAlive())).option(ChannelOption.TCP_NODELAY, Boolean.valueOf(clientConfiguration.isTcpNoDelay())).host(clientConfiguration.getHost()).port(clientConfiguration.getPort()).connect().doOnNext(connection -> {
            SslConfiguration sslTunnelConfiguration = clientConfiguration.getSslTunnelConfiguration();
            ChannelPipeline pipeline = connection.channel().pipeline();
            if (sslTunnelConfiguration.isSslEnabled()) {
                logger.debug(connectionContext.getMessage("Enabling SSL tunnel"));
                try {
                    pipeline.addFirst("sslTunnel", createSslTunnelHandler(connection.channel().alloc(), sslTunnelConfiguration));
                    pipeline.addAfter("sslTunnel", tdsEncoder.getClass().getName(), tdsEncoder);
                } catch (GeneralSecurityException e) {
                    connection.channel().close();
                    throw new IllegalStateException("Cannot configure SSL tunnel", e);
                }
            } else {
                pipeline.addFirst(tdsEncoder.getClass().getName(), tdsEncoder);
            }
            TdsSslHandler tdsSslHandler = new TdsSslHandler(atomic, clientConfiguration, connectionContext.withChannelId(connection.channel().toString()));
            pipeline.addAfter(tdsEncoder.getClass().getName(), tdsSslHandler.getClass().getName(), tdsSslHandler);
            if (InternalLoggerFactory.getInstance(ReactorNettyClient.class).isTraceEnabled()) {
                pipeline.addBefore(tdsEncoder.getClass().getName(), LoggingHandler.class.getSimpleName(), new LoggingHandler(ReactorNettyClient.class, LogLevel.TRACE));
            }
        }).map(connection2 -> {
            return new ReactorNettyClient(connection2, tdsEncoder, connectionContext.withChannelId(connection2.channel().toString()));
        });
    }

    private static SslHandler createSslTunnelHandler(ByteBufAllocator byteBufAllocator, SslConfiguration sslConfiguration) throws GeneralSecurityException {
        return new SslHandler(sslConfiguration.getSslContext().newEngine(byteBufAllocator));
    }

    @Override // io.r2dbc.mssql.client.Client
    public Mono<Void> attention() {
        return Mono.defer(() -> {
            return Mono.fromFuture(send(Mono.just(Attention.create(1, getTransactionDescriptor()))).toFuture());
        });
    }

    @Override // io.r2dbc.mssql.client.Client
    public Mono<Void> close() {
        logger.debug(this.context.getMessage("close()"));
        return Mono.defer(() -> {
            logger.debug(this.context.getMessage("close(subscribed)"));
            if (!this.isClosed.compareAndSet(false, true)) {
                return Mono.empty();
            }
            this.connection.dispose();
            return this.connection.onDispose();
        });
    }

    @Override // io.r2dbc.mssql.client.Client
    public ByteBufAllocator getByteBufAllocator() {
        return this.byteBufAllocator;
    }

    @Override // io.r2dbc.mssql.client.Client
    public ConnectionContext getContext() {
        return this.context;
    }

    @Override // io.r2dbc.mssql.client.Client
    public Optional<Collation> getDatabaseCollation() {
        return this.databaseCollation;
    }

    @Override // io.r2dbc.mssql.client.Client
    public Optional<String> getDatabaseVersion() {
        return this.databaseVersion;
    }

    @Override // io.r2dbc.mssql.client.Client
    public Optional<Redirect> getRedirect() {
        return this.redirect;
    }

    @Override // io.r2dbc.mssql.client.Client
    public TransactionDescriptor getTransactionDescriptor() {
        return this.transactionDescriptor;
    }

    @Override // io.r2dbc.mssql.client.Client
    public TransactionStatus getTransactionStatus() {
        return this.transactionStatus;
    }

    @Override // io.r2dbc.mssql.client.Client
    public boolean isColumnEncryptionSupported() {
        return this.encryptionSupported;
    }

    @Override // io.r2dbc.mssql.client.Client
    public boolean isConnected() {
        if (this.isClosed.get()) {
            return false;
        }
        return this.connection.channel().isOpen();
    }

    @Override // io.r2dbc.mssql.client.Client
    public Flux<Message> exchange(Publisher<? extends ClientMessage> publisher, Predicate<Message> predicate) {
        Assert.requireNonNull(predicate, "takeUntil must not be null");
        Assert.requireNonNull(publisher, "Requests must not be null");
        if (DEBUG_ENABLED) {
            logger.debug(this.context.getMessage("exchange()"));
        }
        ExchangeRequest exchangeRequest = new ExchangeRequest();
        return Mono.create(monoSink -> {
            if (DEBUG_ENABLED) {
                logger.debug(this.context.getMessage("exchange(subscribed)"));
            }
            if (!isConnected()) {
                monoSink.error((Throwable) CLOSED.get());
            }
            try {
                exchangeRequest.submit(this.requestQueue, monoSink, this.responseProcessor.asFlux().doOnSubscribe(subscription -> {
                    this.outstandingRequests.incrementAndGet();
                    Flux.from(publisher).subscribe(clientMessage -> {
                        if (isConnected()) {
                            this.requestSink.emitNext(clientMessage, Sinks.EmitFailureHandler.FAIL_FAST);
                        } else {
                            monoSink.error((Throwable) CLOSED.get());
                        }
                    }, th -> {
                        this.requestSink.emitError(th, Sinks.EmitFailureHandler.FAIL_FAST);
                    }, () -> {
                        if (isConnected()) {
                            return;
                        }
                        monoSink.error((Throwable) CLOSED.get());
                    });
                }));
            } catch (Exception e) {
                monoSink.error(e);
            }
        }).flatMapMany(Function.identity()).handle((message, synchronousSink) -> {
            synchronousSink.next(message);
            if (predicate.test(message)) {
                exchangeRequest.complete();
                synchronousSink.complete();
            }
        }).doAfterTerminate(this.requestQueue).doFinally(signalType -> {
            this.outstandingRequests.decrementAndGet();
        }).doOnCancel(() -> {
            if (exchangeRequest.isComplete()) {
                return;
            }
            logger.error("Exchange cancelled while exchange is active. This is likely a bug leading to unpredictable outcome.");
        });
    }

    private Mono<Void> send(Publisher<? extends ClientMessage> publisher) {
        return Flux.from(publisher).concatMap(clientMessage -> {
            NettyOutbound sendObject = this.connection.outbound().sendObject(encodeForSend(clientMessage));
            return (!(clientMessage instanceof Attention) || this.outstandingRequests.longValue() == 0) ? sendObject : Mono.from(sendObject).doOnSuccess(r4 -> {
                this.attentionPropagation.incrementAndGet();
            });
        }).then();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleClose() {
        if (!this.isClosed.compareAndSet(false, true)) {
            drainError(EXPECTED);
        } else {
            logger.warn(this.context.getMessage("Connection has been closed by peer"));
            drainError(UNEXPECTED);
        }
    }

    private void handleConnectionError(Throwable th) {
        drainError(() -> {
            return new MssqlConnectionException(th);
        });
    }

    private void drainError(Supplier<? extends Throwable> supplier) {
        while (true) {
            Sinkable poll = this.requestQueue.poll();
            if (poll == null) {
                this.responseProcessor.emitError(supplier.get(), Sinks.EmitFailureHandler.FAIL_FAST);
                return;
            }
            poll.onError(supplier.get());
        }
    }
}
