package org.apache.iotdb.session.subscription.consumer.base;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.session.subscription.consumer.AckStrategy;
import org.apache.iotdb.session.subscription.consumer.ConsumeListener;
import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
import org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePushConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.session.subscription.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumer.class */
public abstract class AbstractSubscriptionPushConsumer extends AbstractSubscriptionConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) SubscriptionTreePushConsumer.class);
    private final AckStrategy ackStrategy;
    private final ConsumeListener consumeListener;
    private final long autoPollIntervalMs;
    private final long autoPollTimeoutMs;
    private final AtomicBoolean isClosed;

    /* loaded from: input_file:org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumer$AutoPollWorker.class */
    class AutoPollWorker implements Runnable {
        AutoPollWorker() {
        }

        @Override // java.lang.Runnable
        public void run() {
            if (AbstractSubscriptionPushConsumer.this.isClosed() || AbstractSubscriptionPushConsumer.this.subscribedTopics.isEmpty()) {
                return;
            }
            try {
                List<SubscriptionMessage> multiplePoll = AbstractSubscriptionPushConsumer.this.multiplePoll(AbstractSubscriptionPushConsumer.this.subscribedTopics.keySet(), AbstractSubscriptionPushConsumer.this.autoPollTimeoutMs);
                if (multiplePoll.isEmpty()) {
                    AbstractSubscriptionPushConsumer.LOGGER.info("SubscriptionPushConsumer {} poll empty message from topics {} after {} millisecond(s)", this, CollectionUtils.getLimitedString(AbstractSubscriptionPushConsumer.this.subscribedTopics.keySet(), 32), Long.valueOf(AbstractSubscriptionPushConsumer.this.autoPollTimeoutMs));
                    return;
                }
                if (AbstractSubscriptionPushConsumer.this.ackStrategy.equals(AckStrategy.BEFORE_CONSUME)) {
                    AbstractSubscriptionPushConsumer.this.ack(multiplePoll);
                }
                ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                for (SubscriptionMessage subscriptionMessage : multiplePoll) {
                    try {
                        if (Objects.equals(ConsumeResult.SUCCESS, AbstractSubscriptionPushConsumer.this.consumeListener.onReceive(subscriptionMessage))) {
                            arrayList.add(subscriptionMessage);
                        } else {
                            AbstractSubscriptionPushConsumer.LOGGER.warn("Consumer listener result failure when consuming message: {}", subscriptionMessage);
                            arrayList2.add(subscriptionMessage);
                        }
                    } catch (Exception e) {
                        AbstractSubscriptionPushConsumer.LOGGER.warn("Consumer listener raised an exception while consuming message: {}", subscriptionMessage, e);
                        arrayList2.add(subscriptionMessage);
                    }
                }
                if (AbstractSubscriptionPushConsumer.this.ackStrategy.equals(AckStrategy.AFTER_CONSUME)) {
                    AbstractSubscriptionPushConsumer.this.ack(arrayList);
                    AbstractSubscriptionPushConsumer.this.nack((Iterable<SubscriptionMessage>) arrayList2);
                }
            } catch (Exception e2) {
                AbstractSubscriptionPushConsumer.LOGGER.warn("something unexpected happened when auto poll messages...", (Throwable) e2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractSubscriptionPushConsumer(AbstractSubscriptionPushConsumerBuilder abstractSubscriptionPushConsumerBuilder) {
        super(abstractSubscriptionPushConsumerBuilder);
        this.isClosed = new AtomicBoolean(true);
        this.ackStrategy = abstractSubscriptionPushConsumerBuilder.ackStrategy;
        this.consumeListener = abstractSubscriptionPushConsumerBuilder.consumeListener;
        this.autoPollIntervalMs = abstractSubscriptionPushConsumerBuilder.autoPollIntervalMs;
        this.autoPollTimeoutMs = abstractSubscriptionPushConsumerBuilder.autoPollTimeoutMs;
    }

    public AbstractSubscriptionPushConsumer(Properties properties) {
        this(properties, (AckStrategy) properties.getOrDefault(ConsumerConstant.ACK_STRATEGY_KEY, AckStrategy.defaultValue()), (ConsumeListener) properties.getOrDefault(ConsumerConstant.CONSUME_LISTENER_KEY, subscriptionMessage -> {
            return ConsumeResult.SUCCESS;
        }), ((Long) properties.getOrDefault(ConsumerConstant.AUTO_POLL_INTERVAL_MS_KEY, 100L)).longValue(), ((Long) properties.getOrDefault(ConsumerConstant.AUTO_POLL_TIMEOUT_MS_KEY, Long.valueOf(ConsumerConstant.AUTO_POLL_TIMEOUT_MS_DEFAULT_VALUE))).longValue());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractSubscriptionPushConsumer(Properties properties, AckStrategy ackStrategy, ConsumeListener consumeListener, long j, long j2) {
        super(new AbstractSubscriptionPushConsumerBuilder().ackStrategy(ackStrategy).consumeListener(consumeListener).autoPollIntervalMs(j).autoPollTimeoutMs(j2), properties);
        this.isClosed = new AtomicBoolean(true);
        this.ackStrategy = ackStrategy;
        this.consumeListener = consumeListener;
        this.autoPollIntervalMs = Math.max(j, 1L);
        this.autoPollTimeoutMs = Math.max(j2, 1000L);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionConsumer
    public synchronized void open() throws SubscriptionException {
        if (this.isClosed.get()) {
            super.open();
            this.isClosed.set(false);
            submitAutoPollWorker();
        }
    }

    @Override // org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionConsumer, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.isClosed.get()) {
            return;
        }
        super.close();
        this.isClosed.set(true);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionConsumer
    public boolean isClosed() {
        return this.isClosed.get();
    }

    private void submitAutoPollWorker() {
        ScheduledFuture[] scheduledFutureArr = {SubscriptionExecutorServiceManager.submitAutoPollWorker(() -> {
            if (!isClosed()) {
                new AutoPollWorker().run();
            } else if (Objects.nonNull(scheduledFutureArr[0])) {
                scheduledFutureArr[0].cancel(false);
                LOGGER.info("SubscriptionPushConsumer {} cancel auto poll worker", this);
            }
        }, this.autoPollIntervalMs)};
        LOGGER.info("SubscriptionPushConsumer {} submit auto poll worker", this);
    }

    public String toString() {
        return "SubscriptionPushConsumer" + coreReportMessage();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionConsumer
    public Map<String, String> coreReportMessage() {
        Map<String, String> coreReportMessage = super.coreReportMessage();
        coreReportMessage.put("ackStrategy", this.ackStrategy.toString());
        return coreReportMessage;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionConsumer
    public Map<String, String> allReportMessage() {
        Map<String, String> allReportMessage = super.allReportMessage();
        allReportMessage.put("ackStrategy", this.ackStrategy.toString());
        allReportMessage.put("autoPollIntervalMs", String.valueOf(this.autoPollIntervalMs));
        allReportMessage.put("autoPollTimeoutMs", String.valueOf(this.autoPollTimeoutMs));
        return allReportMessage;
    }

    @Override // org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionConsumer
    public /* bridge */ /* synthetic */ String getConsumerGroupId() {
        return super.getConsumerGroupId();
    }

    @Override // org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionConsumer
    public /* bridge */ /* synthetic */ String getConsumerId() {
        return super.getConsumerId();
    }

    @Override // org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionConsumer
    public /* bridge */ /* synthetic */ boolean allTopicMessagesHaveBeenConsumed() {
        return super.allTopicMessagesHaveBeenConsumed();
    }

    @Override // org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionConsumer
    @Deprecated
    public /* bridge */ /* synthetic */ boolean allSnapshotTopicMessagesHaveBeenConsumed() {
        return super.allSnapshotTopicMessagesHaveBeenConsumed();
    }
}
