package org.apache.pulsar.io.debezium;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import io.debezium.annotation.ThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.relational.history.AbstractDatabaseHistory;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryException;
import io.debezium.relational.history.DatabaseHistoryListener;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-debezium-core-2.9.5.2.jar:org/apache/pulsar/io/debezium/PulsarDatabaseHistory.class */
public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PulsarDatabaseHistory.class);
    public static final Field TOPIC = Field.create("database.history.pulsar.topic").withDisplayName("Database history topic name").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withDescription("The name of the topic for the database schema history").withValidation(Field::isRequired);
    public static final Field SERVICE_URL = Field.create("database.history.pulsar.service.url").withDisplayName("Pulsar service url").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withDescription("Pulsar service url").withValidation(Field::isOptional);
    public static final Field CLIENT_BUILDER = Field.create("database.history.pulsar.client.builder").withDisplayName("Pulsar client builder").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withDescription("Pulsar client builder").withValidation(Field::isOptional);
    public static final Field READER_CONFIG = Field.create("database.history.pulsar.reader.config").withDisplayName("Extra configs of the reader").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withDescription("The configs of the reader for the database schema history topic, in the form of a JSON string with key-value pairs").withDefault((String) null).withValidation(Field::isOptional);
    public static final Field.Set ALL_FIELDS = Field.setOf(TOPIC, SERVICE_URL, CLIENT_BUILDER, DatabaseHistory.NAME, READER_CONFIG);
    private String topicName;
    private String dbHistoryName;
    private ClientBuilder clientBuilder;
    private volatile PulsarClient pulsarClient;
    private volatile Producer<String> producer;
    private final DocumentReader reader = DocumentReader.defaultReader();
    private Map<String, Object> readerConfigMap = new HashMap();

    @Override // io.debezium.relational.history.AbstractDatabaseHistory, io.debezium.relational.history.DatabaseHistory
    public void configure(Configuration configuration, HistoryRecordComparator historyRecordComparator, DatabaseHistoryListener databaseHistoryListener, boolean z) {
        super.configure(configuration, historyRecordComparator, databaseHistoryListener, z);
        Field.Set set = ALL_FIELDS;
        Logger logger = this.logger;
        Objects.requireNonNull(logger);
        if (!configuration.validateAndRecord(set, logger::error)) {
            throw new IllegalArgumentException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
        }
        this.topicName = configuration.getString(TOPIC);
        try {
            this.readerConfigMap = IOConfigUtils.loadConfigFromJsonString(configuration.getString(READER_CONFIG));
        } catch (JsonProcessingException e) {
            log.warn("The provided reader configs are invalid, will not passing any extra config to the reader builder.", (Throwable) e);
        }
        String string = configuration.getString(CLIENT_BUILDER);
        if (StringUtils.isBlank(string) && StringUtils.isBlank(configuration.getString(SERVICE_URL))) {
            throw new IllegalArgumentException("Neither Pulsar Service URL nor ClientBuilder provided.");
        }
        this.clientBuilder = PulsarClient.builder();
        if (StringUtils.isBlank(string)) {
            this.clientBuilder.serviceUrl(configuration.getString(SERVICE_URL));
        } else {
            this.clientBuilder = (ClientBuilder) SerDeUtils.deserialize(string, this.clientBuilder.getClass().getClassLoader());
        }
        this.dbHistoryName = configuration.getString(DatabaseHistory.NAME, UUID.randomUUID().toString());
        log.info("Configure to store the debezium database history {} to pulsar topic {}", this.dbHistoryName, this.topicName);
    }

    @Override // io.debezium.relational.history.AbstractDatabaseHistory, io.debezium.relational.history.DatabaseHistory
    public void initializeStorage() {
        super.initializeStorage();
        try {
            Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(this.topicName).create();
            try {
                create.send("");
                if (create != null) {
                    create.close();
                }
            } finally {
            }
        } catch (PulsarClientException e) {
            log.error("Failed to initialize storage", (Throwable) e);
            throw new RuntimeException("Failed to initialize storage", e);
        }
    }

    void setupClientIfNeeded() {
        if (null == this.pulsarClient) {
            try {
                this.pulsarClient = this.clientBuilder.build();
            } catch (PulsarClientException e) {
                throw new RuntimeException("Failed to create pulsar client to pulsar cluster", e);
            }
        }
    }

    void setupProducerIfNeeded() {
        setupClientIfNeeded();
        if (null == this.producer) {
            try {
                this.producer = this.pulsarClient.newProducer(Schema.STRING).topic(this.topicName).producerName(this.dbHistoryName).blockIfQueueFull(true).create();
            } catch (PulsarClientException e) {
                log.error("Failed to create pulsar producer to topic '{}'", this.topicName);
                throw new RuntimeException("Failed to create pulsar producer to topic '" + this.topicName, e);
            }
        }
    }

    @Override // io.debezium.relational.history.AbstractDatabaseHistory, io.debezium.relational.history.DatabaseHistory
    public void start() {
        super.start();
        setupProducerIfNeeded();
    }

    @Override // io.debezium.relational.history.AbstractDatabaseHistory
    protected void storeRecord(HistoryRecord historyRecord) throws DatabaseHistoryException {
        if (this.producer == null) {
            throw new IllegalStateException("No producer is available. Ensure that 'start()' is called before storing database history records.");
        }
        if (log.isTraceEnabled()) {
            log.trace("Storing record into database history: {}", historyRecord);
        }
        try {
            this.producer.send(historyRecord.toString());
        } catch (PulsarClientException e) {
            throw new DatabaseHistoryException(e);
        }
    }

    @Override // io.debezium.relational.history.AbstractDatabaseHistory, io.debezium.relational.history.DatabaseHistory
    public void stop() {
        try {
            if (this.producer != null) {
                try {
                    this.producer.flush();
                    this.producer.close();
                } catch (PulsarClientException e) {
                    this.producer.close();
                } catch (Throwable th) {
                    this.producer.close();
                    throw th;
                }
                this.producer = null;
            }
            if (this.pulsarClient != null) {
                this.pulsarClient.close();
                this.pulsarClient = null;
            }
        } catch (PulsarClientException e2) {
            log.warn("Failed to closing pulsar client", (Throwable) e2);
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:28:0x003c, code lost:
    
        if (r8.compareTo(r0.getMessageId()) < 0) goto L12;
     */
    @Override // io.debezium.relational.history.AbstractDatabaseHistory
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    protected void recoverRecords(java.util.function.Consumer<io.debezium.relational.history.HistoryRecord> r6) {
        /*
            Method dump skipped, instructions count: 303
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.pulsar.io.debezium.PulsarDatabaseHistory.recoverRecords(java.util.function.Consumer):void");
    }

    @Override // io.debezium.relational.history.DatabaseHistory
    public boolean exists() {
        setupClientIfNeeded();
        try {
            Reader<String> createHistoryReader = createHistoryReader();
            try {
                boolean hasMessageAvailable = createHistoryReader.hasMessageAvailable();
                if (createHistoryReader != null) {
                    createHistoryReader.close();
                }
                return hasMessageAvailable;
            } finally {
            }
        } catch (IOException e) {
            log.error("Encountered issues on checking existence of database history", (Throwable) e);
            throw new RuntimeException("Encountered issues on checking existence of database history", e);
        }
    }

    @Override // io.debezium.relational.history.DatabaseHistory
    public boolean storageExists() {
        return true;
    }

    public String toString() {
        return this.topicName != null ? "Pulsar topic (" + this.topicName + DefaultExpressionEngine.DEFAULT_INDEX_END : "Pulsar topic";
    }

    @VisibleForTesting
    Reader<String> createHistoryReader() throws PulsarClientException {
        return this.pulsarClient.newReader(Schema.STRING).topic(this.topicName).startMessageId(MessageId.earliest).loadConf(this.readerConfigMap).create();
    }
}
