package io.debezium.pipeline;

import io.debezium.annotation.ThreadSafe;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.spi.ChangeEventSourceMetricsFactory;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSourceFactory;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.util.LoggingContext;
import io.debezium.util.Threads;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:META-INF/bundled-dependencies/debezium-core-1.7.1.Final.jar:io/debezium/pipeline/ChangeEventSourceCoordinator.class */
public class ChangeEventSourceCoordinator<P extends Partition, O extends OffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ChangeEventSourceCoordinator.class);
    public static final Duration SHUTDOWN_WAIT_TIMEOUT = Duration.ofSeconds(90);
    private final Offsets<P, O> previousOffsets;
    private final ErrorHandler errorHandler;
    private final ChangeEventSourceFactory<P, O> changeEventSourceFactory;
    private final ChangeEventSourceMetricsFactory changeEventSourceMetricsFactory;
    private final ExecutorService executor;
    private final EventDispatcher<?> eventDispatcher;
    private final DatabaseSchema<?> schema;
    private volatile boolean running;
    private volatile StreamingChangeEventSource<P, O> streamingSource;
    private final ReentrantLock commitOffsetLock = new ReentrantLock();
    private SnapshotChangeEventSourceMetrics snapshotMetrics;
    private StreamingChangeEventSourceMetrics streamingMetrics;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:META-INF/bundled-dependencies/debezium-core-1.7.1.Final.jar:io/debezium/pipeline/ChangeEventSourceCoordinator$CatchUpStreamingResult.class */
    public class CatchUpStreamingResult {
        public boolean performedCatchUpStreaming;

        public CatchUpStreamingResult(boolean z) {
            this.performedCatchUpStreaming = z;
        }
    }

    /* loaded from: input_file:META-INF/bundled-dependencies/debezium-core-1.7.1.Final.jar:io/debezium/pipeline/ChangeEventSourceCoordinator$ChangeEventSourceContextImpl.class */
    private class ChangeEventSourceContextImpl implements ChangeEventSource.ChangeEventSourceContext {
        private ChangeEventSourceContextImpl() {
        }

        @Override // io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext
        public boolean isRunning() {
            return ChangeEventSourceCoordinator.this.running;
        }
    }

    public ChangeEventSourceCoordinator(Offsets<P, O> offsets, ErrorHandler errorHandler, Class<? extends SourceConnector> cls, CommonConnectorConfig commonConnectorConfig, ChangeEventSourceFactory<P, O> changeEventSourceFactory, ChangeEventSourceMetricsFactory changeEventSourceMetricsFactory, EventDispatcher<?> eventDispatcher, DatabaseSchema<?> databaseSchema) {
        this.previousOffsets = offsets;
        this.errorHandler = errorHandler;
        this.changeEventSourceFactory = changeEventSourceFactory;
        this.changeEventSourceMetricsFactory = changeEventSourceMetricsFactory;
        this.executor = Threads.newSingleThreadExecutor(cls, commonConnectorConfig.getLogicalName(), "change-event-source-coordinator");
        this.eventDispatcher = eventDispatcher;
        this.schema = databaseSchema;
    }

    public synchronized void start(CdcSourceTaskContext cdcSourceTaskContext, ChangeEventQueueMetrics changeEventQueueMetrics, EventMetadataProvider eventMetadataProvider) {
        P theOnlyPartition = this.previousOffsets.getTheOnlyPartition();
        O theOnlyOffset = this.previousOffsets.getTheOnlyOffset();
        AtomicReference atomicReference = new AtomicReference();
        try {
            this.snapshotMetrics = this.changeEventSourceMetricsFactory.getSnapshotMetrics(cdcSourceTaskContext, changeEventQueueMetrics, eventMetadataProvider);
            this.streamingMetrics = this.changeEventSourceMetricsFactory.getStreamingMetrics(cdcSourceTaskContext, changeEventQueueMetrics, eventMetadataProvider);
            this.running = true;
            this.executor.submit(() -> {
                try {
                    try {
                        try {
                            atomicReference.set(cdcSourceTaskContext.configureLoggingContext("snapshot"));
                            this.snapshotMetrics.register(LOGGER);
                            this.streamingMetrics.register(LOGGER);
                            LOGGER.info("Metrics registered");
                            ChangeEventSourceContextImpl changeEventSourceContextImpl = new ChangeEventSourceContextImpl();
                            LOGGER.info("Context created");
                            SnapshotChangeEventSource<P, O> snapshotChangeEventSource = this.changeEventSourceFactory.getSnapshotChangeEventSource(this.snapshotMetrics);
                            if (executeCatchUpStreaming(changeEventSourceContextImpl, snapshotChangeEventSource, theOnlyPartition, theOnlyOffset).performedCatchUpStreaming) {
                                streamingConnected(false);
                                this.commitOffsetLock.lock();
                                this.streamingSource = null;
                                this.commitOffsetLock.unlock();
                            }
                            this.eventDispatcher.setEventListener(this.snapshotMetrics);
                            SnapshotResult<O> execute = snapshotChangeEventSource.execute(changeEventSourceContextImpl, theOnlyPartition, theOnlyOffset);
                            LOGGER.info("Snapshot ended with {}", execute);
                            if (execute.getStatus() == SnapshotResult.SnapshotResultStatus.COMPLETED || this.schema.tableInformationComplete()) {
                                this.schema.assureNonEmptySchema();
                            }
                            if (this.running && execute.isCompletedOrSkipped()) {
                                atomicReference.set(cdcSourceTaskContext.configureLoggingContext("streaming"));
                                streamEvents(changeEventSourceContextImpl, theOnlyPartition, execute.getOffset());
                            }
                            streamingConnected(false);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            LOGGER.warn("Change event source executor was interrupted", (Throwable) e);
                            streamingConnected(false);
                        }
                    } catch (Throwable th) {
                        this.errorHandler.setProducerThrowable(th);
                        streamingConnected(false);
                    }
                } catch (Throwable th2) {
                    streamingConnected(false);
                    throw th2;
                }
            });
            if (atomicReference.get() != null) {
                ((LoggingContext.PreviousContext) atomicReference.get()).restore();
            }
        } catch (Throwable th) {
            if (atomicReference.get() != null) {
                ((LoggingContext.PreviousContext) atomicReference.get()).restore();
            }
            throw th;
        }
    }

    protected ChangeEventSourceCoordinator<P, O>.CatchUpStreamingResult executeCatchUpStreaming(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, SnapshotChangeEventSource<P, O> snapshotChangeEventSource, P p, O o) throws InterruptedException {
        return new CatchUpStreamingResult(false);
    }

    protected void streamEvents(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, P p, O o) throws InterruptedException {
        this.streamingSource = this.changeEventSourceFactory.getStreamingChangeEventSource();
        Optional<IncrementalSnapshotChangeEventSource<? extends DataCollectionId>> incrementalSnapshotChangeEventSource = this.changeEventSourceFactory.getIncrementalSnapshotChangeEventSource(o, this.snapshotMetrics, this.snapshotMetrics);
        this.eventDispatcher.setIncrementalSnapshotChangeEventSource(incrementalSnapshotChangeEventSource);
        this.eventDispatcher.setEventListener(this.streamingMetrics);
        streamingConnected(true);
        LOGGER.info("Starting streaming");
        this.streamingSource.init();
        incrementalSnapshotChangeEventSource.ifPresent(incrementalSnapshotChangeEventSource2 -> {
            incrementalSnapshotChangeEventSource2.init(o);
        });
        this.streamingSource.execute(changeEventSourceContext, p, o);
        LOGGER.info("Finished streaming");
    }

    public void commitOffset(Map<String, ?> map) {
        if (this.commitOffsetLock.isLocked() || this.streamingSource == null || map == null) {
            return;
        }
        this.streamingSource.commitOffset(map);
    }

    public synchronized void stop() throws InterruptedException {
        this.running = false;
        try {
            Thread.interrupted();
            this.executor.shutdown();
            if (!this.executor.awaitTermination(SHUTDOWN_WAIT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) {
                LOGGER.warn("Coordinator didn't stop in the expected time, shutting down executor now");
                Thread.interrupted();
                this.executor.shutdownNow();
                this.executor.awaitTermination(SHUTDOWN_WAIT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
            }
        } finally {
            this.snapshotMetrics.unregister(LOGGER);
            this.streamingMetrics.unregister(LOGGER);
        }
    }

    private void streamingConnected(boolean z) {
        if (this.changeEventSourceMetricsFactory.connectionMetricHandledByCoordinator()) {
            this.streamingMetrics.connected(z);
        }
    }
}
