package solutions.a2.cdc.oracle;

import java.lang.reflect.InvocationTargetException;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import oracle.jdbc.OraclePreparedStatement;
import oracle.jdbc.OracleResultSet;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import solutions.a2.cdc.oracle.data.OraCdcLobTransformationsIntf;
import solutions.a2.cdc.oracle.jmx.OraCdcLogMinerMgmt;
import solutions.a2.cdc.oracle.jmx.OraCdcLogMinerMgmtIntf;
import solutions.a2.utils.ExceptionUtils;

/* loaded from: input_file:solutions/a2/cdc/oracle/OraCdcLogMinerWorkerThread.class */
public class OraCdcLogMinerWorkerThread extends Thread {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) OraCdcLogMinerWorkerThread.class);
    private static final int MAX_RETRIES = 63;
    private final OraCdcLogMinerTask task;
    private final int pollInterval;
    private final OraRdbmsInfo rdbmsInfo;
    private final OraCdcLogMinerMgmt metrics;
    private final CountDownLatch runLatch;
    private boolean logMinerReady;
    private final Map<String, String> partition;
    private final Map<Long, OraTable4LogMiner> tablesInProcessing;
    private final Map<Long, Long> partitionsInProcessing;
    private final Set<Long> tablesOutOfScope;
    private final OraDumpDecoder odd;
    private final OraLogMiner logMiner;
    private Connection connLogMiner;
    private OraclePreparedStatement psLogMiner;
    private PreparedStatement psCheckTable;
    private PreparedStatement psCheckLob;
    private PreparedStatement psIsDataObjLob;
    private OraclePreparedStatement psReadLob;
    private OracleResultSet rsLogMiner;
    private final String mineDataSql;
    private final String checkTableSql;
    private Connection connDictionary;
    private final Path queuesRoot;
    private final Map<String, OraCdcTransaction> activeTransactions;
    private final boolean legacyResiliencyModel;
    private final TreeMap<String, Triple<Long, String, Long>> sortedByFirstScn;
    private final ActiveTransComparator activeTransComparator;
    private final BlockingQueue<OraCdcTransaction> committedTransactions;
    private final OraCdcSourceConnectorConfig config;
    private final boolean useChronicleQueue;
    private long lastScn;
    private String lastRsId;
    private long lastSsn;
    private final AtomicBoolean running;
    private boolean isCdb;
    private final boolean processLobs;
    private final OraCdcLobTransformationsIntf transformLobs;
    private OraCdcLargeObjectWorker lobWorker;
    private final int connectionRetryBackoff;
    private final int fetchSize;
    private final boolean traceSession;
    private final OraConnectionObjects oraConnections;
    private final int topicPartition;
    private boolean fetchRsLogMinerNext;
    private boolean isRsLogMinerRowAvailable;
    private final Set<Long> lobObjects;
    private final Set<Long> nonLobObjects;
    private String lastRealRowId;
    private final long logMinerReconnectIntervalMs;
    private final OraCdcPseudoColumnsProcessor pseudoColumns;

    /* loaded from: input_file:solutions/a2/cdc/oracle/OraCdcLogMinerWorkerThread$ActiveTransComparator.class */
    private static class ActiveTransComparator implements Comparator<String> {
        private final Map<String, OraCdcTransaction> activeTransactions;

        ActiveTransComparator(Map<String, OraCdcTransaction> map) {
            this.activeTransactions = map;
        }

        @Override // java.util.Comparator
        public int compare(String str, String str2) {
            if (StringUtils.equals(str, str2)) {
                return 0;
            }
            OraCdcTransaction oraCdcTransaction = this.activeTransactions.get(str);
            OraCdcTransaction oraCdcTransaction2 = this.activeTransactions.get(str2);
            return (oraCdcTransaction == null || oraCdcTransaction2 == null || oraCdcTransaction.getFirstChange() < oraCdcTransaction2.getFirstChange()) ? -1 : 1;
        }
    }

    public OraCdcLogMinerWorkerThread(OraCdcLogMinerTask oraCdcLogMinerTask, Map<String, String> map, long j, String str, String str2, Map<Long, OraTable4LogMiner> map2, Set<Long> set, int i, OraDumpDecoder oraDumpDecoder, Path path, Map<String, OraCdcTransaction> map3, BlockingQueue<OraCdcTransaction> blockingQueue, OraCdcLogMinerMgmt oraCdcLogMinerMgmt, OraCdcSourceConnectorConfig oraCdcSourceConnectorConfig, OraCdcLobTransformationsIntf oraCdcLobTransformationsIntf, OraRdbmsInfo oraRdbmsInfo, OraConnectionObjects oraConnectionObjects, OraCdcPseudoColumnsProcessor oraCdcPseudoColumnsProcessor) throws SQLException {
        this.logMinerReady = false;
        LOGGER.info("Initializing oracdc logminer archivelog worker thread");
        setName("OraCdcLogMinerWorkerThread-" + System.nanoTime());
        this.task = oraCdcLogMinerTask;
        this.config = oraCdcSourceConnectorConfig;
        this.partition = map;
        this.mineDataSql = str;
        this.checkTableSql = str2;
        this.tablesInProcessing = map2;
        this.partitionsInProcessing = new HashMap();
        this.tablesOutOfScope = set;
        this.queuesRoot = path;
        this.odd = oraDumpDecoder;
        this.topicPartition = i;
        this.activeTransactions = map3;
        this.committedTransactions = blockingQueue;
        this.metrics = oraCdcLogMinerMgmt;
        this.processLobs = oraCdcSourceConnectorConfig.getBoolean(ParamConstants.PROCESS_LOBS_PARAM).booleanValue();
        this.transformLobs = oraCdcLobTransformationsIntf;
        this.pollInterval = oraCdcSourceConnectorConfig.getInt(ParamConstants.POLL_INTERVAL_MS_PARAM).intValue();
        this.connectionRetryBackoff = oraCdcSourceConnectorConfig.getInt(ParamConstants.CONNECTION_BACKOFF_PARAM).intValue();
        this.fetchSize = oraCdcSourceConnectorConfig.getInt(ParamConstants.FETCH_SIZE_PARAM).intValue();
        this.traceSession = oraCdcSourceConnectorConfig.getBoolean(ParamConstants.TRACE_LOGMINER_PARAM).booleanValue();
        this.rdbmsInfo = oraRdbmsInfo;
        this.oraConnections = oraConnectionObjects;
        this.pseudoColumns = oraCdcPseudoColumnsProcessor;
        this.isCdb = oraRdbmsInfo.isCdb() && !oraRdbmsInfo.isPdbConnectionAllowed();
        this.legacyResiliencyModel = oraCdcLogMinerTask.isLegacyResiliencyModel();
        if (this.legacyResiliencyModel) {
            this.activeTransComparator = null;
            this.sortedByFirstScn = null;
        } else {
            this.activeTransComparator = new ActiveTransComparator(map3);
            this.sortedByFirstScn = new TreeMap<>(this.activeTransComparator);
        }
        this.runLatch = new CountDownLatch(1);
        this.running = new AtomicBoolean(false);
        if (this.processLobs) {
            this.lobObjects = new HashSet();
            this.nonLobObjects = new HashSet();
        } else {
            this.lobObjects = null;
            this.nonLobObjects = null;
        }
        this.useChronicleQueue = StringUtils.equalsIgnoreCase(oraCdcSourceConnectorConfig.getString(ParamConstants.ORA_TRANSACTION_IMPL_PARAM), ParamConstants.ORA_TRANSACTION_IMPL_CHRONICLE);
        this.logMinerReconnectIntervalMs = oraCdcSourceConnectorConfig.getLong(ParamConstants.LM_RECONNECT_INTERVAL_MS_PARAM).longValue();
        try {
            this.connLogMiner = oraConnectionObjects.getLogMinerConnection(this.traceSession);
            this.connDictionary = oraConnectionObjects.getConnection();
            int negotiatedSDU = oraRdbmsInfo.getNegotiatedSDU(this.connLogMiner);
            if (negotiatedSDU > 0 && negotiatedSDU <= 8192) {
                LOGGER.warn("\n=====================\nThe negotiated SDU between connector and mining instance is set to {}.\n\tWe recommend increasing it to achieve better performance.\n\tInstructions on how to do this can be found at\n\t\t\thttps://github.com/averemee-si/oracdc#performance-tips\n=====================", Integer.valueOf(negotiatedSDU));
            }
            String string = oraCdcSourceConnectorConfig.getString(ParamConstants.ARCHIVED_LOG_CAT_PARAM);
            try {
                try {
                    try {
                        try {
                            try {
                                try {
                                    try {
                                        this.logMiner = (OraLogMiner) Class.forName(string).getConstructor(Connection.class, OraCdcLogMinerMgmtIntf.class, Long.TYPE, OraCdcSourceConnectorConfig.class, CountDownLatch.class, OraRdbmsInfo.class, OraConnectionObjects.class).newInstance(this.connLogMiner, oraCdcLogMinerMgmt, Long.valueOf(j), oraCdcSourceConnectorConfig, this.runLatch, oraRdbmsInfo, oraConnectionObjects);
                                        if (this.logMiner.isOracleConnectionRequired()) {
                                            if (this.logMiner.getDbId() == oraRdbmsInfo.getDbId()) {
                                                LOGGER.debug("Database Id for dictionary and mining connections: {}", Long.valueOf(this.logMiner.getDbId()));
                                                if (this.logMiner.isDictionaryAvailable()) {
                                                    LOGGER.info("Mining database {} is in OPEN mode", this.logMiner.getDbUniqueName());
                                                    if (this.logMiner.getDbUniqueName().equals(oraRdbmsInfo.getDbUniqueName())) {
                                                        LOGGER.info("Same database will be used for dictionary query and mining");
                                                    } else {
                                                        LOGGER.info("Active DataGuard database {} will be used for mining", this.logMiner.getDbUniqueName());
                                                    }
                                                } else {
                                                    LOGGER.info("Mining database {} is in MOUNT mode", this.logMiner.getDbUniqueName());
                                                    LOGGER.info("DataGuard database {} will be used for mining", this.logMiner.getDbUniqueName());
                                                }
                                            } else {
                                                LOGGER.info("Mining database {} has {} DBID.", this.logMiner.getDbUniqueName(), Long.valueOf(this.logMiner.getDbId()));
                                                LOGGER.info("Source database {} has {} DBID.", oraRdbmsInfo.getDbUniqueName(), Long.valueOf(oraRdbmsInfo.getDbId()));
                                            }
                                        }
                                        this.psLogMiner = (OraclePreparedStatement) this.connLogMiner.prepareStatement(str, 1003, 1007);
                                        this.psLogMiner.setRowPrefetch(this.fetchSize);
                                        LOGGER.info("RowPrefetch size for accessing V$LOGMNR_CONTENTS set to {}.", Integer.valueOf(this.fetchSize));
                                        initDictionaryStatements();
                                        this.logMinerReady = this.logMiner.next();
                                        if (this.processLobs) {
                                            this.psReadLob = (OraclePreparedStatement) this.connLogMiner.prepareStatement(this.isCdb ? OraDictSqlTexts.MINE_LOB_CDB : OraDictSqlTexts.MINE_LOB_NON_CDB, 1003, 1007);
                                            this.psReadLob.setRowPrefetch(this.fetchSize);
                                        }
                                    } catch (InvocationTargetException e) {
                                        LOGGER.error("InvocationTargetException while calling '{}.(java.sql.Connection, solutions.a2.cdc.oracle.jmx.OraCdcLogMinerMgmtIntf, long, Integer, Long)' constructor", string);
                                        throw new ConnectException("InvocationTargetException while calling required constructor for " + string, e);
                                    }
                                } catch (IllegalArgumentException e2) {
                                    LOGGER.error("IllegalArgumentException while calling '{}.(java.sql.Connection, solutions.a2.cdc.oracle.jmx.OraCdcLogMinerMgmtIntf, long, Integer, Long)' constructor", string);
                                    throw new ConnectException("IllegalArgumentException while calling required constructor for " + string, e2);
                                }
                            } catch (IllegalAccessException e3) {
                                LOGGER.error("IllegalAccessException while calling '{}.(java.sql.Connection, solutions.a2.cdc.oracle.jmx.OraCdcLogMinerMgmtIntf, long, Integer, Long)' constructor", string);
                                throw new ConnectException("IllegalAccessException while calling required constructor for " + string, e3);
                            }
                        } catch (SecurityException e4) {
                            LOGGER.error("SecurityException while obtaining '{}.(java.sql.Connection, solutions.a2.cdc.oracle.jmx.OraCdcLogMinerMgmtIntf, long, Integer, Long)' constructor", string);
                            throw new ConnectException("SecurityException while obtaining required constructor for " + string, e4);
                        }
                    } catch (ClassNotFoundException e5) {
                        LOGGER.error("ClassNotFoundException while instantiating {}", string);
                        throw new ConnectException("ClassNotFoundException while instantiating " + string, e5);
                    }
                } catch (InstantiationException e6) {
                    LOGGER.error("InstantiationException while calling '{}.(java.sql.Connection, solutions.a2.cdc.oracle.jmx.OraCdcLogMinerMgmtIntf, long, Integer, Long)' constructor", string);
                    throw new ConnectException("InstantiationException while calling required constructor for " + string, e6);
                }
            } catch (NoSuchMethodException e7) {
                LOGGER.error("NoSuchMethodException while obtaining '{}.(java.sql.Connection, solutions.a2.cdc.oracle.jmx.OraCdcLogMinerMgmtIntf, long, Integer, Long)' constructor", string);
                throw new ConnectException("NoSuchMethodException while obtaining required constructor for " + string, e7);
            }
        } catch (SQLException e8) {
            LOGGER.error("\n\nUnable to start OraCdcLogMinerWorkerThread !\nSQL Error ={}, SQL State = {}, SQL Message = '{}'\n\n", Integer.valueOf(e8.getErrorCode()), e8.getSQLState(), e8.getMessage());
            throw e8;
        }
    }

    public void rewind(long j, String str, long j2) throws SQLException {
        if (!this.logMinerReady) {
            LOGGER.info("Values from offset (SCN = {}, RS_ID = '{}', SSN = {}) ignored, waiting for new archived log.", Long.valueOf(j), str, Long.valueOf(j2));
            return;
        }
        LOGGER.info("Rewinding LogMiner ResultSet to first position after SCN = {}, RS_ID = '{}', SSN = {}.", Long.valueOf(j), str, Long.valueOf(j2));
        this.rsLogMiner = (OracleResultSet) this.psLogMiner.executeQuery();
        int i = 0;
        long currentTimeMillis = System.currentTimeMillis();
        this.lastScn = j;
        this.lastRsId = str;
        this.lastSsn = j2;
        while (true) {
            if (1 == 0) {
                break;
            }
            if (!this.rsLogMiner.next()) {
                LOGGER.error("Incorrect rewind to SCN = {}, RS_ID = '{}', SSN = {}", Long.valueOf(j), str, Long.valueOf(j2));
                throw new SQLException("Incorrect rewind operation!!!");
            }
            this.lastScn = this.rsLogMiner.getLong("SCN");
            this.lastRsId = this.rsLogMiner.getString("RS_ID");
            this.lastSsn = this.rsLogMiner.getLong("SSN");
            if (i == 0 && this.lastScn > j) {
                this.rsLogMiner.close();
                this.rsLogMiner = (OracleResultSet) this.psLogMiner.executeQuery();
                break;
            }
            i++;
            if (j == this.lastScn && (str == null || StringUtils.equals(str, this.lastRsId))) {
                if (j2 == -1 || j2 == this.lastSsn) {
                    if (!this.rsLogMiner.getBoolean("CSF")) {
                        break;
                    }
                }
            }
        }
        LOGGER.info("Total records scipped while rewinding: {}, elapsed time ms: {}", Integer.valueOf(i), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    /* JADX WARN: Code restructure failed: missing block: B:319:0x1002, code lost:
    
        solutions.a2.cdc.oracle.OraCdcLogMinerWorkerThread.LOGGER.debug("End of LogMiner loop...");
        r18.running.set(false);
        solutions.a2.cdc.oracle.OraCdcLogMinerWorkerThread.LOGGER.info("END: OraCdcLogMinerWorkerThread.run()");
     */
    /* JADX WARN: Code restructure failed: missing block: B:320:0x1020, code lost:
    
        return;
     */
    @Override // java.lang.Thread, java.lang.Runnable
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void run() {
        /*
            Method dump skipped, instructions count: 4129
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: solutions.a2.cdc.oracle.OraCdcLogMinerWorkerThread.run():void");
    }

    public long getLastScn() {
        return this.lastScn;
    }

    public String getLastRsId() {
        return this.lastRsId;
    }

    public long getLastSsn() {
        return this.lastSsn;
    }

    public boolean isRunning() {
        return this.running.get();
    }

    public void shutdown() {
        LOGGER.info("Stopping oracdc logminer archivelog worker thread...");
        while (this.runLatch.getCount() > 0) {
            this.runLatch.countDown();
        }
        LOGGER.debug("call to shutdown() completed");
    }

    private void restoreOraConnection(SQLException sQLException) {
        LOGGER.error("Error '{}' when waiting for next archived log.", sQLException.getMessage());
        LOGGER.error("SQL errorCode = {}, SQL state = '{}'", Integer.valueOf(sQLException.getErrorCode()), sQLException.getSQLState());
        if (sQLException.getErrorCode() != 17410 && sQLException.getErrorCode() != 17002) {
            LOGGER.error("\n=====================\nUnhandled '{}', SQL errorCode = {}, SQL state = '{}'\nin restoreOraConnection(sqle) !\nTo fix - please send this message to oracle@a2-solutions.eu\n=====================\n", sQLException.getMessage(), Integer.valueOf(sQLException.getErrorCode()), sQLException.getSQLState());
            return;
        }
        boolean z = false;
        int i = 0;
        while (this.runLatch.getCount() > 0 && !z) {
            int i2 = i;
            i++;
            long pow = ((long) Math.pow(2.0d, i2)) + this.connectionRetryBackoff;
            LOGGER.warn("Waiting {} ms for LogMiner connection to restore after ORA-{} error...", Long.valueOf(pow), Integer.valueOf(sQLException.getErrorCode()));
            try {
                wait(pow);
            } catch (InterruptedException e) {
                LOGGER.error(e.getMessage());
                LOGGER.error(ExceptionUtils.getExceptionStackTrace(e));
            }
            try {
                restoreOraConnection();
                z = true;
            } catch (SQLException e2) {
                LOGGER.error("Error '{}' when restoring connection, SQL errorCode = {}, SQL state = '{}'", e2.getMessage(), Integer.valueOf(e2.getErrorCode()), e2.getSQLState());
            }
        }
    }

    private void restoreOraConnection() throws SQLException {
        this.connLogMiner = this.oraConnections.getLogMinerConnection(this.traceSession);
        this.psLogMiner = (OraclePreparedStatement) this.connLogMiner.prepareStatement(this.mineDataSql, 1003, 1007);
        this.psLogMiner.setRowPrefetch(this.fetchSize);
        this.psCheckTable = this.connDictionary.prepareStatement(this.checkTableSql, 1003, 1007);
        if (this.processLobs) {
            this.psReadLob = (OraclePreparedStatement) this.connLogMiner.prepareStatement(this.isCdb ? OraDictSqlTexts.MINE_LOB_CDB : OraDictSqlTexts.MINE_LOB_NON_CDB, 1003, 1007);
            this.psReadLob.setRowPrefetch(this.fetchSize);
        }
        this.logMiner.createStatements(this.connLogMiner);
    }

    private void closeOraConnection() throws SQLException {
        if (this.processLobs) {
            this.psReadLob.close();
            this.psReadLob = null;
        }
        this.psCheckTable.close();
        this.psCheckTable = null;
        this.psLogMiner.close();
        this.psLogMiner = null;
        this.connLogMiner.close();
        this.connLogMiner = null;
    }

    /* JADX WARN: Code restructure failed: missing block: B:54:0x0068, code lost:
    
        continue;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private java.util.List<solutions.a2.cdc.oracle.OraCdcLargeObjectHolder> catchTheLob(short r17, java.lang.String r18, long r19, solutions.a2.cdc.oracle.OraTable4LogMiner r21, java.lang.String r22) throws java.sql.SQLException {
        /*
            Method dump skipped, instructions count: 1332
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: solutions.a2.cdc.oracle.OraCdcLogMinerWorkerThread.catchTheLob(short, java.lang.String, long, solutions.a2.cdc.oracle.OraTable4LogMiner, java.lang.String):java.util.List");
    }

    private String getLobColumnId(String str) {
        return "\"" + StringUtils.substringBetween(str, "\"") + "\"";
    }

    private String readXmlWriteRedoData(String str) throws SQLException {
        boolean z = true;
        StringBuilder sb = new StringBuilder(65000);
        while (z) {
            if (this.fetchRsLogMinerNext) {
                this.isRsLogMinerRowAvailable = this.rsLogMiner.next();
            } else {
                this.fetchRsLogMinerNext = true;
            }
            if (this.rsLogMiner.getShort("OPERATION_CODE") != 70) {
                LOGGER.error("Unexpected operation with code {} at SCN {} RBA '{}'", Short.valueOf(this.rsLogMiner.getShort("OPERATION_CODE")), Long.valueOf(this.rsLogMiner.getLong("SCN")), this.rsLogMiner.getString("RS_ID"));
                throw new SQLException("Unexpected operation!!!");
            }
            z = this.rsLogMiner.getBoolean("CSF");
            sb.append(this.rsLogMiner.getString("SQL_REDO"));
        }
        String str2 = new String(OraDumpDecoder.toByteArray(StringUtils.substringBetween(sb.toString(), "HEXTORAW('", ")'")));
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("{} column {} content:\n{}", str, str2);
        }
        this.fetchRsLogMinerNext = true;
        return str2;
    }

    private String readSqlRedo() throws SQLException {
        boolean z = this.rsLogMiner.getBoolean("CSF");
        if (!z) {
            return this.rsLogMiner.getString("SQL_REDO");
        }
        StringBuilder sb = new StringBuilder(16000);
        boolean z2 = z;
        while (z2) {
            sb.append(this.rsLogMiner.getString("SQL_REDO"));
            z2 = this.rsLogMiner.getBoolean("CSF");
            if (z2) {
                this.rsLogMiner.next();
            }
        }
        return sb.toString();
    }

    private void initDictionaryStatements() throws SQLException {
        this.psCheckTable = this.connDictionary.prepareStatement(this.checkTableSql, 1003, 1007);
        if (this.processLobs) {
            this.psCheckLob = this.connDictionary.prepareStatement(this.isCdb ? OraDictSqlTexts.MAP_DATAOBJ_TO_COLUMN_CDB : OraDictSqlTexts.MAP_DATAOBJ_TO_COLUMN_NON_CDB, 1003, 1007);
            this.psIsDataObjLob = this.connDictionary.prepareStatement(this.isCdb ? OraDictSqlTexts.LOB_CHECK_CDB : OraDictSqlTexts.LOB_CHECK_NON_CDB, 1003, 1007);
        }
    }
}
