package io.confluent.connect.elasticsearch;

import io.confluent.connect.elasticsearch.DataConverter;
import io.confluent.connect.elasticsearch.bulk.BulkProcessor;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/elasticsearch/ElasticsearchWriter.class */
public class ElasticsearchWriter {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ElasticsearchWriter.class);
    private final ElasticsearchClient client;
    private final String type;
    private final boolean ignoreKey;
    private final Set<String> ignoreKeyTopics;
    private final boolean ignoreSchema;
    private final Set<String> ignoreSchemaTopics;

    @Deprecated
    private final Map<String, String> topicToIndexMap;
    private final long flushTimeoutMs;
    private final BulkProcessor<IndexableRecord, ?> bulkProcessor;
    private final boolean dropInvalidMessage;
    private final DataConverter.BehaviorOnNullValues behaviorOnNullValues;
    private final DataConverter converter;
    private final Set<String> existingMappings = new HashSet();
    private final BulkProcessor.BehaviorOnMalformedDoc behaviorOnMalformedDoc;

    /* loaded from: input_file:io/confluent/connect/elasticsearch/ElasticsearchWriter$Builder.class */
    public static class Builder {
        private final ElasticsearchClient client;
        private String type;
        private long flushTimeoutMs;
        private int maxBufferedRecords;
        private int maxInFlightRequests;
        private int batchSize;
        private long lingerMs;
        private int maxRetry;
        private long retryBackoffMs;
        private boolean dropInvalidMessage;
        private BulkProcessor.BehaviorOnMalformedDoc behaviorOnMalformedDoc;
        private boolean useCompactMapEntries = true;
        private boolean ignoreKey = false;
        private Set<String> ignoreKeyTopics = Collections.emptySet();
        private boolean ignoreSchema = false;
        private Set<String> ignoreSchemaTopics = Collections.emptySet();
        private Map<String, String> topicToIndexMap = new HashMap();
        private DataConverter.BehaviorOnNullValues behaviorOnNullValues = DataConverter.BehaviorOnNullValues.DEFAULT;

        public Builder(ElasticsearchClient elasticsearchClient) {
            this.client = elasticsearchClient;
        }

        public Builder setType(String str) {
            this.type = str;
            return this;
        }

        public Builder setIgnoreKey(boolean z, Set<String> set) {
            this.ignoreKey = z;
            this.ignoreKeyTopics = set;
            return this;
        }

        public Builder setIgnoreSchema(boolean z, Set<String> set) {
            this.ignoreSchema = z;
            this.ignoreSchemaTopics = set;
            return this;
        }

        public Builder setCompactMapEntries(boolean z) {
            this.useCompactMapEntries = z;
            return this;
        }

        public Builder setTopicToIndexMap(Map<String, String> map) {
            this.topicToIndexMap = map;
            return this;
        }

        public Builder setFlushTimoutMs(long j) {
            this.flushTimeoutMs = j;
            return this;
        }

        public Builder setMaxBufferedRecords(int i) {
            this.maxBufferedRecords = i;
            return this;
        }

        public Builder setMaxInFlightRequests(int i) {
            this.maxInFlightRequests = i;
            return this;
        }

        public Builder setBatchSize(int i) {
            this.batchSize = i;
            return this;
        }

        public Builder setLingerMs(long j) {
            this.lingerMs = j;
            return this;
        }

        public Builder setMaxRetry(int i) {
            this.maxRetry = i;
            return this;
        }

        public Builder setRetryBackoffMs(long j) {
            this.retryBackoffMs = j;
            return this;
        }

        public Builder setDropInvalidMessage(boolean z) {
            this.dropInvalidMessage = z;
            return this;
        }

        public Builder setBehaviorOnNullValues(DataConverter.BehaviorOnNullValues behaviorOnNullValues) {
            this.behaviorOnNullValues = (DataConverter.BehaviorOnNullValues) Objects.requireNonNull(behaviorOnNullValues, "behaviorOnNullValues cannot be null");
            return this;
        }

        public Builder setBehaviorOnMalformedDoc(BulkProcessor.BehaviorOnMalformedDoc behaviorOnMalformedDoc) {
            this.behaviorOnMalformedDoc = behaviorOnMalformedDoc;
            return this;
        }

        public ElasticsearchWriter build() {
            return new ElasticsearchWriter(this.client, this.type, this.useCompactMapEntries, this.ignoreKey, this.ignoreKeyTopics, this.ignoreSchema, this.ignoreSchemaTopics, this.topicToIndexMap, this.flushTimeoutMs, this.maxBufferedRecords, this.maxInFlightRequests, this.batchSize, this.lingerMs, this.maxRetry, this.retryBackoffMs, this.dropInvalidMessage, this.behaviorOnNullValues, this.behaviorOnMalformedDoc);
        }
    }

    ElasticsearchWriter(ElasticsearchClient elasticsearchClient, String str, boolean z, boolean z2, Set<String> set, boolean z3, Set<String> set2, Map<String, String> map, long j, int i, int i2, int i3, long j2, int i4, long j3, boolean z4, DataConverter.BehaviorOnNullValues behaviorOnNullValues, BulkProcessor.BehaviorOnMalformedDoc behaviorOnMalformedDoc) {
        this.client = elasticsearchClient;
        this.type = str;
        this.ignoreKey = z2;
        this.ignoreKeyTopics = set;
        this.ignoreSchema = z3;
        this.ignoreSchemaTopics = set2;
        this.topicToIndexMap = map;
        this.flushTimeoutMs = j;
        this.dropInvalidMessage = z4;
        this.behaviorOnNullValues = behaviorOnNullValues;
        this.converter = new DataConverter(z, behaviorOnNullValues);
        this.behaviorOnMalformedDoc = behaviorOnMalformedDoc;
        this.bulkProcessor = new BulkProcessor<>(new SystemTime(), new BulkIndexingClient(elasticsearchClient), i, i2, i3, j2, i4, j3, behaviorOnMalformedDoc);
    }

    public void write(Collection<SinkRecord> collection) {
        for (SinkRecord sinkRecord : collection) {
            if (ignoreRecord(sinkRecord)) {
                log.trace("Ignoring sink record with null value for topic/partition/offset {}/{}/{}", sinkRecord.topic(), sinkRecord.kafkaPartition(), Long.valueOf(sinkRecord.kafkaOffset()));
            } else {
                log.trace("Writing record to Elasticsearch: topic/partition/offset {}/{}/{}", sinkRecord.topic(), sinkRecord.kafkaPartition(), Long.valueOf(sinkRecord.kafkaOffset()));
                String convertTopicToIndexName = convertTopicToIndexName(sinkRecord.topic());
                boolean z = this.ignoreKeyTopics.contains(sinkRecord.topic()) || this.ignoreKey;
                boolean z2 = this.ignoreSchemaTopics.contains(sinkRecord.topic()) || this.ignoreSchema;
                this.client.createIndices(Collections.singleton(convertTopicToIndexName));
                if (!z2 && !this.existingMappings.contains(convertTopicToIndexName)) {
                    try {
                        if (Mapping.getMapping(this.client, convertTopicToIndexName, this.type) == null) {
                            Mapping.createMapping(this.client, convertTopicToIndexName, this.type, sinkRecord.valueSchema());
                        }
                        log.debug("Locally caching mapping for index '{}'", convertTopicToIndexName);
                        this.existingMappings.add(convertTopicToIndexName);
                    } catch (IOException e) {
                        throw new ConnectException("Failed to initialize mapping for index: " + convertTopicToIndexName, e);
                    }
                }
                tryWriteRecord(sinkRecord, convertTopicToIndexName, z, z2);
            }
        }
    }

    private boolean ignoreRecord(SinkRecord sinkRecord) {
        return sinkRecord.value() == null && this.behaviorOnNullValues == DataConverter.BehaviorOnNullValues.IGNORE;
    }

    private void tryWriteRecord(SinkRecord sinkRecord, String str, boolean z, boolean z2) {
        try {
            IndexableRecord convertRecord = this.converter.convertRecord(sinkRecord, str, this.type, z, z2);
            if (convertRecord != null) {
                log.trace("Adding record from topic/partition/offset {}/{}/{} to bulk processor", sinkRecord.topic(), sinkRecord.kafkaPartition(), Long.valueOf(sinkRecord.kafkaOffset()));
                this.bulkProcessor.add(convertRecord, this.flushTimeoutMs);
            }
        } catch (ConnectException e) {
            if (!this.dropInvalidMessage) {
                throw e;
            }
            log.error("Can't convert record from topic/partition/offset {}/{}/{}. Error message: {}", sinkRecord.topic(), sinkRecord.kafkaPartition(), Long.valueOf(sinkRecord.kafkaOffset()), e.getMessage());
        }
    }

    private String convertTopicToIndexName(String str) {
        String str2 = this.topicToIndexMap.get(str);
        String lowerCase = str2 != null ? str2 : str.toLowerCase();
        log.trace("Topic '{}' was translated as index '{}'", str, lowerCase);
        return lowerCase;
    }

    public void flush() {
        this.bulkProcessor.flush(this.flushTimeoutMs);
    }

    public void start() {
        this.bulkProcessor.start();
    }

    public void stop() {
        try {
            log.debug("Flushing records, waiting up to {}ms ('{}')", Long.valueOf(this.flushTimeoutMs), ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG);
            this.bulkProcessor.flush(this.flushTimeoutMs);
        } catch (Exception e) {
            log.warn("Failed to flush during stop", (Throwable) e);
        }
        log.debug("Stopping Elastisearch writer");
        this.bulkProcessor.stop();
        log.debug("Waiting for bulk processor to stop, up to {}ms ('{}')", Long.valueOf(this.flushTimeoutMs), ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG);
        this.bulkProcessor.awaitStop(this.flushTimeoutMs);
        log.debug("Stopped Elastisearch writer");
    }

    public void createIndicesForTopics(Set<String> set) {
        Objects.requireNonNull(set);
        this.client.createIndices(indicesForTopics(set));
    }

    private Set<String> indicesForTopics(Set<String> set) {
        HashSet hashSet = new HashSet();
        Iterator<String> it = set.iterator();
        while (it.hasNext()) {
            hashSet.add(convertTopicToIndexName(it.next()));
        }
        return hashSet;
    }
}
