package solutions.a2.cdc.oracle;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.TimeZone;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import oracle.jdbc.OracleConnection;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.log4j.varia.ExternallyRolledFileAppender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import solutions.a2.cdc.oracle.jmx.OraCdcLogMinerMgmtIntf;
import solutions.a2.cdc.oracle.jmx.OraCdcRedoShipment;
import solutions.a2.cdc.oracle.utils.ExceptionUtils;

/* loaded from: input_file:solutions/a2/cdc/oracle/OraCdcDistributedV$ArchivedLogImpl.class */
public class OraCdcDistributedV$ArchivedLogImpl implements OraLogMiner {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) OraCdcDistributedV$ArchivedLogImpl.class);
    private long sessionFirstChange;
    private int numArchLogs;
    private long sizeOfArchLogs;
    private final boolean useNumOfArchLogs;
    private final boolean dictionaryAvailable;
    private final long dbId;
    private final String dbUniqueName;
    private final OraCdcLogMinerMgmtIntf metrics;
    private CallableStatement csAddArchivedLogs;
    private CallableStatement csStartLogMiner;
    private CallableStatement csStopLogMiner;
    private int archLogsAvailable = 0;
    private long archLogsSize = 0;
    private List<String> fileNames = new ArrayList();
    private long readStartMillis;
    private final BlockingQueue<ArchivedRedoFile> redoFiles;

    /* JADX INFO: Access modifiers changed from: private */
    /* compiled from: OraCdcDistributedV$ArchivedLogImpl.java */
    /* loaded from: input_file:solutions/a2/cdc/oracle/OraCdcDistributedV$ArchivedLogImpl$ArchivedRedoFile.class */
    public static class ArchivedRedoFile {
        private final ZoneId dbZoneId;
        String NAME;
        int THREAD;
        long SEQUENCE;
        long FIRST_CHANGE;
        long NEXT_CHANGE;
        long BYTES;
        Timestamp FIRST_TIME;

        ArchivedRedoFile(ZoneId zoneId) {
            this.dbZoneId = zoneId;
        }

        int ACTUAL_LAG_SECONDS() {
            return (int) Duration.between(ZonedDateTime.now(this.dbZoneId), this.FIRST_TIME.toLocalDateTime().atZone(this.dbZoneId)).getSeconds();
        }
    }

    /* compiled from: OraCdcDistributedV$ArchivedLogImpl.java */
    /* loaded from: input_file:solutions/a2/cdc/oracle/OraCdcDistributedV$ArchivedLogImpl$RedoTransportThread.class */
    private static class RedoTransportThread extends Thread {
        private final CountDownLatch runLatch;
        private final ZoneId oracleDbZoneId;
        private final BlockingQueue<ArchivedRedoFile> redoFiles;
        private final OraCdcRedoShipment metrics;
        private final InetSocketAddress targetServerAddress;
        private OracleConnection connDictionary;
        private PreparedStatement psGetArchivedLogs;
        private long firstChange;
        private long lastSequence = -1;
        private long nextChange = 0;
        private final OraRdbmsInfo rdbmsInfo;

        RedoTransportThread(long j, OraCdcSourceConnectorConfig oraCdcSourceConnectorConfig, CountDownLatch countDownLatch, BlockingQueue<ArchivedRedoFile> blockingQueue, OraConnectionObjects oraConnectionObjects, OraRdbmsInfo oraRdbmsInfo) throws SQLException {
            setName("OraCdcRedoTransportThread-" + System.nanoTime());
            this.firstChange = j;
            this.runLatch = countDownLatch;
            this.redoFiles = blockingQueue;
            this.rdbmsInfo = oraRdbmsInfo;
            this.connDictionary = (OracleConnection) oraConnectionObjects.getConnection();
            this.oracleDbZoneId = TimeZone.getDefault().toZoneId();
            this.psGetArchivedLogs = this.connDictionary.prepareStatement(OraDictSqlTexts.ARCHIVED_LOGS, 1003, 1007);
            String string = oraCdcSourceConnectorConfig.getString(ParamConstants.DISTRIBUTED_TARGET_HOST);
            if (StringUtils.isBlank(string)) {
                throw new SQLException("Parameter {} must be set", ParamConstants.DISTRIBUTED_TARGET_HOST);
            }
            int intValue = oraCdcSourceConnectorConfig.getInt(ParamConstants.DISTRIBUTED_TARGET_PORT).intValue();
            this.metrics = new OraCdcRedoShipment(string, intValue);
            this.targetServerAddress = new InetSocketAddress(string, intValue);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            OraCdcDistributedV$ArchivedLogImpl.LOGGER.info("BEGIN: RedoTransportThread.run()");
            while (this.runLatch.getCount() > 0) {
                try {
                    this.psGetArchivedLogs.setLong(1, this.firstChange);
                    this.psGetArchivedLogs.setLong(2, this.firstChange);
                    this.psGetArchivedLogs.setLong(3, this.firstChange);
                    this.psGetArchivedLogs.setInt(4, this.rdbmsInfo.getRedoThread());
                    this.psGetArchivedLogs.setInt(5, this.rdbmsInfo.getRedoThread());
                    ResultSet executeQuery = this.psGetArchivedLogs.executeQuery();
                    while (executeQuery.next()) {
                        long j = executeQuery.getLong("SEQUENCE#");
                        this.nextChange = executeQuery.getLong("NEXT_CHANGE#");
                        if (j > this.lastSequence && this.firstChange < this.nextChange) {
                            long nanoTime = System.nanoTime();
                            ArchivedRedoFile archivedRedoFile = new ArchivedRedoFile(this.oracleDbZoneId);
                            archivedRedoFile.NAME = executeQuery.getString("NAME");
                            archivedRedoFile.THREAD = executeQuery.getInt("THREAD#");
                            archivedRedoFile.SEQUENCE = j;
                            archivedRedoFile.FIRST_CHANGE = executeQuery.getLong("FIRST_CHANGE#");
                            archivedRedoFile.NEXT_CHANGE = this.nextChange;
                            archivedRedoFile.BYTES = executeQuery.getLong("BYTES");
                            archivedRedoFile.FIRST_TIME = executeQuery.getTimestamp("FIRST_TIME");
                            if (OraCdcDistributedV$ArchivedLogImpl.LOGGER.isDebugEnabled()) {
                                OraCdcDistributedV$ArchivedLogImpl.LOGGER.debug("File request for reading {} sent to {}:{}", archivedRedoFile.NAME, this.targetServerAddress.getHostString(), Integer.valueOf(this.targetServerAddress.getPort()));
                            }
                            ByteBuffer allocate = ByteBuffer.allocate(1024);
                            allocate.put(archivedRedoFile.NAME.getBytes("UTF-8"));
                            allocate.flip();
                            SocketChannel open = SocketChannel.open();
                            open.connect(this.targetServerAddress);
                            open.configureBlocking(true);
                            open.write(allocate);
                            ByteBuffer allocate2 = ByteBuffer.allocate(1024);
                            open.read(allocate2);
                            allocate2.rewind();
                            String trim = StringUtils.trim(new String(allocate2.array(), "UTF-8"));
                            if (OraCdcDistributedV$ArchivedLogImpl.LOGGER.isDebugEnabled()) {
                                OraCdcDistributedV$ArchivedLogImpl.LOGGER.debug("Response received:\t{}", trim);
                            }
                            String[] split = StringUtils.split(trim, "\n");
                            if (split.length != 3 || !ExternallyRolledFileAppender.OK.equals(split[0])) {
                                throw new IOException("Invalid response!\t" + trim);
                            }
                            this.metrics.addProcessedFileInfo(System.nanoTime() - nanoTime, archivedRedoFile.BYTES, archivedRedoFile.NAME);
                            archivedRedoFile.NAME = split[1];
                            this.redoFiles.add(archivedRedoFile);
                            this.firstChange = archivedRedoFile.NEXT_CHANGE;
                        }
                    }
                    this.psGetArchivedLogs.clearParameters();
                    try {
                        Thread.sleep(50L);
                    } catch (InterruptedException e) {
                        throw new SQLException(e);
                    }
                } catch (IOException | SQLException e2) {
                    OraCdcDistributedV$ArchivedLogImpl.LOGGER.error(e2.getMessage());
                    OraCdcDistributedV$ArchivedLogImpl.LOGGER.error(ExceptionUtils.getExceptionStackTrace(e2));
                    try {
                        this.connDictionary.close();
                    } catch (SQLException e3) {
                    }
                    throw new ConnectException(e2);
                }
            }
            try {
                if (this.connDictionary != null) {
                    this.connDictionary.close();
                }
            } catch (SQLException e4) {
                OraCdcDistributedV$ArchivedLogImpl.LOGGER.error(e4.getMessage());
                OraCdcDistributedV$ArchivedLogImpl.LOGGER.error(ExceptionUtils.getExceptionStackTrace(e4));
            }
        }
    }

    public OraCdcDistributedV$ArchivedLogImpl(Connection connection, OraCdcLogMinerMgmtIntf oraCdcLogMinerMgmtIntf, long j, OraCdcSourceConnectorConfig oraCdcSourceConnectorConfig, CountDownLatch countDownLatch, OraRdbmsInfo oraRdbmsInfo, OraConnectionObjects oraConnectionObjects) throws SQLException {
        LOGGER.trace("BEGIN: OraLogMiner Constructor");
        this.metrics = oraCdcLogMinerMgmtIntf;
        this.redoFiles = new LinkedBlockingQueue();
        if (oraCdcSourceConnectorConfig.getLong(ParamConstants.REDO_FILES_SIZE_PARAM).longValue() > 0) {
            this.useNumOfArchLogs = false;
            this.sizeOfArchLogs = oraCdcSourceConnectorConfig.getLong(ParamConstants.REDO_FILES_SIZE_PARAM).longValue();
            LOGGER.debug("The redo log read size limit will be set to '{}' bytes.", Long.valueOf(this.sizeOfArchLogs));
        } else {
            this.useNumOfArchLogs = true;
            this.numArchLogs = oraCdcSourceConnectorConfig.getShort(ParamConstants.REDO_FILES_COUNT_PARAM).shortValue();
            LOGGER.debug("The redo log read size limit will be set to '{}' files", Integer.valueOf(this.numArchLogs));
        }
        createStatements(connection);
        PreparedStatement prepareStatement = connection.prepareStatement(OraDictSqlTexts.RDBMS_OPEN_MODE, 1003, 1007);
        ResultSet executeQuery = prepareStatement.executeQuery();
        if (!executeQuery.next()) {
            throw new SQLException("Unable to detect RDBMS open mode");
        }
        String string = executeQuery.getString(1);
        if ("MOUNTED".equals(string)) {
            LOGGER.trace("LogMiner connection database is in MOUNTED state, no dictionary available.");
            this.dictionaryAvailable = false;
        } else {
            LOGGER.trace("LogMiner connection database is in {} state, dictionary is available.", string);
            this.dictionaryAvailable = true;
        }
        LOGGER.info("LogMiner will start from SCN {}", Long.valueOf(j));
        this.dbId = executeQuery.getLong(2);
        this.dbUniqueName = executeQuery.getString(3);
        executeQuery.close();
        prepareStatement.close();
        new RedoTransportThread(j, oraCdcSourceConnectorConfig, countDownLatch, this.redoFiles, oraConnectionObjects, oraRdbmsInfo).start();
        oraCdcLogMinerMgmtIntf.start(j);
        LOGGER.trace("END: OraLogMiner Constructor");
    }

    @Override // solutions.a2.cdc.oracle.OraLogMiner
    public void createStatements(Connection connection) throws SQLException {
        this.csAddArchivedLogs = connection.prepareCall(OraDictSqlTexts.ADD_ARCHIVED_LOG);
        this.csStartLogMiner = connection.prepareCall(OraDictSqlTexts.START_LOGMINER);
        this.csStopLogMiner = connection.prepareCall(OraDictSqlTexts.STOP_LOGMINER);
    }

    @Override // solutions.a2.cdc.oracle.OraLogMiner
    public boolean next() throws SQLException {
        return start(true);
    }

    @Override // solutions.a2.cdc.oracle.OraLogMiner
    public boolean extend() throws SQLException {
        return start(false);
    }

    private boolean start(boolean z) throws SQLException {
        String str = z ? "next()" : "extend()";
        LOGGER.trace("BEGIN: {}", str);
        this.archLogsAvailable = 0;
        this.archLogsSize = 0L;
        long j = 0;
        long j2 = 0;
        int i = 0;
        if (z) {
            this.fileNames = new ArrayList();
        }
        while (true) {
            ArchivedRedoFile poll = this.redoFiles.poll();
            if (poll == null) {
                break;
            }
            this.fileNames.add(poll.NAME);
            if (this.archLogsAvailable == 0) {
                j = poll.FIRST_CHANGE;
                i = poll.ACTUAL_LAG_SECONDS();
            }
            j2 = poll.NEXT_CHANGE;
            this.archLogsAvailable++;
            this.archLogsSize += poll.BYTES;
            LOGGER.info("Adding archived log {} thread# {} sequence# {} first change number {} next log first change {}", poll.NAME, Integer.valueOf(poll.THREAD), Long.valueOf(poll.SEQUENCE), Long.valueOf(poll.FIRST_CHANGE), Long.valueOf(poll.NEXT_CHANGE));
            if (!this.useNumOfArchLogs) {
                if (this.archLogsSize >= this.sizeOfArchLogs) {
                    break;
                }
            } else {
                if (this.archLogsAvailable >= this.numArchLogs) {
                    break;
                }
            }
        }
        if (this.archLogsAvailable == 0) {
            LOGGER.trace("END: {} return false", str);
            return false;
        }
        this.metrics.setNowProcessed(this.fileNames, z ? j : this.sessionFirstChange, j2, i);
        LOGGER.trace("Adding files to LogMiner session and starting it");
        for (int i2 = 0; i2 < this.fileNames.size(); i2++) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Adding {} to LogMiner processing list.", this.fileNames.get(i2));
            }
            this.csAddArchivedLogs.setInt(1, i2);
            this.csAddArchivedLogs.setString(2, this.fileNames.get(i2));
            this.csAddArchivedLogs.addBatch();
        }
        this.csAddArchivedLogs.executeBatch();
        this.csAddArchivedLogs.clearBatch();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Attempting to start LogMiner for SCN range from {} to {}.", Long.valueOf(z ? j : this.sessionFirstChange), Long.valueOf(j2));
        }
        this.csStartLogMiner.setLong(1, z ? j : this.sessionFirstChange);
        this.csStartLogMiner.setLong(2, j2);
        this.csStartLogMiner.execute();
        this.csStartLogMiner.clearParameters();
        if (z) {
            this.sessionFirstChange = j;
        }
        this.readStartMillis = System.currentTimeMillis();
        LOGGER.trace("END: {} returns true", str);
        return true;
    }

    @Override // solutions.a2.cdc.oracle.OraLogMiner
    public void stop() throws SQLException {
        LOGGER.trace("BEGIN: stop()");
        this.csStopLogMiner.execute();
        this.metrics.addAlreadyProcessed(this.fileNames, this.archLogsAvailable, this.archLogsSize, System.currentTimeMillis() - this.readStartMillis);
        LOGGER.trace("END: stop()");
    }

    @Override // solutions.a2.cdc.oracle.OraLogMiner
    public boolean isDictionaryAvailable() {
        return this.dictionaryAvailable;
    }

    @Override // solutions.a2.cdc.oracle.OraLogMiner
    public long getDbId() {
        return this.dbId;
    }

    @Override // solutions.a2.cdc.oracle.OraLogMiner
    public String getDbUniqueName() {
        return this.dbUniqueName;
    }
}
