package com.couchbase.client.dcp.conductor;

import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.logging.RedactableArgument;
import com.couchbase.client.core.state.AbstractStateMachine;
import com.couchbase.client.core.state.LifecycleState;
import com.couchbase.client.core.state.NotConnectedException;
import com.couchbase.client.core.time.Delay;
import com.couchbase.client.dcp.config.ClientEnvironment;
import com.couchbase.client.dcp.error.RollbackException;
import com.couchbase.client.dcp.message.DcpCloseStreamRequest;
import com.couchbase.client.dcp.message.DcpFailoverLogRequest;
import com.couchbase.client.dcp.message.DcpFailoverLogResponse;
import com.couchbase.client.dcp.message.DcpGetPartitionSeqnosRequest;
import com.couchbase.client.dcp.message.DcpOpenStreamRequest;
import com.couchbase.client.dcp.message.DcpOpenStreamResponse;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.dcp.message.ResponseStatus;
import com.couchbase.client.dcp.message.RollbackMessage;
import com.couchbase.client.dcp.message.VbucketState;
import com.couchbase.client.dcp.metrics.DcpChannelMetrics;
import com.couchbase.client.dcp.metrics.MetricsContext;
import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
import com.couchbase.client.dcp.transport.netty.ChannelUtils;
import com.couchbase.client.dcp.transport.netty.DcpMessageHandler;
import com.couchbase.client.dcp.transport.netty.DcpPipeline;
import com.couchbase.client.dcp.transport.netty.DcpResponse;
import com.couchbase.client.dcp.transport.netty.DcpResponseListener;
import com.couchbase.client.dcp.util.AdaptiveDelay;
import com.couchbase.client.dcp.util.AtomicBooleanArray;
import com.couchbase.client.dcp.util.retry.RetryBuilder;
import com.couchbase.client.deps.io.netty.bootstrap.Bootstrap;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.buffer.PooledByteBufAllocator;
import com.couchbase.client.deps.io.netty.buffer.Unpooled;
import com.couchbase.client.deps.io.netty.buffer.UnpooledByteBufAllocator;
import com.couchbase.client.deps.io.netty.channel.Channel;
import com.couchbase.client.deps.io.netty.channel.ChannelFuture;
import com.couchbase.client.deps.io.netty.channel.ChannelFutureListener;
import com.couchbase.client.deps.io.netty.channel.ChannelOption;
import com.couchbase.client.deps.io.netty.util.ReferenceCountUtil;
import com.couchbase.client.deps.io.netty.util.concurrent.Future;
import com.couchbase.client.deps.io.netty.util.concurrent.GenericFutureListener;
import com.couchbase.client.deps.io.netty.util.concurrent.ImmediateEventExecutor;
import io.micrometer.core.instrument.Tags;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import rx.Completable;
import rx.CompletableSubscriber;
import rx.Single;
import rx.SingleSubscriber;
import rx.Subscription;
import rx.functions.Action4;

/* loaded from: input_file:com/couchbase/client/dcp/conductor/DcpChannel.class */
public class DcpChannel extends AbstractStateMachine<LifecycleState> {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance((Class<?>) DcpChannel.class);
    private static final ChannelFlowController dummyFlowController = new ChannelFlowController() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.1
        @Override // com.couchbase.client.dcp.transport.netty.ChannelFlowController
        public void ack(ByteBuf byteBuf) {
        }

