package org.apache.iotdb.session.subscription.consumer;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionConnectionException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.class */
public final class SubscriptionProviders {
    private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionProviders.class);
    private final SortedMap<Integer, SubscriptionProvider> subscriptionProviders = new ConcurrentSkipListMap();
    private int nextDataNodeId = -1;
    private final ReentrantReadWriteLock subscriptionProvidersLock = new ReentrantReadWriteLock(true);
    private final Set<TEndPoint> initialEndpoints;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SubscriptionProviders(Set<TEndPoint> set) {
        this.initialEndpoints = set;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void acquireReadLock() {
        this.subscriptionProvidersLock.readLock().lock();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void releaseReadLock() {
        this.subscriptionProvidersLock.readLock().unlock();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void acquireWriteLock() {
        this.subscriptionProvidersLock.writeLock().lock();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void releaseWriteLock() {
        this.subscriptionProvidersLock.writeLock().unlock();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void openProviders(SubscriptionConsumer subscriptionConsumer) throws SubscriptionException {
        closeProviders();
        Iterator<TEndPoint> it = this.initialEndpoints.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            TEndPoint next = it.next();
            try {
                SubscriptionProvider constructProviderAndHandshake = subscriptionConsumer.constructProviderAndHandshake(next);
                int dataNodeId = constructProviderAndHandshake.getDataNodeId();
                addProvider(dataNodeId, constructProviderAndHandshake);
                try {
                    for (Map.Entry<Integer, TEndPoint> entry : constructProviderAndHandshake.getSessionConnection().fetchAllEndPoints().entrySet()) {
                        if (dataNodeId != entry.getKey().intValue()) {
                            try {
                                addProvider(entry.getKey().intValue(), subscriptionConsumer.constructProviderAndHandshake(entry.getValue()));
                            } catch (Exception e) {
                                LOGGER.warn("Failed to create connection with {}, will retry later...", entry.getValue(), e);
                            }
                        }
                    }
                } catch (Exception e2) {
                    LOGGER.warn("Failed to fetch all endpoints from {}, will retry later...", next, e2);
                }
            } catch (Exception e3) {
                LOGGER.warn("Failed to create connection with {}", next, e3);
            }
        }
        if (hasNoAvailableProviders()) {
            throw new SubscriptionConnectionException(String.format("Cluster has no available subscription providers to connect with initial endpoints %s", this.initialEndpoints));
        }
        this.nextDataNodeId = this.subscriptionProviders.firstKey().intValue();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeProviders() {
        Iterator<SubscriptionProvider> it = getAllProviders().iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (Exception e) {
                LOGGER.warn(e.getMessage());
            }
        }
        this.subscriptionProviders.clear();
    }

    void addProvider(int i, SubscriptionProvider subscriptionProvider) {
        LOGGER.info("add new subscription provider {}", subscriptionProvider);
        this.subscriptionProviders.put(Integer.valueOf(i), subscriptionProvider);
    }

    void closeAndRemoveProvider(int i) throws SubscriptionException, IoTDBConnectionException {
        if (containsProvider(i)) {
            SubscriptionProvider subscriptionProvider = this.subscriptionProviders.get(Integer.valueOf(i));
            try {
                subscriptionProvider.close();
                LOGGER.info("close and remove stale subscription provider {}", subscriptionProvider);
                this.subscriptionProviders.remove(Integer.valueOf(i));
            } catch (Throwable th) {
                LOGGER.info("close and remove stale subscription provider {}", subscriptionProvider);
                this.subscriptionProviders.remove(Integer.valueOf(i));
                throw th;
            }
        }
    }

    boolean hasNoProviders() {
        return this.subscriptionProviders.isEmpty();
    }

    List<SubscriptionProvider> getAllProviders() {
        return new ArrayList(this.subscriptionProviders.values());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SubscriptionProvider getProvider(int i) {
        if (containsProvider(i)) {
            return this.subscriptionProviders.get(Integer.valueOf(i));
        }
        return null;
    }

    boolean hasNoAvailableProviders() {
        return this.subscriptionProviders.values().stream().noneMatch((v0) -> {
            return v0.isAvailable();
        });
    }

    boolean containsProvider(int i) {
        return this.subscriptionProviders.containsKey(Integer.valueOf(i));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<SubscriptionProvider> getAllAvailableProviders() {
        return (List) this.subscriptionProviders.values().stream().filter((v0) -> {
            return v0.isAvailable();
        }).collect(Collectors.toList());
    }

    void updateNextDataNodeId() {
        SortedMap<Integer, SubscriptionProvider> tailMap = this.subscriptionProviders.tailMap(Integer.valueOf(this.nextDataNodeId + 1));
        this.nextDataNodeId = (tailMap.isEmpty() ? this.subscriptionProviders.firstKey() : tailMap.firstKey()).intValue();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SubscriptionProvider getNextAvailableProvider() {
        if (hasNoAvailableProviders()) {
            return null;
        }
        SubscriptionProvider provider = getProvider(this.nextDataNodeId);
        while (true) {
            SubscriptionProvider subscriptionProvider = provider;
            if (!Objects.isNull(subscriptionProvider) && subscriptionProvider.isAvailable()) {
                updateNextDataNodeId();
                return subscriptionProvider;
            }
            updateNextDataNodeId();
            provider = getProvider(this.nextDataNodeId);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void heartbeat(SubscriptionConsumer subscriptionConsumer) {
        if (subscriptionConsumer.isClosed()) {
            return;
        }
        acquireWriteLock();
        try {
            heartbeatInternal(subscriptionConsumer);
        } finally {
            releaseWriteLock();
        }
    }

    private void heartbeatInternal(SubscriptionConsumer subscriptionConsumer) {
        for (SubscriptionProvider subscriptionProvider : getAllProviders()) {
            try {
                subscriptionConsumer.subscribedTopics = subscriptionProvider.heartbeat();
                subscriptionProvider.setAvailable();
            } catch (Exception e) {
                LOGGER.warn("something unexpected happened when sending heartbeat to subscription provider {}, set subscription provider unavailable", subscriptionProvider, e);
                subscriptionProvider.setUnavailable();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sync(SubscriptionConsumer subscriptionConsumer) {
        if (subscriptionConsumer.isClosed()) {
            return;
        }
        acquireWriteLock();
        try {
            syncInternal(subscriptionConsumer);
        } finally {
            releaseWriteLock();
        }
    }

    private void syncInternal(SubscriptionConsumer subscriptionConsumer) {
        if (hasNoAvailableProviders()) {
            try {
                openProviders(subscriptionConsumer);
            } catch (Exception e) {
                LOGGER.warn("something unexpected happened when syncing subscription endpoints...", e);
                return;
            }
        }
        try {
            Map<Integer, TEndPoint> fetchAllEndPointsWithRedirection = subscriptionConsumer.fetchAllEndPointsWithRedirection();
            for (Map.Entry<Integer, TEndPoint> entry : fetchAllEndPointsWithRedirection.entrySet()) {
                SubscriptionProvider provider = getProvider(entry.getKey().intValue());
                if (Objects.isNull(provider)) {
                    TEndPoint value = entry.getValue();
                    try {
                        addProvider(entry.getKey().intValue(), subscriptionConsumer.constructProviderAndHandshake(value));
                    } catch (Exception e2) {
                        LOGGER.warn("Failed to create connection with endpoint {}, will retry later...", value, e2);
                    }
                } else {
                    try {
                        subscriptionConsumer.subscribedTopics = provider.heartbeat();
                        provider.setAvailable();
                    } catch (Exception e3) {
                        LOGGER.warn("something unexpected happened when sending heartbeat to subscription provider {}, set subscription provider unavailable", provider, e3);
                        provider.setUnavailable();
                    }
                    if (!provider.isAvailable()) {
                        try {
                            closeAndRemoveProvider(entry.getKey().intValue());
                        } catch (Exception e4) {
                            LOGGER.warn("Exception occurred when closing and removing subscription provider with data node id {}", entry.getKey(), e4);
                        }
                    }
                }
            }
            Iterator<SubscriptionProvider> it = getAllProviders().iterator();
            while (it.hasNext()) {
                int dataNodeId = it.next().getDataNodeId();
                if (!fetchAllEndPointsWithRedirection.containsKey(Integer.valueOf(dataNodeId))) {
                    try {
                        closeAndRemoveProvider(dataNodeId);
                    } catch (Exception e5) {
                        LOGGER.warn("Exception occurred when closing and removing subscription provider with data node id {}", Integer.valueOf(dataNodeId), e5);
                    }
                }
            }
        } catch (Exception e6) {
            LOGGER.warn("Failed to fetch all endpoints, will retry later...", e6);
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("SubscriptionProviders{");
        Iterator<Map.Entry<Integer, SubscriptionProvider>> it = this.subscriptionProviders.entrySet().iterator();
        while (it.hasNext()) {
            sb.append(it.next().getValue().toString()).append(", ");
        }
        if (!this.subscriptionProviders.isEmpty()) {
            sb.delete(sb.length() - 2, sb.length());
        }
        sb.append("}");
        return sb.toString();
    }
}
