package io.debezium.relational;

import io.debezium.DebeziumException;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.util.Clock;
import io.debezium.util.ColumnUtils;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/debezium-core-1.7.0.Final.jar:io/debezium/relational/RelationalSnapshotChangeEventSource.class */
public abstract class RelationalSnapshotChangeEventSource<P extends Partition, O extends OffsetContext> extends AbstractSnapshotChangeEventSource<P, O> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) RelationalSnapshotChangeEventSource.class);
    public static final Duration LOG_INTERVAL = Duration.ofMillis(10000);
    public static final Pattern SELECT_ALL_PATTERN = Pattern.compile("\\*");
    private final RelationalDatabaseConnectorConfig connectorConfig;
    private final JdbcConnection jdbcConnection;
    private final RelationalDatabaseSchema schema;
    protected final EventDispatcher<TableId> dispatcher;
    protected final Clock clock;
    private final SnapshotProgressListener snapshotProgressListener;

    /* loaded from: input_file:META-INF/bundled-dependencies/debezium-core-1.7.0.Final.jar:io/debezium/relational/RelationalSnapshotChangeEventSource$RelationalSnapshotContext.class */
    public static class RelationalSnapshotContext<P extends Partition, O extends OffsetContext> extends AbstractSnapshotChangeEventSource.SnapshotContext<P, O> {
        public final String catalogName;
        public final Tables tables;
        public Set<TableId> capturedTables;
        public Set<TableId> capturedSchemaTables;
        public boolean lastTable;
        public boolean lastRecordInTable;

        public RelationalSnapshotContext(P p, String str) throws SQLException {
            super(p);
            this.catalogName = str;
            this.tables = new Tables();
        }
    }

    public RelationalSnapshotChangeEventSource(RelationalDatabaseConnectorConfig relationalDatabaseConnectorConfig, JdbcConnection jdbcConnection, RelationalDatabaseSchema relationalDatabaseSchema, EventDispatcher<TableId> eventDispatcher, Clock clock, SnapshotProgressListener snapshotProgressListener) {
        super(relationalDatabaseConnectorConfig, snapshotProgressListener);
        this.connectorConfig = relationalDatabaseConnectorConfig;
        this.jdbcConnection = jdbcConnection;
        this.schema = relationalDatabaseSchema;
        this.dispatcher = eventDispatcher;
        this.clock = clock;
        this.snapshotProgressListener = snapshotProgressListener;
    }

    @Override // io.debezium.pipeline.source.AbstractSnapshotChangeEventSource
    public SnapshotResult<O> doExecute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, O o, AbstractSnapshotChangeEventSource.SnapshotContext<P, O> snapshotContext, AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask) throws Exception {
        RelationalSnapshotContext<P, O> relationalSnapshotContext = (RelationalSnapshotContext) snapshotContext;
        Connection connection = null;
        try {
            LOGGER.info("Snapshot step 1 - Preparing");
            if (o != null && o.isSnapshotRunning()) {
                LOGGER.info("Previous snapshot was cancelled before completion; a new snapshot will be taken.");
            }
            connection = createSnapshotConnection();
            connectionCreated(relationalSnapshotContext);
            LOGGER.info("Snapshot step 2 - Determining captured tables");
            determineCapturedTables(relationalSnapshotContext);
            this.snapshotProgressListener.monitoredDataCollectionsDetermined(relationalSnapshotContext.capturedTables);
            LOGGER.info("Snapshot step 3 - Locking captured tables {}", relationalSnapshotContext.capturedTables);
            if (snapshottingTask.snapshotSchema()) {
                lockTablesForSchemaSnapshot(changeEventSourceContext, relationalSnapshotContext);
            }
            LOGGER.info("Snapshot step 4 - Determining snapshot offset");
            determineSnapshotOffset(relationalSnapshotContext, o);
            LOGGER.info("Snapshot step 5 - Reading structure of captured tables");
            readTableStructure(changeEventSourceContext, relationalSnapshotContext, o);
            if (snapshottingTask.snapshotSchema()) {
                LOGGER.info("Snapshot step 6 - Persisting schema history");
                createSchemaChangeEventsForTables(changeEventSourceContext, relationalSnapshotContext, snapshottingTask);
                releaseSchemaSnapshotLocks(relationalSnapshotContext);
            } else {
                LOGGER.info("Snapshot step 6 - Skipping persisting of schema history");
            }
            if (snapshottingTask.snapshotData()) {
                LOGGER.info("Snapshot step 7 - Snapshotting data");
                createDataEvents(changeEventSourceContext, relationalSnapshotContext);
            } else {
                LOGGER.info("Snapshot step 7 - Skipping snapshotting of data");
                releaseDataSnapshotLocks(relationalSnapshotContext);
                relationalSnapshotContext.offset.preSnapshotCompletion();
                relationalSnapshotContext.offset.postSnapshotCompletion();
            }
            postSnapshot();
            this.dispatcher.alwaysDispatchHeartbeatEvent(relationalSnapshotContext.partition, relationalSnapshotContext.offset);
            SnapshotResult<O> completed = SnapshotResult.completed(relationalSnapshotContext.offset);
            rollbackTransaction(connection);
            return completed;
        } catch (Throwable th) {
            rollbackTransaction(connection);
            throw th;
        }
    }

    public Connection createSnapshotConnection() throws SQLException {
        Connection connection = this.jdbcConnection.connection();
        connection.setAutoCommit(false);
        return connection;
    }

    protected void connectionCreated(RelationalSnapshotContext<P, O> relationalSnapshotContext) throws Exception {
    }

    private Stream<TableId> toTableIds(Set<TableId> set, Pattern pattern) {
        return set.stream().filter(tableId -> {
            return pattern.asPredicate().test(this.connectorConfig.getTableIdMapper().toString(tableId));
        }).sorted();
    }

    private Set<TableId> sort(Set<TableId> set) throws Exception {
        String tableIncludeList = this.connectorConfig.tableIncludeList();
        return tableIncludeList != null ? (Set) Strings.listOfRegex(tableIncludeList, 2).stream().flatMap(pattern -> {
            return toTableIds(set, pattern);
        }).collect(Collectors.toCollection(LinkedHashSet::new)) : (Set) set.stream().sorted().collect(Collectors.toCollection(LinkedHashSet::new));
    }

    private void determineCapturedTables(RelationalSnapshotContext<P, O> relationalSnapshotContext) throws Exception {
        Set<TableId> set = (Set) determineDataCollectionsToBeSnapshotted(getAllTableIds(relationalSnapshotContext)).collect(Collectors.toSet());
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        for (TableId tableId : set) {
            if (this.connectorConfig.getTableFilters().eligibleDataCollectionFilter().isIncluded(tableId)) {
                LOGGER.trace("Adding table {} to the list of capture schema tables", tableId);
                hashSet2.add(tableId);
            }
            if (this.connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) {
                LOGGER.trace("Adding table {} to the list of captured tables", tableId);
                hashSet.add(tableId);
            } else {
                LOGGER.trace("Ignoring table {} as it's not included in the filter configuration", tableId);
            }
        }
        relationalSnapshotContext.capturedTables = sort(hashSet);
        relationalSnapshotContext.capturedSchemaTables = (Set) hashSet2.stream().sorted().collect(Collectors.toCollection(LinkedHashSet::new));
    }

    protected abstract Set<TableId> getAllTableIds(RelationalSnapshotContext<P, O> relationalSnapshotContext) throws Exception;

    protected abstract void lockTablesForSchemaSnapshot(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, RelationalSnapshotContext<P, O> relationalSnapshotContext) throws Exception;

    protected abstract void determineSnapshotOffset(RelationalSnapshotContext<P, O> relationalSnapshotContext, O o) throws Exception;

    protected abstract void readTableStructure(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, RelationalSnapshotContext<P, O> relationalSnapshotContext, O o) throws Exception;

    protected abstract void releaseSchemaSnapshotLocks(RelationalSnapshotContext<P, O> relationalSnapshotContext) throws Exception;

    protected void releaseDataSnapshotLocks(RelationalSnapshotContext<P, O> relationalSnapshotContext) throws Exception {
    }

    protected void createSchemaChangeEventsForTables(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, RelationalSnapshotContext<P, O> relationalSnapshotContext, AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask) throws Exception {
        tryStartingSnapshot(relationalSnapshotContext);
        Iterator<TableId> it = relationalSnapshotContext.capturedTables.iterator();
        while (it.hasNext()) {
            TableId next = it.next();
            if (!changeEventSourceContext.isRunning()) {
                throw new InterruptedException("Interrupted while capturing schema of table " + next);
            }
            LOGGER.debug("Capturing structure of table {}", next);
            Table forTable = relationalSnapshotContext.tables.forTable(next);
            if (this.schema.isHistorized()) {
                relationalSnapshotContext.offset.event(next, getClock().currentTime());
                if (!snapshottingTask.snapshotData() && !it.hasNext()) {
                    lastSnapshotRecord(relationalSnapshotContext);
                }
                this.dispatcher.dispatchSchemaChangeEvent((EventDispatcher<TableId>) forTable.id(), receiver -> {
                    try {
                        receiver.schemaChangeEvent(getCreateTableEvent(relationalSnapshotContext, forTable));
                    } catch (Exception e) {
                        throw new DebeziumException(e);
                    }
                });
            }
        }
    }

    protected abstract SchemaChangeEvent getCreateTableEvent(RelationalSnapshotContext<P, O> relationalSnapshotContext, Table table) throws Exception;

    private void createDataEvents(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, RelationalSnapshotContext<P, O> relationalSnapshotContext) throws Exception {
        EventDispatcher.SnapshotReceiver snapshotChangeEventReceiver = this.dispatcher.getSnapshotChangeEventReceiver();
        tryStartingSnapshot(relationalSnapshotContext);
        int size = relationalSnapshotContext.capturedTables.size();
        int i = 1;
        LOGGER.info("Snapshotting contents of {} tables while still in transaction", Integer.valueOf(size));
        Iterator<TableId> it = relationalSnapshotContext.capturedTables.iterator();
        while (it.hasNext()) {
            TableId next = it.next();
            relationalSnapshotContext.lastTable = !it.hasNext();
            if (!changeEventSourceContext.isRunning()) {
                throw new InterruptedException("Interrupted while snapshotting table " + next);
            }
            LOGGER.debug("Snapshotting table {}", next);
            int i2 = i;
            i++;
            createDataEventsForTable(changeEventSourceContext, relationalSnapshotContext, snapshotChangeEventReceiver, relationalSnapshotContext.tables.forTable(next), i2, size);
        }
        releaseDataSnapshotLocks(relationalSnapshotContext);
        relationalSnapshotContext.offset.preSnapshotCompletion();
        snapshotChangeEventReceiver.completeSnapshot();
        relationalSnapshotContext.offset.postSnapshotCompletion();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void tryStartingSnapshot(RelationalSnapshotContext<P, O> relationalSnapshotContext) {
        if (relationalSnapshotContext.offset.isSnapshotRunning()) {
            return;
        }
        relationalSnapshotContext.offset.preSnapshotStart();
    }

    private void createDataEventsForTable(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, RelationalSnapshotContext<P, O> relationalSnapshotContext, EventDispatcher.SnapshotReceiver snapshotReceiver, Table table, int i, int i2) throws InterruptedException {
        long currentTimeInMillis = this.clock.currentTimeInMillis();
        LOGGER.info("Exporting data from table '{}' ({} of {} tables)", table.id(), Integer.valueOf(i), Integer.valueOf(i2));
        Optional<String> determineSnapshotSelect = determineSnapshotSelect(relationalSnapshotContext, table.id());
        if (!determineSnapshotSelect.isPresent()) {
            LOGGER.warn("For table '{}' the select statement was not provided, skipping table", table.id());
            this.snapshotProgressListener.dataCollectionSnapshotCompleted(table.id(), 0L);
            return;
        }
        LOGGER.info("\t For table '{}' using select statement: '{}'", table.id(), determineSnapshotSelect.get());
        OptionalLong rowCountForTable = rowCountForTable(table.id());
        try {
            Statement readTableStatement = readTableStatement(rowCountForTable);
            try {
                ResultSet executeQuery = readTableStatement.executeQuery(determineSnapshotSelect.get());
                try {
                    ColumnUtils.ColumnArray array = ColumnUtils.toArray(executeQuery, table);
                    long j = 0;
                    Threads.Timer tableScanLogTimer = getTableScanLogTimer();
                    relationalSnapshotContext.lastRecordInTable = false;
                    if (executeQuery.next()) {
                        while (!relationalSnapshotContext.lastRecordInTable) {
                            if (!changeEventSourceContext.isRunning()) {
                                throw new InterruptedException("Interrupted while snapshotting table " + table.id());
                            }
                            j++;
                            Object[] rowToArray = this.jdbcConnection.rowToArray(table, schema(), executeQuery, array);
                            relationalSnapshotContext.lastRecordInTable = !executeQuery.next();
                            if (tableScanLogTimer.expired()) {
                                long currentTimeInMillis2 = this.clock.currentTimeInMillis();
                                if (rowCountForTable.isPresent()) {
                                    LOGGER.info("\t Exported {} of {} records for table '{}' after {}", Long.valueOf(j), Long.valueOf(rowCountForTable.getAsLong()), table.id(), Strings.duration(currentTimeInMillis2 - currentTimeInMillis));
                                } else {
                                    LOGGER.info("\t Exported {} records for table '{}' after {}", Long.valueOf(j), table.id(), Strings.duration(currentTimeInMillis2 - currentTimeInMillis));
                                }
                                this.snapshotProgressListener.rowsScanned(table.id(), j);
                                tableScanLogTimer = getTableScanLogTimer();
                            }
                            if (relationalSnapshotContext.lastTable && relationalSnapshotContext.lastRecordInTable) {
                                lastSnapshotRecord(relationalSnapshotContext);
                            }
                            this.dispatcher.dispatchSnapshotEvent(table.id(), getChangeRecordEmitter(relationalSnapshotContext, table.id(), rowToArray), snapshotReceiver);
                        }
                    } else if (relationalSnapshotContext.lastTable) {
                        lastSnapshotRecord(relationalSnapshotContext);
                    }
                    LOGGER.info("\t Finished exporting {} records for table '{}'; total duration '{}'", Long.valueOf(j), table.id(), Strings.duration(this.clock.currentTimeInMillis() - currentTimeInMillis));
                    this.snapshotProgressListener.dataCollectionSnapshotCompleted(table.id(), j);
                    if (executeQuery != null) {
                        executeQuery.close();
                    }
                    if (readTableStatement != null) {
                        readTableStatement.close();
                    }
                } catch (Throwable th) {
                    if (executeQuery != null) {
                        try {
                            executeQuery.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (SQLException e) {
            throw new ConnectException("Snapshotting of table " + table.id() + " failed", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void lastSnapshotRecord(RelationalSnapshotContext<P, O> relationalSnapshotContext) {
        relationalSnapshotContext.offset.markLastSnapshotRecord();
    }

    protected OptionalLong rowCountForTable(TableId tableId) {
        return OptionalLong.empty();
    }

    private Threads.Timer getTableScanLogTimer() {
        return Threads.timer(this.clock, LOG_INTERVAL);
    }

    protected ChangeRecordEmitter getChangeRecordEmitter(AbstractSnapshotChangeEventSource.SnapshotContext<P, O> snapshotContext, TableId tableId, Object[] objArr) {
        snapshotContext.offset.event(tableId, getClock().currentTime());
        return new SnapshotChangeRecordEmitter(snapshotContext.partition, snapshotContext.offset, objArr, getClock());
    }

    private Optional<String> determineSnapshotSelect(RelationalSnapshotContext<P, O> relationalSnapshotContext, TableId tableId) {
        String str = this.connectorConfig.getSnapshotSelectOverridesByTable().get(tableId);
        if (str == null) {
            str = this.connectorConfig.getSnapshotSelectOverridesByTable().get(new TableId(null, tableId.schema(), tableId.table()));
        }
        return str != null ? Optional.of(enhanceOverriddenSelect(relationalSnapshotContext, str, tableId)) : getSnapshotSelect(relationalSnapshotContext, tableId, getPreparedColumnNames(this.schema.tableFor(tableId)));
    }

    protected List<String> getPreparedColumnNames(Table table) {
        List<String> list = (List) table.retrieveColumnNames().stream().filter(str -> {
            return additionalColumnFilter(table.id(), str);
        }).filter(str2 -> {
            return this.connectorConfig.getColumnFilter().matches(table.id().catalog(), table.id().schema(), table.id().table(), str2);
        }).map(str3 -> {
            return this.jdbcConnection.quotedColumnIdString(str3);
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            LOGGER.info("\t All columns in table {} were excluded due to include/exclude lists, defaulting to selecting all columns", table.id());
            list = (List) table.retrieveColumnNames().stream().map(str4 -> {
                return this.jdbcConnection.quotedColumnIdString(str4);
            }).collect(Collectors.toList());
        }
        return list;
    }

    protected boolean additionalColumnFilter(TableId tableId, String str) {
        return true;
    }

    protected String enhanceOverriddenSelect(RelationalSnapshotContext<P, O> relationalSnapshotContext, String str, TableId tableId) {
        return str;
    }

    protected abstract Optional<String> getSnapshotSelect(RelationalSnapshotContext<P, O> relationalSnapshotContext, TableId tableId, List<String> list);

    protected RelationalDatabaseSchema schema() {
        return this.schema;
    }

    protected Object getColumnValue(ResultSet resultSet, int i, Column column, Table table) throws SQLException {
        return this.jdbcConnection.getColumnValue(resultSet, i, column, table, schema());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Statement readTableStatement(OptionalLong optionalLong) throws SQLException {
        return this.jdbcConnection.readTableStatement(this.connectorConfig, optionalLong);
    }

    private void rollbackTransaction(Connection connection) {
        if (connection != null) {
            try {
                connection.rollback();
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Clock getClock() {
        return this.clock;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void postSnapshot() throws InterruptedException {
    }
}
