package org.apache.kafka.connect.runtime.errors;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.4.4.jar:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.class */
public class DeadLetterQueueReporter implements ErrorReporter {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DeadLetterQueueReporter.class);
    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
    public static final String HEADER_PREFIX = "__connect.errors.";
    public static final String ERROR_HEADER_ORIG_TOPIC = "__connect.errors.topic";
    public static final String ERROR_HEADER_ORIG_PARTITION = "__connect.errors.partition";
    public static final String ERROR_HEADER_ORIG_OFFSET = "__connect.errors.offset";
    public static final String ERROR_HEADER_CONNECTOR_NAME = "__connect.errors.connector.name";
    public static final String ERROR_HEADER_TASK_ID = "__connect.errors.task.id";
    public static final String ERROR_HEADER_STAGE = "__connect.errors.stage";
    public static final String ERROR_HEADER_EXECUTING_CLASS = "__connect.errors.class.name";
    public static final String ERROR_HEADER_EXCEPTION = "__connect.errors.exception.class.name";
    public static final String ERROR_HEADER_EXCEPTION_MESSAGE = "__connect.errors.exception.message";
    public static final String ERROR_HEADER_EXCEPTION_STACK_TRACE = "__connect.errors.exception.stacktrace";
    private final SinkConnectorConfig connConfig;
    private final ConnectorTaskId connectorTaskId;
    private final ErrorHandlingMetrics errorHandlingMetrics;
    private KafkaProducer<byte[], byte[]> kafkaProducer;

    public static DeadLetterQueueReporter createAndSetup(Map<String, Object> map, ConnectorTaskId connectorTaskId, SinkConnectorConfig sinkConnectorConfig, Map<String, Object> map2, ErrorHandlingMetrics errorHandlingMetrics) {
        String dlqTopicName = sinkConnectorConfig.dlqTopicName();
        try {
            AdminClient create = AdminClient.create(map);
            Throwable th = null;
            try {
                try {
                    if (!create.listTopics().names().get().contains(dlqTopicName)) {
                        log.error("Topic {} doesn't exist. Will attempt to create topic.", dlqTopicName);
                        create.createTopics(Collections.singleton(new NewTopic(dlqTopicName, 1, sinkConnectorConfig.dlqTopicReplicationFactor()))).all().get();
                    }
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (InterruptedException e) {
            throw new ConnectException("Could not initialize dead letter queue with topic=" + dlqTopicName, e);
        } catch (ExecutionException e2) {
            if (!(e2.getCause() instanceof TopicExistsException)) {
                throw new ConnectException("Could not initialize dead letter queue with topic=" + dlqTopicName, e2);
            }
        }
        return new DeadLetterQueueReporter(new KafkaProducer(map2), sinkConnectorConfig, connectorTaskId, errorHandlingMetrics);
    }

    DeadLetterQueueReporter(KafkaProducer<byte[], byte[]> kafkaProducer, SinkConnectorConfig sinkConnectorConfig, ConnectorTaskId connectorTaskId, ErrorHandlingMetrics errorHandlingMetrics) {
        Objects.requireNonNull(kafkaProducer);
        Objects.requireNonNull(sinkConnectorConfig);
        Objects.requireNonNull(connectorTaskId);
        Objects.requireNonNull(errorHandlingMetrics);
        this.kafkaProducer = kafkaProducer;
        this.connConfig = sinkConnectorConfig;
        this.connectorTaskId = connectorTaskId;
        this.errorHandlingMetrics = errorHandlingMetrics;
    }

    @Override // org.apache.kafka.connect.runtime.errors.ErrorReporter
    public void report(ProcessingContext processingContext) {
        String dlqTopicName = this.connConfig.dlqTopicName();
        if (dlqTopicName.isEmpty()) {
            return;
        }
        this.errorHandlingMetrics.recordDeadLetterQueueProduceRequest();
        ConsumerRecord<byte[], byte[]> consumerRecord = processingContext.consumerRecord();
        if (consumerRecord == null) {
            this.errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
            return;
        }
        ProducerRecord<byte[], byte[]> producerRecord = consumerRecord.timestamp() == -1 ? new ProducerRecord<>(dlqTopicName, (Integer) null, consumerRecord.key(), consumerRecord.value(), consumerRecord.headers()) : new ProducerRecord<>(dlqTopicName, null, Long.valueOf(consumerRecord.timestamp()), consumerRecord.key(), consumerRecord.value(), consumerRecord.headers());
        if (this.connConfig.isDlqContextHeadersEnabled()) {
            populateContextHeaders(producerRecord, processingContext);
        }
        this.kafkaProducer.send(producerRecord, (recordMetadata, exc) -> {
            if (exc != null) {
                log.error("Could not produce message to dead letter queue. topic=" + dlqTopicName, (Throwable) exc);
                this.errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
            }
        });
    }

    void populateContextHeaders(ProducerRecord<byte[], byte[]> producerRecord, ProcessingContext processingContext) {
        Headers headers = producerRecord.headers();
        if (processingContext.consumerRecord() != null) {
            headers.add(ERROR_HEADER_ORIG_TOPIC, toBytes(processingContext.consumerRecord().topic()));
            headers.add(ERROR_HEADER_ORIG_PARTITION, toBytes(processingContext.consumerRecord().partition()));
            headers.add(ERROR_HEADER_ORIG_OFFSET, toBytes(processingContext.consumerRecord().offset()));
        }
        headers.add(ERROR_HEADER_CONNECTOR_NAME, toBytes(this.connectorTaskId.connector()));
        headers.add(ERROR_HEADER_TASK_ID, toBytes(String.valueOf(this.connectorTaskId.task())));
        headers.add(ERROR_HEADER_STAGE, toBytes(processingContext.stage().name()));
        headers.add(ERROR_HEADER_EXECUTING_CLASS, toBytes(processingContext.executingClass().getName()));
        if (processingContext.error() != null) {
            headers.add(ERROR_HEADER_EXCEPTION, toBytes(processingContext.error().getClass().getName()));
            headers.add(ERROR_HEADER_EXCEPTION_MESSAGE, toBytes(processingContext.error().getMessage()));
            byte[] stacktrace = stacktrace(processingContext.error());
            if (stacktrace != null) {
                headers.add(ERROR_HEADER_EXCEPTION_STACK_TRACE, stacktrace);
            }
        }
    }

    private byte[] stacktrace(Throwable th) {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try {
            th.printStackTrace(new PrintStream((OutputStream) byteArrayOutputStream, true, "UTF-8"));
            byteArrayOutputStream.close();
            return byteArrayOutputStream.toByteArray();
        } catch (IOException e) {
            log.error("Could not serialize stacktrace.", (Throwable) e);
            return null;
        }
    }

    private byte[] toBytes(int i) {
        return toBytes(String.valueOf(i));
    }

    private byte[] toBytes(long j) {
        return toBytes(String.valueOf(j));
    }

    private byte[] toBytes(String str) {
        if (str != null) {
            return str.getBytes(StandardCharsets.UTF_8);
        }
        return null;
    }
}
