package org.apache.kafka.connect.runtime;

import java.time.Duration;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.4.7.jar:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/runtime/WorkerSourceTask.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/runtime/WorkerSourceTask.class */
public class WorkerSourceTask extends WorkerTask {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) WorkerSourceTask.class);
    private static final long SEND_FAILED_BACKOFF_MS = 100;
    private final WorkerConfig workerConfig;
    private final SourceTask task;
    private final ClusterConfigState configState;
    private final Converter keyConverter;
    private final Converter valueConverter;
    private final HeaderConverter headerConverter;
    private final TransformationChain<SourceRecord> transformationChain;
    private KafkaProducer<byte[], byte[]> producer;
    private final OffsetStorageReader offsetReader;
    private final OffsetStorageWriter offsetWriter;
    private final Time time;
    private final SourceTaskMetricsGroup sourceTaskMetricsGroup;
    private List<SourceRecord> toSend;
    private boolean lastSendFailed;
    private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessages;
    private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessagesBacklog;
    private boolean flushing;
    private CountDownLatch stopRequestedLatch;
    private Map<String, String> taskConfig;
    private boolean finishedStart;
    private boolean startedShutdownBeforeStartCompleted;
    private boolean stopped;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.4.7.jar:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/runtime/WorkerSourceTask$SourceRecordWriteCounter.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/runtime/WorkerSourceTask$SourceRecordWriteCounter.class */
    public static class SourceRecordWriteCounter {
        private final SourceTaskMetricsGroup metricsGroup;
        private final int batchSize;
        private boolean completed = false;
        private int counter;
        static final /* synthetic */ boolean $assertionsDisabled;

        public SourceRecordWriteCounter(int i, SourceTaskMetricsGroup sourceTaskMetricsGroup) {
            if (!$assertionsDisabled && i <= 0) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && sourceTaskMetricsGroup == null) {
                throw new AssertionError();
            }
            this.batchSize = i;
            this.counter = i;
            this.metricsGroup = sourceTaskMetricsGroup;
        }

        public void skipRecord() {
            if (this.counter > 0) {
                int i = this.counter - 1;
                this.counter = i;
                if (i == 0) {
                    finishedAllWrites();
                }
            }
        }

        public void completeRecord() {
            if (this.counter > 0) {
                int i = this.counter - 1;
                this.counter = i;
                if (i == 0) {
                    finishedAllWrites();
                }
            }
        }

        public void retryRemaining() {
            finishedAllWrites();
        }

        private void finishedAllWrites() {
            if (this.completed) {
                return;
            }
            this.metricsGroup.recordWrite(this.batchSize - this.counter);
            this.completed = true;
        }

        static {
            $assertionsDisabled = !WorkerSourceTask.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.4.7.jar:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/runtime/WorkerSourceTask$SourceTaskMetricsGroup.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/runtime/WorkerSourceTask$SourceTaskMetricsGroup.class */
    public static class SourceTaskMetricsGroup {
        private final ConnectMetrics.MetricGroup metricGroup;
        private final Sensor sourceRecordPoll;
        private final Sensor sourceRecordWrite;
        private final Sensor sourceRecordActiveCount;
        private final Sensor pollTime;
        private int activeRecordCount;

        public SourceTaskMetricsGroup(ConnectorTaskId connectorTaskId, ConnectMetrics connectMetrics) {
            ConnectMetricsRegistry registry = connectMetrics.registry();
            this.metricGroup = connectMetrics.group(registry.sourceTaskGroupName(), registry.connectorTagName(), connectorTaskId.connector(), registry.taskTagName(), Integer.toString(connectorTaskId.task()));
            this.metricGroup.close();
            this.sourceRecordPoll = this.metricGroup.sensor("source-record-poll");
            this.sourceRecordPoll.add(this.metricGroup.metricName(registry.sourceRecordPollRate), new Rate());
            this.sourceRecordPoll.add(this.metricGroup.metricName(registry.sourceRecordPollTotal), new Total());
            this.sourceRecordWrite = this.metricGroup.sensor("source-record-write");
            this.sourceRecordWrite.add(this.metricGroup.metricName(registry.sourceRecordWriteRate), new Rate());
            this.sourceRecordWrite.add(this.metricGroup.metricName(registry.sourceRecordWriteTotal), new Total());
            this.pollTime = this.metricGroup.sensor("poll-batch-time");
            this.pollTime.add(this.metricGroup.metricName(registry.sourceRecordPollBatchTimeMax), new Max());
            this.pollTime.add(this.metricGroup.metricName(registry.sourceRecordPollBatchTimeAvg), new Avg());
            this.sourceRecordActiveCount = this.metricGroup.sensor("source-record-active-count");
            this.sourceRecordActiveCount.add(this.metricGroup.metricName(registry.sourceRecordActiveCount), new Value());
            this.sourceRecordActiveCount.add(this.metricGroup.metricName(registry.sourceRecordActiveCountMax), new Max());
            this.sourceRecordActiveCount.add(this.metricGroup.metricName(registry.sourceRecordActiveCountAvg), new Avg());
        }

        void close() {
            this.metricGroup.close();
        }

        void recordPoll(int i, long j) {
            this.sourceRecordPoll.record(i);
            this.pollTime.record(j);
            this.activeRecordCount += i;
            this.sourceRecordActiveCount.record(this.activeRecordCount);
        }

        void recordWrite(int i) {
            this.sourceRecordWrite.record(i);
            this.activeRecordCount -= i;
            this.activeRecordCount = Math.max(0, this.activeRecordCount);
            this.sourceRecordActiveCount.record(this.activeRecordCount);
        }

        protected ConnectMetrics.MetricGroup metricGroup() {
            return this.metricGroup;
        }
    }

    public WorkerSourceTask(ConnectorTaskId connectorTaskId, SourceTask sourceTask, TaskStatus.Listener listener, TargetState targetState, Converter converter, Converter converter2, HeaderConverter headerConverter, TransformationChain<SourceRecord> transformationChain, KafkaProducer<byte[], byte[]> kafkaProducer, OffsetStorageReader offsetStorageReader, OffsetStorageWriter offsetStorageWriter, WorkerConfig workerConfig, ClusterConfigState clusterConfigState, ConnectMetrics connectMetrics, ClassLoader classLoader, Time time, RetryWithToleranceOperator retryWithToleranceOperator) {
        super(connectorTaskId, listener, targetState, classLoader, connectMetrics, retryWithToleranceOperator);
        this.finishedStart = false;
        this.startedShutdownBeforeStartCompleted = false;
        this.stopped = false;
        this.workerConfig = workerConfig;
        this.task = sourceTask;
        this.configState = clusterConfigState;
        this.keyConverter = converter;
        this.valueConverter = converter2;
        this.headerConverter = headerConverter;
        this.transformationChain = transformationChain;
        this.producer = kafkaProducer;
        this.offsetReader = offsetStorageReader;
        this.offsetWriter = offsetStorageWriter;
        this.time = time;
        this.toSend = null;
        this.lastSendFailed = false;
        this.outstandingMessages = new IdentityHashMap<>();
        this.outstandingMessagesBacklog = new IdentityHashMap<>();
        this.flushing = false;
        this.stopRequestedLatch = new CountDownLatch(1);
        this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(connectorTaskId, connectMetrics);
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public void initialize(TaskConfig taskConfig) {
        try {
            this.taskConfig = taskConfig.originalsStrings();
        } catch (Throwable th) {
            log.error("{} Task failed initialization and will not be started.", this, th);
            onFailure(th);
        }
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    protected void close() {
        if (!shouldPause()) {
            tryStop();
        }
        if (this.producer != null) {
            try {
                this.producer.close(Duration.ofSeconds(30L));
            } catch (Throwable th) {
                log.warn("Could not close producer", th);
            }
        }
        try {
            this.transformationChain.close();
        } catch (Throwable th2) {
            log.warn("Could not close transformation chain", th2);
        }
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    protected void releaseResources() {
        this.sourceTaskMetricsGroup.close();
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public void stop() {
        super.stop();
        this.stopRequestedLatch.countDown();
        synchronized (this) {
            if (this.finishedStart) {
                tryStop();
            } else {
                this.startedShutdownBeforeStartCompleted = true;
            }
        }
    }

    private synchronized void tryStop() {
        if (this.stopped) {
            return;
        }
        try {
            this.task.stop();
            this.stopped = true;
        } catch (Throwable th) {
            log.warn("Could not stop task", th);
        }
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public void execute() {
        try {
            this.task.initialize(new WorkerSourceTaskContext(this.offsetReader, this, this.configState));
            this.task.start(this.taskConfig);
            log.info("{} Source task finished initialization and start", this);
            synchronized (this) {
                if (this.startedShutdownBeforeStartCompleted) {
                    tryStop();
                    return;
                }
                this.finishedStart = true;
                while (!isStopping()) {
                    if (shouldPause()) {
                        onPause();
                        if (awaitUnpause()) {
                            onResume();
                        }
                    } else {
                        if (this.toSend == null) {
                            log.trace("{} Nothing to send to Kafka. Polling source for additional records", this);
                            long milliseconds = this.time.milliseconds();
                            this.toSend = poll();
                            if (this.toSend != null) {
                                recordPollReturned(this.toSend.size(), this.time.milliseconds() - milliseconds);
                            }
                        }
                        if (this.toSend != null) {
                            log.debug("{} About to send " + this.toSend.size() + " records to Kafka", this);
                            if (!sendRecords()) {
                                this.stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS, TimeUnit.MILLISECONDS);
                            }
                        }
                    }
                }
            }
        } catch (InterruptedException e) {
        } finally {
            commitOffsets();
        }
    }

    protected List<SourceRecord> poll() throws InterruptedException {
        try {
            return this.task.poll();
        } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) {
            log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
            return null;
        }
    }

    private ProducerRecord<byte[], byte[]> convertTransformedRecord(SourceRecord sourceRecord) {
        if (sourceRecord == null) {
            return null;
        }
        RecordHeaders recordHeaders = (RecordHeaders) this.retryWithToleranceOperator.execute(() -> {
            return convertHeaderFor(sourceRecord);
        }, Stage.HEADER_CONVERTER, this.headerConverter.getClass());
        byte[] bArr = (byte[]) this.retryWithToleranceOperator.execute(() -> {
            return this.keyConverter.fromConnectData(sourceRecord.topic(), sourceRecord.keySchema(), sourceRecord.key());
        }, Stage.KEY_CONVERTER, this.keyConverter.getClass());
        byte[] bArr2 = (byte[]) this.retryWithToleranceOperator.execute(() -> {
            return this.valueConverter.fromConnectData(sourceRecord.topic(), sourceRecord.valueSchema(), sourceRecord.value());
        }, Stage.VALUE_CONVERTER, this.valueConverter.getClass());
        if (this.retryWithToleranceOperator.failed()) {
            return null;
        }
        return new ProducerRecord<>(sourceRecord.topic(), sourceRecord.kafkaPartition(), ConnectUtils.checkAndConvertTimestamp(sourceRecord.timestamp()), bArr, bArr2, recordHeaders);
    }

    private boolean sendRecords() {
        int i = 0;
        recordBatch(this.toSend.size());
        final SourceRecordWriteCounter sourceRecordWriteCounter = new SourceRecordWriteCounter(this.toSend.size(), this.sourceTaskMetricsGroup);
        for (final SourceRecord sourceRecord : this.toSend) {
            this.retryWithToleranceOperator.sourceRecord(sourceRecord);
            SourceRecord apply = this.transformationChain.apply(sourceRecord);
            final ProducerRecord<byte[], byte[]> convertTransformedRecord = convertTransformedRecord(apply);
            if (convertTransformedRecord == null || this.retryWithToleranceOperator.failed()) {
                sourceRecordWriteCounter.skipRecord();
                commitTaskRecord(sourceRecord);
            } else {
                log.trace("{} Appending record with key {}, value {}", this, apply.key(), apply.value());
                synchronized (this) {
                    if (!this.lastSendFailed) {
                        if (this.flushing) {
                            this.outstandingMessagesBacklog.put(convertTransformedRecord, convertTransformedRecord);
                        } else {
                            this.outstandingMessages.put(convertTransformedRecord, convertTransformedRecord);
                        }
                        this.offsetWriter.offset(apply.sourcePartition(), apply.sourceOffset());
                    }
                }
                try {
                    final String str = convertTransformedRecord.topic();
                    this.producer.send(convertTransformedRecord, new Callback() { // from class: org.apache.kafka.connect.runtime.WorkerSourceTask.1
                        @Override // org.apache.kafka.clients.producer.Callback
                        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                            if (exc != null) {
                                WorkerSourceTask.log.error("{} failed to send record to {}: {}", WorkerSourceTask.this, str, exc);
                                WorkerSourceTask.log.debug("{} Failed record: {}", WorkerSourceTask.this, sourceRecord);
                            } else {
                                WorkerSourceTask.log.trace("{} Wrote record successfully: topic {} partition {} offset {}", WorkerSourceTask.this, recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset()));
                                WorkerSourceTask.this.commitTaskRecord(sourceRecord);
                            }
                            WorkerSourceTask.this.recordSent(convertTransformedRecord);
                            sourceRecordWriteCounter.completeRecord();
                        }
                    });
                    this.lastSendFailed = false;
                    i++;
                } catch (RetriableException e) {
                    log.warn("{} Failed to send {}, backing off before retrying:", this, convertTransformedRecord, e);
                    this.toSend = this.toSend.subList(i, this.toSend.size());
                    this.lastSendFailed = true;
                    sourceRecordWriteCounter.retryRemaining();
                    return false;
                } catch (KafkaException e2) {
                    throw new ConnectException("Unrecoverable exception trying to send", e2);
                }
            }
        }
        this.toSend = null;
        return true;
    }

    private RecordHeaders convertHeaderFor(SourceRecord sourceRecord) {
        Headers headers = sourceRecord.headers();
        RecordHeaders recordHeaders = new RecordHeaders();
        if (headers != null) {
            String str = sourceRecord.topic();
            for (Header header : headers) {
                String key = header.key();
                recordHeaders.add(key, this.headerConverter.fromConnectHeader(str, key, header.schema(), header.value()));
            }
        }
        return recordHeaders;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void commitTaskRecord(SourceRecord sourceRecord) {
        try {
            this.task.commitRecord(sourceRecord);
        } catch (Throwable th) {
            log.error("{} Exception thrown while calling task.commitRecord()", this, th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void recordSent(ProducerRecord<byte[], byte[]> producerRecord) {
        ProducerRecord<byte[], byte[]> remove = this.outstandingMessages.remove(producerRecord);
        if (remove == null && this.flushing) {
            remove = this.outstandingMessagesBacklog.remove(producerRecord);
        }
        if (remove == null) {
            log.error("{} CRITICAL Saw callback for record that was not present in the outstanding message set: {}", this, producerRecord);
        } else if (this.flushing && this.outstandingMessages.isEmpty()) {
            notifyAll();
        }
    }

    public boolean commitOffsets() {
        long longValue = this.workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG).longValue();
        log.info("{} Committing offsets", this);
        long milliseconds = this.time.milliseconds();
        long j = milliseconds + longValue;
        synchronized (this) {
            this.flushing = true;
            boolean beginFlush = this.offsetWriter.beginFlush();
            log.info("{} flushing {} outstanding messages for offset commit", this, Integer.valueOf(this.outstandingMessages.size()));
            while (!this.outstandingMessages.isEmpty()) {
                try {
                    long milliseconds2 = j - this.time.milliseconds();
                    if (milliseconds2 <= 0) {
                        log.error("{} Failed to flush, timed out while waiting for producer to flush outstanding {} messages", this, Integer.valueOf(this.outstandingMessages.size()));
                        finishFailedFlush();
                        recordCommitFailure(this.time.milliseconds() - milliseconds, null);
                        return false;
                    }
                    wait(milliseconds2);
                } catch (InterruptedException e) {
                    log.error("{} Interrupted while flushing messages, offsets will not be committed", this);
                    finishFailedFlush();
                    recordCommitFailure(this.time.milliseconds() - milliseconds, null);
                    return false;
                }
            }
            if (!beginFlush) {
                finishSuccessfulFlush();
                long milliseconds3 = this.time.milliseconds() - milliseconds;
                recordCommitSuccess(milliseconds3);
                log.debug("{} Finished offset commitOffsets successfully in {} ms", this, Long.valueOf(milliseconds3));
                commitSourceTask();
                return true;
            }
            Future<Void> doFlush = this.offsetWriter.doFlush(new org.apache.kafka.connect.util.Callback<Void>() { // from class: org.apache.kafka.connect.runtime.WorkerSourceTask.2
                @Override // org.apache.kafka.connect.util.Callback
                public void onCompletion(Throwable th, Void r7) {
                    if (th != null) {
                        WorkerSourceTask.log.error("{} Failed to flush offsets to storage: ", WorkerSourceTask.this, th);
                    } else {
                        WorkerSourceTask.log.trace("{} Finished flushing offsets to storage", WorkerSourceTask.this);
                    }
                }
            });
            if (doFlush == null) {
                finishFailedFlush();
                recordCommitFailure(this.time.milliseconds() - milliseconds, null);
                return false;
            }
            try {
                doFlush.get(Math.max(j - this.time.milliseconds(), 0L), TimeUnit.MILLISECONDS);
                finishSuccessfulFlush();
                long milliseconds4 = this.time.milliseconds() - milliseconds;
                recordCommitSuccess(milliseconds4);
                log.info("{} Finished commitOffsets successfully in {} ms", this, Long.valueOf(milliseconds4));
                commitSourceTask();
                return true;
            } catch (InterruptedException e2) {
                log.warn("{} Flush of offsets interrupted, cancelling", this);
                finishFailedFlush();
                recordCommitFailure(this.time.milliseconds() - milliseconds, e2);
                return false;
            } catch (ExecutionException e3) {
                log.error("{} Flush of offsets threw an unexpected exception: ", this, e3);
                finishFailedFlush();
                recordCommitFailure(this.time.milliseconds() - milliseconds, e3);
                return false;
            } catch (TimeoutException e4) {
                log.error("{} Timed out waiting to flush offsets to storage", this);
                finishFailedFlush();
                recordCommitFailure(this.time.milliseconds() - milliseconds, null);
                return false;
            }
        }
    }

    private void commitSourceTask() {
        try {
            this.task.commit();
        } catch (Throwable th) {
            log.error("{} Exception thrown while calling task.commit()", this, th);
        }
    }

    private synchronized void finishFailedFlush() {
        this.offsetWriter.cancelFlush();
        this.outstandingMessages.putAll(this.outstandingMessagesBacklog);
        this.outstandingMessagesBacklog.clear();
        this.flushing = false;
    }

    private synchronized void finishSuccessfulFlush() {
        IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> identityHashMap = this.outstandingMessages;
        this.outstandingMessages = this.outstandingMessagesBacklog;
        this.outstandingMessagesBacklog = identityHashMap;
        this.flushing = false;
    }

    public String toString() {
        return "WorkerSourceTask{id=" + this.id + '}';
    }

    protected void recordPollReturned(int i, long j) {
        this.sourceTaskMetricsGroup.recordPoll(i, j);
    }

    SourceTaskMetricsGroup sourceTaskMetricsGroup() {
        return this.sourceTaskMetricsGroup;
    }
}
