package org.apache.iotdb.session.subscription.consumer.base;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.URLEncoder;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.InvalidPathException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
import org.apache.iotdb.rpc.subscription.config.TopicConfig;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionConnectionException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionPipeTimeoutException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionPollTimeoutException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeCriticalException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeNonCriticalException;
import org.apache.iotdb.rpc.subscription.payload.poll.ErrorPayload;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
import org.apache.iotdb.session.subscription.consumer.AsyncCommitCallback;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
import org.apache.iotdb.session.subscription.util.CollectionUtils;
import org.apache.iotdb.session.subscription.util.IdentifierUtils;
import org.apache.iotdb.session.subscription.util.PollTimer;
import org.apache.iotdb.session.subscription.util.RandomStringGenerator;
import org.apache.iotdb.session.subscription.util.SetPartitioner;
import org.apache.iotdb.session.util.SessionUtils;
import org.apache.tsfile.write.record.Tablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.class */
public abstract class AbstractSubscriptionConsumer implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSubscriptionConsumer.class);
    private static final long SLEEP_MS = 100;
    private static final long SLEEP_DELTA_MS = 50;
    private static final long TIMER_DELTA_MS = 250;
    private final String username;
    private final String password;
    protected String consumerId;
    protected String consumerGroupId;
    private final long heartbeatIntervalMs;
    private final long endpointsSyncIntervalMs;
    private final AbstractSubscriptionProviders providers;
    private final AtomicBoolean isClosed;
    private final AtomicBoolean isReleased;
    private final String fileSaveDir;
    private final boolean fileSaveFsync;
    private final Set<SubscriptionCommitContext> inFlightFilesCommitContextSet;
    private final int thriftMaxFrameSize;
    private final int maxPollParallelism;
    protected volatile Map<String, TopicConfig> subscribedTopics;
    private final Map<SubscriptionPollResponseType, BiFunction<SubscriptionPollResponse, PollTimer, Optional<SubscriptionMessage>>> responseTransformer;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionConsumer$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$iotdb$rpc$subscription$payload$poll$SubscriptionPollResponseType = new int[SubscriptionPollResponseType.values().length];

        static {
            try {
                $SwitchMap$org$apache$iotdb$rpc$subscription$payload$poll$SubscriptionPollResponseType[SubscriptionPollResponseType.FILE_PIECE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$iotdb$rpc$subscription$payload$poll$SubscriptionPollResponseType[SubscriptionPollResponseType.FILE_SEAL.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$iotdb$rpc$subscription$payload$poll$SubscriptionPollResponseType[SubscriptionPollResponseType.ERROR.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$iotdb$rpc$subscription$payload$poll$SubscriptionPollResponseType[SubscriptionPollResponseType.TABLETS.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* loaded from: input_file:org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer$AsyncCommitWorker.class */
    private class AsyncCommitWorker implements Runnable {
        private final Iterable<SubscriptionMessage> messages;
        private final AsyncCommitCallback callback;

        public AsyncCommitWorker(Iterable<SubscriptionMessage> iterable, AsyncCommitCallback asyncCommitCallback) {
            this.messages = iterable;
            this.callback = asyncCommitCallback;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (AbstractSubscriptionConsumer.this.isClosed()) {
                return;
            }
            try {
                AbstractSubscriptionConsumer.this.ack(this.messages);
                this.callback.onComplete();
            } catch (Exception e) {
                this.callback.onFailure(e);
            }
        }
    }

    /* loaded from: input_file:org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer$PollTask.class */
    private class PollTask implements Callable<List<SubscriptionMessage>> {
        private final Set<String> topicNames;
        private final long timeoutMs;

        public PollTask(Set<String> set, long j) {
            this.topicNames = set;
            this.timeoutMs = j;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public List<SubscriptionMessage> call() {
            return AbstractSubscriptionConsumer.this.singlePoll(this.topicNames, this.timeoutMs);
        }
    }

    public boolean allSnapshotTopicMessagesHaveBeenConsumed() {
        return allTopicMessagesHaveBeenConsumed(this.subscribedTopics.keySet());
    }

    private boolean allTopicMessagesHaveBeenConsumed(Collection<String> collection) {
        Stream<String> stream = collection.stream();
        Map<String, TopicConfig> map = this.subscribedTopics;
        Objects.requireNonNull(map);
        return stream.map((v1) -> {
            return r1.get(v1);
        }).noneMatch((v0) -> {
            return Objects.nonNull(v0);
        });
    }

    public String getConsumerId() {
        return this.consumerId;
    }

    public String getConsumerGroupId() {
        return this.consumerGroupId;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractSubscriptionConsumer(AbstractSubscriptionConsumerBuilder abstractSubscriptionConsumerBuilder) {
        this.isClosed = new AtomicBoolean(true);
        this.isReleased = new AtomicBoolean(false);
        this.inFlightFilesCommitContextSet = new HashSet();
        this.subscribedTopics = new HashMap();
        this.responseTransformer = Collections.unmodifiableMap(new HashMap<SubscriptionPollResponseType, BiFunction<SubscriptionPollResponse, PollTimer, Optional<SubscriptionMessage>>>() { // from class: org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionConsumer.1
            {
                put(SubscriptionPollResponseType.TABLETS, (subscriptionPollResponse, pollTimer) -> {
                    return AbstractSubscriptionConsumer.this.pollTablets(subscriptionPollResponse, pollTimer);
                });
                put(SubscriptionPollResponseType.FILE_INIT, (subscriptionPollResponse2, pollTimer2) -> {
                    return AbstractSubscriptionConsumer.this.pollFile(subscriptionPollResponse2, pollTimer2);
                });
                put(SubscriptionPollResponseType.ERROR, (subscriptionPollResponse3, pollTimer3) -> {
                    ErrorPayload payload = subscriptionPollResponse3.getPayload();
                    String errorMessage = payload.getErrorMessage();
                    if (payload.isCritical()) {
                        throw new SubscriptionRuntimeCriticalException(errorMessage);
                    }
                    throw new SubscriptionRuntimeNonCriticalException(errorMessage);
                });
                put(SubscriptionPollResponseType.TERMINATION, (subscriptionPollResponse4, pollTimer4) -> {
                    String topicName = subscriptionPollResponse4.getCommitContext().getTopicName();
                    AbstractSubscriptionConsumer.LOGGER.info("Termination occurred when SubscriptionConsumer {} polling topics, unsubscribe topic {} automatically", AbstractSubscriptionConsumer.this.coreReportMessage(), topicName);
                    AbstractSubscriptionConsumer.this.unsubscribe(Collections.singleton(topicName), false);
                    return Optional.empty();
                });
            }
        });
        HashSet hashSet = new HashSet();
        if (Objects.nonNull(abstractSubscriptionConsumerBuilder.host) || Objects.nonNull(abstractSubscriptionConsumerBuilder.port)) {
            if (Objects.isNull(abstractSubscriptionConsumerBuilder.host)) {
                abstractSubscriptionConsumerBuilder.host = "localhost";
            }
            if (Objects.isNull(abstractSubscriptionConsumerBuilder.port)) {
                abstractSubscriptionConsumerBuilder.port = 6667;
            }
            hashSet.add(new TEndPoint(abstractSubscriptionConsumerBuilder.host, abstractSubscriptionConsumerBuilder.port.intValue()));
        } else if (Objects.isNull(abstractSubscriptionConsumerBuilder.nodeUrls)) {
            abstractSubscriptionConsumerBuilder.host = "localhost";
            abstractSubscriptionConsumerBuilder.port = 6667;
            hashSet.add(new TEndPoint(abstractSubscriptionConsumerBuilder.host, abstractSubscriptionConsumerBuilder.port.intValue()));
        } else {
            hashSet.addAll(SessionUtils.parseSeedNodeUrls(abstractSubscriptionConsumerBuilder.nodeUrls));
        }
        this.providers = new AbstractSubscriptionProviders(hashSet);
        this.username = abstractSubscriptionConsumerBuilder.username;
        this.password = abstractSubscriptionConsumerBuilder.password;
        this.consumerId = abstractSubscriptionConsumerBuilder.consumerId;
        this.consumerGroupId = abstractSubscriptionConsumerBuilder.consumerGroupId;
        this.heartbeatIntervalMs = abstractSubscriptionConsumerBuilder.heartbeatIntervalMs;
        this.endpointsSyncIntervalMs = abstractSubscriptionConsumerBuilder.endpointsSyncIntervalMs;
        this.fileSaveDir = abstractSubscriptionConsumerBuilder.fileSaveDir;
        this.fileSaveFsync = abstractSubscriptionConsumerBuilder.fileSaveFsync;
        this.thriftMaxFrameSize = abstractSubscriptionConsumerBuilder.thriftMaxFrameSize;
        this.maxPollParallelism = abstractSubscriptionConsumerBuilder.maxPollParallelism;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractSubscriptionConsumer(AbstractSubscriptionConsumerBuilder abstractSubscriptionConsumerBuilder, Properties properties) {
        this(abstractSubscriptionConsumerBuilder.host((String) properties.getOrDefault("host", "localhost")).port((Integer) properties.getOrDefault("port", 6667)).nodeUrls((List) properties.get("node-urls")).username((String) properties.getOrDefault("username", "root")).password((String) properties.getOrDefault("password", "root")).consumerId((String) properties.get("consumer-id")).consumerGroupId((String) properties.get("group-id")).heartbeatIntervalMs(((Long) properties.getOrDefault("heartbeat-interval-ms", 30000L)).longValue()).endpointsSyncIntervalMs(((Long) properties.getOrDefault("endpoints-sync-interval-ms", 120000L)).longValue()).fileSaveDir((String) properties.getOrDefault("file-save-dir", ConsumerConstant.FILE_SAVE_DIR_DEFAULT_VALUE)).fileSaveFsync(((Boolean) properties.getOrDefault("file-save-fsync", false)).booleanValue()).thriftMaxFrameSize(((Integer) properties.getOrDefault("thrift-max-frame-size", 67108864)).intValue()).maxPollParallelism(((Integer) properties.getOrDefault("max-poll-parallelism", 1)).intValue()));
    }

    private void checkIfHasBeenClosed() throws SubscriptionException {
        if (this.isReleased.get()) {
            String format = String.format("%s has ever been closed, unsupported operation after closing.", this);
            LOGGER.error(format);
            throw new SubscriptionException(format);
        }
    }

    private void checkIfOpened() throws SubscriptionException {
        if (this.isClosed.get()) {
            String format = String.format("%s is not yet open, please open the subscription consumer first.", this);
            LOGGER.error(format);
            throw new SubscriptionException(format);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void open() throws SubscriptionException {
        checkIfHasBeenClosed();
        if (this.isClosed.get()) {
            this.providers.acquireWriteLock();
            try {
                this.providers.openProviders(this);
                this.isClosed.set(false);
                submitHeartbeatWorker();
                submitEndpointsSyncer();
            } finally {
                this.providers.releaseWriteLock();
            }
        }
    }

    @Override // java.lang.AutoCloseable
    public synchronized void close() {
        if (this.isClosed.get()) {
            return;
        }
        this.providers.acquireWriteLock();
        try {
            this.providers.closeProviders();
            this.isClosed.set(true);
            this.isReleased.set(true);
        } finally {
            this.providers.releaseWriteLock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isClosed() {
        return this.isClosed.get();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void subscribe(String str) throws SubscriptionException {
        subscribe(Collections.singleton(str));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void subscribe(String... strArr) throws SubscriptionException {
        subscribe(new HashSet(Arrays.asList(strArr)));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void subscribe(Set<String> set) throws SubscriptionException {
        subscribe(set, true);
    }

    private void subscribe(Set<String> set, boolean z) throws SubscriptionException {
        checkIfOpened();
        if (z) {
            set = (Set) set.stream().map(IdentifierUtils::checkAndParseIdentifier).collect(Collectors.toSet());
        }
        this.providers.acquireReadLock();
        try {
            subscribeWithRedirection(set);
        } finally {
            this.providers.releaseReadLock();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void unsubscribe(String str) throws SubscriptionException {
        unsubscribe(Collections.singleton(str));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void unsubscribe(String... strArr) throws SubscriptionException {
        unsubscribe(new HashSet(Arrays.asList(strArr)));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void unsubscribe(Set<String> set) throws SubscriptionException {
        unsubscribe(set, true);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void unsubscribe(Set<String> set, boolean z) throws SubscriptionException {
        checkIfOpened();
        if (z) {
            set = (Set) set.stream().map(IdentifierUtils::checkAndParseIdentifier).collect(Collectors.toSet());
        }
        this.providers.acquireReadLock();
        try {
            unsubscribeWithRedirection(set);
        } finally {
            this.providers.releaseReadLock();
        }
    }

    protected abstract AbstractSubscriptionProvider constructSubscriptionProvider(TEndPoint tEndPoint, String str, String str2, String str3, String str4, int i);

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractSubscriptionProvider constructProviderAndHandshake(TEndPoint tEndPoint) throws SubscriptionException {
        AbstractSubscriptionProvider constructSubscriptionProvider = constructSubscriptionProvider(tEndPoint, this.username, this.password, this.consumerId, this.consumerGroupId, this.thriftMaxFrameSize);
        try {
            constructSubscriptionProvider.handshake();
            if (Objects.isNull(this.consumerId)) {
                this.consumerId = constructSubscriptionProvider.getConsumerId();
            }
            if (Objects.isNull(this.consumerGroupId)) {
                this.consumerGroupId = constructSubscriptionProvider.getConsumerGroupId();
            }
            return constructSubscriptionProvider;
        } catch (Exception e) {
            try {
                constructSubscriptionProvider.close();
            } catch (Exception e2) {
            }
            throw new SubscriptionConnectionException(String.format("Failed to handshake with subscription provider %s because of %s", constructSubscriptionProvider, e), e);
        }
    }

    private Path getFileDir(String str) throws IOException {
        Path resolve = Paths.get(this.fileSaveDir, new String[0]).resolve(this.consumerGroupId).resolve(this.consumerId).resolve(str);
        Files.createDirectories(resolve, new FileAttribute[0]);
        return resolve;
    }

    private Path getFilePath(SubscriptionCommitContext subscriptionCommitContext, String str, String str2, boolean z, boolean z2) throws SubscriptionException {
        try {
            try {
                Path resolve = getFileDir(str).resolve(str2);
                try {
                    Files.createFile(resolve, new FileAttribute[0]);
                    return resolve;
                } catch (FileAlreadyExistsException e) {
                    if (!z) {
                        throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), e);
                    }
                    if (this.inFlightFilesCommitContextSet.contains(subscriptionCommitContext)) {
                        LOGGER.info("Detect already existed file {} when polling topic {}, resume consumption", str2, str);
                        return resolve;
                    }
                    String generate = RandomStringGenerator.generate(16);
                    LOGGER.warn("Detect already existed file {} when polling topic {}, add random suffix {} to filename", new Object[]{str2, str, generate});
                    return getFilePath(subscriptionCommitContext, str, str2 + "." + generate, false, true);
                }
            } catch (InvalidPathException e2) {
                if (z2) {
                    return getFilePath(subscriptionCommitContext, URLEncoder.encode(str), str2, true, false);
                }
                throw new SubscriptionRuntimeNonCriticalException(e2.getMessage(), e2);
            }
        } catch (IOException e3) {
            throw new SubscriptionRuntimeNonCriticalException(e3.getMessage(), e3);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<SubscriptionMessage> multiplePoll(Set<String> set, long j) {
        if (set.isEmpty()) {
            return Collections.emptyList();
        }
        int availableThreadCountForPollTasks = SubscriptionExecutorServiceManager.getAvailableThreadCountForPollTasks();
        if (availableThreadCountForPollTasks == 0) {
            return singlePoll(set, j);
        }
        ArrayList arrayList = new ArrayList();
        Iterator it = SetPartitioner.partition(set, Math.min(this.maxPollParallelism, availableThreadCountForPollTasks)).iterator();
        while (it.hasNext()) {
            arrayList.add(new PollTask((Set) it.next(), j));
        }
        ArrayList arrayList2 = new ArrayList();
        SubscriptionRuntimeCriticalException subscriptionRuntimeCriticalException = null;
        try {
            for (Future future : SubscriptionExecutorServiceManager.submitMultiplePollTasks(arrayList, j)) {
                try {
                } catch (CancellationException e) {
                } catch (ExecutionException e2) {
                    SubscriptionRuntimeCriticalException cause = e2.getCause();
                    if (cause instanceof SubscriptionRuntimeCriticalException) {
                        SubscriptionRuntimeCriticalException subscriptionRuntimeCriticalException2 = cause;
                        LOGGER.warn("SubscriptionRuntimeCriticalException occurred when SubscriptionConsumer {} polling topics {}", new Object[]{this, set, subscriptionRuntimeCriticalException2});
                        subscriptionRuntimeCriticalException = subscriptionRuntimeCriticalException2;
                    } else {
                        LOGGER.warn("ExecutionException occurred when SubscriptionConsumer {} polling topics {}", new Object[]{this, set, e2});
                    }
                }
                if (!future.isCancelled()) {
                    arrayList2.addAll((Collection) future.get());
                }
            }
        } catch (InterruptedException e3) {
            LOGGER.warn("InterruptedException occurred when SubscriptionConsumer {} polling topics {}", new Object[]{this, set, e3});
            Thread.currentThread().interrupt();
        }
        if (arrayList2.isEmpty() && Objects.nonNull(subscriptionRuntimeCriticalException)) {
            throw subscriptionRuntimeCriticalException;
        }
        return arrayList2;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<SubscriptionMessage> singlePoll(Set<String> set, long j) throws SubscriptionException {
        if (set.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        List<SubscriptionPollResponse> arrayList2 = new ArrayList();
        PollTimer pollTimer = new PollTimer(System.currentTimeMillis(), j);
        do {
            try {
                ArrayList arrayList3 = new ArrayList();
                try {
                    arrayList2.clear();
                    arrayList2 = pollInternal(set, pollTimer.remainingMs());
                    for (SubscriptionPollResponse subscriptionPollResponse : arrayList2) {
                        short responseType = subscriptionPollResponse.getResponseType();
                        if (SubscriptionPollResponseType.isValidatedResponseType(responseType)) {
                            try {
                                Optional<SubscriptionMessage> apply = this.responseTransformer.getOrDefault(SubscriptionPollResponseType.valueOf(responseType), (subscriptionPollResponse2, pollTimer2) -> {
                                    LOGGER.warn("unexpected response type: {}", Short.valueOf(responseType));
                                    return Optional.empty();
                                }).apply(subscriptionPollResponse, new PollTimer(System.currentTimeMillis(), j));
                                Objects.requireNonNull(arrayList3);
                                apply.ifPresent((v1) -> {
                                    r1.add(v1);
                                });
                            } catch (SubscriptionRuntimeNonCriticalException e) {
                                LOGGER.warn("SubscriptionRuntimeNonCriticalException occurred when SubscriptionConsumer {} polling topics {}", new Object[]{this, set, e});
                            }
                        } else {
                            LOGGER.warn("unexpected response type: {}", Short.valueOf(responseType));
                        }
                    }
                    arrayList.addAll(arrayList3);
                    if (arrayList.isEmpty() && !allTopicMessagesHaveBeenConsumed(set)) {
                        pollTimer.update();
                        Thread.sleep(((long) (Math.random() * 100.0d)) + SLEEP_DELTA_MS);
                    }
                } catch (SubscriptionRuntimeCriticalException e2) {
                    LOGGER.warn("SubscriptionRuntimeCriticalException occurred when SubscriptionConsumer {} polling topics {}", new Object[]{this, set, e2});
                    try {
                        nack(arrayList2);
                        arrayList2.clear();
                    } catch (Exception e3) {
                    }
                    try {
                        nack((Iterable<SubscriptionMessage>) arrayList);
                        arrayList.clear();
                    } catch (Exception e4) {
                    }
                    throw e2;
                }
            } catch (InterruptedException e5) {
                Thread.currentThread().interrupt();
            }
        } while (pollTimer.notExpired(TIMER_DELTA_MS));
        if (!Thread.currentThread().isInterrupted()) {
            return arrayList;
        }
        try {
            nack(arrayList2);
            arrayList2.clear();
        } catch (Exception e6) {
        }
        try {
            nack((Iterable<SubscriptionMessage>) arrayList);
            arrayList.clear();
        } catch (Exception e7) {
        }
        return Collections.emptyList();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Optional<SubscriptionMessage> pollFile(SubscriptionPollResponse subscriptionPollResponse, PollTimer pollTimer) throws SubscriptionException {
        SubscriptionCommitContext commitContext = subscriptionPollResponse.getCommitContext();
        String fileName = subscriptionPollResponse.getPayload().getFileName();
        File file = getFilePath(commitContext, commitContext.getTopicName(), fileName, true, true).toFile();
        try {
            RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
            try {
                Optional<SubscriptionMessage> of = Optional.of(pollFileInternal(commitContext, fileName, file, randomAccessFile, pollTimer));
                randomAccessFile.close();
                return of;
            } finally {
            }
        } catch (Exception e) {
            if (!(e instanceof SubscriptionPollTimeoutException)) {
                this.inFlightFilesCommitContextSet.remove(commitContext);
            }
            nack((Iterable<SubscriptionMessage>) Collections.singletonList(new SubscriptionMessage(commitContext, file.getAbsolutePath())));
            throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), e);
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:30:0x014c, code lost:
    
        r0 = java.lang.String.format("inconsistent commit context, current is %s, incoming is %s, consumer: %s", r9, r0, r8);
        org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionConsumer.LOGGER.warn(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:31:0x0178, code lost:
    
        throw new org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeNonCriticalException(r0);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private org.apache.iotdb.session.subscription.payload.SubscriptionMessage pollFileInternal(org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext r9, java.lang.String r10, java.io.File r11, java.io.RandomAccessFile r12, org.apache.iotdb.session.subscription.util.PollTimer r13) throws java.io.IOException, org.apache.iotdb.rpc.subscription.exception.SubscriptionException {
        /*
            Method dump skipped, instructions count: 1078
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionConsumer.pollFileInternal(org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext, java.lang.String, java.io.File, java.io.RandomAccessFile, org.apache.iotdb.session.subscription.util.PollTimer):org.apache.iotdb.session.subscription.payload.SubscriptionMessage");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Optional<SubscriptionMessage> pollTablets(SubscriptionPollResponse subscriptionPollResponse, PollTimer pollTimer) throws SubscriptionException {
        try {
            return pollTabletsInternal(subscriptionPollResponse, pollTimer);
        } catch (Exception e) {
            nack((Iterable<SubscriptionMessage>) Collections.singletonList(new SubscriptionMessage(subscriptionPollResponse.getCommitContext(), (List<Tablet>) Collections.emptyList())));
            throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), e);
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:18:0x017d, code lost:
    
        r0 = java.lang.String.format("inconsistent commit context, current is %s, incoming is %s, consumer: %s", r0, r0, r7);
        org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionConsumer.LOGGER.warn(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:19:0x01aa, code lost:
    
        throw new org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeNonCriticalException(r0);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private java.util.Optional<org.apache.iotdb.session.subscription.payload.SubscriptionMessage> pollTabletsInternal(org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse r8, org.apache.iotdb.session.subscription.util.PollTimer r9) {
        /*
            Method dump skipped, instructions count: 583
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionConsumer.pollTabletsInternal(org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse, org.apache.iotdb.session.subscription.util.PollTimer):java.util.Optional");
    }

    private List<SubscriptionPollResponse> pollInternal(Set<String> set, long j) throws SubscriptionException {
        this.providers.acquireReadLock();
        try {
            AbstractSubscriptionProvider nextAvailableProvider = this.providers.getNextAvailableProvider();
            if (Objects.isNull(nextAvailableProvider) || !nextAvailableProvider.isAvailable()) {
                if (!isClosed()) {
                    throw new SubscriptionConnectionException(String.format("Cluster has no available subscription providers when %s poll topic %s", this, set));
                }
                List<SubscriptionPollResponse> emptyList = Collections.emptyList();
                this.providers.releaseReadLock();
                return emptyList;
            }
            try {
                List<SubscriptionPollResponse> poll = nextAvailableProvider.poll(set, j);
                this.providers.releaseReadLock();
                return poll;
            } catch (SubscriptionConnectionException e) {
                List<SubscriptionPollResponse> emptyList2 = Collections.emptyList();
                this.providers.releaseReadLock();
                return emptyList2;
            }
        } catch (Throwable th) {
            this.providers.releaseReadLock();
            throw th;
        }
    }

    private List<SubscriptionPollResponse> pollFileInternal(SubscriptionCommitContext subscriptionCommitContext, long j, long j2) throws SubscriptionException {
        int dataNodeId = subscriptionCommitContext.getDataNodeId();
        this.providers.acquireReadLock();
        try {
            AbstractSubscriptionProvider provider = this.providers.getProvider(dataNodeId);
            if (Objects.isNull(provider) || !provider.isAvailable()) {
                if (!isClosed()) {
                    throw new SubscriptionConnectionException(String.format("something unexpected happened when %s poll file from subscription provider with data node id %s, the subscription provider may be unavailable or not existed", this, Integer.valueOf(dataNodeId)));
                }
                List<SubscriptionPollResponse> emptyList = Collections.emptyList();
                this.providers.releaseReadLock();
                return emptyList;
            }
            try {
                List<SubscriptionPollResponse> pollFile = provider.pollFile(subscriptionCommitContext, j, j2);
                this.providers.releaseReadLock();
                return pollFile;
            } catch (SubscriptionConnectionException e) {
                List<SubscriptionPollResponse> emptyList2 = Collections.emptyList();
                this.providers.releaseReadLock();
                return emptyList2;
            }
        } catch (Throwable th) {
            this.providers.releaseReadLock();
            throw th;
        }
    }

    private List<SubscriptionPollResponse> pollTabletsInternal(SubscriptionCommitContext subscriptionCommitContext, int i, long j) throws SubscriptionException {
        int dataNodeId = subscriptionCommitContext.getDataNodeId();
        this.providers.acquireReadLock();
        try {
            AbstractSubscriptionProvider provider = this.providers.getProvider(dataNodeId);
            if (Objects.isNull(provider) || !provider.isAvailable()) {
                if (!isClosed()) {
                    throw new SubscriptionConnectionException(String.format("something unexpected happened when %s poll tablets from subscription provider with data node id %s, the subscription provider may be unavailable or not existed", this, Integer.valueOf(dataNodeId)));
                }
                List<SubscriptionPollResponse> emptyList = Collections.emptyList();
                this.providers.releaseReadLock();
                return emptyList;
            }
            try {
                List<SubscriptionPollResponse> pollTablets = provider.pollTablets(subscriptionCommitContext, i, j);
                this.providers.releaseReadLock();
                return pollTablets;
            } catch (SubscriptionConnectionException e) {
                List<SubscriptionPollResponse> emptyList2 = Collections.emptyList();
                this.providers.releaseReadLock();
                return emptyList2;
            }
        } catch (Throwable th) {
            this.providers.releaseReadLock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void ack(Iterable<SubscriptionMessage> iterable) throws SubscriptionException {
        HashMap hashMap = new HashMap();
        for (SubscriptionMessage subscriptionMessage : iterable) {
            ((List) hashMap.computeIfAbsent(Integer.valueOf(subscriptionMessage.getCommitContext().getDataNodeId()), num -> {
                return new ArrayList();
            })).add(subscriptionMessage.getCommitContext());
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            commitInternal(((Integer) entry.getKey()).intValue(), (List) entry.getValue(), false);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void nack(Iterable<SubscriptionMessage> iterable) throws SubscriptionException {
        HashMap hashMap = new HashMap();
        for (SubscriptionMessage subscriptionMessage : iterable) {
            if (Objects.equals(Short.valueOf(SubscriptionMessageType.TS_FILE_HANDLER.getType()), Short.valueOf(subscriptionMessage.getMessageType())) && !this.inFlightFilesCommitContextSet.contains(subscriptionMessage.getCommitContext())) {
                try {
                    subscriptionMessage.getTsFileHandler().deleteFile();
                } catch (Exception e) {
                }
            }
            ((List) hashMap.computeIfAbsent(Integer.valueOf(subscriptionMessage.getCommitContext().getDataNodeId()), num -> {
                return new ArrayList();
            })).add(subscriptionMessage.getCommitContext());
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            commitInternal(((Integer) entry.getKey()).intValue(), (List) entry.getValue(), true);
        }
    }

    private void nack(List<SubscriptionPollResponse> list) throws SubscriptionException {
        HashMap hashMap = new HashMap();
        for (SubscriptionPollResponse subscriptionPollResponse : list) {
            ((List) hashMap.computeIfAbsent(Integer.valueOf(subscriptionPollResponse.getCommitContext().getDataNodeId()), num -> {
                return new ArrayList();
            })).add(subscriptionPollResponse.getCommitContext());
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            commitInternal(((Integer) entry.getKey()).intValue(), (List) entry.getValue(), true);
        }
    }

    private void commitInternal(int i, List<SubscriptionCommitContext> list, boolean z) throws SubscriptionException {
        this.providers.acquireReadLock();
        try {
            AbstractSubscriptionProvider provider = this.providers.getProvider(i);
            if (Objects.isNull(provider) || !provider.isAvailable()) {
                if (!isClosed()) {
                    throw new SubscriptionConnectionException(String.format("something unexpected happened when %s commit (nack: %s) messages to subscription provider with data node id %s, the subscription provider may be unavailable or not existed", this, Boolean.valueOf(z), Integer.valueOf(i)));
                }
            } else {
                provider.commit(list, z);
                this.providers.releaseReadLock();
            }
        } finally {
            this.providers.releaseReadLock();
        }
    }

    private void submitHeartbeatWorker() {
        ScheduledFuture[] scheduledFutureArr = {SubscriptionExecutorServiceManager.submitHeartbeatWorker(() -> {
            if (!isClosed()) {
                this.providers.heartbeat(this);
            } else if (Objects.nonNull(scheduledFutureArr[0])) {
                scheduledFutureArr[0].cancel(false);
                LOGGER.info("SubscriptionConsumer {} cancel heartbeat worker", this);
            }
        }, this.heartbeatIntervalMs)};
        LOGGER.info("SubscriptionConsumer {} submit heartbeat worker", this);
    }

    private void submitEndpointsSyncer() {
        ScheduledFuture[] scheduledFutureArr = {SubscriptionExecutorServiceManager.submitEndpointsSyncer(() -> {
            if (!isClosed()) {
                this.providers.sync(this);
            } else if (Objects.nonNull(scheduledFutureArr[0])) {
                scheduledFutureArr[0].cancel(false);
                LOGGER.info("SubscriptionConsumer {} cancel endpoints syncer", this);
            }
        }, this.endpointsSyncIntervalMs)};
        LOGGER.info("SubscriptionConsumer {} submit endpoints syncer", this);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void commitAsync(Iterable<SubscriptionMessage> iterable, AsyncCommitCallback asyncCommitCallback) {
        SubscriptionExecutorServiceManager.submitAsyncCommitWorker(new AsyncCommitWorker(iterable, asyncCommitCallback));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<Void> commitAsync(Iterable<SubscriptionMessage> iterable) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        SubscriptionExecutorServiceManager.submitAsyncCommitWorker(() -> {
            if (isClosed()) {
                return;
            }
            try {
                ack(iterable);
                completableFuture.complete(null);
            } catch (Throwable th) {
                completableFuture.completeExceptionally(th);
            }
        });
        return completableFuture;
    }

    private void subscribeWithRedirection(Set<String> set) throws SubscriptionException {
        List<AbstractSubscriptionProvider> allAvailableProviders = this.providers.getAllAvailableProviders();
        if (allAvailableProviders.isEmpty()) {
            throw new SubscriptionConnectionException(String.format("Cluster has no available subscription providers when %s subscribe topic %s", this, set));
        }
        for (AbstractSubscriptionProvider abstractSubscriptionProvider : allAvailableProviders) {
            try {
                this.subscribedTopics = abstractSubscriptionProvider.subscribe(set);
                return;
            } catch (Exception e) {
                if (e instanceof SubscriptionPipeTimeoutException) {
                    LOGGER.warn(e.getMessage());
                    return;
                }
                LOGGER.warn("{} failed to subscribe topics {} from subscription provider {}, try next subscription provider...", new Object[]{this, set, abstractSubscriptionProvider, e});
            }
        }
        String format = String.format("%s failed to subscribe topics %s from all available subscription providers %s", this, set, allAvailableProviders);
        LOGGER.warn(format);
        throw new SubscriptionRuntimeCriticalException(format);
    }

    private void unsubscribeWithRedirection(Set<String> set) throws SubscriptionException {
        List<AbstractSubscriptionProvider> allAvailableProviders = this.providers.getAllAvailableProviders();
        if (allAvailableProviders.isEmpty()) {
            throw new SubscriptionConnectionException(String.format("Cluster has no available subscription providers when %s unsubscribe topic %s", this, set));
        }
        for (AbstractSubscriptionProvider abstractSubscriptionProvider : allAvailableProviders) {
            try {
                this.subscribedTopics = abstractSubscriptionProvider.unsubscribe(set);
                return;
            } catch (Exception e) {
                if (e instanceof SubscriptionPipeTimeoutException) {
                    LOGGER.warn(e.getMessage());
                    return;
                }
                LOGGER.warn("{} failed to unsubscribe topics {} from subscription provider {}, try next subscription provider...", new Object[]{this, set, abstractSubscriptionProvider, e});
            }
        }
        String format = String.format("%s failed to unsubscribe topics %s from all available subscription providers %s", this, set, allAvailableProviders);
        LOGGER.warn(format);
        throw new SubscriptionRuntimeCriticalException(format);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<Integer, TEndPoint> fetchAllEndPointsWithRedirection() throws SubscriptionException {
        List<AbstractSubscriptionProvider> allAvailableProviders = this.providers.getAllAvailableProviders();
        if (allAvailableProviders.isEmpty()) {
            throw new SubscriptionConnectionException(String.format("Cluster has no available subscription providers when %s fetch all endpoints", this));
        }
        for (AbstractSubscriptionProvider abstractSubscriptionProvider : allAvailableProviders) {
            try {
                return abstractSubscriptionProvider.getSessionConnection().fetchAllEndPoints();
            } catch (Exception e) {
                LOGGER.warn("{} failed to fetch all endpoints from subscription provider {}, try next subscription provider...", new Object[]{this, abstractSubscriptionProvider, e});
            }
        }
        String format = String.format("%s failed to fetch all endpoints from all available subscription providers %s", this, allAvailableProviders);
        LOGGER.warn(format);
        throw new SubscriptionRuntimeCriticalException(format);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<String, String> coreReportMessage() {
        HashMap hashMap = new HashMap();
        hashMap.put("consumerId", this.consumerId);
        hashMap.put("consumerGroupId", this.consumerGroupId);
        hashMap.put("isClosed", this.isClosed.toString());
        hashMap.put("fileSaveDir", this.fileSaveDir);
        hashMap.put("inFlightFilesCommitContextSet", CollectionUtils.getLimitedString(this.inFlightFilesCommitContextSet, 32));
        hashMap.put("subscribedTopicNames", CollectionUtils.getLimitedString(this.subscribedTopics.keySet(), 32));
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<String, String> allReportMessage() {
        HashMap hashMap = new HashMap();
        hashMap.put("consumerId", this.consumerId);
        hashMap.put("consumerGroupId", this.consumerGroupId);
        hashMap.put("heartbeatIntervalMs", String.valueOf(this.heartbeatIntervalMs));
        hashMap.put("endpointsSyncIntervalMs", String.valueOf(this.endpointsSyncIntervalMs));
        hashMap.put("providers", this.providers.toString());
        hashMap.put("isClosed", this.isClosed.toString());
        hashMap.put("isReleased", this.isReleased.toString());
        hashMap.put("fileSaveDir", this.fileSaveDir);
        hashMap.put("fileSaveFsync", String.valueOf(this.fileSaveFsync));
        hashMap.put("inFlightFilesCommitContextSet", this.inFlightFilesCommitContextSet.toString());
        hashMap.put("thriftMaxFrameSize", String.valueOf(this.thriftMaxFrameSize));
        hashMap.put("maxPollParallelism", String.valueOf(this.maxPollParallelism));
        hashMap.put("subscribedTopics", this.subscribedTopics.toString());
        return hashMap;
    }
}
