package org.apache.pulsar.functions.sink;

import com.google.common.annotations.VisibleForTesting;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.AbstractSinkRecord;
import org.apache.pulsar.functions.instance.ProducerBuilderFactory;
import org.apache.pulsar.functions.instance.ProducerCache;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-3.3.2.jar:org/apache/pulsar/functions/sink/PulsarSink.class */
public class PulsarSink<T> implements Sink<T> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PulsarSink.class);
    private final PulsarClient client;
    private final PulsarSinkConfig pulsarSinkConfig;
    private final Map<String, String> properties;
    private final ClassLoader functionClassLoader;
    private ComponentStatsManager stats;
    private final ProducerCache producerCache;

    @VisibleForTesting
    PulsarSinkProcessor<T> pulsarSinkProcessor;
    private final TopicSchema topicSchema;
    private Schema<T> schema;
    private ProducerBuilderFactory producerBuilderFactory;

    @VisibleForTesting
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-3.3.2.jar:org/apache/pulsar/functions/sink/PulsarSink$PulsarSinkAtLeastOnceProcessor.class */
    class PulsarSinkAtLeastOnceProcessor extends PulsarSink<T>.PulsarSinkAtMostOnceProcessor {
        PulsarSinkAtLeastOnceProcessor() {
            super();
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkAtMostOnceProcessor, org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public void sendOutputMessage(TypedMessageBuilder<T> typedMessageBuilder, AbstractSinkRecord<T> abstractSinkRecord) {
            typedMessageBuilder.sendAsync().thenAccept(messageId -> {
                abstractSinkRecord.ack();
            }).exceptionally((Function<Throwable, ? extends Void>) getPublishErrorHandler(abstractSinkRecord, true));
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-3.3.2.jar:org/apache/pulsar/functions/sink/PulsarSink$PulsarSinkAtMostOnceProcessor.class */
    class PulsarSinkAtMostOnceProcessor extends PulsarSink<T>.PulsarSinkProcessorBase {
        public PulsarSinkAtMostOnceProcessor() {
            super();
            if (!(PulsarSink.this.schema instanceof AutoConsumeSchema)) {
                getProducer(PulsarSink.this.pulsarSinkConfig.getTopic(), PulsarSink.this.schema);
            } else if (PulsarSink.log.isDebugEnabled()) {
                PulsarSink.log.debug("The Pulsar producer is not initialized until the first record is published for `AUTO_CONSUME` schema.");
            }
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public TypedMessageBuilder<T> newMessage(AbstractSinkRecord<T> abstractSinkRecord) {
            Schema<T> schema = abstractSinkRecord.getSchema();
            if (!abstractSinkRecord.shouldSetSchema()) {
                schema = PulsarSink.this.schema;
            }
            return schema != null ? (TypedMessageBuilder<T>) getProducer(abstractSinkRecord.getDestinationTopic().orElse(PulsarSink.this.pulsarSinkConfig.getTopic()), schema).newMessage(schema) : getProducer(abstractSinkRecord.getDestinationTopic().orElse(PulsarSink.this.pulsarSinkConfig.getTopic()), null).newMessage();
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public void sendOutputMessage(TypedMessageBuilder<T> typedMessageBuilder, AbstractSinkRecord<T> abstractSinkRecord) {
            typedMessageBuilder.sendAsync().thenAccept(messageId -> {
            }).exceptionally((Function<Throwable, ? extends Void>) getPublishErrorHandler(abstractSinkRecord, false));
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-3.3.2.jar:org/apache/pulsar/functions/sink/PulsarSink$PulsarSinkEffectivelyOnceProcessor.class */
    class PulsarSinkEffectivelyOnceProcessor extends PulsarSink<T>.PulsarSinkProcessorBase {
        PulsarSinkEffectivelyOnceProcessor() {
            super();
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public TypedMessageBuilder<T> newMessage(AbstractSinkRecord<T> abstractSinkRecord) {
            if (!abstractSinkRecord.getPartitionId().isPresent()) {
                throw new RuntimeException("PartitionId needs to be specified for every record while in Effectively-once mode");
            }
            Schema<T> schema = abstractSinkRecord.getSchema();
            if (!abstractSinkRecord.shouldSetSchema()) {
                schema = PulsarSink.this.schema;
            }
            String orElse = abstractSinkRecord.getDestinationTopic().orElse(PulsarSink.this.pulsarSinkConfig.getTopic());
            String str = abstractSinkRecord.getPartitionId().get();
            Producer<T> producer = getProducer(orElse, schema, str, str);
            return schema != null ? (TypedMessageBuilder<T>) producer.newMessage(schema) : producer.newMessage();
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public void sendOutputMessage(TypedMessageBuilder<T> typedMessageBuilder, AbstractSinkRecord<T> abstractSinkRecord) {
            if (!abstractSinkRecord.getRecordSequence().isPresent()) {
                throw new RuntimeException("RecordSequence needs to be specified for every record while in Effectively-once mode");
            }
            typedMessageBuilder.sequenceId(abstractSinkRecord.getRecordSequence().get().longValue());
            typedMessageBuilder.sendAsync().thenAccept(messageId -> {
                abstractSinkRecord.ack();
            }).exceptionally((Function<Throwable, ? extends Void>) getPublishErrorHandler(abstractSinkRecord, true));
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-3.3.2.jar:org/apache/pulsar/functions/sink/PulsarSink$PulsarSinkManualProcessor.class */
    class PulsarSinkManualProcessor extends PulsarSink<T>.PulsarSinkAtMostOnceProcessor {
        PulsarSinkManualProcessor() {
            super();
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkAtMostOnceProcessor, org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public void sendOutputMessage(TypedMessageBuilder<T> typedMessageBuilder, AbstractSinkRecord<T> abstractSinkRecord) {
            super.sendOutputMessage(typedMessageBuilder, abstractSinkRecord);
        }
    }

    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-3.3.2.jar:org/apache/pulsar/functions/sink/PulsarSink$PulsarSinkProcessor.class */
    private interface PulsarSinkProcessor<T> {
        TypedMessageBuilder<T> newMessage(AbstractSinkRecord<T> abstractSinkRecord);

        void sendOutputMessage(TypedMessageBuilder<T> typedMessageBuilder, AbstractSinkRecord<T> abstractSinkRecord);

        void close() throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-3.3.2.jar:org/apache/pulsar/functions/sink/PulsarSink$PulsarSinkProcessorBase.class */
    public abstract class PulsarSinkProcessorBase implements PulsarSinkProcessor<T> {
        PulsarSinkProcessorBase() {
        }

        protected Producer<T> getProducer(String str, Schema schema) {
            return getProducer(str, schema, null, null);
        }

        protected Producer<T> getProducer(String str, Schema schema, String str2, String str3) {
            return PulsarSink.this.producerCache.getOrCreateProducer(ProducerCache.CacheArea.SINK_RECORD_CACHE, str, str3, () -> {
                Producer<T> createProducer = PulsarSink.this.createProducer(str, schema, str2);
                PulsarSink.log.info("Initialized producer with name '{}' on topic '{}' with schema {} partitionId {} -> {}", str2, str, schema, str3, createProducer);
                return createProducer;
            });
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public void close() throws Exception {
        }

        public Function<Throwable, Void> getPublishErrorHandler(AbstractSinkRecord<T> abstractSinkRecord, boolean z) {
            return th -> {
                String format;
                Record<?> sourceRecord = abstractSinkRecord.getSourceRecord();
                if (z) {
                    sourceRecord.fail();
                }
                String orElse = abstractSinkRecord.getDestinationTopic().orElse(PulsarSink.this.pulsarSinkConfig.getTopic());
                if (sourceRecord instanceof PulsarRecord) {
                    format = String.format("Failed to publish to topic [%s] with error [%s] with src message id [%s]", orElse, th.getMessage(), ((PulsarRecord) sourceRecord).getMessageId());
                } else {
                    format = String.format("Failed to publish to topic [%s] with error [%s]", orElse, th.getMessage());
                    if (abstractSinkRecord.getRecordSequence().isPresent()) {
                        format = String.format(format + " with src sequence id [%s]", abstractSinkRecord.getRecordSequence().get());
                    }
                }
                PulsarSink.log.error(format);
                PulsarSink.this.stats.incrSinkExceptions(new Exception(format));
                return null;
            };
        }
    }

    public PulsarSink(PulsarClient pulsarClient, PulsarSinkConfig pulsarSinkConfig, Map<String, String> map, ComponentStatsManager componentStatsManager, ClassLoader classLoader, ProducerCache producerCache) {
        this.client = pulsarClient;
        this.pulsarSinkConfig = pulsarSinkConfig;
        this.topicSchema = new TopicSchema(pulsarClient, classLoader);
        this.properties = map;
        this.stats = componentStatsManager;
        this.functionClassLoader = classLoader;
        this.producerCache = producerCache;
    }

    @Override // org.apache.pulsar.io.core.Sink
    public void open(Map<String, Object> map, SinkContext sinkContext) throws Exception {
        log.info("Opening pulsar sink with config: {}", this.pulsarSinkConfig);
        this.schema = initializeSchema();
        if (this.schema == null) {
            log.info("Since output type is null, not creating any real sink");
            return;
        }
        this.producerBuilderFactory = new ProducerBuilderFactory(this.client, this.pulsarSinkConfig.getProducerConfig(), this.functionClassLoader, null);
        switch (this.pulsarSinkConfig.getProcessingGuarantees()) {
            case ATMOST_ONCE:
                this.pulsarSinkProcessor = new PulsarSinkAtMostOnceProcessor();
                return;
            case ATLEAST_ONCE:
                this.pulsarSinkProcessor = new PulsarSinkAtLeastOnceProcessor();
                return;
            case EFFECTIVELY_ONCE:
                this.pulsarSinkProcessor = new PulsarSinkEffectivelyOnceProcessor();
                return;
            case MANUAL:
                this.pulsarSinkProcessor = new PulsarSinkManualProcessor();
                return;
            default:
                return;
        }
    }

    @Override // org.apache.pulsar.io.core.Sink
    public void write(Record<T> record) {
        AbstractSinkRecord<T> abstractSinkRecord = (AbstractSinkRecord) record;
        TypedMessageBuilder<T> newMessage = this.pulsarSinkProcessor.newMessage(abstractSinkRecord);
        if (record.getKey().isPresent() && (!(record.getSchema() instanceof KeyValueSchema) || ((KeyValueSchema) record.getSchema()).getKeyValueEncodingType() != KeyValueEncodingType.SEPARATED)) {
            newMessage.key(record.getKey().get());
        }
        newMessage.value(record.getValue());
        if (!record.getProperties().isEmpty() && (abstractSinkRecord.shouldAlwaysSetMessageProperties() || this.pulsarSinkConfig.isForwardSourceMessageProperty())) {
            newMessage.properties(record.getProperties());
        }
        if (abstractSinkRecord.getSourceRecord() instanceof PulsarRecord) {
            PulsarRecord pulsarRecord = (PulsarRecord) abstractSinkRecord.getSourceRecord();
            newMessage.property("__pfn_input_topic__", pulsarRecord.getTopicName().get()).property("__pfn_input_msg_id__", new String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray()), StandardCharsets.UTF_8));
        } else {
            Optional<Long> eventTime = abstractSinkRecord.getSourceRecord().getEventTime();
            Objects.requireNonNull(newMessage);
            eventTime.ifPresent((v1) -> {
                r1.eventTime(v1);
            });
        }
        this.pulsarSinkProcessor.sendOutputMessage(newMessage, abstractSinkRecord);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.pulsarSinkProcessor != null) {
            this.pulsarSinkProcessor.close();
        }
    }

    Producer<T> createProducer(String str, Schema<T> schema, String str2) {
        Schema<T> schema2 = schema != null ? schema : this.schema;
        try {
            log.info("Initializing producer {} on topic {} with schema {}", str2, str, schema2);
            return this.producerBuilderFactory.createProducerBuilder(str, schema2, str2).properties(this.properties).create();
        } catch (PulsarClientException e) {
            throw new RuntimeException("Failed to create Producer for topic " + str + " producerName " + str2 + " schema " + String.valueOf(schema2), e);
        }
    }

    @VisibleForTesting
    Schema<T> initializeSchema() throws ClassNotFoundException {
        if (StringUtils.isEmpty(this.pulsarSinkConfig.getTypeClassName())) {
            return (Schema<T>) Schema.BYTES;
        }
        Class<?> loadClass = Reflections.loadClass(this.pulsarSinkConfig.getTypeClassName(), this.functionClassLoader);
        if (Void.class.equals(loadClass)) {
            return null;
        }
        ConsumerConfig consumerConfig = new ConsumerConfig();
        consumerConfig.setSchemaProperties(this.pulsarSinkConfig.getSchemaProperties());
        if (StringUtils.isEmpty(this.pulsarSinkConfig.getSchemaType())) {
            consumerConfig.setSchemaType(this.pulsarSinkConfig.getSerdeClassName());
            return (Schema<T>) this.topicSchema.getSchema(this.pulsarSinkConfig.getTopic(), loadClass, consumerConfig, false, this.functionClassLoader);
        }
        if (GenericRecord.class.isAssignableFrom(loadClass)) {
            consumerConfig.setSchemaType(SchemaType.AUTO_CONSUME.toString());
            SchemaType valueOf = SchemaType.valueOf(this.pulsarSinkConfig.getSchemaType());
            if (SchemaType.AUTO_CONSUME != valueOf) {
                log.info("The configured schema type {} is not able to write GenericRecords. So overwrite the schema type to be {}", valueOf, SchemaType.AUTO_CONSUME);
            }
        } else {
            consumerConfig.setSchemaType(this.pulsarSinkConfig.getSchemaType());
        }
        return (Schema<T>) this.topicSchema.getSchema(this.pulsarSinkConfig.getTopic(), loadClass, consumerConfig, false);
    }
}
