package com.mongodb.kafka.connect.source;

import com.mongodb.ConnectionString;
import com.mongodb.client.model.Collation;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.kafka.connect.util.ConfigHelper;
import com.mongodb.kafka.connect.util.ConnectConfigException;
import com.mongodb.kafka.connect.util.Validators;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.bson.Document;

/* loaded from: input_file:com/mongodb/kafka/connect/source/MongoSourceConfig.class */
public class MongoSourceConfig extends AbstractConfig {
    public static final String CONNECTION_URI_CONFIG = "connection.uri";
    private static final String CONNECTION_URI_DEFAULT = "mongodb://localhost:27017,localhost:27018,localhost:27019";
    private static final String CONNECTION_URI_DISPLAY = "MongoDB Connection URI";
    private static final String CONNECTION_URI_DOC = "The connection URI as supported by the official drivers. eg: ``mongodb://user@pass@locahost/``.";
    public static final String TOPIC_PREFIX_CONFIG = "topic.prefix";
    private static final String TOPIC_PREFIX_DOC = "Prefix to prepend to database & collection names to generate the name of the Kafka topic to publish data to.";
    private static final String TOPIC_PREFIX_DISPLAY = "Topic Prefix";
    private static final String TOPIC_PREFIX_DEFAULT = "";
    public static final String PIPELINE_CONFIG = "pipeline";
    private static final String PIPELINE_DISPLAY = "The pipeline to apply to the change stream";
    private static final String PIPELINE_DOC = "An inline JSON array with objects describing the pipeline operations to run.\nExample: `[{\"$match\": {\"operationType\": \"insert\"}}, {\"$addFields\": {\"Kafka\": \"Rules!\"}}]`";
    private static final String PIPELINE_DEFAULT = "[]";
    public static final String BATCH_SIZE_CONFIG = "batch.size";
    private static final String BATCH_SIZE_DISPLAY = "The cursor batch size";
    private static final String BATCH_SIZE_DOC = "The cursor batch size.";
    private static final int BATCH_SIZE_DEFAULT = 0;
    public static final String PUBLISH_FULL_DOCUMENT_ONLY_CONFIG = "publish.full.document.only";
    private static final String PUBLISH_FULL_DOCUMENT_ONLY_DISPLAY = "Publish only the `fullDocument` field";
    private static final String PUBLISH_FULL_DOCUMENT_ONLY_DOC = "Only publish the actual changed document rather than the full change stream document. Automatically, sets `change.stream.full.document=updateLookup` so updated documents will be included.";
    private static final boolean PUBLISH_FULL_DOCUMENT_ONLY_DEFAULT = false;
    public static final String FULL_DOCUMENT_CONFIG = "change.stream.full.document";
    private static final String FULL_DOCUMENT_DISPLAY = "Set what to return for update operations";
    private static final String FULL_DOCUMENT_DOC = "Determines what to return for update operations when using a Change Stream.\nWhen set to 'updateLookup', the change stream for partial updates will include both a delta describing the changes to the document as well as a copy of the entire document that was changed from *some time* after the change occurred.";
    private static final String FULL_DOCUMENT_DEFAULT = "";
    public static final String COLLATION_CONFIG = "collation";
    private static final String COLLATION_DISPLAY = "The collation options";
    private static final String COLLATION_DOC = "The json representation of the Collation options to use for the change stream.\nUse the `Collation.asDocument().toJson()` to create the specific json representation.";
    private static final String COLLATION_DEFAULT = "";
    public static final String POLL_MAX_BATCH_SIZE_CONFIG = "poll.max.batch.size";
    private static final String POLL_MAX_BATCH_SIZE_DISPLAY = "The maximum batch size";
    private static final String POLL_MAX_BATCH_SIZE_DOC = "Maximum number of change stream documents to include in a single batch when polling for new data. This setting can be used to limit the amount of data buffered internally in the connector.";
    private static final int POLL_MAX_BATCH_SIZE_DEFAULT = 1000;
    public static final String POLL_AWAIT_TIME_MS_CONFIG = "poll.await.time.ms";
    private static final String POLL_AWAIT_TIME_MS_DOC = "The amount of time to wait before checking for new results on the change stream";
    private static final int POLL_AWAIT_TIME_MS_DEFAULT = 5000;
    private static final String POLL_AWAIT_TIME_MS_DISPLAY = "Poll await time (ms)";
    public static final String DATABASE_CONFIG = "database";
    private static final String DATABASE_DISPLAY = "The database to watch.";
    private static final String DATABASE_DOC = "The database to watch. If not set then all databases will be watched.";
    private static final String DATABASE_DEFAULT = "";
    public static final String COLLECTION_CONFIG = "collection";
    private static final String COLLECTION_DISPLAY = "The collection to watch.";
    private static final String COLLECTION_DOC = "The collection in the database to watch. If not set then all collections will be watched.";
    private static final String COLLECTION_DEFAULT = "";
    public static final String COPY_EXISTING_CONFIG = "copy.existing";
    private static final String COPY_EXISTING_DISPLAY = "Copy existing data";
    private static final String COPY_EXISTING_DOC = "Copy existing data from all the collections being used as the source then add any changes after. It should be noted that the reading of all the data during the copy and then the subsequent change stream events may produce duplicated events. During the copy, clients can make changes to the data in MongoDB, which may be represented both by the copying process and the change stream. However, as the change stream events are idempotent the changes can be applied so that the data is eventually consistent. Renaming a collection during the copying process is not supported.";
    private static final boolean COPY_EXISTING_DEFAULT = false;
    public static final String COPY_EXISTING_MAX_THREADS_CONFIG = "copy.existing.max.threads";
    private static final String COPY_EXISTING_MAX_THREADS_DISPLAY = "Copy existing max number of threads";
    private static final String COPY_EXISTING_MAX_THREADS_DOC = "The number of threads to use when performing the data copy. Defaults to the number of processors";
    public static final String COPY_EXISTING_QUEUE_SIZE_CONFIG = "copy.existing.queue.size";
    private static final String COPY_EXISTING_QUEUE_SIZE_DISPLAY = "Copy existing queue size";
    private static final String COPY_EXISTING_QUEUE_SIZE_DOC = "The max size of the queue to use when copying data.";
    private static final int COPY_EXISTING_QUEUE_SIZE_DEFAULT = 16000;
    private final ConnectionString connectionString;
    private static final int COPY_EXISTING_MAX_THREADS_DEFAULT = Runtime.getRuntime().availableProcessors();
    public static final ConfigDef CONFIG = createConfigDef();
    private static final List<Consumer<MongoSourceConfig>> INITIALIZERS = Collections.singletonList((v0) -> {
        v0.validateCollection();
    });

