package io.debezium.connector.mongodb;

import io.debezium.annotation.ThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.util.LoggingContext;
import io.debezium.util.Threads;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.ConnectMetricsRegistry;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:META-INF/bundled-dependencies/debezium-connector-mongodb-0.10.0.Final.jar:io/debezium/connector/mongodb/MongoDbConnectorTask.class */
public final class MongoDbConnectorTask extends BaseSourceTask {
    private static final String CONTEXT_NAME = "mongodb-connector-task";
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final Deque<Replicator> replicators = new ConcurrentLinkedDeque();
    private final RecordBatchSummarizer recordSummarizer = new RecordBatchSummarizer();
    private volatile ChangeEventQueue<SourceRecord> queue;
    private volatile String taskName;
    private volatile MongoDbTaskContext taskContext;
    private volatile Throwable replicatorError;

    /* loaded from: input_file:META-INF/bundled-dependencies/debezium-connector-mongodb-0.10.0.Final.jar:io/debezium/connector/mongodb/MongoDbConnectorTask$RecordBatchSummarizer.class */
    protected final class RecordBatchSummarizer implements Consumer<List<SourceRecord>> {
        private final Map<String, ReplicaSetSummary> summaryByReplicaSet = new HashMap();

        protected RecordBatchSummarizer() {
        }

