package solutions.a2.cdc.oracle;

import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.MutableTriple;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import solutions.a2.cdc.oracle.jmx.OraCdcMgmtBase;
import solutions.a2.cdc.oracle.schema.FileUtils;
import solutions.a2.cdc.oracle.utils.Version;
import solutions.a2.kafka.ConnectorParams;
import solutions.a2.oracle.internals.RedoByteAddress;
import solutions.a2.utils.ExceptionUtils;

/* loaded from: input_file:solutions/a2/cdc/oracle/OraCdcTaskBase.class */
public abstract class OraCdcTaskBase extends SourceTask {
    static final int WAIT_FOR_WORKER_MILLIS = 50;
    int batchSize;
    int pollInterval;
    int schemaType;
    OraRdbmsInfo rdbmsInfo;
    String connectorName;
    OraCdcSourceConnectorConfig config;
    CountDownLatch runLatch;
    AtomicBoolean isPollRunning;
    OraConnectionObjects oraConnections;
    Map<String, Object> offset;
    Map<Long, OraTable4LogMiner> tablesInProcessing;
    Set<Long> tablesOutOfScope;
    BlockingQueue<OraCdcTransaction> committedTransactions;
    OraCdcTransaction transaction;
    OraCdcWorkerThreadBase worker;
    OraCdcDictionaryChecker checker;
    private static final Logger LOGGER = LoggerFactory.getLogger(OraCdcTaskBase.class);
    static final AtomicBoolean state = new AtomicBoolean(true);
    static final AtomicInteger taskId = new AtomicInteger(0);
    boolean useChronicleQueue = true;
    boolean processLobs = false;
    boolean lastStatementInTransaction = true;
    final List<SourceRecord> result = new ArrayList();
    final List<OraCdcLargeObjectHolder> lobs = new ArrayList();
    long lastProcessedCommitScn = 0;
    long lastInProgressCommitScn = 0;
    long lastInProgressScn = 0;
    RedoByteAddress lastInProgressRba = null;
    long lastInProgressSubScn = 0;

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> map) {
        this.connectorName = map.get("name");
        try {
            this.config = new OraCdcSourceConnectorConfig(map);
            this.config.setConnectorName(this.connectorName);
            this.batchSize = this.config.getInt(ConnectorParams.BATCH_SIZE_PARAM).intValue();
            this.pollInterval = this.config.pollIntervalMs();
            this.schemaType = this.config.schemaType();
            this.useChronicleQueue = StringUtils.equalsIgnoreCase(this.config.getString(ParamConstants.ORA_TRANSACTION_IMPL_PARAM), ParamConstants.ORA_TRANSACTION_IMPL_CHRONICLE);
            this.processLobs = this.config.processLobs();
            if (this.processLobs && !this.useChronicleQueue) {
                LOGGER.error("\n=====================\nLOB processing is only possible if a2.transaction.implementation is set to ChronicleQueue!\nPlease set a2.process.lobs to false if a2.transaction.implementation is set to ConcurrentLinkedQueue\nand restart connector!!!\n=====================");
                throw new ConnectException("LOB processing is only possible if a2.transaction.implementation is set to ChronicleQueue!");
            }
            this.runLatch = new CountDownLatch(1);
            this.isPollRunning = new AtomicBoolean(false);
            boolean useRac = this.config.useRac();
            boolean activateStandby = this.config.activateStandby();
            boolean z = activateStandby && this.config.dg4RacThreads() != null && this.config.dg4RacThreads().size() > 1;
            int i = 1;
            if (z) {
                state.set(true);
                taskId.set(0);
                List<String> dg4RacThreads = this.config.dg4RacThreads();
                while (!state.compareAndSet(true, false)) {
                    try {
                        Thread.sleep(1L);
                    } catch (InterruptedException e) {
                    }
                }
                int andAdd = taskId.getAndAdd(1);
                if (andAdd > dg4RacThreads.size() - 1) {
                    LOGGER.error("Errors while processing following array of Oracle Signgle Instance DataGuard for RAC threads:");
                    dg4RacThreads.forEach(str -> {
                        LOGGER.error("\t{}", str);
                    });
                    LOGGER.error("Size equals {}, but current index equals {} !", Integer.valueOf(dg4RacThreads.size()), Integer.valueOf(andAdd));
                    throw new ConnectException("Unable to properly assign Kafka tasks to Oracle Single Instance DataGuard for RAC!");
                }
                if (andAdd == dg4RacThreads.size() - 1) {
                    taskId.set(0);
                }
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Processing redo thread array element {} with value {}.", Integer.valueOf(andAdd), dg4RacThreads.get(andAdd));
                }
                i = Integer.parseInt(dg4RacThreads.get(andAdd));
                state.set(true);
            }
            try {
                if (StringUtils.isNotBlank(this.config.walletLocation())) {
                    if (useRac) {
                        this.oraConnections = OraConnectionObjects.get4OraWallet(this.connectorName, this.config.racUrls(), this.config.walletLocation());
                    } else {
                        this.oraConnections = OraConnectionObjects.get4OraWallet(this.connectorName, this.config.getString(ConnectorParams.CONNECTION_URL_PARAM), this.config.walletLocation());
                    }
                } else {
                    if (!StringUtils.isNotBlank(this.config.getString(ConnectorParams.CONNECTION_USER_PARAM)) || !StringUtils.isNotBlank(this.config.getPassword(ConnectorParams.CONNECTION_PASSWORD_PARAM).value())) {
                        throw new SQLException("Wrong connection parameters!");
                    }
                    if (useRac) {
                        this.oraConnections = OraConnectionObjects.get4UserPassword(this.connectorName, this.config.racUrls(), this.config.getString(ConnectorParams.CONNECTION_USER_PARAM), this.config.getPassword(ConnectorParams.CONNECTION_PASSWORD_PARAM).value());
                    } else {
                        this.oraConnections = OraConnectionObjects.get4UserPassword(this.connectorName, this.config.getString(ConnectorParams.CONNECTION_URL_PARAM), this.config.getString(ConnectorParams.CONNECTION_USER_PARAM), this.config.getPassword(ConnectorParams.CONNECTION_PASSWORD_PARAM).value());
                    }
                }
                if (this.config.useOracdcSchemas()) {
                    LOGGER.info("oracdc will use own schemas for Oracle NUMBER and TIMESTAMP WITH [LOCAL] TIMEZONE datatypes");
                }
                if (this.offset == null) {
                    this.offset = new ConcurrentHashMap();
                }
                if (this.tablesInProcessing == null) {
                    this.tablesInProcessing = new HashMap();
                }
                if (this.tablesOutOfScope == null) {
                    this.tablesOutOfScope = new HashSet();
                }
                if (this.committedTransactions == null) {
                    this.committedTransactions = new LinkedBlockingQueue();
                }
                try {
                    Connection connection = this.oraConnections.getConnection();
                    try {
                        this.rdbmsInfo = new OraRdbmsInfo(connection);
                        if (z) {
                            this.rdbmsInfo.setRedoThread(i);
                        }
                        this.config.topicPartition(this.rdbmsInfo.getRedoThread() - 1);
                        LOGGER.info("\n=====================\nConnector {} connected to {}, {}\n\t$ORACLE_SID={}, running on {}, OS {}.\n=====================", new Object[]{this.connectorName, this.rdbmsInfo.getRdbmsEdition(), this.rdbmsInfo.getVersionString(), this.rdbmsInfo.getInstanceName(), this.rdbmsInfo.getHostName(), this.rdbmsInfo.getPlatformName()});
                        if (this.rdbmsInfo.isCdb() && !this.rdbmsInfo.isCdbRoot() && !this.rdbmsInfo.isPdbConnectionAllowed()) {
                            LOGGER.error("Connector {} must be connected to CDB$ROOT while using oracdc for mining data using LogMiner!", this.connectorName);
                            throw new ConnectException("Unable to run oracdc without connection to CDB$ROOT!");
                        }
                        LOGGER.debug("Oracle connection information:\n{}", this.rdbmsInfo.toString());
                        if (this.rdbmsInfo.isCdb() && this.rdbmsInfo.isPdbConnectionAllowed()) {
                            LOGGER.info("\n=====================\nConnected to PDB {} (RDBMS 19.10+ Feature)\n=====================", this.rdbmsInfo.getPdbName());
                        }
                        if (activateStandby) {
                            this.oraConnections.addStandbyConnection(this.config.getString(ParamConstants.STANDBY_URL_PARAM), this.config.getString(ParamConstants.STANDBY_WALLET_PARAM));
                            LOGGER.info("\n=====================\nConnector {} will use connection to PHYSICAL STANDBY for LogMiner calls\n=====================", this.connectorName);
                        }
                        if (this.config.getBoolean(ParamConstants.MAKE_DISTRIBUTED_ACTIVE_PARAM).booleanValue()) {
                            this.oraConnections.addDistributedConnection(this.config.getString(ParamConstants.DISTRIBUTED_URL_PARAM), this.config.getString(ParamConstants.DISTRIBUTED_WALLET_PARAM));
                            LOGGER.info("\n=====================\nConnector {} will use remote database in distributed configuration for LogMiner calls\n=====================", this.connectorName);
                        }
                        if (StringUtils.equalsIgnoreCase(this.rdbmsInfo.getSupplementalLogDataAll(), "YES")) {
                            LOGGER.info("\n=====================\nV$DATABASE.SUPPLEMENTAL_LOG_DATA_ALL is set to 'YES'.\n\tNo additional checks for supplemental logging will performed at the table level.\n=====================");
                        } else {
                            if (StringUtils.equalsIgnoreCase(this.rdbmsInfo.getSupplementalLogDataMin(), "NO")) {
                                LOGGER.error("\n=====================\nBoth V$DATABASE.SUPPLEMENTAL_LOG_DATA_ALL and V$DATABASE.SUPPLEMENTAL_LOG_DATA_MIN are set to 'NO'!\nFor the connector to work properly, you need to set connecting Oracle RDBMS as SYSDBA:\nalter database add supplemental log data (ALL) columns;\nOR recommended but more time consuming settings\nalter database add supplemental log data;\nand then enable supplemental only for required tables:\nalter table <OWNER>.<TABLE_NAME> add supplemental log data (ALL) columns;\n=====================");
                                throw new ConnectException("Must set SUPPLEMENTAL LOGGING settings!");
                            }
                            LOGGER.info("\n=====================\nV$DATABASE.SUPPLEMENTAL_LOG_DATA_ALL is set to 'NO'.\nV$DATABASE.SUPPLEMENTAL_LOG_DATA_MIN is set to '{}'.\n\tAdditional checks for supplemental logging will performed at the table level.\n=====================", this.rdbmsInfo.getSupplementalLogDataMin());
                        }
                        if (connection != null) {
                            connection.close();
                        }
                    } finally {
                    }
                } catch (SQLException e2) {
                    LOGGER.error("Unable to prepare to start oracdc task!");
                    LOGGER.error(ExceptionUtils.getExceptionStackTrace(e2));
                    throw new ConnectException(e2);
                }
            } catch (SQLException e3) {
                LOGGER.error("Unable to connect to RDBMS for connector '{}'!", this.connectorName);
                LOGGER.error(ExceptionUtils.getExceptionStackTrace(e3));
                LOGGER.error("Stopping connector '{}'", this.connectorName);
                throw new ConnectException("Unable to connect to RDBMS");
            }
        } catch (ConfigException e4) {
            throw new ConnectException("Couldn't start oracdc due to coniguration error", e4);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop(boolean z) {
        LOGGER.info("Stopping oracdc logminer source task.");
        if (this.runLatch != null) {
            this.runLatch.countDown();
            if (!z || this.worker == null) {
                while (this.isPollRunning.get()) {
                    try {
                        LOGGER.debug("Waiting {} ms for connector task to stop...", Integer.valueOf(WAIT_FOR_WORKER_MILLIS));
                        Thread.sleep(50L);
                    } catch (InterruptedException e) {
                    }
                }
            } else {
                this.worker.shutdown();
                while (this.worker.isRunning()) {
                    try {
                        LOGGER.debug("Waiting {} ms for worker thread to stop...", Integer.valueOf(WAIT_FOR_WORKER_MILLIS));
                        Thread.sleep(50L);
                    } catch (InterruptedException e2) {
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stopEpilogue() {
        if (this.useChronicleQueue && this.committedTransactions != null && !this.committedTransactions.isEmpty()) {
            this.committedTransactions.forEach(oraCdcTransaction -> {
                if (this.isPollRunning.get()) {
                    LOGGER.error("Unable to remove directory {}, please remove it manually", ((OraCdcTransactionChronicleQueue) oraCdcTransaction).getPath().toString());
                } else {
                    oraCdcTransaction.close();
                }
            });
        }
        if (this.oraConnections != null) {
            try {
                this.oraConnections.destroy();
            } catch (SQLException e) {
                LOGGER.error("Unable to close all RDBMS connections!");
                LOGGER.error(ExceptionUtils.getExceptionStackTrace(e));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processStoredSchemas(OraCdcMgmtBase oraCdcMgmtBase) {
        if (this.config.useOracdcSchemas()) {
            String string = this.config.getString(ParamConstants.DICTIONARY_FILE_PARAM);
            if (StringUtils.isNotBlank(string)) {
                try {
                    LOGGER.info("Loading stored schema definitions from file {}.", string);
                    this.tablesInProcessing = FileUtils.readDictionaryFile(string, Integer.valueOf(this.schemaType), this.config.transformLobsImpl(), this.rdbmsInfo);
                    LOGGER.info("{} table schema definitions loaded from file {}.", Integer.valueOf(this.tablesInProcessing.size()), string);
                    this.tablesInProcessing.forEach((l, oraTable4LogMiner) -> {
                        oraTable4LogMiner.setTopicDecoderPartition(this.config, this.rdbmsInfo.odd(), this.rdbmsInfo.partition());
                        oraCdcMgmtBase.addTableInProcessing(oraTable4LogMiner.fqn());
                    });
                } catch (IOException e) {
                    LOGGER.warn("Unable to read stored definition from {}.", string);
                    LOGGER.warn(ExceptionUtils.getExceptionStackTrace(e));
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean startPosition(MutableTriple<Long, RedoByteAddress, Long> mutableTriple) throws SQLException {
        long j;
        boolean z = false;
        long firstScnFromArchivedLogs = this.rdbmsInfo.firstScnFromArchivedLogs(this.oraConnections.getLogMinerConnection(), (this.config.activateStandby() || this.rdbmsInfo.isStandby()) ? false : true);
        RedoByteAddress redoByteAddress = null;
        long j2 = -1;
        long startScn = this.config.startScn();
        boolean z2 = Long.compareUnsigned(startScn, 0L) > 0;
        Map offset = (this.context == null || this.context.offsetStorageReader() == null) ? null : this.context.offsetStorageReader().offset(this.rdbmsInfo.partition());
        if (offset == null || !offset.containsKey("C:COMMIT_SCN")) {
            LOGGER.info("No data present in connector's offset storage for {}:{}", this.rdbmsInfo.sourcePartitionName(), Long.valueOf(this.rdbmsInfo.getDbId()));
            if (z2) {
                j = this.config.startScn();
                LOGGER.info("{}={} is set in connector properties, previous offset data is not available.", this.config.startScnParam(), Long.valueOf(j));
                if (j < firstScnFromArchivedLogs) {
                    LOGGER.warn("Ignoring {}={} in connector properties, and setting {} to first available SCN in V$ARCHIVED_LOG {}.", new Object[]{this.config.startScnParam(), Long.valueOf(j), this.config.startScnParam(), Long.valueOf(firstScnFromArchivedLogs)});
                    j = firstScnFromArchivedLogs;
                } else {
                    z = true;
                }
            } else {
                LOGGER.info("oracdc will start from minimum available SCN in V$ARCHIVED_LOG = {}.", Long.valueOf(firstScnFromArchivedLogs));
                j = firstScnFromArchivedLogs;
            }
        } else if (z2) {
            j = startScn;
            LOGGER.info("{}={} is set in connector properties, ignoring SCN related restart data from connector offset storage.", this.config.startScnParam(), Long.valueOf(j));
            if (j < firstScnFromArchivedLogs) {
                LOGGER.warn("Ignoring {}={} in connector properties, and setting {} to first available SCN in V$ARCHIVED_LOG {}.", new Object[]{this.config.startScnParam(), Long.valueOf(j), this.config.startScnParam(), Long.valueOf(firstScnFromArchivedLogs)});
                j = firstScnFromArchivedLogs;
            } else {
                z = true;
            }
        } else {
            j = ((Long) offset.get("S:SCN")).longValue();
            redoByteAddress = RedoByteAddress.fromLogmnrContentsRs_Id((String) offset.get("S:RS_ID"));
            j2 = ((Long) offset.get("S:SSN")).longValue();
            LOGGER.info("Point in time from offset data to start reading reading from SCN={}, RS_ID (RBA)='{}', SSN={}", new Object[]{Long.valueOf(j), redoByteAddress, Long.valueOf(j2)});
            this.lastProcessedCommitScn = ((Long) offset.get("C:COMMIT_SCN")).longValue();
            this.lastInProgressCommitScn = ((Long) offset.get("COMMIT_SCN")).longValue();
            if (this.lastProcessedCommitScn == this.lastInProgressCommitScn) {
                this.lastInProgressCommitScn = 0L;
            } else {
                this.lastInProgressScn = ((Long) offset.get("SCN")).longValue();
                this.lastInProgressRba = RedoByteAddress.fromLogmnrContentsRs_Id((String) offset.get("RS_ID"));
                this.lastInProgressSubScn = ((Long) offset.get("SSN")).longValue();
                LOGGER.info("Last sent SCN={}, RBA={}, SSN={} for  transaction with incomplete send", new Object[]{Long.valueOf(this.lastInProgressScn), this.lastInProgressRba, Long.valueOf(this.lastInProgressSubScn)});
            }
            if (j < firstScnFromArchivedLogs) {
                LOGGER.warn("\n=====================\nIgnoring Point in time {}:{}:{} from offset, and setting it to first available SCN in V$ARCHIVED_LOG {}.\n=====================", new Object[]{Long.valueOf(j), redoByteAddress, Long.valueOf(j2), Long.valueOf(firstScnFromArchivedLogs)});
                j = firstScnFromArchivedLogs;
            } else {
                z = true;
            }
        }
        mutableTriple.setLeft(Long.valueOf(j));
        mutableTriple.setMiddle(redoByteAddress);
        mutableTriple.setRight(Long.valueOf(j2));
        return z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public OraConnectionObjects oraConnections() {
        return this.oraConnections;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CountDownLatch runLatch() {
        return this.runLatch;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public OraCdcSourceConnectorConfig config() {
        return this.config;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public OraRdbmsInfo rdbmsInfo() {
        return this.rdbmsInfo;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void putReadRestartScn(Triple<Long, RedoByteAddress, Long> triple) {
        this.offset.put("S:SCN", triple.getLeft());
        this.offset.put("S:RS_ID", ((RedoByteAddress) triple.getMiddle()).toString());
        this.offset.put("S:SSN", triple.getRight());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void putTableAndVersion(long j, int i) {
        this.offset.put(Long.toString(j), Integer.toString(i));
    }
}
