package io.debezium.connector.sqlserver;

import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.util.Clock;
import io.debezium.util.ElapsedTimeStrategy;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/debezium-connector-sqlserver-1.9.7.Final.jar:io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.class */
public class SqlServerStreamingChangeEventSource implements StreamingChangeEventSource<SqlServerPartition, SqlServerOffsetContext> {
    private static final Pattern MISSING_CDC_FUNCTION_CHANGES_ERROR = Pattern.compile("Invalid object name '(.*)\\.cdc.fn_cdc_get_all_changes_(.*)'\\.");
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) SqlServerStreamingChangeEventSource.class);
    private static final Duration DEFAULT_INTERVAL_BETWEEN_COMMITS = Duration.ofMinutes(1);
    private static final int INTERVAL_BETWEEN_COMMITS_BASED_ON_POLL_FACTOR = 3;
    private final SqlServerConnection dataConnection;
    private final SqlServerConnection metadataConnection;
    private final EventDispatcher<SqlServerPartition, TableId> dispatcher;
    private final ErrorHandler errorHandler;
    private final Clock clock;
    private final SqlServerDatabaseSchema schema;
    private final Duration pollInterval;
    private final SqlServerConnectorConfig connectorConfig;
    private final ElapsedTimeStrategy pauseBetweenCommits;
    private final Map<SqlServerPartition, SqlServerStreamingExecutionContext> streamingExecutionContexts;

    public SqlServerStreamingChangeEventSource(SqlServerConnectorConfig sqlServerConnectorConfig, SqlServerConnection sqlServerConnection, SqlServerConnection sqlServerConnection2, EventDispatcher<SqlServerPartition, TableId> eventDispatcher, ErrorHandler errorHandler, Clock clock, SqlServerDatabaseSchema sqlServerDatabaseSchema) {
        this.connectorConfig = sqlServerConnectorConfig;
        this.dataConnection = sqlServerConnection;
        this.metadataConnection = sqlServerConnection2;
        this.dispatcher = eventDispatcher;
        this.errorHandler = errorHandler;
        this.clock = clock;
        this.schema = sqlServerDatabaseSchema;
        this.pollInterval = sqlServerConnectorConfig.getPollInterval();
        Duration multipliedBy = this.pollInterval.multipliedBy(3L);
        this.pauseBetweenCommits = ElapsedTimeStrategy.constant(clock, DEFAULT_INTERVAL_BETWEEN_COMMITS.compareTo(multipliedBy) > 0 ? DEFAULT_INTERVAL_BETWEEN_COMMITS.toMillis() : multipliedBy.toMillis());
        this.pauseBetweenCommits.hasElapsed();
        this.streamingExecutionContexts = new HashMap();
    }

    @Override // io.debezium.pipeline.source.spi.StreamingChangeEventSource
    public void execute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, SqlServerPartition sqlServerPartition, SqlServerOffsetContext sqlServerOffsetContext) throws InterruptedException {
        throw new UnsupportedOperationException("Currently unsupported by the SQL Server connector");
    }

    @Override // io.debezium.pipeline.source.spi.StreamingChangeEventSource
    public boolean executeIteration(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, SqlServerPartition sqlServerPartition, SqlServerOffsetContext sqlServerOffsetContext) throws InterruptedException {
        if (this.connectorConfig.getSnapshotMode().equals(SqlServerConnectorConfig.SnapshotMode.INITIAL_ONLY)) {
            LOGGER.info("Streaming is not enabled in current configuration");
            return false;
        }
        String databaseName = sqlServerPartition.getDatabaseName();
        try {
            SqlServerStreamingExecutionContext orDefault = this.streamingExecutionContexts.getOrDefault(sqlServerPartition, new SqlServerStreamingExecutionContext(new PriorityQueue((sqlServerChangeTable, sqlServerChangeTable2) -> {
                return sqlServerChangeTable.getStopLsn().compareTo(sqlServerChangeTable2.getStopLsn());
            }), new AtomicReference(), sqlServerOffsetContext.getChangePosition(), new AtomicBoolean(false), sqlServerOffsetContext.isSnapshotCompleted()));
            if (!this.streamingExecutionContexts.containsKey(sqlServerPartition)) {
                this.streamingExecutionContexts.put(sqlServerPartition, orDefault);
                LOGGER.info("Last position recorded in offsets is {}[{}]", sqlServerOffsetContext.getChangePosition(), Long.valueOf(sqlServerOffsetContext.getEventSerialNo()));
            }
            Queue<SqlServerChangeTable> schemaChangeCheckpoints = orDefault.getSchemaChangeCheckpoints();
            AtomicReference<SqlServerChangeTable[]> tablesSlot = orDefault.getTablesSlot();
            TxLogPosition changePosition = sqlServerOffsetContext.getChangePosition();
            long eventSerialNo = sqlServerOffsetContext.getEventSerialNo();
            AtomicBoolean changesStoppedBeingMonotonic = orDefault.getChangesStoppedBeingMonotonic();
            int maxTransactionsPerIteration = this.connectorConfig.getMaxTransactionsPerIteration();
            TxLogPosition lastProcessedPosition = orDefault.getLastProcessedPosition();
            if (changeEventSourceContext.isRunning()) {
                commitTransaction();
                Lsn toLsn = getToLsn(this.dataConnection, databaseName, lastProcessedPosition, maxTransactionsPerIteration);
                if (!toLsn.isAvailable()) {
                    LOGGER.warn("No maximum LSN recorded in the database; please ensure that the SQL Server Agent is running");
                    return false;
                }
                if (toLsn.compareTo(lastProcessedPosition.getCommitLsn()) <= 0 && orDefault.getShouldIncreaseFromLsn()) {
                    LOGGER.debug("No change in the database");
                    return false;
                }
                Lsn incrementLsn = (lastProcessedPosition.getCommitLsn().isAvailable() && orDefault.getShouldIncreaseFromLsn()) ? this.dataConnection.incrementLsn(databaseName, lastProcessedPosition.getCommitLsn()) : lastProcessedPosition.getCommitLsn();
                orDefault.setShouldIncreaseFromLsn(true);
                while (!schemaChangeCheckpoints.isEmpty()) {
                    migrateTable(sqlServerPartition, schemaChangeCheckpoints, sqlServerOffsetContext);
                }
                if (!this.dataConnection.getNewChangeTables(databaseName, incrementLsn, toLsn).isEmpty()) {
                    SqlServerChangeTable[] changeTablesToQuery = getChangeTablesToQuery(sqlServerPartition, sqlServerOffsetContext, toLsn);
                    tablesSlot.set(changeTablesToQuery);
                    for (SqlServerChangeTable sqlServerChangeTable3 : changeTablesToQuery) {
                        if (sqlServerChangeTable3.getStartLsn().isBetween(incrementLsn, toLsn)) {
                            LOGGER.info("Schema will be changed for {}", sqlServerChangeTable3);
                            schemaChangeCheckpoints.add(sqlServerChangeTable3);
                        }
                    }
                }
                if (tablesSlot.get() == null) {
                    tablesSlot.set(getChangeTablesToQuery(sqlServerPartition, sqlServerOffsetContext, toLsn));
                }
                try {
                    this.dataConnection.getChangesForTables(databaseName, tablesSlot.get(), incrementLsn, toLsn, resultSetArr -> {
                        SqlServerChangeTablePointer sqlServerChangeTablePointer;
                        TableId sourceTableId;
                        long j = 1;
                        int length = resultSetArr.length;
                        SqlServerChangeTablePointer[] sqlServerChangeTablePointerArr = new SqlServerChangeTablePointer[length];
                        SqlServerChangeTable[] sqlServerChangeTableArr = (SqlServerChangeTable[]) tablesSlot.get();
                        for (int i = 0; i < length; i++) {
                            sqlServerChangeTablePointerArr[i] = new SqlServerChangeTablePointer(sqlServerChangeTableArr[i], resultSetArr[i], this.connectorConfig.getSourceTimestampMode());
                            sqlServerChangeTablePointerArr[i].next();
                        }
                        while (true) {
                            sqlServerChangeTablePointer = null;
                            for (SqlServerChangeTablePointer sqlServerChangeTablePointer2 : sqlServerChangeTablePointerArr) {
                                if (!sqlServerChangeTablePointer2.isCompleted() && (sqlServerChangeTablePointer == null || sqlServerChangeTablePointer2.compareTo(sqlServerChangeTablePointer) < 0)) {
                                    sqlServerChangeTablePointer = sqlServerChangeTablePointer2;
                                }
                            }
                            if (sqlServerChangeTablePointer == null) {
                                return;
                            }
                            if (sqlServerChangeTablePointer.getChangePosition().isAvailable() && sqlServerChangeTablePointer.getChangePosition().getInTxLsn().isAvailable()) {
                                if (sqlServerChangeTablePointer.isNewTransaction() && changesStoppedBeingMonotonic.get()) {
                                    LOGGER.info("Resetting changesStoppedBeingMonotonic as transaction changes");
                                    changesStoppedBeingMonotonic.set(false);
                                }
                                if (sqlServerChangeTablePointer.isCurrentPositionSmallerThanPreviousPosition()) {
                                    LOGGER.info("Disabling skipping changes due to not monotonic order of changes");
                                    changesStoppedBeingMonotonic.set(true);
                                }
                                if (!changesStoppedBeingMonotonic.get() && sqlServerChangeTablePointer.getChangePosition().compareTo(changePosition) < 0) {
                                    LOGGER.info("Skipping change {} as its position is smaller than the last recorded position {}", sqlServerChangeTablePointer, changePosition);
                                    sqlServerChangeTablePointer.next();
                                } else if (!changesStoppedBeingMonotonic.get() && sqlServerChangeTablePointer.getChangePosition().compareTo(changePosition) == 0 && j <= eventSerialNo) {
                                    LOGGER.info("Skipping change {} as its order in the transaction {} is smaller than or equal to the last recorded operation {}[{}]", sqlServerChangeTablePointer, Long.valueOf(j), changePosition, Long.valueOf(eventSerialNo));
                                    j++;
                                    sqlServerChangeTablePointer.next();
                                } else if (!sqlServerChangeTablePointer.getChangeTable().getStopLsn().isAvailable() || sqlServerChangeTablePointer.getChangeTable().getStopLsn().compareTo(sqlServerChangeTablePointer.getChangePosition().getCommitLsn()) > 0) {
                                    LOGGER.trace("Processing change {}", sqlServerChangeTablePointer);
                                    LOGGER.trace("Schema change checkpoints {}", schemaChangeCheckpoints);
                                    if (!schemaChangeCheckpoints.isEmpty() && sqlServerChangeTablePointer.getChangePosition().getCommitLsn().compareTo(((SqlServerChangeTable) schemaChangeCheckpoints.peek()).getStartLsn()) >= 0) {
                                        migrateTable(sqlServerPartition, schemaChangeCheckpoints, sqlServerOffsetContext);
                                    }
                                    sourceTableId = sqlServerChangeTablePointer.getChangeTable().getSourceTableId();
                                    int operation = sqlServerChangeTablePointer.getOperation();
                                    Object[] data = sqlServerChangeTablePointer.getData();
                                    int i2 = 1;
                                    if (operation == 3) {
                                        if (!sqlServerChangeTablePointer.next() || sqlServerChangeTablePointer.getOperation() != 4) {
                                            break;
                                        } else {
                                            i2 = 2;
                                        }
                                    }
                                    Object[] data2 = operation == 3 ? sqlServerChangeTablePointer.getData() : null;
                                    sqlServerOffsetContext.setChangePosition(sqlServerChangeTablePointer.getChangePosition(), i2);
                                    sqlServerOffsetContext.event(sqlServerChangeTablePointer.getChangeTable().getSourceTableId(), this.connectorConfig.getSourceTimestampMode().getTimestamp(this.clock, sqlServerChangeTablePointer.getResultSet()));
                                    this.dispatcher.dispatchDataChangeEvent(sqlServerPartition, sourceTableId, new SqlServerChangeRecordEmitter(sqlServerPartition, sqlServerOffsetContext, operation, data, data2, this.clock));
                                    sqlServerChangeTablePointer.next();
                                } else {
                                    LOGGER.debug("Skipping table change {} as its stop LSN is smaller than the last recorded LSN {}", sqlServerChangeTablePointer, sqlServerChangeTablePointer.getChangePosition());
                                    sqlServerChangeTablePointer.next();
                                }
                            } else {
                                LOGGER.error("Skipping change {} as its LSN is NULL which is not expected", sqlServerChangeTablePointer);
                                sqlServerChangeTablePointer.next();
                            }
                        }
                        throw new IllegalStateException("The update before event at " + sqlServerChangeTablePointer.getChangePosition() + " for table " + sourceTableId + " was not followed by after event.\n Please report this as a bug together with a events around given LSN.");
                    });
                    orDefault.setLastProcessedPosition(TxLogPosition.valueOf(toLsn));
                    this.dataConnection.rollback();
                } catch (SQLException e) {
                    tablesSlot.set(processErrorFromChangeTableQuery(databaseName, e, tablesSlot.get()));
                }
            }
            return true;
        } catch (Exception e2) {
            this.errorHandler.setProducerThrowable(e2);
            return true;
        }
    }

    private void commitTransaction() throws SQLException {
        if (this.connectorConfig.isReadOnlyDatabaseConnection() || this.pauseBetweenCommits.hasElapsed()) {
            this.dataConnection.commit();
            this.metadataConnection.commit();
        }
    }

    private void migrateTable(SqlServerPartition sqlServerPartition, Queue<SqlServerChangeTable> queue, SqlServerOffsetContext sqlServerOffsetContext) throws InterruptedException, SQLException {
        SqlServerChangeTable poll = queue.poll();
        LOGGER.info("Migrating schema to {}", poll);
        Table tableFor = this.schema.tableFor(poll.getSourceTableId());
        Table tableSchemaFromTable = this.metadataConnection.getTableSchemaFromTable(sqlServerPartition.getDatabaseName(), poll);
        if (tableFor.equals(tableSchemaFromTable)) {
            LOGGER.info("Migration skipped, no table schema changes detected.");
        } else {
            this.dispatcher.dispatchSchemaChangeEvent(sqlServerPartition, poll.getSourceTableId(), new SqlServerSchemaChangeEventEmitter(sqlServerPartition, sqlServerOffsetContext, poll, tableSchemaFromTable, SchemaChangeEvent.SchemaChangeEventType.ALTER));
            poll.setSourceTable(tableSchemaFromTable);
        }
    }

    private SqlServerChangeTable[] processErrorFromChangeTableQuery(String str, SQLException sQLException, SqlServerChangeTable[] sqlServerChangeTableArr) throws Exception {
        Matcher matcher = MISSING_CDC_FUNCTION_CHANGES_ERROR.matcher(sQLException.getMessage());
        if (!matcher.matches() || !matcher.group(1).equals(str)) {
            throw sQLException;
        }
        String group = matcher.group(2);
        LOGGER.info("Table is no longer captured with capture instance {}", group);
        return (SqlServerChangeTable[]) Arrays.stream(sqlServerChangeTableArr).filter(sqlServerChangeTable -> {
            return !sqlServerChangeTable.getCaptureInstance().equals(group);
        }).toArray(i -> {
            return new SqlServerChangeTable[i];
        });
    }

    private SqlServerChangeTable[] getChangeTablesToQuery(SqlServerPartition sqlServerPartition, SqlServerOffsetContext sqlServerOffsetContext, Lsn lsn) throws SQLException, InterruptedException {
        SqlServerChangeTable sqlServerChangeTable;
        String databaseName = sqlServerPartition.getDatabaseName();
        List<SqlServerChangeTable> changeTables = this.dataConnection.getChangeTables(databaseName, lsn);
        if (changeTables.isEmpty()) {
            LOGGER.warn("No table has enabled CDC or security constraints prevents getting the list of change tables");
        }
        Map map = (Map) changeTables.stream().filter(sqlServerChangeTable2 -> {
            if (this.connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(sqlServerChangeTable2.getSourceTableId())) {
                return true;
            }
            LOGGER.info("CDC is enabled for table {} but the table is not whitelisted by connector", sqlServerChangeTable2);
            return false;
        }).collect(Collectors.groupingBy((v0) -> {
            return v0.getSourceTableId();
        }));
        if (map.isEmpty()) {
            LOGGER.warn("No whitelisted table has enabled CDC, whitelisted table list does not contain any table with CDC enabled or no table match the white/blacklist filter(s)");
        }
        ArrayList arrayList = new ArrayList();
        for (List list : map.values()) {
            SqlServerChangeTable sqlServerChangeTable3 = (SqlServerChangeTable) list.get(0);
            if (list.size() > 1) {
                if (((SqlServerChangeTable) list.get(0)).getStartLsn().compareTo(((SqlServerChangeTable) list.get(1)).getStartLsn()) < 0) {
                    sqlServerChangeTable = (SqlServerChangeTable) list.get(1);
                } else {
                    sqlServerChangeTable3 = (SqlServerChangeTable) list.get(1);
                    sqlServerChangeTable = (SqlServerChangeTable) list.get(0);
                }
                sqlServerChangeTable3.setStopLsn(sqlServerChangeTable.getStartLsn());
                sqlServerChangeTable.setSourceTable(this.dataConnection.getTableSchemaFromTable(databaseName, sqlServerChangeTable));
                arrayList.add(sqlServerChangeTable);
                LOGGER.info("Multiple capture instances present for the same table: {} and {}", sqlServerChangeTable3, sqlServerChangeTable);
            }
            if (this.schema.tableFor(sqlServerChangeTable3.getSourceTableId()) == null) {
                LOGGER.info("Table {} is new to be monitored by capture instance {}", sqlServerChangeTable3.getSourceTableId(), sqlServerChangeTable3.getCaptureInstance());
                sqlServerOffsetContext.event(sqlServerChangeTable3.getSourceTableId(), Instant.now());
                this.dispatcher.dispatchSchemaChangeEvent(sqlServerPartition, sqlServerChangeTable3.getSourceTableId(), new SqlServerSchemaChangeEventEmitter(sqlServerPartition, sqlServerOffsetContext, sqlServerChangeTable3, this.dataConnection.getTableSchemaFromTable(databaseName, sqlServerChangeTable3), SchemaChangeEvent.SchemaChangeEventType.CREATE));
            }
            sqlServerChangeTable3.setSourceTable(this.schema.tableFor(sqlServerChangeTable3.getSourceTableId()));
            arrayList.add(sqlServerChangeTable3);
        }
        return (SqlServerChangeTable[]) arrayList.toArray(new SqlServerChangeTable[arrayList.size()]);
    }

    private Lsn getToLsn(SqlServerConnection sqlServerConnection, String str, TxLogPosition txLogPosition, int i) throws SQLException {
        if (i == 0) {
            return sqlServerConnection.getMaxTransactionLsn(str);
        }
        Lsn commitLsn = txLogPosition.getCommitLsn();
        return !commitLsn.isAvailable() ? sqlServerConnection.getNthTransactionLsnFromBeginning(str, i) : sqlServerConnection.getNthTransactionLsnFromLast(str, commitLsn, i);
    }
}
