package solutions.a2.cdc.oracle;

import java.io.IOException;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.AbstractCircuitBreaker;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import solutions.a2.oracle.internals.RedoByteAddress;
import solutions.a2.utils.ExceptionUtils;

/* loaded from: input_file:solutions/a2/cdc/oracle/OraCdcWorkerThreadBase.class */
public abstract class OraCdcWorkerThreadBase extends Thread {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) OraCdcWorkerThreadBase.class);
    final CountDownLatch runLatch;
    final AtomicBoolean running = new AtomicBoolean(false);
    final OraRdbmsInfo rdbmsInfo;
    final OraCdcSourceConnectorConfig config;
    final OraConnectionObjects oraConnections;
    final boolean processLobs;
    final Set<Long> lobObjects;
    final Set<Long> nonLobObjects;
    final int backofMs;
    final BlockingQueue<OraCdcTransaction> committedTransactions;
    final boolean isCdb;
    final int pollInterval;
    long lastScn;
    RedoByteAddress lastRba;
    long lastSubScn;
    private final Path queuesRoot;
    private final boolean useChronicleQueue;
    private final int concTransThreshold;
    private final int reduceLoadMs;
    private final Runtime runtime;
    private final int initialCapacity;

    public OraCdcWorkerThreadBase(CountDownLatch countDownLatch, OraRdbmsInfo oraRdbmsInfo, OraCdcSourceConnectorConfig oraCdcSourceConnectorConfig, OraConnectionObjects oraConnectionObjects, BlockingQueue<OraCdcTransaction> blockingQueue) throws SQLException {
        this.runLatch = countDownLatch;
        this.rdbmsInfo = oraRdbmsInfo;
        this.config = oraCdcSourceConnectorConfig;
        this.oraConnections = oraConnectionObjects;
        this.processLobs = oraCdcSourceConnectorConfig.processLobs();
        if (this.processLobs) {
            this.lobObjects = new HashSet();
            this.nonLobObjects = new HashSet();
        } else {
            this.lobObjects = null;
            this.nonLobObjects = null;
        }
        this.useChronicleQueue = StringUtils.equalsIgnoreCase(oraCdcSourceConnectorConfig.getString(ParamConstants.ORA_TRANSACTION_IMPL_PARAM), ParamConstants.ORA_TRANSACTION_IMPL_CHRONICLE);
        this.backofMs = oraCdcSourceConnectorConfig.connectionRetryBackoff();
        this.queuesRoot = oraCdcSourceConnectorConfig.queuesRoot();
        this.committedTransactions = blockingQueue;
        this.isCdb = oraRdbmsInfo.isCdb() && !oraRdbmsInfo.isPdbConnectionAllowed();
        this.pollInterval = oraCdcSourceConnectorConfig.pollIntervalMs();
        this.concTransThreshold = oraCdcSourceConnectorConfig.transactionsThreshold();
        this.reduceLoadMs = oraCdcSourceConnectorConfig.reduceLoadMs();
        this.runtime = Runtime.getRuntime();
        this.initialCapacity = oraCdcSourceConnectorConfig.arrayListCapacity();
        LOGGER.info("The threshold for concurrent transactions processed is set to {}", Integer.valueOf(this.concTransThreshold));
    }

    public boolean isRunning() {
        return this.running.get();
    }

    public void shutdown() {
        LOGGER.info("Stopping oracdc worker thread...");
        while (this.runLatch.getCount() > 0) {
            this.runLatch.countDown();
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("call to shutdown() completed");
        }
    }

    public long lastScn() {
        return this.lastScn;
    }

    public RedoByteAddress lastRba() {
        return this.lastRba;
    }

    public long lastSubScn() {
        return this.lastSubScn;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract void rewind(long j, RedoByteAddress redoByteAddress, long j2) throws SQLException;

    /* JADX INFO: Access modifiers changed from: package-private */
    public OraCdcTransaction createTransaction(String str, int i) {
        int i2 = 0;
        while (i2 <= 127) {
            i2++;
            int size = this.committedTransactions.size();
            boolean z = size + i > this.concTransThreshold && size > 0;
            if (!this.useChronicleQueue && ((float) (this.runtime.freeMemory() / this.runtime.totalMemory())) > 0.24f) {
                z = false;
            }
            if (!z) {
                break;
            }
            try {
                LOGGER.info("Currently {} transactions are ready to send and {} are in the process of reading from RDBMS. Wait {}ms to reduce the load on the system", Integer.valueOf(size), Integer.valueOf(i), Integer.valueOf(this.reduceLoadMs));
                Thread.sleep(this.reduceLoadMs);
            } catch (InterruptedException e) {
            }
        }
        return this.useChronicleQueue ? getChronicleQueue(str) : new OraCdcTransactionArrayList(str, this.initialCapacity);
    }

    private OraCdcTransactionChronicleQueue getChronicleQueue(String str) {
        long currentTimeMillis = System.currentTimeMillis();
        int i = 0;
        Exception exc = null;
        while (i <= 127) {
            i++;
            try {
                return new OraCdcTransactionChronicleQueue(this.processLobs, this.queuesRoot, str);
            } catch (Exception e) {
                exc = e;
                if (e.getCause() == null || !(e.getCause() instanceof IOException) || !StringUtils.containsIgnoreCase(e.getCause().getMessage(), "Too") || !StringUtils.containsIgnoreCase(e.getCause().getMessage(), "many") || !StringUtils.containsIgnoreCase(e.getCause().getMessage(), AbstractCircuitBreaker.PROPERTY_NAME) || !StringUtils.containsIgnoreCase(e.getCause().getMessage(), "files")) {
                    LOGGER.error("\n=====================\n'{}' while initializing Chronicle Queue.\n\tThis might be issue https://github.com/OpenHFT/Chronicle-Queue/issues/1446 or you don't have enough open files limit.\nPlease send errorstack below to oracle@a2.solutions\n{}\n=====================\n", e.getMessage(), ExceptionUtils.getExceptionStackTrace(e));
                    throw new ConnectException(e);
                }
                try {
                    LOGGER.info("Wait {}ms until OS resources become available to create a Chronicle Queue", Integer.valueOf(this.backofMs));
                    Thread.sleep(this.backofMs);
                } catch (InterruptedException e2) {
                }
            }
        }
        Logger logger = LOGGER;
        Object[] objArr = new Object[3];
        objArr[0] = Integer.valueOf(i);
        objArr[1] = Long.valueOf(System.currentTimeMillis() - currentTimeMillis);
        objArr[2] = exc != null ? ExceptionUtils.getExceptionStackTrace(exc) : "";
        logger.error("\n=====================\nFailed to reconnect to create Chronicle Queue after {} attempts in {} ms.\n{}\n=====================\n", objArr);
        if (exc != null) {
            throw new ConnectException(exc);
        }
        throw new ConnectException("Unable to create Chronicle Queue!");
    }
}
