package org.apache.pulsar.functions.source;

import java.security.Security;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarSourceConsumerConfig;
import org.apache.pulsar.functions.utils.CryptoUtils;
import org.apache.pulsar.io.core.Source;
import org.bouncycastle.jce.provider.BouncyCastleProvider;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-3.2.4.jar:org/apache/pulsar/functions/source/PulsarSource.class */
public abstract class PulsarSource<T> implements Source<T> {
    protected final PulsarClient pulsarClient;
    protected final PulsarSourceConfig pulsarSourceConfig;
    protected final Map<String, String> properties;
    protected final ClassLoader functionClassLoader;
    protected final TopicSchema topicSchema;

    /* JADX INFO: Access modifiers changed from: protected */
    public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarSourceConfig, Map<String, String> map, ClassLoader classLoader) {
        this.pulsarClient = pulsarClient;
        this.pulsarSourceConfig = pulsarSourceConfig;
        this.topicSchema = new TopicSchema(pulsarClient, classLoader);
        this.properties = map;
        this.functionClassLoader = classLoader;
    }

    public abstract List<Consumer<T>> getInputConsumers();

    /* JADX INFO: Access modifiers changed from: protected */
    public ConsumerBuilder<T> createConsumeBuilder(String str, PulsarSourceConsumerConfig pulsarSourceConsumerConfig) {
        ConsumerBuilder<T> subscriptionType = this.pulsarClient.newConsumer(pulsarSourceConsumerConfig.getSchema()).subscriptionName(this.pulsarSourceConfig.getSubscriptionName()).subscriptionInitialPosition(this.pulsarSourceConfig.getSubscriptionPosition()).subscriptionType(this.pulsarSourceConfig.getSubscriptionType());
        if (pulsarSourceConsumerConfig.getConsumerProperties() != null && !pulsarSourceConsumerConfig.getConsumerProperties().isEmpty()) {
            subscriptionType.loadConf(new HashMap(pulsarSourceConsumerConfig.getConsumerProperties()));
        }
        ConsumerBuilder<T> consumerBuilder = pulsarSourceConsumerConfig.isRegexPattern() ? subscriptionType.topicsPattern(str) : subscriptionType.topics(Collections.singletonList(str));
        if (pulsarSourceConsumerConfig.getReceiverQueueSize() != null) {
            consumerBuilder = consumerBuilder.receiverQueueSize(pulsarSourceConsumerConfig.getReceiverQueueSize().intValue());
        }
        if (pulsarSourceConsumerConfig.getCryptoKeyReader() != null) {
            consumerBuilder = consumerBuilder.cryptoKeyReader(pulsarSourceConsumerConfig.getCryptoKeyReader());
        }
        if (pulsarSourceConsumerConfig.getConsumerCryptoFailureAction() != null) {
            consumerBuilder = consumerBuilder.cryptoFailureAction(pulsarSourceConsumerConfig.getConsumerCryptoFailureAction());
        }
        ConsumerBuilder<T> properties = consumerBuilder.properties(this.properties);
        if (this.pulsarSourceConfig.getNegativeAckRedeliveryDelayMs() != null && this.pulsarSourceConfig.getNegativeAckRedeliveryDelayMs().longValue() > 0) {
            properties.negativeAckRedeliveryDelay(this.pulsarSourceConfig.getNegativeAckRedeliveryDelayMs().longValue(), TimeUnit.MILLISECONDS);
        }
        if (this.pulsarSourceConfig.getTimeoutMs() != null) {
            properties = properties.ackTimeout(this.pulsarSourceConfig.getTimeoutMs().longValue(), TimeUnit.MILLISECONDS);
        }
        if (this.pulsarSourceConfig.getMaxMessageRetries() != null && this.pulsarSourceConfig.getMaxMessageRetries().intValue() >= 0) {
            DeadLetterPolicy.DeadLetterPolicyBuilder builder = DeadLetterPolicy.builder();
            builder.maxRedeliverCount(this.pulsarSourceConfig.getMaxMessageRetries().intValue());
            if (this.pulsarSourceConfig.getDeadLetterTopic() != null && !this.pulsarSourceConfig.getDeadLetterTopic().isEmpty()) {
                builder.deadLetterTopic(this.pulsarSourceConfig.getDeadLetterTopic());
            }
            properties = properties.deadLetterPolicy(builder.build());
        }
        if (pulsarSourceConsumerConfig.isPoolMessages()) {
            properties.poolMessages(true);
        }
        return properties;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    public Record<T> buildRecord(Consumer<T> consumer, Message<T> message) {
        Schema schema = null;
        if (message instanceof MessageImpl) {
            schema = ((MessageImpl) message).getSchemaInternal();
        } else if (message instanceof TopicMessageImpl) {
            schema = ((TopicMessageImpl) message).getSchemaInternal();
        }
        if (schema instanceof AutoConsumeSchema) {
            schema = ((AutoConsumeSchema) schema).unwrapInternalSchema(message.getSchemaVersion());
        }
        return PulsarRecord.builder().message(message).schema(schema).topicName(message.getTopicName()).customAckFunction(bool -> {
            if (bool.booleanValue()) {
                consumer.acknowledgeCumulativeAsync((Message<?>) message).whenComplete((r3, th) -> {
                    message.release();
                });
            } else {
                consumer.acknowledgeAsync((Message<?>) message).whenComplete((r32, th2) -> {
                    message.release();
                });
            }
        }).ackFunction(() -> {
            if (this.pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                consumer.acknowledgeCumulativeAsync((Message<?>) message).whenComplete((r3, th) -> {
                    message.release();
                });
            } else {
                consumer.acknowledgeAsync((Message<?>) message).whenComplete((r32, th2) -> {
                    message.release();
                });
            }
        }).failFunction(() -> {
            try {
                if (this.pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                    throw new RuntimeException("Failed to process message: " + message.getMessageId());
                }
                consumer.negativeAcknowledge((Message<?>) message);
            } finally {
                message.release();
            }
        }).build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    public PulsarSourceConsumerConfig<T> buildPulsarSourceConsumerConfig(String str, ConsumerConfig consumerConfig, Class<?> cls) {
        PulsarSourceConsumerConfig.PulsarSourceConsumerConfigBuilder<T> consumerProperties = PulsarSourceConsumerConfig.builder().isRegexPattern(consumerConfig.isRegexPattern()).receiverQueueSize(consumerConfig.getReceiverQueueSize()).consumerProperties(consumerConfig.getConsumerProperties());
        consumerProperties.schema((consumerConfig.getSerdeClassName() == null || consumerConfig.getSerdeClassName().isEmpty()) ? this.topicSchema.getSchema(str, cls, consumerConfig, true) : this.topicSchema.getSchema(str, cls, consumerConfig.getSerdeClassName(), true));
        if (consumerConfig.getCryptoConfig() != null) {
            if (Security.getProvider("BC") == null) {
                Security.addProvider(new BouncyCastleProvider());
            }
            consumerProperties.consumerCryptoFailureAction(consumerConfig.getCryptoConfig().getConsumerCryptoFailureAction());
            consumerProperties.cryptoKeyReader(CryptoUtils.getCryptoKeyReaderInstance(consumerConfig.getCryptoConfig().getCryptoKeyReaderClassName(), consumerConfig.getCryptoConfig().getCryptoKeyReaderConfig(), this.functionClassLoader));
        }
        consumerProperties.poolMessages(consumerConfig.isPoolMessages());
        return consumerProperties.build();
    }
}
