package org.apache.pulsar.io.elasticsearch;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Generated;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.elasticsearch.client.BulkProcessor;
import org.apache.pulsar.io.elasticsearch.client.RestClient;
import org.apache.pulsar.io.elasticsearch.client.RestClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/io/elasticsearch/ElasticSearchClient.class */
public class ElasticSearchClient implements AutoCloseable {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(ElasticSearchClient.class);
    static final String[] MALFORMED_ERRORS = {"mapper_parsing_exception", "action_request_validation_exception", "illegal_argument_exception"};
    private ElasticSearchConfig config;
    private RestClient client;
    private final RandomExponentialRetry backoffRetry;
    final Set<String> indexCache = new HashSet();
    final Map<String, String> topicToIndexCache = new HashMap();
    final AtomicReference<State> state = new AtomicReference<>(State.Open);
    private final IndexNameFormatter indexNameFormatter;
    final SinkContext sinkContext;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/io/elasticsearch/ElasticSearchClient$State.class */
    public enum State {
        Open,
        Failed,
        Closed
    }

    public ElasticSearchClient(ElasticSearchConfig elasticSearchConfig, SinkContext sinkContext) {
        this.config = elasticSearchConfig;
        this.sinkContext = sinkContext;
        if (this.config.getIndexName() != null) {
            this.indexNameFormatter = new IndexNameFormatter(this.config.getIndexName());
        } else {
            this.indexNameFormatter = null;
        }
        BulkProcessor.Listener listener = new BulkProcessor.Listener() { // from class: org.apache.pulsar.io.elasticsearch.ElasticSearchClient.1
            @Override // org.apache.pulsar.io.elasticsearch.client.BulkProcessor.Listener
            public void afterBulk(long j, List<BulkProcessor.BulkOperationRequest> list, List<BulkProcessor.BulkOperationResult> list2) {
                if (ElasticSearchClient.log.isTraceEnabled()) {
                    ElasticSearchClient.log.trace("Bulk request id={} size={}:", Long.valueOf(j), Integer.valueOf(list.size()));
                }
                int i = 0;
                for (BulkProcessor.BulkOperationResult bulkOperationResult : list2) {
                    int i2 = i;
                    i++;
                    Record<?> pulsarRecord = list.get(i2).getPulsarRecord();
                    if (bulkOperationResult.isError()) {
                        pulsarRecord.fail();
                        ElasticSearchClient.this.checkForIrrecoverableError(pulsarRecord, bulkOperationResult);
                    } else {
                        pulsarRecord.ack();
                    }
                }
            }

            @Override // org.apache.pulsar.io.elasticsearch.client.BulkProcessor.Listener
            public void afterBulk(long j, List<BulkProcessor.BulkOperationRequest> list, Throwable th) {
                ElasticSearchClient.log.warn("Bulk request id={} failed:", Long.valueOf(j), th);
                Iterator<BulkProcessor.BulkOperationRequest> it = list.iterator();
                while (it.hasNext()) {
                    it.next().getPulsarRecord().fail();
                }
            }
        };
        this.backoffRetry = new RandomExponentialRetry(elasticSearchConfig.getMaxRetryTimeInSec());
        this.client = (RestClient) retry(() -> {
            return RestClientFactory.createClient(this.config, listener);
        }, -1, "client creation");
        this.state.set(State.Open);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void failed(Exception exc) {
        if (this.state.compareAndSet(State.Open, State.Failed)) {
            this.sinkContext.fatal(exc);
        }
    }

    void checkForIrrecoverableError(Record<?> record, BulkProcessor.BulkOperationResult bulkOperationResult) {
        if (bulkOperationResult.isError()) {
            String error = bulkOperationResult.getError();
            boolean z = false;
            for (String str : MALFORMED_ERRORS) {
                if (error.contains(str)) {
                    z = true;
                    switch (this.config.getMalformedDocAction()) {
                        case WARN:
                            log.warn("Ignoring malformed document index={} id={}", new Object[]{bulkOperationResult.getIndex(), bulkOperationResult.getDocumentId(), str});
                            break;
                        case FAIL:
                            log.error("Failure due to the malformed document index={} id={}", new Object[]{bulkOperationResult.getIndex(), bulkOperationResult.getDocumentId(), str});
                            failed(new Exception(str));
                            break;
                    }
                }
            }
            if (z) {
                return;
            }
            log.warn("Bulk request failed, message id=[{}] index={} error={}", new Object[]{record.getMessage().map(message -> {
                return message.getMessageId().toString();
            }).orElse(""), bulkOperationResult.getIndex(), bulkOperationResult.getError()});
        }
    }

    public void bulkIndex(Record record, Pair<String, String> pair) throws Exception {
        try {
            checkState();
            checkIndexExists(record);
            String indexName = indexName(record);
            String left = pair.getLeft();
            this.client.getBulkProcessor().appendIndexRequest(BulkProcessor.BulkIndexRequest.builder().index(indexName).documentId(left).documentSource(pair.getRight()).record(record).build());
        } catch (Exception e) {
            log.debug("index failed id=" + pair.getLeft(), e);
            record.fail();
            throw e;
        }
    }

    public boolean indexDocument(Record<GenericObject> record, Pair<String, String> pair) throws Exception {
        try {
            checkState();
            checkIndexExists(record);
            boolean indexDocument = this.client.indexDocument(indexName(record), pair.getLeft(), pair.getRight());
            if (indexDocument) {
                record.ack();
            } else {
                record.fail();
            }
            return indexDocument;
        } catch (Exception e) {
            log.error("index failed id=" + pair.getLeft(), e);
            record.fail();
            throw e;
        }
    }

    public void bulkDelete(Record<GenericObject> record, String str) throws Exception {
        try {
            checkState();
            checkIndexExists(record);
            this.client.getBulkProcessor().appendDeleteRequest(BulkProcessor.BulkDeleteRequest.builder().index(indexName(record)).documentId(str).record(record).build());
        } catch (Exception e) {
            log.debug("delete failed id: {}", str, e);
            record.fail();
            throw e;
        }
    }

    public boolean deleteDocument(Record<GenericObject> record, String str) throws Exception {
        try {
            checkState();
            checkIndexExists(record);
            boolean deleteDocument = this.client.deleteDocument(indexName(record), str);
            if (deleteDocument) {
                record.ack();
            } else {
                record.fail();
            }
            return deleteDocument;
        } catch (Exception e) {
            log.debug("index failed id: {}", str, e);
            record.fail();
            throw e;
        }
    }

    public void flush() {
        this.client.getBulkProcessor().flush();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.client != null) {
            this.client.close();
            this.client = null;
        }
        this.state.compareAndSet(State.Open, State.Closed);
    }

