package solutions.a2.cdc.oracle;

import java.io.IOException;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import solutions.a2.cdc.oracle.jmx.OraCdcInitialLoad;
import solutions.a2.utils.ExceptionUtils;

/* loaded from: input_file:solutions/a2/cdc/oracle/OraCdcInitialLoadThread.class */
public class OraCdcInitialLoadThread extends Thread {
    private static final Logger LOGGER = LoggerFactory.getLogger(OraCdcInitialLoadThread.class);
    private final int waitInterval;
    private final long asOfScn;
    private final OraCdcInitialLoad metrics;
    private final CountDownLatch runLatch;
    private final Map<Long, OraTable4LogMiner> tablesInProcessing;
    private final Path queuesRoot;
    private final BlockingQueue<OraTable4InitialLoad> tablesQueue;
    private final AtomicBoolean running;
    private final int selectThreadCount;
    private final OraRdbmsInfo rdbmsInfo;
    private final OraConnectionObjects oraConnections;

    public OraCdcInitialLoadThread(int i, long j, Map<Long, OraTable4LogMiner> map, OraCdcSourceConnectorConfig oraCdcSourceConnectorConfig, OraRdbmsInfo oraRdbmsInfo, OraCdcInitialLoad oraCdcInitialLoad, BlockingQueue<OraTable4InitialLoad> blockingQueue, OraConnectionObjects oraConnectionObjects) throws SQLException {
        LOGGER.info("Initializing oracdc initial load thread");
        setName("OraCdcInitialLoadThread-" + System.nanoTime());
        this.waitInterval = i;
        this.asOfScn = j;
        this.tablesInProcessing = map;
        this.queuesRoot = oraCdcSourceConnectorConfig.queuesRoot();
        this.tablesQueue = blockingQueue;
        this.oraConnections = oraConnectionObjects;
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        this.selectThreadCount = Math.min(availableProcessors, oraRdbmsInfo.getCpuCoreCount());
        LOGGER.info("DB cores available {}, Kafka Cores available {}.", Integer.valueOf(oraRdbmsInfo.getCpuCoreCount()), Integer.valueOf(availableProcessors));
        LOGGER.info("{} parallel loaders for select phase will be used.");
        this.metrics = oraCdcInitialLoad;
        this.runLatch = new CountDownLatch(map.size());
        this.running = new AtomicBoolean(true);
        this.rdbmsInfo = oraRdbmsInfo;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        LOGGER.info("BEGIN: OraCdcInitialLoadThread.run()");
        long currentTimeMillis = System.currentTimeMillis();
        if (this.tablesInProcessing == null || this.tablesInProcessing.size() <= 0) {
            LOGGER.warn("No tables for initial load!!!");
        } else {
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(this.selectThreadCount, this.selectThreadCount, this.waitInterval, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(this.tablesInProcessing.size()), new ThreadPoolExecutor.AbortPolicy());
            this.tablesInProcessing.forEach((l, oraTable4LogMiner) -> {
                try {
                    OraTable4InitialLoad oraTable4InitialLoad = new OraTable4InitialLoad(this.queuesRoot, oraTable4LogMiner, this.metrics, this.rdbmsInfo);
                    threadPoolExecutor.submit(() -> {
                        oraTable4InitialLoad.readTableData(Long.valueOf(this.asOfScn), this.runLatch, this.running, this.tablesQueue, this.oraConnections);
                    });
                } catch (IOException e) {
                    LOGGER.error(ExceptionUtils.getExceptionStackTrace(e));
                    throw new ConnectException(e);
                }
            });
            try {
                LOGGER.debug("Start waiting for initial load jobs completition...");
                this.runLatch.await();
                threadPoolExecutor.shutdown();
            } catch (InterruptedException e) {
                LOGGER.error(ExceptionUtils.getExceptionStackTrace(e));
                throw new ConnectException(e);
            }
        }
        this.running.set(false);
        LOGGER.info("END: OraCdcInitialLoadThread.run(), elapsed time {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public boolean isRunning() {
        return this.running.get();
    }

    public void shutdown() {
        LOGGER.info("Stopping oracdc initial load thread...");
        this.running.set(false);
        while (this.runLatch.getCount() > 0) {
            this.runLatch.countDown();
        }
        LOGGER.debug("call to shutdown() completed");
    }
}
