package org.apache.pulsar.io.kafka.connect;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-3.3.1.6.jar:org/apache/pulsar/io/kafka/connect/KafkaConnectSink.class */
public class KafkaConnectSink implements Sink<GenericObject> {
    private static final Logger log;
    private boolean unwrapKeyValueIfAvailable;
    private PulsarKafkaSinkContext sinkContext;

    @VisibleForTesting
    PulsarKafkaSinkTaskContext taskContext;
    private SinkConnector connector;
    private SinkTask task;
    private long maxBatchSize;
    private long lingerMs;
    private PulsarKafkaConnectSinkConfig kafkaSinkConfig;
    protected String topicName;
    protected boolean useOptionalPrimitives;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final AtomicLong currentBatchSize = new AtomicLong(0);
    private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("pulsar-io-kafka-adaptor-sink-flush-%d").build());

    @VisibleForTesting
    protected final ConcurrentLinkedDeque<Record<GenericObject>> pendingFlushQueue = new ConcurrentLinkedDeque<>();
    private final AtomicBoolean isFlushRunning = new AtomicBoolean(false);
    private volatile boolean isRunning = false;
    private final Properties props = new Properties();
    private boolean sanitizeTopicName = false;
    private boolean collapsePartitionedTopics = false;
    private final Cache<String, String> sanitizedTopicCache = CacheBuilder.newBuilder().maximumSize(1000).expireAfterAccess(30, TimeUnit.MINUTES).build();
    private final Cache<String, String> desanitizedTopicCache = CacheBuilder.newBuilder().build();
    private int maxBatchBitsForOffset = 12;
    private boolean useIndexAsOffset = true;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-3.3.1.6.jar:org/apache/pulsar/io/kafka/connect/KafkaConnectSink$BatchMessageSequenceRef.class */
    public static class BatchMessageSequenceRef {
        long ledgerId;
        long entryId;
        int batchIdx;

        public long getLedgerId() {
            return this.ledgerId;
        }

        public long getEntryId() {
            return this.entryId;
        }

        public int getBatchIdx() {
            return this.batchIdx;
        }

        public BatchMessageSequenceRef(long j, long j2, int i) {
            this.ledgerId = j;
            this.entryId = j2;
            this.batchIdx = i;
        }
    }

    public void write(Record<GenericObject> record) {
        if (log.isDebugEnabled()) {
            log.debug("Record sending to kafka, record={}.", record);
        }
        if (!this.isRunning) {
            log.warn("Sink is stopped. Cannot send the record {}", record);
            record.fail();
            return;
        }
        Preconditions.checkArgument(record.getMessage().isPresent());
        try {
            this.task.put(Lists.newArrayList(toSinkRecord(record)));
            this.pendingFlushQueue.add(record);
            this.currentBatchSize.addAndGet(((Message) record.getMessage().get()).size());
            flushIfNeeded(false);
        } catch (Exception e) {
            log.error("Error sending the record {}", record, e);
            record.fail();
        }
    }

    public void close() throws Exception {
        this.isRunning = false;
        flushIfNeeded(true);
        this.scheduledExecutor.shutdown();
        if (!this.scheduledExecutor.awaitTermination(10 * this.lingerMs, TimeUnit.MILLISECONDS)) {
            log.error("scheduledExecutor did not terminate in {} ms", Long.valueOf(10 * this.lingerMs));
        }
        this.task.stop();
        this.connector.stop();
        this.taskContext.close();
        log.info("Kafka sink stopped.");
    }

    public void open(Map<String, Object> map, SinkContext sinkContext) throws Exception {
        this.kafkaSinkConfig = PulsarKafkaConnectSinkConfig.load(map);
        Objects.requireNonNull(this.kafkaSinkConfig.getTopic(), "Kafka topic is not set");
        Preconditions.checkArgument(sinkContext.getSubscriptionType() == SubscriptionType.Failover || sinkContext.getSubscriptionType() == SubscriptionType.Exclusive, "Source must run with Exclusive or Failover subscription type");
        this.topicName = this.kafkaSinkConfig.getTopic();
        this.unwrapKeyValueIfAvailable = this.kafkaSinkConfig.isUnwrapKeyValueIfAvailable();
        this.sanitizeTopicName = this.kafkaSinkConfig.isSanitizeTopicName();
        this.collapsePartitionedTopics = this.kafkaSinkConfig.isCollapsePartitionedTopics();
        this.useOptionalPrimitives = this.kafkaSinkConfig.isUseOptionalPrimitives();
        this.useIndexAsOffset = this.kafkaSinkConfig.isUseIndexAsOffset();
        this.maxBatchBitsForOffset = this.kafkaSinkConfig.getMaxBatchBitsForOffset();
        Preconditions.checkArgument(this.maxBatchBitsForOffset <= 20, "Cannot use more than 20 bits for maxBatchBitsForOffset");
        String kafkaConnectorSinkClass = this.kafkaSinkConfig.getKafkaConnectorSinkClass();
        Map<String, String> kafkaConnectorConfigProperties = this.kafkaSinkConfig.getKafkaConnectorConfigProperties();
        Properties properties = this.props;
        Objects.requireNonNull(properties);
        kafkaConnectorConfigProperties.forEach((v1, v2) -> {
            r1.put(v1, v2);
        });
        this.connector = (SinkConnector) Class.forName(kafkaConnectorSinkClass).getConstructor(new Class[0]).newInstance(new Object[0]);
        Class<? extends Task> taskClass = this.connector.taskClass();
        this.sinkContext = new PulsarKafkaSinkContext();
        this.connector.initialize(this.sinkContext);
        this.connector.start(Maps.fromProperties(this.props));
        List<Map<String, String>> taskConfigs = this.connector.taskConfigs(1);
        Preconditions.checkNotNull(taskConfigs);
        Preconditions.checkArgument(taskConfigs.size() == 1);
        List list = (List) taskConfigs.stream().map(HashMap::new).collect(Collectors.toList());
        list.forEach(map2 -> {
            map2.put("offset.storage.topic", this.kafkaSinkConfig.getOffsetStorageTopic());
        });
        this.task = (SinkTask) taskClass.getConstructor(new Class[0]).newInstance(new Object[0]);
        Map map3 = (Map) list.get(0);
        SinkTask sinkTask = this.task;
        Objects.requireNonNull(sinkTask);
        this.taskContext = new PulsarKafkaSinkTaskContext(map3, sinkContext, sinkTask::open, str -> {
            if (!this.sanitizeTopicName) {
                return str;
            }
            String ifPresent = this.desanitizedTopicCache.getIfPresent(str);
            if (log.isDebugEnabled()) {
                log.debug("desanitizedTopicCache got: kafkaName: {}, pulsarTopicName: {}", str, ifPresent);
            }
            return ifPresent != null ? ifPresent : str;
        });
        this.task.initialize(this.taskContext);
        this.task.start((Map) list.get(0));
        this.maxBatchSize = this.kafkaSinkConfig.getBatchSize();
        this.lingerMs = this.kafkaSinkConfig.getLingerTimeMs();
        this.isRunning = true;
        this.scheduledExecutor.scheduleWithFixedDelay(() -> {
            flushIfNeeded(true);
        }, this.lingerMs, this.lingerMs, TimeUnit.MILLISECONDS);
        log.info("Kafka sink started : {}.", this.props);
    }

    private void flushIfNeeded(boolean z) {
        if (this.isFlushRunning.get()) {
            return;
        }
        if (z || this.currentBatchSize.get() >= this.maxBatchSize) {
            this.scheduledExecutor.submit(this::flush);
        }
    }

    public void flush() {
        if (log.isDebugEnabled()) {
            log.debug("flush requested, pending: {}, batchSize: {}", Long.valueOf(this.currentBatchSize.get()), Long.valueOf(this.maxBatchSize));
        }
        if (!this.pendingFlushQueue.isEmpty() && this.isFlushRunning.compareAndSet(false, true)) {
            Record<GenericObject> last = this.pendingFlushQueue.getLast();
            try {
                try {
                    Map<TopicPartition, OffsetAndMetadata> currentOffsets = this.taskContext.currentOffsets();
                    Map<TopicPartition, OffsetAndMetadata> preCommit = this.task.preCommit(currentOffsets);
                    if (preCommit == null || preCommit.isEmpty()) {
                        log.info("Task returned empty committedOffsets map; skipping flush; task will retry later");
                        this.isFlushRunning.compareAndSet(true, false);
                        return;
                    }
                    if (log.isDebugEnabled() && !areMapsEqual(preCommit, currentOffsets)) {
                        log.debug("committedOffsets {} differ from currentOffsets {}", preCommit, currentOffsets);
                    }
                    this.taskContext.flushOffsets(preCommit);
                    ackUntil(last, preCommit, (v0) -> {
                        v0.ack();
                    });
                    log.info("Flush succeeded");
                    this.isFlushRunning.compareAndSet(true, false);
                } catch (Throwable th) {
                    log.error("error flushing pending records", th);
                    ackUntil(last, null, (v0) -> {
                        v0.fail();
                    });
                    this.isFlushRunning.compareAndSet(true, false);
                }
            } catch (Throwable th2) {
                this.isFlushRunning.compareAndSet(true, false);
                throw th2;
            }
        }
    }

    private static boolean areMapsEqual(Map<TopicPartition, OffsetAndMetadata> map, Map<TopicPartition, OffsetAndMetadata> map2) {
        if (map.size() != map2.size()) {
            return false;
        }
        return map.entrySet().stream().allMatch(entry -> {
            return ((OffsetAndMetadata) entry.getValue()).equals(map2.get(entry.getKey()));
        });
    }

    @VisibleForTesting
    protected void ackUntil(Record<GenericObject> record, Map<TopicPartition, OffsetAndMetadata> map, Consumer<Record<GenericObject>> consumer) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            if (!hashMap.containsKey(key.topic())) {
                hashMap.put(key.topic(), new HashMap());
            }
            ((Map) hashMap.get(key.topic())).put(Integer.valueOf(key.partition()), Long.valueOf(entry.getValue().offset()));
        }
        Iterator<Record<GenericObject>> it = this.pendingFlushQueue.iterator();
        while (it.hasNext()) {
            Record<GenericObject> next = it.next();
            String sanitizeNameIfNeeded = sanitizeNameIfNeeded((String) next.getTopicName().orElse(this.topicName), this.sanitizeTopicName);
            Long l = hashMap.containsKey(sanitizeNameIfNeeded) ? (Long) ((Map) hashMap.get(sanitizeNameIfNeeded)).get(Integer.valueOf(((Integer) next.getPartitionIndex().orElse(0)).intValue())) : null;
            if (l == null) {
                if (next == record) {
                    return;
                }
            } else if (getMessageOffset(next) <= l.longValue()) {
                consumer.accept(next);
                this.pendingFlushQueue.remove(next);
                this.currentBatchSize.addAndGet((-1) * ((Message) next.getMessage().get()).size());
                if (next == record) {
                    return;
                }
            } else if (next == record) {
                return;
            }
        }
    }

    private long getMessageOffset(Record<GenericObject> record) {
        BatchMessageSequenceRef messageSequenceRefForBatchMessage;
        if (record.getMessage().isPresent()) {
            if (this.useIndexAsOffset && ((Message) record.getMessage().get()).hasIndex()) {
                return ((Message) record.getMessage().get()).getIndex().orElse(-1L).longValue();
            }
            MessageId messageId = ((Message) record.getMessage().get()).getMessageId();
            if (this.maxBatchBitsForOffset > 0 && (messageSequenceRefForBatchMessage = getMessageSequenceRefForBatchMessage(messageId)) != null) {
                long ledgerId = messageSequenceRefForBatchMessage.getLedgerId();
                long entryId = messageSequenceRefForBatchMessage.getEntryId();
                if (entryId > (1 << (28 - this.maxBatchBitsForOffset))) {
                    log.error("EntryId of the message {} over max, chance of duplicate offsets", Long.valueOf(entryId));
                }
                int batchIdx = messageSequenceRefForBatchMessage.getBatchIdx();
                if (batchIdx < 0) {
                    log.error("BatchIdx {} of the message is negative, chance of duplicate offsets", Integer.valueOf(batchIdx));
                    batchIdx = 0;
                }
                if (batchIdx > (1 << this.maxBatchBitsForOffset)) {
                    log.error("BatchIdx of the message {} over max, chance of duplicate offsets", Integer.valueOf(batchIdx));
                }
                return (ledgerId << 28) | (entryId << this.maxBatchBitsForOffset) | batchIdx;
            }
        }
        return ((Long) record.getRecordSequence().orElse(-1L)).longValue();
    }

    private static Method getMethodOfMessageId(MessageId messageId, String str) throws NoSuchMethodException {
        NoSuchMethodException noSuchMethodException = null;
        for (Class<?> cls = messageId.getClass(); cls != null; cls = cls.getSuperclass()) {
            try {
                return cls.getDeclaredMethod(str, new Class[0]);
            } catch (NoSuchMethodException e) {
                if (noSuchMethodException == null) {
                    noSuchMethodException = e;
                }
            }
        }
        if ($assertionsDisabled || noSuchMethodException != null) {
            throw noSuchMethodException;
        }
        throw new AssertionError();
    }

    @VisibleForTesting
    static BatchMessageSequenceRef getMessageSequenceRefForBatchMessage(MessageId messageId) {
        try {
            try {
                int intValue = ((Integer) getMethodOfMessageId(messageId, "getBatchIndex").invoke(messageId, new Object[0])).intValue();
                if (intValue < 0) {
                    return null;
                }
                return new BatchMessageSequenceRef(((Long) getMethodOfMessageId(messageId, "getLedgerId").invoke(messageId, new Object[0])).longValue(), ((Long) getMethodOfMessageId(messageId, "getEntryId").invoke(messageId, new Object[0])).longValue(), intValue);
            } catch (NoSuchMethodException e) {
                return null;
            }
        } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e2) {
            log.error("Unexpected error while retrieving sequenceId, messageId class: {}, error: {}", messageId.getClass().getName(), e2.getMessage(), e2);
            throw new RuntimeException(e2);
        }
    }

    protected SinkRecord toSinkRecord(Record<GenericObject> record) {
        int intValue;
        String sanitizeNameIfNeeded;
        Object orElse;
        Schema schema;
        Schema kafkaConnectSchema;
        Object kafkaConnectData;
        Long valueOf;
        if (this.collapsePartitionedTopics && record.getTopicName().isPresent() && TopicName.get((String) record.getTopicName().get()).isPartitioned()) {
            TopicName topicName = TopicName.get((String) record.getTopicName().get());
            intValue = topicName.getPartitionIndex();
            sanitizeNameIfNeeded = sanitizeNameIfNeeded(topicName.getPartitionedTopicName(), this.sanitizeTopicName);
        } else {
            intValue = ((Integer) record.getPartitionIndex().orElse(0)).intValue();
            sanitizeNameIfNeeded = sanitizeNameIfNeeded((String) record.getTopicName().orElse(this.topicName), this.sanitizeTopicName);
        }
        if (!this.unwrapKeyValueIfAvailable || record.getSchema() == null || record.getSchema().getSchemaInfo() == null || record.getSchema().getSchemaInfo().getType() != SchemaType.KEY_VALUE) {
            if (((Message) record.getMessage().get()).hasBase64EncodedKey()) {
                orElse = ((Message) record.getMessage().get()).getKeyBytes();
                schema = this.useOptionalPrimitives ? Schema.OPTIONAL_BYTES_SCHEMA : Schema.BYTES_SCHEMA;
            } else {
                orElse = record.getKey().orElse(null);
                schema = this.useOptionalPrimitives ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA;
            }
            kafkaConnectSchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(record.getSchema(), this.useOptionalPrimitives);
            kafkaConnectData = KafkaConnectData.getKafkaConnectData(((GenericObject) record.getValue()).getNativeObject(), kafkaConnectSchema);
        } else {
            KeyValueSchema keyValueSchema = (KeyValueSchema) record.getSchema();
            schema = PulsarSchemaToKafkaSchema.getOptionalKafkaConnectSchema(keyValueSchema.getKeySchema(), this.useOptionalPrimitives);
            kafkaConnectSchema = PulsarSchemaToKafkaSchema.getOptionalKafkaConnectSchema(keyValueSchema.getValueSchema(), this.useOptionalPrimitives);
            Object nativeObject = ((GenericObject) record.getValue()).getNativeObject();
            if (nativeObject instanceof KeyValue) {
                KeyValue keyValue = (KeyValue) nativeObject;
                orElse = KafkaConnectData.getKafkaConnectDataFromSchema(keyValue.getKey(), schema);
                kafkaConnectData = KafkaConnectData.getKafkaConnectDataFromSchema(keyValue.getValue(), kafkaConnectSchema);
            } else {
                if (nativeObject != null) {
                    throw new IllegalStateException("Cannot extract KeyValue data from " + nativeObject.getClass());
                }
                orElse = null;
                kafkaConnectData = null;
            }
        }
        long messageOffset = getMessageOffset(record);
        if (messageOffset < 0) {
            log.error("Message without sequenceId. Key: {} Value: {}", orElse, kafkaConnectData);
            throw new IllegalStateException("Message without sequenceId");
        }
        this.taskContext.updateLastOffset(new TopicPartition(sanitizeNameIfNeeded, intValue), messageOffset);
        TimestampType timestampType = TimestampType.NO_TIMESTAMP_TYPE;
        if (record.getEventTime().isPresent()) {
            valueOf = (Long) record.getEventTime().get();
            timestampType = TimestampType.CREATE_TIME;
        } else {
            valueOf = Long.valueOf(((Message) record.getMessage().get()).getPublishTime());
        }
        return new SinkRecord(sanitizeNameIfNeeded, intValue, schema, orElse, kafkaConnectSchema, kafkaConnectData, messageOffset, valueOf, timestampType);
    }

    @VisibleForTesting
    protected long currentOffset(String str, int i) {
        return this.taskContext.currentOffset(sanitizeNameIfNeeded(str, this.sanitizeTopicName), i).longValue();
    }

    protected String sanitizeNameIfNeeded(String str, boolean z) {
        if (!z) {
            return str;
        }
        try {
            return this.sanitizedTopicCache.get(str, () -> {
                String replaceAll = str.replaceAll("[^a-zA-Z0-9_]", "_");
                if (replaceAll.matches("^[^a-zA-Z_].*")) {
                    replaceAll = "_" + replaceAll;
                }
                this.desanitizedTopicCache.get(replaceAll, () -> {
                    return str;
                });
                return replaceAll;
            });
        } catch (ExecutionException e) {
            log.error("Failed to get sanitized topic name for {}", str, e);
            throw new IllegalStateException("Failed to get sanitized topic name for " + str, e);
        }
    }

    static {
        $assertionsDisabled = !KafkaConnectSink.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger((Class<?>) KafkaConnectSink.class);
    }
}
