package com.couchbase.client.dcp.conductor;

import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.config.NodeInfo;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.logging.RedactableArgument;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.state.LifecycleState;
import com.couchbase.client.core.state.NotConnectedException;
import com.couchbase.client.core.time.Delay;
import com.couchbase.client.dcp.buffer.BucketConfigHelper;
import com.couchbase.client.dcp.config.ClientEnvironment;
import com.couchbase.client.dcp.events.FailedToAddNodeEvent;
import com.couchbase.client.dcp.events.FailedToMovePartitionEvent;
import com.couchbase.client.dcp.events.FailedToRemoveNodeEvent;
import com.couchbase.client.dcp.state.PartitionState;
import com.couchbase.client.dcp.state.SessionState;
import com.couchbase.client.dcp.util.retry.RetryBuilder;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.util.internal.ConcurrentSet;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import rx.Completable;
import rx.CompletableSubscriber;
import rx.Observable;
import rx.Single;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Action4;
import rx.functions.Func1;

/* loaded from: input_file:com/couchbase/client/dcp/conductor/Conductor.class */
public class Conductor {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance((Class<?>) Conductor.class);
    private final ConfigProvider configProvider;
    private final ClientEnvironment env;
    private final boolean ownsConfigProvider;
    private final Set<DcpChannel> channels = new ConcurrentSet();
    private volatile boolean stopped = true;
    private final AtomicReference<CouchbaseBucketConfig> currentConfig = new AtomicReference<>();
    private final SessionState sessionState = new SessionState();

    public Conductor(ClientEnvironment clientEnvironment, ConfigProvider configProvider) {
        this.env = clientEnvironment;
        this.configProvider = configProvider == null ? new HttpStreamingConfigProvider(clientEnvironment) : configProvider;
        this.ownsConfigProvider = configProvider == null;
        this.configProvider.configs().forEach(new Action1<CouchbaseBucketConfig>() { // from class: com.couchbase.client.dcp.conductor.Conductor.1
            @Override // rx.functions.Action1
            public void call(CouchbaseBucketConfig couchbaseBucketConfig) {
                Conductor.LOGGER.trace("Applying new configuration, new rev is {}.", Long.valueOf(couchbaseBucketConfig.rev()));
                Conductor.this.currentConfig.set(couchbaseBucketConfig);
                Conductor.this.reconfigure(couchbaseBucketConfig);
            }
        });
    }

    public SessionState sessionState() {
        return this.sessionState;
    }