        @Override // com.couchbase.client.dcp.transport.netty.ChannelFlowController
        public void ack(int i) {
        }
    };
    private final DcpChannelControlHandler controlHandler;
    private volatile boolean isShutdown;
    private volatile Channel channel;
    private volatile ChannelFuture connectFuture;
    private final DcpChannelMetrics metrics;
    private final AdaptiveDelay reconnectDelay;
    final ClientEnvironment env;
    final InetSocketAddress inetAddress;
    final AtomicBooleanArray streamIsOpen;
    final Conductor conductor;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.couchbase.client.dcp.conductor.DcpChannel$2, reason: invalid class name */
    /* loaded from: input_file:com/couchbase/client/dcp/conductor/DcpChannel$2.class */
    public class AnonymousClass2 implements Completable.OnSubscribe {
        AnonymousClass2() {
        }

        @Override // rx.functions.Action1
        public void call(final CompletableSubscriber completableSubscriber) {
            if (DcpChannel.this.isShutdown || DcpChannel.this.state() != LifecycleState.DISCONNECTED) {
                completableSubscriber.onCompleted();
                return;
            }
            Bootstrap group = new Bootstrap().option(ChannelOption.ALLOCATOR, DcpChannel.this.env.poolBuffers() ? PooledByteBufAllocator.DEFAULT : UnpooledByteBufAllocator.DEFAULT).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf((int) DcpChannel.this.env.socketConnectTimeout())).remoteAddress(DcpChannel.this.inetAddress).channel(ChannelUtils.channelForEventLoopGroup(DcpChannel.this.env.eventLoopGroup())).handler(new DcpPipeline(DcpChannel.this.env, DcpChannel.this.controlHandler, DcpChannel.this.conductor.configProvider(), DcpChannel.this.metrics)).group(DcpChannel.this.env.eventLoopGroup());
            DcpChannel.this.transitionState(LifecycleState.CONNECTING);
            DcpChannel.this.connectFuture = (ChannelFuture) DcpChannel.this.metrics.trackConnect(group.connect());
            DcpChannel.this.connectFuture.addListener2((GenericFutureListener<? extends Future<? super Void>>) new GenericFutureListener<ChannelFuture>() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.2.1
                @Override // com.couchbase.client.deps.io.netty.util.concurrent.GenericFutureListener
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if (!channelFuture.isSuccess()) {
                        DcpChannel.LOGGER.info("Connect attempt to {} failed.", RedactableArgument.system(DcpChannel.this.inetAddress), channelFuture.cause());
                        DcpChannel.this.transitionState(LifecycleState.DISCONNECTED);
                        completableSubscriber.onError(channelFuture.cause());
                        return;
                    }
                    DcpChannel.this.channel = channelFuture.channel();
                    DcpChannel.this.metrics.trackDisconnect(DcpChannel.this.channel.closeFuture());
                    if (DcpChannel.this.isShutdown) {
                        DcpChannel.LOGGER.info("Connected Node {}, but got instructed to disconnect in the meantime.", RedactableArgument.system(DcpChannel.this.inetAddress));
                        DcpChannel.this.disconnect().subscribe(new CompletableSubscriber() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.2.1.1
                            @Override // rx.CompletableSubscriber
                            public void onCompleted() {
                                completableSubscriber.onCompleted();
                            }

                            @Override // rx.CompletableSubscriber
                            public void onError(Throwable th) {
                                DcpChannel.LOGGER.warn("Got error during disconnect.", th);
                            }

                            @Override // rx.CompletableSubscriber
                            public void onSubscribe(Subscription subscription) {
                            }
                        });
                    } else {
                        DcpChannel.this.transitionState(LifecycleState.CONNECTED);
                        DcpChannel.LOGGER.info("Connected to Node {}", RedactableArgument.system(DcpChannel.this.inetAddress));
                        DcpChannel.this.channel.closeFuture().addListener2((GenericFutureListener<? extends Future<? super Void>>) new GenericFutureListener<ChannelFuture>() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.2.1.2
                            @Override // com.couchbase.client.deps.io.netty.util.concurrent.GenericFutureListener
                            public void operationComplete(ChannelFuture channelFuture2) throws Exception {
                                DcpChannel.LOGGER.debug("Got notified of channel close on Node {}", DcpChannel.this.inetAddress);
                                if (DcpChannel.this.env.persistencePollingEnabled()) {
                                    short s = 0;
                                    while (true) {
                                        short s2 = s;
                                        if (s2 >= DcpChannel.this.streamIsOpen.length()) {
                                            break;
                                        }
                                        if (DcpChannel.this.streamIsOpen.get(s2)) {
                                            DcpChannel.this.env.streamEventBuffer().clear(s2);
                                        }
                                        s = (short) (s2 + 1);
                                    }
                                }
                                DcpChannel.this.transitionState(LifecycleState.DISCONNECTED);
                                if (!DcpChannel.this.isShutdown) {
                                    DcpChannel.this.dispatchReconnect();
                                }
                                DcpChannel.this.channel = null;
                            }
                        });
                        completableSubscriber.onCompleted();
                    }
                }
            });
        }
    }

    public DcpChannel(InetSocketAddress inetSocketAddress, ClientEnvironment clientEnvironment, Conductor conductor) {
        super(LifecycleState.DISCONNECTED);
        this.reconnectDelay = new AdaptiveDelay(Delay.exponential(TimeUnit.MILLISECONDS, 4096L, 32L), Duration.ofSeconds(10L));
        this.streamIsOpen = new AtomicBooleanArray(1024);
        this.inetAddress = inetSocketAddress;
        this.env = clientEnvironment;
        this.conductor = conductor;
        this.controlHandler = new DcpChannelControlHandler(this);
        this.isShutdown = false;
        this.metrics = new DcpChannelMetrics(new MetricsContext("dcp", Tags.of("remote", inetSocketAddress.toString())));
    }

    public Future<DcpResponse> sendRequest(ByteBuf byteBuf) {
        if (this.channel != null) {
            return ((DcpMessageHandler) this.channel.pipeline().get(DcpMessageHandler.class)).sendRequest(byteBuf);
        }
        ReferenceCountUtil.safeRelease(byteBuf);
        return ImmediateEventExecutor.INSTANCE.newFailedFuture(new NotConnectedException("Failed to issue request; channel is not active."));
    }

    public Completable connect() {
        return Completable.create(new AnonymousClass2());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void dispatchReconnect() {
        if (this.isShutdown) {
            LOGGER.debug("Ignoring reconnect on {} because already shutdown.", this.inetAddress);
            return;
        }
        LOGGER.info("Node {} socket closed, initiating reconnect.", RedactableArgument.system(this.inetAddress));
        long millis = this.reconnectDelay.calculate().toMillis();
        if (millis > 0) {
            LOGGER.info("Delaying reconnection attempt by {}ms", Long.valueOf(millis));
        }
        Completable.timer(millis, TimeUnit.MILLISECONDS).andThen(connect().retryWhen(RetryBuilder.any().max(Integer.MAX_VALUE).delay(Delay.exponential(TimeUnit.MILLISECONDS, 4096L, 32L)).doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.4
            @Override // rx.functions.Action4
            public void call(Integer num, Throwable th, Long l, TimeUnit timeUnit) {
                DcpChannel.LOGGER.debug("Rescheduling Node reconnect for DCP channel {}", DcpChannel.this.inetAddress);
            }
        }).build())).subscribe(new CompletableSubscriber() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.3
            @Override // rx.CompletableSubscriber
            public void onCompleted() {
                DcpChannel.LOGGER.debug("Completed Node connect for DCP channel {}", DcpChannel.this.inetAddress);
                short s = 0;
                while (true) {
                    short s2 = s;
                    if (s2 >= DcpChannel.this.streamIsOpen.length()) {
                        return;
                    }
                    if (DcpChannel.this.streamIsOpen.get(s2)) {
                        DcpChannel.this.conductor.maybeMovePartition(s2);
                    }
                    s = (short) (s2 + 1);
                }
            }

            @Override // rx.CompletableSubscriber
            public void onError(Throwable th) {
                DcpChannel.LOGGER.warn("Got error during connect (maybe retried) for node {}", RedactableArgument.system(DcpChannel.this.inetAddress), th);
            }

            @Override // rx.CompletableSubscriber
            public void onSubscribe(Subscription subscription) {
            }
        });
    }

    public boolean isShutdown() {
        return this.isShutdown;
    }

    public Completable disconnect() {
        return Completable.create(new Completable.OnSubscribe() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.5
            @Override // rx.functions.Action1
            public void call(final CompletableSubscriber completableSubscriber) {
                DcpChannel.this.isShutdown = true;
                if (DcpChannel.this.channel != null) {
                    DcpChannel.this.transitionState(LifecycleState.DISCONNECTING);
                    ((ChannelFuture) DcpChannel.this.metrics.trackDisconnect(DcpChannel.this.channel.close())).addListener2((GenericFutureListener<? extends Future<? super Void>>) new GenericFutureListener<ChannelFuture>() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.5.1
                        @Override // com.couchbase.client.deps.io.netty.util.concurrent.GenericFutureListener
                        public void operationComplete(ChannelFuture channelFuture) throws Exception {
                            DcpChannel.this.transitionState(LifecycleState.DISCONNECTED);
                            DcpChannel.LOGGER.info("Disconnected from Node {}", RedactableArgument.system(DcpChannel.this.address()));
                            if (channelFuture.isSuccess()) {
                                completableSubscriber.onCompleted();
                            } else {
                                DcpChannel.LOGGER.debug("Error during channel close.", channelFuture.cause());
                                completableSubscriber.onError(channelFuture.cause());
                            }
                        }
                    });
                } else if (DcpChannel.this.connectFuture != null) {
                    DcpChannel.this.connectFuture.addListener2((GenericFutureListener<? extends Future<? super Void>>) new ChannelFutureListener() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.5.2
                        @Override // com.couchbase.client.deps.io.netty.util.concurrent.GenericFutureListener
                        public void operationComplete(ChannelFuture channelFuture) throws Exception {
                            if (channelFuture.isSuccess()) {
                                channelFuture.channel().closeFuture().addListener2((GenericFutureListener<? extends Future<? super Void>>) new ChannelFutureListener() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.5.2.1
                                    @Override // com.couchbase.client.deps.io.netty.util.concurrent.GenericFutureListener
                                    public void operationComplete(ChannelFuture channelFuture2) throws Exception {
                                        if (channelFuture2.isSuccess()) {
                                            completableSubscriber.onCompleted();
                                        } else {
                                            completableSubscriber.onError(channelFuture2.cause());
                                        }
                                    }
                                });
                            } else {
                                completableSubscriber.onCompleted();
                            }
                        }
                    });
                } else {
                    completableSubscriber.onCompleted();
                }
            }
        });
    }

    public InetSocketAddress address() {
        return this.inetAddress;
    }

    public Completable openStream(final short s, final long j, final long j2, final long j3, final long j4, final long j5) {
        return Completable.create(new Completable.OnSubscribe() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.6
            @Override // rx.functions.Action1
            public void call(final CompletableSubscriber completableSubscriber) {
                long j6;
                long j7;
                if (DcpChannel.this.state() != LifecycleState.CONNECTED) {
                    completableSubscriber.onError(new NotConnectedException());
                    return;
                }
                if (j4 == j2 + 1) {
                    DcpChannel.LOGGER.debug("Disregarding snapshot marker from the future.");
                    j7 = j2;
                    j6 = j2;
                } else {
                    j6 = j5;
                    j7 = j4;
                }
                DcpChannel.LOGGER.debug("Opening Stream against {} with vbid: {}, vbuuid: {}, startSeqno: {}, endSeqno: {},  snapshotStartSeqno: {}, snapshotEndSeqno: {}", DcpChannel.this.inetAddress, Short.valueOf(s), Long.valueOf(j), Long.valueOf(j2), Long.valueOf(j3), Long.valueOf(j7), Long.valueOf(j6));
                ByteBuf buffer = Unpooled.buffer();
                DcpOpenStreamRequest.init(buffer, s);
                DcpOpenStreamRequest.vbuuid(buffer, j);
                DcpOpenStreamRequest.startSeqno(buffer, j2);
                DcpOpenStreamRequest.endSeqno(buffer, j3);
                DcpOpenStreamRequest.snapshotStartSeqno(buffer, j7);
                DcpOpenStreamRequest.snapshotEndSeqno(buffer, j6);
                DcpChannel.this.sendRequest(buffer).addListener2(new DcpResponseListener() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.6.1
                    @Override // com.couchbase.client.deps.io.netty.util.concurrent.GenericFutureListener
                    public void operationComplete(Future<DcpResponse> future) throws Exception {
                        if (!future.isSuccess()) {
                            DcpChannel.LOGGER.debug("Failed open Stream against {} with vbid: {}", DcpChannel.this.inetAddress, Short.valueOf(s));
                            DcpChannel.this.streamIsOpen.set(s, false);
                            completableSubscriber.onError(future.cause());
                            return;
                        }
                        DcpResponse now = future.getNow();
                        ByteBuf buffer2 = now.buffer();
                        try {
                            ResponseStatus status = now.status();
                            if (status == ResponseStatus.KEY_EXISTS) {
                                DcpChannel.LOGGER.debug("Stream already open against {} with vbid: {}", DcpChannel.this.inetAddress, Short.valueOf(s));
                                completableSubscriber.onCompleted();
                                buffer2.release();
                                return;
                            }
                            if (!status.isSuccess()) {
                                DcpChannel.LOGGER.debug("Failed open Stream against {} with vbid: {}", DcpChannel.this.inetAddress, Short.valueOf(s));
                                DcpChannel.this.streamIsOpen.set(s, false);
                            }
                            if (status.isSuccess()) {
                                DcpChannel.LOGGER.debug("Opened Stream against {} with vbid: {}", DcpChannel.this.inetAddress, Short.valueOf(s));
                                DcpChannel.this.streamIsOpen.set(s, true);
                                completableSubscriber.onCompleted();
                                ByteBuf buffer3 = Unpooled.buffer();
                                DcpFailoverLogResponse.init(buffer3);
                                DcpFailoverLogResponse.vbucket(buffer3, DcpOpenStreamResponse.vbucket(buffer2));
                                ByteBuf writeShort = MessageUtil.getContent(buffer2).copy().writeShort(s);
                                MessageUtil.setContent(writeShort, buffer3);
                                writeShort.release();
                                DcpChannel.this.env.controlEventHandler().onEvent(DcpChannel.dummyFlowController, buffer3);
                            } else if (status == ResponseStatus.ROLLBACK_REQUIRED) {
                                completableSubscriber.onError(new RollbackException());
                                ByteBuf buffer4 = Unpooled.buffer();
                                RollbackMessage.init(buffer4, s, DcpOpenStreamResponse.rollbackSeqno(buffer2));
                                DcpChannel.this.env.controlEventHandler().onEvent(DcpChannel.dummyFlowController, buffer4);
                            } else if (status == ResponseStatus.NOT_MY_VBUCKET) {
                                completableSubscriber.onError(new NotMyVbucketException());
                            } else {
                                completableSubscriber.onError(new IllegalStateException("Unhandled Status: " + status));
                            }
                        } finally {
                            buffer2.release();
                        }
                    }
                });
            }
        });
    }

    public Completable closeStream(final short s) {
        return Completable.create(new Completable.OnSubscribe() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.7
            @Override // rx.functions.Action1
            public void call(final CompletableSubscriber completableSubscriber) {
                if (DcpChannel.this.state() != LifecycleState.CONNECTED) {
                    completableSubscriber.onError(new NotConnectedException());
                    return;
                }
                DcpChannel.LOGGER.debug("Closing Stream against {} with vbid: {}", DcpChannel.this.inetAddress, Short.valueOf(s));
                ByteBuf buffer = Unpooled.buffer();
                DcpCloseStreamRequest.init(buffer);
                DcpCloseStreamRequest.vbucket(buffer, s);
                DcpChannel.this.sendRequest(buffer).addListener2(new DcpResponseListener() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.7.1
                    @Override // com.couchbase.client.deps.io.netty.util.concurrent.GenericFutureListener
                    public void operationComplete(Future<DcpResponse> future) throws Exception {
                        DcpChannel.this.streamIsOpen.set(s, false);
                        if (!future.isSuccess()) {
                            DcpChannel.LOGGER.debug("Failed close Stream against {} with vbid: {}", DcpChannel.this.inetAddress, Short.valueOf(s));
                            completableSubscriber.onError(future.cause());
                        } else {
                            future.getNow().buffer().release();
                            DcpChannel.LOGGER.debug("Closed Stream against {} with vbid: {}", DcpChannel.this.inetAddress, Short.valueOf(s));
                            completableSubscriber.onCompleted();
                        }
                    }
                });
            }
        });
    }

    public Single<ByteBuf> getSeqnos() {
        return Single.create(new Single.OnSubscribe<ByteBuf>() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.8
            @Override // rx.functions.Action1
            public void call(final SingleSubscriber<? super ByteBuf> singleSubscriber) {
                if (DcpChannel.this.state() != LifecycleState.CONNECTED) {
                    singleSubscriber.onError(new NotConnectedException());
                    return;
                }
                ByteBuf buffer = Unpooled.buffer();
                DcpGetPartitionSeqnosRequest.init(buffer);
                DcpGetPartitionSeqnosRequest.vbucketState(buffer, VbucketState.ACTIVE);
                DcpChannel.this.sendRequest(buffer).addListener2(new DcpResponseListener() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.8.1
                    @Override // com.couchbase.client.deps.io.netty.util.concurrent.GenericFutureListener
                    public void operationComplete(Future<DcpResponse> future) throws Exception {
                        if (!future.isSuccess()) {
                            singleSubscriber.onError(future.cause());
                            return;
                        }
                        ByteBuf buffer2 = future.getNow().buffer();
                        try {
                            singleSubscriber.onSuccess(MessageUtil.getContent(buffer2).copy());
                        } finally {
                            buffer2.release();
                        }
                    }
                });
            }
        });
    }

    public Single<ByteBuf> getFailoverLog(final short s) {
        return Single.create(new Single.OnSubscribe<ByteBuf>() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.9
            @Override // rx.functions.Action1
            public void call(final SingleSubscriber<? super ByteBuf> singleSubscriber) {
                if (DcpChannel.this.state() != LifecycleState.CONNECTED) {
                    singleSubscriber.onError(new NotConnectedException());
                    return;
                }
                ByteBuf buffer = Unpooled.buffer();
                DcpFailoverLogRequest.init(buffer);
                DcpFailoverLogRequest.vbucket(buffer, s);
                DcpChannel.LOGGER.debug("Asked for failover log on {} for vbid: {}", DcpChannel.this.inetAddress, Short.valueOf(s));
                DcpChannel.this.sendRequest(buffer).addListener2(new DcpResponseListener() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.9.1
                    @Override // com.couchbase.client.deps.io.netty.util.concurrent.GenericFutureListener
                    public void operationComplete(Future<DcpResponse> future) throws Exception {
                        if (!future.isSuccess()) {
                            DcpChannel.LOGGER.debug("Failed to ask for failover log on {} for vbid: {}", DcpChannel.this.inetAddress, Short.valueOf(s));
                            singleSubscriber.onError(future.cause());
                            return;
                        }
                        ByteBuf buffer2 = future.getNow().buffer();
                        try {
                            ByteBuf buffer3 = Unpooled.buffer();
                            DcpFailoverLogResponse.init(buffer3);
                            DcpFailoverLogResponse.vbucket(buffer3, DcpFailoverLogResponse.vbucket(buffer2));
                            ByteBuf writeShort = MessageUtil.getContent(buffer2).copy().writeShort(s);
                            MessageUtil.setContent(writeShort, buffer3);
                            writeShort.release();
                            DcpChannel.LOGGER.debug("Failover log for vbid {} is {}", Short.valueOf(s), DcpFailoverLogResponse.toString(buffer3));
                            singleSubscriber.onSuccess(buffer3);
                            buffer2.release();
                        } catch (Throwable th) {
                            buffer2.release();
                            throw th;
                        }
                    }
                });
            }
        });
    }

    public boolean streamIsOpen(short s) {
        return this.streamIsOpen.get(s);
    }

    public boolean equals(Object obj) {
        if (obj instanceof InetAddress) {
            return this.inetAddress.equals(obj);
        }
        if (obj instanceof DcpChannel) {
            return this.inetAddress.equals(((DcpChannel) obj).inetAddress);
        }
        return false;
    }

    public int hashCode() {
        return this.inetAddress.hashCode();
    }

    public String toString() {
        return "DcpChannel{inetAddress=" + this.inetAddress + '}';
    }
}
