package org.apache.pulsar.io.elasticsearch;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;

@Connector(name = "elastic_search", type = IOType.SINK, help = "A sink connector that sends pulsar messages to elastic search", configClass = ElasticSearchConfig.class)
/* loaded from: input_file:org/apache/pulsar/io/elasticsearch/ElasticSearchSink.class */
public class ElasticSearchSink implements Sink<byte[]> {
    protected static final String DOCUMENT = "doc";
    private URL url;
    private RestHighLevelClient client;
    private CredentialsProvider credentialsProvider;
    private ElasticSearchConfig elasticSearchConfig;

    @Override // org.apache.pulsar.io.core.Sink
    public void open(Map<String, Object> map, SinkContext sinkContext) throws Exception {
        this.elasticSearchConfig = ElasticSearchConfig.load(map);
        this.elasticSearchConfig.validate();
        createIndexIfNeeded();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.client.close();
    }

    @Override // org.apache.pulsar.io.core.Sink
    public void write(Record<byte[]> record) {
        KeyValue<String, byte[]> extractKeyValue = extractKeyValue(record);
        IndexRequest indexRequest = Requests.indexRequest(this.elasticSearchConfig.getIndexName());
        indexRequest.type("doc");
        indexRequest.source(extractKeyValue.getValue(), XContentType.JSON);
        try {
            if (getClient().index(indexRequest, new Header[0]).getResult().equals(DocWriteResponse.Result.CREATED)) {
                record.ack();
            } else {
                record.fail();
            }
        } catch (IOException e) {
            record.fail();
        }
    }

    public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
        return new KeyValue<>(record.getKey().orElseGet(null), record.getValue());
    }

    private void createIndexIfNeeded() throws IOException {
        GetIndexRequest getIndexRequest = new GetIndexRequest();
        getIndexRequest.indices(this.elasticSearchConfig.getIndexName());
        if (getClient().indices().exists(getIndexRequest, new Header[0])) {
            return;
        }
        CreateIndexRequest createIndexRequest = new CreateIndexRequest(this.elasticSearchConfig.getIndexName());
        createIndexRequest.settings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, this.elasticSearchConfig.getIndexNumberOfShards()).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, this.elasticSearchConfig.getIndexNumberOfReplicas()));
        CreateIndexResponse create = getClient().indices().create(createIndexRequest, new Header[0]);
        if (!create.isAcknowledged() || !create.isShardsAcknowledged()) {
            throw new RuntimeException("Unable to create index.");
        }
    }

    private URL getUrl() throws MalformedURLException {
        if (this.url == null) {
            this.url = new URL(this.elasticSearchConfig.getElasticSearchUrl());
        }
        return this.url;
    }

    private CredentialsProvider getCredentialsProvider() {
        if (StringUtils.isEmpty(this.elasticSearchConfig.getUsername()) || StringUtils.isEmpty(this.elasticSearchConfig.getPassword())) {
            return null;
        }
        this.credentialsProvider = new BasicCredentialsProvider();
        this.credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(this.elasticSearchConfig.getUsername(), this.elasticSearchConfig.getPassword()));
        return this.credentialsProvider;
    }

    private RestHighLevelClient getClient() throws MalformedURLException {
        if (this.client == null) {
            CredentialsProvider credentialsProvider = getCredentialsProvider();
            RestClientBuilder builder = RestClient.builder(new HttpHost(getUrl().getHost(), getUrl().getPort(), getUrl().getProtocol()));
            if (credentialsProvider != null) {
                builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
                    return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                });
            }
            this.client = new RestHighLevelClient(builder);
        }
        return this.client;
    }
}