        @Override // java.util.function.Consumer
        public void accept(List<SourceRecord> list) {
            if (!list.isEmpty() && MongoDbConnectorTask.this.logger.isInfoEnabled()) {
                this.summaryByReplicaSet.clear();
                list.forEach(sourceRecord -> {
                    String replicaSetNameForPartition = SourceInfo.replicaSetNameForPartition(sourceRecord.sourcePartition());
                    if (replicaSetNameForPartition != null) {
                        this.summaryByReplicaSet.computeIfAbsent(replicaSetNameForPartition, str -> {
                            return new ReplicaSetSummary();
                        }).add(sourceRecord);
                    }
                });
                if (this.summaryByReplicaSet.isEmpty()) {
                    return;
                }
                LoggingContext.PreviousContext configureLoggingContext = MongoDbConnectorTask.this.taskContext.configureLoggingContext(ConnectMetricsRegistry.TASK_TAG_NAME);
                try {
                    this.summaryByReplicaSet.forEach((str, replicaSetSummary) -> {
                        MongoDbConnectorTask.this.logger.info("{} records sent for replica set '{}', last offset: {}", Integer.valueOf(replicaSetSummary.recordCount()), str, replicaSetSummary.lastOffset());
                    });
                } finally {
                    configureLoggingContext.restore();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:META-INF/bundled-dependencies/debezium-connector-mongodb-0.10.0.Final.jar:io/debezium/connector/mongodb/MongoDbConnectorTask$ReplicaSetSummary.class */
    public static final class ReplicaSetSummary {
        private int numRecords = 0;
        private Map<String, ?> lastOffset;

        protected ReplicaSetSummary() {
        }

        public void add(SourceRecord sourceRecord) {
            this.numRecords++;
            this.lastOffset = sourceRecord.sourceOffset();
        }

        public int recordCount() {
            return this.numRecords;
        }

        public Map<String, ?> lastOffset() {
            return this.lastOffset;
        }
    }

    @Override // org.apache.kafka.connect.connector.Task
    public String version() {
        return Module.version();
    }

    @Override // io.debezium.connector.common.BaseSourceTask
    public void start(Configuration configuration) {
        if (this.running.compareAndSet(false, true)) {
            this.taskName = ConnectMetricsRegistry.TASK_TAG_NAME + configuration.getInteger(MongoDbConnectorConfig.TASK_ID);
            MongoDbTaskContext mongoDbTaskContext = new MongoDbTaskContext(configuration);
            this.taskContext = mongoDbTaskContext;
            LoggingContext.PreviousContext configureLoggingContext = mongoDbTaskContext.configureLoggingContext(this.taskName);
            try {
                String string = configuration.getString(MongoDbConnectorConfig.HOSTS);
                ReplicaSets parse = ReplicaSets.parse(string);
                if (parse.validReplicaSetCount() == 0) {
                    throw new ConnectException("Unable to start MongoDB connector task since no replica sets were found at " + string);
                }
                MongoDbConnectorConfig mongoDbConnectorConfig = new MongoDbConnectorConfig(configuration);
                this.queue = new ChangeEventQueue.Builder().pollInterval(mongoDbConnectorConfig.getPollInterval()).maxBatchSize(mongoDbConnectorConfig.getMaxBatchSize()).maxQueueSize(mongoDbConnectorConfig.getMaxQueueSize()).loggingContextSupplier(this::getLoggingContext).build();
                SourceInfo source = mongoDbTaskContext.source();
                ArrayList arrayList = new ArrayList();
                parse.onEachReplicaSet(replicaSet -> {
                    String replicaSetName = replicaSet.replicaSetName();
                    if (replicaSetName != null) {
                        arrayList.add(source.partition(replicaSetName));
                    }
                });
                Map offsets = this.context.offsetStorageReader().offsets(arrayList);
                source.getClass();
                offsets.forEach(source::setOffsetFor);
                int replicaSetCount = parse.replicaSetCount();
                ExecutorService newFixedThreadPool = Threads.newFixedThreadPool(MongoDbConnector.class, mongoDbTaskContext.serverName(), "replicator", replicaSetCount);
                AtomicInteger atomicInteger = new AtomicInteger(replicaSetCount);
                this.logger.info("Ignoring unnamed replica sets: {}", parse.unnamedReplicaSets());
                this.logger.info("Starting {} thread(s) to replicate replica sets: {}", Integer.valueOf(replicaSetCount), parse);
                parse.validReplicaSets().forEach(replicaSet2 -> {
                    ChangeEventQueue<SourceRecord> changeEventQueue = this.queue;
                    changeEventQueue.getClass();
                    Replicator replicator = new Replicator(mongoDbTaskContext, replicaSet2, (v1) -> {
                        r4.enqueue(v1);
                    }, this::failedReplicator);
                    this.replicators.add(replicator);
                    newFixedThreadPool.submit(() -> {
                        try {
                            mongoDbTaskContext.configureLoggingContext(replicaSet2.replicaSetName());
                            replicator.run();
                            try {
                                this.replicators.remove(replicator);
                                if (atomicInteger.decrementAndGet() == 0) {
                                    try {
                                        newFixedThreadPool.shutdown();
                                        mongoDbTaskContext.getConnectionContext().shutdown();
                                    } finally {
                                    }
                                }
                            } catch (Throwable th) {
                                if (atomicInteger.decrementAndGet() == 0) {
                                    try {
                                        newFixedThreadPool.shutdown();
                                        mongoDbTaskContext.getConnectionContext().shutdown();
                                    } finally {
                                    }
                                }
                                throw th;
                            }
                        } catch (Throwable th2) {
                            try {
                                this.replicators.remove(replicator);
                                if (atomicInteger.decrementAndGet() == 0) {
                                    try {
                                        newFixedThreadPool.shutdown();
                                        mongoDbTaskContext.getConnectionContext().shutdown();
                                    } finally {
                                        mongoDbTaskContext.getConnectionContext().shutdown();
                                    }
                                }
                                throw th2;
                            } catch (Throwable th3) {
                                if (atomicInteger.decrementAndGet() == 0) {
                                    try {
                                        newFixedThreadPool.shutdown();
                                        mongoDbTaskContext.getConnectionContext().shutdown();
                                    } finally {
                                        mongoDbTaskContext.getConnectionContext().shutdown();
                                    }
                                }
                                throw th3;
                            }
                        }
                    });
                });
                this.logger.info("Successfully started MongoDB connector task with {} thread(s) for replica sets {}", Integer.valueOf(replicaSetCount), parse);
                configureLoggingContext.restore();
            } catch (Throwable th) {
                configureLoggingContext.restore();
                throw th;
            }
        }
    }

    @Override // org.apache.kafka.connect.source.SourceTask
    public List<SourceRecord> poll() throws InterruptedException {
        if (this.replicatorError != null) {
            throw new ConnectException("Failing connector task, at least one of the replicators has failed");
        }
        List<SourceRecord> poll = this.queue.poll();
        this.recordSummarizer.accept(poll);
        return poll;
    }

    @Override // org.apache.kafka.connect.source.SourceTask, org.apache.kafka.connect.connector.Task
    public void stop() {
        LoggingContext.PreviousContext configureLoggingContext = this.taskContext.configureLoggingContext(this.taskName);
        try {
            try {
                if (this.running.compareAndSet(true, false)) {
                    this.logger.info("Stopping MongoDB task");
                    int i = 0;
                    while (true) {
                        Replicator poll = this.replicators.poll();
                        if (poll == null) {
                            break;
                        }
                        poll.stop();
                        i++;
                    }
                    this.logger.info("Stopped MongoDB replication task by stopping {} replicator threads", Integer.valueOf(i));
                }
                configureLoggingContext.restore();
            } catch (Throwable th) {
                this.logger.error("Unexpected error shutting down the MongoDB replication task", th);
                configureLoggingContext.restore();
            }
        } catch (Throwable th2) {
            configureLoggingContext.restore();
            throw th2;
        }
    }

    @Override // io.debezium.connector.common.BaseSourceTask
    protected Iterable<Field> getAllConfigurationFields() {
        return MongoDbConnectorConfig.ALL_FIELDS;
    }

    private LoggingContext.PreviousContext getLoggingContext() {
        return this.taskContext.configureLoggingContext(CONTEXT_NAME);
    }

    private void failedReplicator(Throwable th) {
        this.replicatorError = th;
        stop();
    }
}