    public Completable connect() {
        this.stopped = false;
        return this.configProvider.start().timeout(this.env.connectTimeout(), TimeUnit.SECONDS).doOnError(new Action1<Throwable>() { // from class: com.couchbase.client.dcp.conductor.Conductor.3
            @Override // rx.functions.Action1
            public void call(Throwable th) {
                Conductor.LOGGER.warn("Cannot connect configuration provider.");
            }
        }).concatWith(this.configProvider.configs().first().toCompletable().timeout(this.env.bootstrapTimeout(), TimeUnit.SECONDS).doOnError(new Action1<Throwable>() { // from class: com.couchbase.client.dcp.conductor.Conductor.2
            @Override // rx.functions.Action1
            public void call(Throwable th) {
                Conductor.LOGGER.warn("Did not receive initial configuration from provider.");
            }
        }));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConfigProvider configProvider() {
        return this.configProvider;
    }

    public boolean disconnected() {
        if (!this.configProvider.isState(LifecycleState.DISCONNECTED)) {
            return false;
        }
        Iterator<DcpChannel> it = this.channels.iterator();
        while (it.hasNext()) {
            if (!it.next().isState(LifecycleState.DISCONNECTED)) {
                return false;
            }
        }
        return true;
    }

    public Completable stop() {
        LOGGER.debug("Instructed to shutdown.");
        this.stopped = true;
        Completable completable = Observable.from(this.channels).flatMapCompletable(new Func1<DcpChannel, Completable>() { // from class: com.couchbase.client.dcp.conductor.Conductor.4
            @Override // rx.functions.Func1
            public Completable call(DcpChannel dcpChannel) {
                return dcpChannel.disconnect();
            }
        }).toCompletable();
        if (this.ownsConfigProvider) {
            completable = completable.andThen(this.configProvider.stop());
        }
        return completable.doOnCompleted(new Action0() { // from class: com.couchbase.client.dcp.conductor.Conductor.5
            @Override // rx.functions.Action0
            public void call() {
                Conductor.LOGGER.info("Shutdown complete.");
            }
        });
    }

    public int numberOfPartitions() {
        return this.currentConfig.get().numberOfPartitions();
    }

    public Observable<ByteBuf> getSeqnos() {
        return Observable.from(this.channels).flatMap(new Func1<DcpChannel, Observable<ByteBuf>>() { // from class: com.couchbase.client.dcp.conductor.Conductor.6
            @Override // rx.functions.Func1
            public Observable<ByteBuf> call(DcpChannel dcpChannel) {
                return Conductor.this.getSeqnosForChannel(dcpChannel);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Observable<ByteBuf> getSeqnosForChannel(final DcpChannel dcpChannel) {
        return Observable.just(dcpChannel).flatMapSingle(new Func1<DcpChannel, Single<ByteBuf>>() { // from class: com.couchbase.client.dcp.conductor.Conductor.8
            @Override // rx.functions.Func1
            public Single<ByteBuf> call(DcpChannel dcpChannel2) {
                return dcpChannel2.getSeqnos();
            }
        }).retryWhen(RetryBuilder.anyOf(NotConnectedException.class).max(Integer.MAX_VALUE).delay(Delay.fixed(200L, TimeUnit.MILLISECONDS)).doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>() { // from class: com.couchbase.client.dcp.conductor.Conductor.7
            @Override // rx.functions.Action4
            public void call(Integer num, Throwable th, Long l, TimeUnit timeUnit) {
                Conductor.LOGGER.debug("Rescheduling get Seqnos for channel {}, not connected (yet).", dcpChannel);
            }
        }).build());
    }

    public Single<ByteBuf> getFailoverLog(final short s) {
        return Observable.just(Short.valueOf(s)).map(new Func1<Short, DcpChannel>() { // from class: com.couchbase.client.dcp.conductor.Conductor.11
            @Override // rx.functions.Func1
            public DcpChannel call(Short sh) {
                return Conductor.this.masterChannelByPartition(s);
            }
        }).flatMapSingle(new Func1<DcpChannel, Single<ByteBuf>>() { // from class: com.couchbase.client.dcp.conductor.Conductor.10
            @Override // rx.functions.Func1
            public Single<ByteBuf> call(DcpChannel dcpChannel) {
                return dcpChannel.getFailoverLog(s);
            }
        }).retryWhen(RetryBuilder.anyOf(NotConnectedException.class).max(Integer.MAX_VALUE).delay(Delay.fixed(200L, TimeUnit.MILLISECONDS)).doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>() { // from class: com.couchbase.client.dcp.conductor.Conductor.9
            @Override // rx.functions.Action4
            public void call(Integer num, Throwable th, Long l, TimeUnit timeUnit) {
                Conductor.LOGGER.debug("Rescheduling Get Failover Log for vbid {}, not connected (yet).", Short.valueOf(s));
            }
        }).build()).toSingle();
    }

    public Completable startStreamForPartition(final short s, final long j, final long j2, final long j3, final long j4, final long j5) {
        return Observable.just(Short.valueOf(s)).map(new Func1<Short, DcpChannel>() { // from class: com.couchbase.client.dcp.conductor.Conductor.14
            @Override // rx.functions.Func1
            public DcpChannel call(Short sh) {
                return Conductor.this.masterChannelByPartition(s);
            }
        }).flatMapCompletable(new Func1<DcpChannel, Completable>() { // from class: com.couchbase.client.dcp.conductor.Conductor.13
            @Override // rx.functions.Func1
            public Completable call(DcpChannel dcpChannel) {
                return dcpChannel.openStream(s, j, j2, j3, j4, j5);
            }
        }).retryWhen(RetryBuilder.anyOf(NotConnectedException.class).max(Integer.MAX_VALUE).delay(Delay.fixed(200L, TimeUnit.MILLISECONDS)).doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>() { // from class: com.couchbase.client.dcp.conductor.Conductor.12
            @Override // rx.functions.Action4
            public void call(Integer num, Throwable th, Long l, TimeUnit timeUnit) {
                Conductor.LOGGER.debug("Rescheduling Stream Start for vbid {}, not connected (yet).", Short.valueOf(s));
            }
        }).build()).toCompletable();
    }

    public Completable stopStreamForPartition(short s) {
        return streamIsOpen(s) ? masterChannelByPartition(s).closeStream(s) : Completable.complete();
    }

    public boolean streamIsOpen(short s) {
        return masterChannelByPartition(s).streamIsOpen(s);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public DcpChannel masterChannelByPartition(short s) {
        CouchbaseBucketConfig couchbaseBucketConfig = this.currentConfig.get();
        NodeInfo nodeAtIndex = couchbaseBucketConfig.nodeAtIndex(couchbaseBucketConfig.nodeIndexForMaster(s, false));
        InetSocketAddress inetSocketAddress = new InetSocketAddress(nodeAtIndex.hostname().nameOrAddress(), (this.env.sslEnabled() ? nodeAtIndex.sslServices() : nodeAtIndex.services()).get(ServiceType.BINARY).intValue());
        for (DcpChannel dcpChannel : this.channels) {
            if (dcpChannel.address().equals(inetSocketAddress)) {
                return dcpChannel;
            }
        }
        throw new IllegalStateException("No DcpChannel found for partition " + ((int) s));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reconfigure(CouchbaseBucketConfig couchbaseBucketConfig) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        BucketConfigHelper bucketConfigHelper = new BucketConfigHelper(couchbaseBucketConfig, this.env.sslEnabled());
        boolean z = !this.env.persistencePollingEnabled();
        for (NodeInfo nodeInfo : bucketConfigHelper.getDataNodes()) {
            if (!z || couchbaseBucketConfig.hasPrimaryPartitionsOnNode(nodeInfo.hostname())) {
                InetSocketAddress address = bucketConfigHelper.getAddress(nodeInfo);
                boolean z2 = false;
                Iterator<DcpChannel> it = this.channels.iterator();
                while (true) {
                    if (it.hasNext()) {
                        if (it.next().address().equals(address)) {
                            z2 = true;
                            break;
                        }
                    } else {
                        break;
                    }
                }
                if (!z2) {
                    arrayList.add(address);
                    LOGGER.debug("Planning to add {}", address);
                }
            }
        }
        for (DcpChannel dcpChannel : this.channels) {
            boolean z3 = false;
            Iterator<NodeInfo> it2 = couchbaseBucketConfig.nodes().iterator();
            while (true) {
                if (it2.hasNext()) {
                    if (bucketConfigHelper.getAddress(it2.next()).equals(dcpChannel.address())) {
                        z3 = true;
                        break;
                    }
                } else {
                    break;
                }
            }
            if (!z3) {
                LOGGER.debug("Planning to remove {}", dcpChannel);
                arrayList2.add(dcpChannel);
            }
        }
        Iterator it3 = arrayList.iterator();
        while (it3.hasNext()) {
            add((InetSocketAddress) it3.next());
        }
        Iterator it4 = arrayList2.iterator();
        while (it4.hasNext()) {
            remove((DcpChannel) it4.next());
        }
    }

    private void add(final InetSocketAddress inetSocketAddress) {
        if (this.channels.contains(inetSocketAddress)) {
            return;
        }
        LOGGER.debug("Adding DCP Channel against {}", inetSocketAddress);
        DcpChannel dcpChannel = new DcpChannel(inetSocketAddress, this.env, this);
        this.channels.add(dcpChannel);
        dcpChannel.connect().retryWhen(RetryBuilder.anyMatches(new Func1<Throwable, Boolean>() { // from class: com.couchbase.client.dcp.conductor.Conductor.17
            @Override // rx.functions.Func1
            public Boolean call(Throwable th) {
                return Boolean.valueOf(!Conductor.this.stopped);
            }
        }).max(this.env.dcpChannelsReconnectMaxAttempts()).delay(this.env.dcpChannelsReconnectDelay()).doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>() { // from class: com.couchbase.client.dcp.conductor.Conductor.16
            @Override // rx.functions.Action4
            public void call(Integer num, Throwable th, Long l, TimeUnit timeUnit) {
                Conductor.LOGGER.debug("Rescheduling Node reconnect for DCP channel {}", inetSocketAddress);
            }
        }).build()).subscribe(new CompletableSubscriber() { // from class: com.couchbase.client.dcp.conductor.Conductor.15
            @Override // rx.CompletableSubscriber
            public void onCompleted() {
                Conductor.LOGGER.debug("Completed Node connect for DCP channel {}", inetSocketAddress);
            }

            @Override // rx.CompletableSubscriber
            public void onError(Throwable th) {
                Conductor.LOGGER.warn("Got error during connect (maybe retried) for node {}", RedactableArgument.system(inetSocketAddress), th);
                if (Conductor.this.env.eventBus() != null) {
                    Conductor.this.env.eventBus().publish(new FailedToAddNodeEvent(inetSocketAddress, th));
                }
            }

            @Override // rx.CompletableSubscriber
            public void onSubscribe(Subscription subscription) {
            }
        });
    }

    private void remove(final DcpChannel dcpChannel) {
        if (this.channels.remove(dcpChannel)) {
            LOGGER.debug("Removing DCP Channel against {}", dcpChannel);
            dcpChannel.disconnect().subscribe(new CompletableSubscriber() { // from class: com.couchbase.client.dcp.conductor.Conductor.18
                @Override // rx.CompletableSubscriber
                public void onCompleted() {
                    Conductor.LOGGER.debug("Channel remove notified as complete for {}", dcpChannel.address());
                }

                @Override // rx.CompletableSubscriber
                public void onError(Throwable th) {
                    Conductor.LOGGER.warn("Got error during Node removal for node {}", RedactableArgument.system(dcpChannel.address()), th);
                    if (Conductor.this.env.eventBus() != null) {
                        Conductor.this.env.eventBus().publish(new FailedToRemoveNodeEvent(dcpChannel.address(), th));
                    }
                }

                @Override // rx.CompletableSubscriber
                public void onSubscribe(Subscription subscription) {
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void maybeMovePartition(final short s) {
        Observable.timer(50L, TimeUnit.MILLISECONDS).filter(new Func1<Long, Boolean>() { // from class: com.couchbase.client.dcp.conductor.Conductor.21
            @Override // rx.functions.Func1
            public Boolean call(Long l) {
                PartitionState partitionState = Conductor.this.sessionState.get(s);
                boolean isAtEnd = partitionState.isAtEnd();
                if (isAtEnd) {
                    Conductor.LOGGER.debug("Reached desired high seqno {} for vbucket {}, not reopening stream.", Long.valueOf(partitionState.getEndSeqno()), Short.valueOf(s));
                }
                return Boolean.valueOf(!isAtEnd);
            }
        }).flatMapCompletable(new Func1<Long, Completable>() { // from class: com.couchbase.client.dcp.conductor.Conductor.20
            @Override // rx.functions.Func1
            public Completable call(Long l) {
                PartitionState partitionState = Conductor.this.sessionState.get(s);
                return Conductor.this.startStreamForPartition(s, partitionState.getLastUuid(), partitionState.getStartSeqno(), partitionState.getEndSeqno(), partitionState.getSnapshotStartSeqno(), partitionState.getSnapshotEndSeqno()).retryWhen(RetryBuilder.anyOf(NotMyVbucketException.class).max(Integer.MAX_VALUE).delay(Delay.fixed(200L, TimeUnit.MILLISECONDS)).build());
            }
        }).toCompletable().subscribe(new CompletableSubscriber() { // from class: com.couchbase.client.dcp.conductor.Conductor.19
            @Override // rx.CompletableSubscriber
            public void onCompleted() {
                Conductor.LOGGER.trace("Completed Partition Move for partition {}", Short.valueOf(s));
            }

            @Override // rx.CompletableSubscriber
            public void onError(Throwable th) {
                Conductor.LOGGER.warn("Error during Partition Move for partition " + ((int) s), th);
                if (Conductor.this.env.eventBus() != null) {
                    Conductor.this.env.eventBus().publish(new FailedToMovePartitionEvent(s, th));
                }
            }

            @Override // rx.CompletableSubscriber
            public void onSubscribe(Subscription subscription) {
                Conductor.LOGGER.debug("Subscribing for Partition Move for partition {}", Short.valueOf(s));
            }
        });
    }
}
