package org.apache.iotdb.session.subscription.consumer.base;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.session.subscription.consumer.AsyncCommitCallback;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.session.subscription.util.CollectionUtils;
import org.apache.iotdb.session.subscription.util.IdentifierUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumer.class */
public abstract class AbstractSubscriptionPullConsumer extends AbstractSubscriptionConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) AbstractSubscriptionPullConsumer.class);
    private final boolean autoCommit;
    private final long autoCommitIntervalMs;
    private SortedMap<Long, Set<SubscriptionMessage>> uncommittedMessages;
    private final AtomicBoolean isClosed;

    /* loaded from: input_file:org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumer$AutoCommitWorker.class */
    private class AutoCommitWorker implements Runnable {
        private AutoCommitWorker() {
        }

        @Override // java.lang.Runnable
        public void run() {
            if (AbstractSubscriptionPullConsumer.this.isClosed()) {
                return;
            }
            long currentTimeMillis = System.currentTimeMillis();
            long j = currentTimeMillis / AbstractSubscriptionPullConsumer.this.autoCommitIntervalMs;
            if (currentTimeMillis % AbstractSubscriptionPullConsumer.this.autoCommitIntervalMs == 0) {
                j--;
            }
            for (Map.Entry entry : AbstractSubscriptionPullConsumer.this.uncommittedMessages.headMap(Long.valueOf(j)).entrySet()) {
                try {
                    AbstractSubscriptionPullConsumer.this.ack((Iterable) entry.getValue());
                    AbstractSubscriptionPullConsumer.this.uncommittedMessages.remove(entry.getKey());
                } catch (Exception e) {
                    AbstractSubscriptionPullConsumer.LOGGER.warn("something unexpected happened when auto commit messages...", (Throwable) e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionConsumer
    public boolean isClosed() {
        return this.isClosed.get();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractSubscriptionPullConsumer(AbstractSubscriptionPullConsumerBuilder abstractSubscriptionPullConsumerBuilder) {
        super(abstractSubscriptionPullConsumerBuilder);
        this.isClosed = new AtomicBoolean(true);
        this.autoCommit = abstractSubscriptionPullConsumerBuilder.autoCommit;
        this.autoCommitIntervalMs = abstractSubscriptionPullConsumerBuilder.autoCommitIntervalMs;
    }

    public AbstractSubscriptionPullConsumer(Properties properties) {
        this(properties, ((Boolean) properties.getOrDefault(ConsumerConstant.AUTO_COMMIT_KEY, true)).booleanValue(), ((Long) properties.getOrDefault(ConsumerConstant.AUTO_COMMIT_INTERVAL_MS_KEY, 5000L)).longValue());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractSubscriptionPullConsumer(Properties properties, boolean z, long j) {
        super(new AbstractSubscriptionPullConsumerBuilder().autoCommit(z).autoCommitIntervalMs(j), properties);
        this.isClosed = new AtomicBoolean(true);
        this.autoCommit = z;
        this.autoCommitIntervalMs = Math.max(j, 500L);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionConsumer
    public synchronized void open() throws SubscriptionException {
        if (this.isClosed.get()) {
            super.open();
            this.isClosed.set(false);
            if (this.autoCommit) {
                this.uncommittedMessages = new ConcurrentSkipListMap();
                submitAutoCommitWorker();
            }
        }
    }

    @Override // org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionConsumer, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.isClosed.get()) {
            return;
        }
        if (this.autoCommit) {
            commitAllUncommittedMessages();
        }
        super.close();
        this.isClosed.set(true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<SubscriptionMessage> poll(Duration duration) throws SubscriptionException {
        return poll(Collections.emptySet(), duration.toMillis());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<SubscriptionMessage> poll(long j) throws SubscriptionException {
        return poll(Collections.emptySet(), j);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<SubscriptionMessage> poll(Set<String> set, Duration duration) throws SubscriptionException {
        return poll(set, duration.toMillis());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<SubscriptionMessage> poll(Set<String> set, long j) throws SubscriptionException {
        Set<String> set2 = (Set) set.stream().map(IdentifierUtils::checkAndParseIdentifier).collect(Collectors.toSet());
        if (set2.isEmpty()) {
            set2 = this.subscribedTopics.keySet();
        } else {
            set2.stream().filter(str -> {
                return !this.subscribedTopics.containsKey(str);
            }).forEach(str2 -> {
                LOGGER.warn("SubscriptionPullConsumer {} does not subscribe to topic {}", this, str2);
            });
        }
        if (set2.isEmpty()) {
            return Collections.emptyList();
        }
        List<SubscriptionMessage> multiplePoll = multiplePoll(set2, j);
        if (multiplePoll.isEmpty()) {
            LOGGER.info("SubscriptionPullConsumer {} poll empty message from topics {} after {} millisecond(s)", this, CollectionUtils.getLimitedString(set2, 32), Long.valueOf(j));
            return multiplePoll;
        }
        if (this.autoCommit) {
            long currentTimeMillis = System.currentTimeMillis();
            long j2 = currentTimeMillis / this.autoCommitIntervalMs;
            if (currentTimeMillis % this.autoCommitIntervalMs == 0) {
                j2--;
            }
            this.uncommittedMessages.computeIfAbsent(Long.valueOf(j2), l -> {
                return new ConcurrentSkipListSet();
            }).addAll(multiplePoll);
        }
        return multiplePoll;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void commitSync(SubscriptionMessage subscriptionMessage) throws SubscriptionException {
        super.ack(Collections.singletonList(subscriptionMessage));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void commitSync(Iterable<SubscriptionMessage> iterable) throws SubscriptionException {
        super.ack(iterable);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<Void> commitAsync(SubscriptionMessage subscriptionMessage) {
        return super.commitAsync(Collections.singletonList(subscriptionMessage));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionConsumer
    public CompletableFuture<Void> commitAsync(Iterable<SubscriptionMessage> iterable) {
        return super.commitAsync(iterable);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void commitAsync(SubscriptionMessage subscriptionMessage, AsyncCommitCallback asyncCommitCallback) {
        super.commitAsync(Collections.singletonList(subscriptionMessage), asyncCommitCallback);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionConsumer
    public void commitAsync(Iterable<SubscriptionMessage> iterable, AsyncCommitCallback asyncCommitCallback) {
        super.commitAsync(iterable, asyncCommitCallback);
    }

    private void submitAutoCommitWorker() {
        ScheduledFuture[] scheduledFutureArr = {SubscriptionExecutorServiceManager.submitAutoCommitWorker(() -> {
            if (!isClosed()) {
                new AutoCommitWorker().run();
            } else if (Objects.nonNull(scheduledFutureArr[0])) {
                scheduledFutureArr[0].cancel(false);
                LOGGER.info("SubscriptionPullConsumer {} cancel auto commit worker", this);
            }
        }, this.autoCommitIntervalMs)};
        LOGGER.info("SubscriptionPullConsumer {} submit auto commit worker", this);
    }

    private void commitAllUncommittedMessages() {
        for (Map.Entry<Long, Set<SubscriptionMessage>> entry : this.uncommittedMessages.entrySet()) {
            try {
                ack(entry.getValue());
                this.uncommittedMessages.remove(entry.getKey());
            } catch (Exception e) {
                LOGGER.warn("something unexpected happened when commit messages during close", (Throwable) e);
            }
        }
    }

    public String toString() {
        return "SubscriptionPullConsumer" + coreReportMessage();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionConsumer
    public Map<String, String> coreReportMessage() {
        Map<String, String> coreReportMessage = super.coreReportMessage();
        coreReportMessage.put("autoCommit", String.valueOf(this.autoCommit));
        return coreReportMessage;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionConsumer
    public Map<String, String> allReportMessage() {
        Map<String, String> allReportMessage = super.allReportMessage();
        allReportMessage.put("autoCommit", String.valueOf(this.autoCommit));
        allReportMessage.put("autoCommitIntervalMs", String.valueOf(this.autoCommitIntervalMs));
        if (this.autoCommit) {
            allReportMessage.put("uncommittedMessages", this.uncommittedMessages.toString());
        }
        return allReportMessage;
    }

    @Override // org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionConsumer
    public /* bridge */ /* synthetic */ String getConsumerGroupId() {
        return super.getConsumerGroupId();
    }

    @Override // org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionConsumer
    public /* bridge */ /* synthetic */ String getConsumerId() {
        return super.getConsumerId();
    }

    @Override // org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionConsumer
    public /* bridge */ /* synthetic */ boolean allSnapshotTopicMessagesHaveBeenConsumed() {
        return super.allSnapshotTopicMessagesHaveBeenConsumed();
    }
}