    @VisibleForTesting
    void setClient(RestClient restClient) {
        this.client = restClient;
    }

    private void checkState() {
        if (this.state.get() != State.Open) {
            throw new IllegalStateException(String.format("Elasticsearch client is in %s state", this.state.get().name()));
        }
    }

    private void checkIndexExists(Record<GenericObject> record) throws IOException {
        if (this.config.isCreateIndexIfNeeded()) {
            String indexName = indexName(record);
            if (this.indexCache.contains(indexName)) {
                return;
            }
            synchronized (this) {
                if (!this.indexCache.contains(indexName)) {
                    createIndexIfNeeded(indexName);
                    this.indexCache.add(indexName);
                }
            }
        }
    }

    String indexName(Record<GenericObject> record) throws IOException {
        if (this.indexNameFormatter != null) {
            return this.indexNameFormatter.indexName(record);
        }
        if (record.getTopicName().isPresent()) {
            return topicToIndexName((String) record.getTopicName().get());
        }
        throw new IOException("Elasticsearch index name configuration and topic name are empty");
    }

    @VisibleForTesting
    public String topicToIndexName(String str) {
        return this.topicToIndexCache.computeIfAbsent(str, str2 -> {
            String lowerCase = str.toLowerCase(Locale.ROOT);
            String[] split = lowerCase.split("/");
            if (split.length > 1) {
                lowerCase = split[split.length - 1];
            }
            while (lowerCase.getBytes(StandardCharsets.UTF_8).length > 255) {
                lowerCase = lowerCase.substring(0, lowerCase.length() - 1);
            }
            if (lowerCase.length() <= 0 || !lowerCase.matches("[a-zA-Z\\.0-9][a-zA-Z_\\.\\-\\+0-9]*")) {
                throw new RuntimeException(new IOException("Cannot convert the topic name='" + str + "' to a valid elasticsearch index name"));
            }
            if (log.isDebugEnabled()) {
                log.debug("Translate topic={} to index={}", str2, lowerCase);
            }
            return lowerCase;
        });
    }

    @VisibleForTesting
    public boolean createIndexIfNeeded(String str) {
        if (indexExists(str)) {
            return false;
        }
        return ((Boolean) retry(() -> {
            return Boolean.valueOf(this.client.createIndex(str));
        }, "create index")).booleanValue();
    }

    public boolean indexExists(String str) {
        return ((Boolean) retry(() -> {
            return Boolean.valueOf(this.client.indexExists(str));
        }, "index exists")).booleanValue();
    }

    private <T> T retry(Callable<T> callable, String str) {
        return (T) retry(callable, this.config.getMaxRetries(), str);
    }

    private <T> T retry(Callable<T> callable, int i, String str) {
        try {
            return (T) this.backoffRetry.retry(callable, i, this.config.getRetryBackoffInMs(), str);
        } catch (Exception e) {
            log.error("error in command {} wth retry", str, e);
            throw new ElasticSearchConnectionException(str + " failed", e);
        }
    }

    RestClient getRestClient() {
        return this.client;
    }
}
