package org.apache.pulsar.client.impl;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.naming.TopicName;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-2.7.1.6-rc-202105101340.jar:org/apache/pulsar/client/impl/ReaderImpl.class */
public class ReaderImpl<T> implements Reader<T> {
    private static final BatchReceivePolicy DISABLED_BATCH_RECEIVE_POLICY = BatchReceivePolicy.builder().timeout(0, TimeUnit.MILLISECONDS).maxNumMessages(1).build();
    private final ConsumerImpl<T> consumer;

    public ReaderImpl(PulsarClientImpl pulsarClientImpl, ReaderConfigurationData<T> readerConfigurationData, ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> completableFuture, Schema<T> schema) {
        String str = "reader-" + DigestUtils.sha1Hex(UUID.randomUUID().toString()).substring(0, 10);
        str = StringUtils.isNotBlank(readerConfigurationData.getSubscriptionRolePrefix()) ? readerConfigurationData.getSubscriptionRolePrefix() + "-" + str : str;
        ConsumerConfigurationData consumerConfigurationData = new ConsumerConfigurationData();
        consumerConfigurationData.getTopicNames().add(readerConfigurationData.getTopicName());
        consumerConfigurationData.setSubscriptionName(str);
        consumerConfigurationData.setSubscriptionType(SubscriptionType.Exclusive);
        consumerConfigurationData.setSubscriptionMode(SubscriptionMode.NonDurable);
        consumerConfigurationData.setReceiverQueueSize(readerConfigurationData.getReceiverQueueSize());
        consumerConfigurationData.setReadCompacted(readerConfigurationData.isReadCompacted());
        consumerConfigurationData.setBatchReceivePolicy(DISABLED_BATCH_RECEIVE_POLICY);
        if (readerConfigurationData.getReaderName() != null) {
            consumerConfigurationData.setConsumerName(readerConfigurationData.getReaderName());
        }
        if (readerConfigurationData.isResetIncludeHead()) {
            consumerConfigurationData.setResetIncludeHead(true);
        }
        if (readerConfigurationData.getReaderListener() != null) {
            final ReaderListener<T> readerListener = readerConfigurationData.getReaderListener();
            consumerConfigurationData.setMessageListener(new MessageListener<T>() { // from class: org.apache.pulsar.client.impl.ReaderImpl.1
                private static final long serialVersionUID = 1;

                @Override // org.apache.pulsar.client.api.MessageListener
                public void received(Consumer<T> consumer, Message<T> message) {
                    readerListener.received(ReaderImpl.this, message);
                    consumer.acknowledgeCumulativeAsync((Message<?>) message);
                }

                @Override // org.apache.pulsar.client.api.MessageListener
                public void reachedEndOfTopic(Consumer<T> consumer) {
                    readerListener.reachedEndOfTopic(ReaderImpl.this);
                }
            });
        }
        consumerConfigurationData.setCryptoFailureAction(readerConfigurationData.getCryptoFailureAction());
        if (readerConfigurationData.getCryptoKeyReader() != null) {
            consumerConfigurationData.setCryptoKeyReader(readerConfigurationData.getCryptoKeyReader());
        }
        if (readerConfigurationData.getKeyHashRanges() != null) {
            consumerConfigurationData.setKeySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(readerConfigurationData.getKeyHashRanges()));
        }
        this.consumer = new ConsumerImpl<>(pulsarClientImpl, readerConfigurationData.getTopicName(), consumerConfigurationData, executorProvider, TopicName.getPartitionIndex(readerConfigurationData.getTopicName()), false, completableFuture, readerConfigurationData.getStartMessageId(), readerConfigurationData.getStartMessageFromRollbackDurationInSec(), schema, null, true);
    }

    @Override // org.apache.pulsar.client.api.Reader
    public String getTopic() {
        return this.consumer.getTopic();
    }

    public ConsumerImpl<T> getConsumer() {
        return this.consumer;
    }

    @Override // org.apache.pulsar.client.api.Reader
    public boolean hasReachedEndOfTopic() {
        return this.consumer.hasReachedEndOfTopic();
    }

    @Override // org.apache.pulsar.client.api.Reader
    public Message<T> readNext() throws PulsarClientException {
        Message<T> receive = this.consumer.receive();
        this.consumer.acknowledgeCumulativeAsync((Message<?>) receive);
        return receive;
    }

    @Override // org.apache.pulsar.client.api.Reader
    public Message<T> readNext(int i, TimeUnit timeUnit) throws PulsarClientException {
        Message<T> receive = this.consumer.receive(i, timeUnit);
        if (receive != null) {
            this.consumer.acknowledgeCumulativeAsync((Message<?>) receive);
        }
        return receive;
    }

    @Override // org.apache.pulsar.client.api.Reader
    public CompletableFuture<Message<T>> readNextAsync() {
        CompletableFuture<Message<T>> receiveAsync = this.consumer.receiveAsync();
        receiveAsync.whenComplete((message, th) -> {
            if (message != null) {
                this.consumer.acknowledgeCumulativeAsync((Message<?>) message);
            }
        });
        return receiveAsync;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.consumer.close();
    }

    @Override // org.apache.pulsar.client.api.Reader
    public CompletableFuture<Void> closeAsync() {
        return this.consumer.closeAsync();
    }

    @Override // org.apache.pulsar.client.api.Reader
    public boolean hasMessageAvailable() throws PulsarClientException {
        return this.consumer.hasMessageAvailable();
    }

    @Override // org.apache.pulsar.client.api.Reader
    public CompletableFuture<Boolean> hasMessageAvailableAsync() {
        return this.consumer.hasMessageAvailableAsync();
    }

    @Override // org.apache.pulsar.client.api.Reader
    public boolean isConnected() {
        return this.consumer.isConnected();
    }

    @Override // org.apache.pulsar.client.api.Reader
    public void seek(MessageId messageId) throws PulsarClientException {
        this.consumer.seek(messageId);
    }

    @Override // org.apache.pulsar.client.api.Reader
    public void seek(long j) throws PulsarClientException {
        this.consumer.seek(j);
    }

    @Override // org.apache.pulsar.client.api.Reader
    public CompletableFuture<Void> seekAsync(MessageId messageId) {
        return this.consumer.seekAsync(messageId);
    }

    @Override // org.apache.pulsar.client.api.Reader
    public CompletableFuture<Void> seekAsync(long j) {
        return this.consumer.seekAsync(j);
    }
}