    public MongoSourceConfig(Map<?, ?> map) {
        this(map, true);
    }

    private MongoSourceConfig(Map<?, ?> map, boolean z) {
        super(CONFIG, map, false);
        this.connectionString = new ConnectionString(getString("connection.uri"));
        if (z) {
            INITIALIZERS.forEach(consumer -> {
                consumer.accept(this);
            });
        }
    }

    public ConnectionString getConnectionString() {
        return this.connectionString;
    }

    public Optional<List<Document>> getPipeline() {
        return ConfigHelper.jsonArrayFromString(getString(PIPELINE_CONFIG));
    }

    public Optional<Collation> getCollation() {
        return ConfigHelper.collationFromJson(getString(COLLATION_CONFIG));
    }

    public Optional<FullDocument> getFullDocument() {
        return getBoolean(PUBLISH_FULL_DOCUMENT_ONLY_CONFIG).booleanValue() ? Optional.of(FullDocument.UPDATE_LOOKUP) : ConfigHelper.fullDocumentFromString(getString(FULL_DOCUMENT_CONFIG));
    }

    private void validateCollection() {
        String string = getString("database");
        String string2 = getString("collection");
        if (!string2.isEmpty() && string.isEmpty()) {
            throw new ConnectConfigException("collection", string2, String.format("Missing database configuration `%s`", "database"));
        }
    }

    private static ConfigDef createConfigDef() {
        ConfigDef configDef = new ConfigDef() { // from class: com.mongodb.kafka.connect.source.MongoSourceConfig.1
            @Override // org.apache.kafka.common.config.ConfigDef
            public Map<String, ConfigValue> validateAll(Map<String, String> map) {
                Map<String, ConfigValue> validateAll = super.validateAll(map);
                if (validateAll.values().stream().anyMatch(configValue -> {
                    return !configValue.errorMessages().isEmpty();
                })) {
                    return validateAll;
                }
                MongoSourceConfig mongoSourceConfig = new MongoSourceConfig(map, false);
                MongoSourceConfig.INITIALIZERS.forEach(consumer -> {
                    try {
                        consumer.accept(mongoSourceConfig);
                    } catch (ConnectConfigException e) {
                        validateAll.put(e.getName(), new ConfigValue(e.getName(), e.getValue(), Collections.emptyList(), Collections.singletonList(e.getMessage())));
                    }
                });
                return validateAll;
            }
        };
        int i = 0 + 1;
        configDef.define("connection.uri", ConfigDef.Type.STRING, CONNECTION_URI_DEFAULT, Validators.errorCheckingValueValidator("A valid connection string", ConnectionString::new), ConfigDef.Importance.HIGH, CONNECTION_URI_DOC, "ChangeStream", i, ConfigDef.Width.MEDIUM, CONNECTION_URI_DISPLAY);
        int i2 = i + 1;
        configDef.define(COPY_EXISTING_CONFIG, ConfigDef.Type.BOOLEAN, (Object) false, ConfigDef.Importance.MEDIUM, COPY_EXISTING_DOC, "ChangeStream", i2, ConfigDef.Width.MEDIUM, COPY_EXISTING_DISPLAY);
        int i3 = i2 + 1;
        configDef.define(COPY_EXISTING_MAX_THREADS_CONFIG, ConfigDef.Type.INT, Integer.valueOf(COPY_EXISTING_MAX_THREADS_DEFAULT), ConfigDef.Range.atLeast(1), ConfigDef.Importance.MEDIUM, COPY_EXISTING_MAX_THREADS_DOC, "ChangeStream", i3, ConfigDef.Width.MEDIUM, COPY_EXISTING_MAX_THREADS_DISPLAY);
        int i4 = i3 + 1;
        configDef.define(COPY_EXISTING_QUEUE_SIZE_CONFIG, ConfigDef.Type.INT, Integer.valueOf(COPY_EXISTING_QUEUE_SIZE_DEFAULT), ConfigDef.Range.atLeast(1), ConfigDef.Importance.MEDIUM, COPY_EXISTING_QUEUE_SIZE_DOC, "ChangeStream", i4, ConfigDef.Width.MEDIUM, COPY_EXISTING_QUEUE_SIZE_DISPLAY);
        int i5 = i4 + 1;
        configDef.define("database", ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, DATABASE_DOC, "ChangeStream", i5, ConfigDef.Width.MEDIUM, DATABASE_DISPLAY);
        int i6 = i5 + 1;
        configDef.define("collection", ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, COLLECTION_DOC, "ChangeStream", i6, ConfigDef.Width.MEDIUM, COLLECTION_DISPLAY);
        int i7 = i6 + 1;
        configDef.define(PIPELINE_CONFIG, ConfigDef.Type.STRING, PIPELINE_DEFAULT, Validators.errorCheckingValueValidator("A valid JSON array", ConfigHelper::jsonArrayFromString), ConfigDef.Importance.MEDIUM, PIPELINE_DOC, "ChangeStream", i7, ConfigDef.Width.MEDIUM, PIPELINE_DISPLAY);
        int i8 = i7 + 1;
        configDef.define("batch.size", ConfigDef.Type.INT, (Object) 0, (ConfigDef.Validator) ConfigDef.Range.atLeast(0), ConfigDef.Importance.MEDIUM, BATCH_SIZE_DOC, "ChangeStream", i8, ConfigDef.Width.MEDIUM, BATCH_SIZE_DISPLAY);
        int i9 = i8 + 1;
        configDef.define(PUBLISH_FULL_DOCUMENT_ONLY_CONFIG, ConfigDef.Type.BOOLEAN, (Object) false, ConfigDef.Importance.HIGH, PUBLISH_FULL_DOCUMENT_ONLY_DOC, "ChangeStream", i9, ConfigDef.Width.MEDIUM, PUBLISH_FULL_DOCUMENT_ONLY_DISPLAY);
        int i10 = i9 + 1;
        configDef.define(FULL_DOCUMENT_CONFIG, ConfigDef.Type.STRING, "", Validators.emptyString().or(Validators.EnumValidatorAndRecommender.in(FullDocument.values(), (v0) -> {
            return v0.getValue();
        })), ConfigDef.Importance.HIGH, FULL_DOCUMENT_DOC, "ChangeStream", i10, ConfigDef.Width.MEDIUM, FULL_DOCUMENT_DISPLAY, Validators.EnumValidatorAndRecommender.in(FullDocument.values(), (v0) -> {
            return v0.getValue();
        }));
        int i11 = i10 + 1;
        configDef.define(COLLATION_CONFIG, ConfigDef.Type.STRING, "", Validators.errorCheckingValueValidator("A valid JSON document representing a collation", ConfigHelper::collationFromJson), ConfigDef.Importance.HIGH, COLLATION_DOC, "ChangeStream", i11, ConfigDef.Width.MEDIUM, COLLATION_DISPLAY);
        int i12 = i11 + 1;
        configDef.define(TOPIC_PREFIX_CONFIG, ConfigDef.Type.STRING, "", (ConfigDef.Validator) null, ConfigDef.Importance.LOW, TOPIC_PREFIX_DOC, "ChangeStream", i12, ConfigDef.Width.MEDIUM, TOPIC_PREFIX_DISPLAY);
        int i13 = i12 + 1;
        configDef.define(POLL_MAX_BATCH_SIZE_CONFIG, ConfigDef.Type.INT, (Object) 1000, (ConfigDef.Validator) ConfigDef.Range.atLeast(1), ConfigDef.Importance.LOW, POLL_MAX_BATCH_SIZE_DOC, "ChangeStream", i13, ConfigDef.Width.MEDIUM, POLL_MAX_BATCH_SIZE_DISPLAY);
        configDef.define(POLL_AWAIT_TIME_MS_CONFIG, ConfigDef.Type.LONG, (Object) 5000, (ConfigDef.Validator) ConfigDef.Range.atLeast(1), ConfigDef.Importance.LOW, POLL_AWAIT_TIME_MS_DOC, "ChangeStream", i13 + 1, ConfigDef.Width.MEDIUM, POLL_AWAIT_TIME_MS_DISPLAY);
        return configDef;
    }
}
