package com.amazonaws.services.kinesis.connectors.elasticsearch;

import com.amazonaws.services.kinesis.connectors.KinesisConnectorConfiguration;
import com.amazonaws.services.kinesis.connectors.UnmodifiableBuffer;
import com.amazonaws.services.kinesis.connectors.interfaces.IEmitter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;

/* loaded from: input_file:com/amazonaws/services/kinesis/connectors/elasticsearch/ElasticsearchEmitter.class */
public class ElasticsearchEmitter implements IEmitter<ElasticsearchObject> {
    private static final Log LOG = LogFactory.getLog(ElasticsearchEmitter.class);
    private static final String ELASTICSEARCH_CLUSTER_NAME_KEY = "cluster.name";
    private static final String ELASTICSEARCH_CLIENT_TRANSPORT_SNIFF_KEY = "client.transport.sniff";
    private static final String ELASTICSEARCH_CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME_KEY = "client.transport.ignore_cluster_name";
    private static final String ELASTICSEARCH_CLIENT_TRANSPORT_PING_TIMEOUT_KEY = "client.transport.ping_timeout";
    private static final String ELASTICSEARCH_CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL_KEY = "client.transport.nodes_sampler_interval";
    private final TransportClient elasticsearchClient;
    private final String elasticsearchEndpoint;
    private final int elasticsearchPort;
    private long BACKOFF_PERIOD = 10000;

    public ElasticsearchEmitter(KinesisConnectorConfiguration kinesisConnectorConfiguration) {
        Settings build = ImmutableSettings.settingsBuilder().put(ELASTICSEARCH_CLUSTER_NAME_KEY, kinesisConnectorConfiguration.ELASTICSEARCH_CLUSTER_NAME).put(ELASTICSEARCH_CLIENT_TRANSPORT_SNIFF_KEY, kinesisConnectorConfiguration.ELASTICSEARCH_TRANSPORT_SNIFF).put(ELASTICSEARCH_CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME_KEY, kinesisConnectorConfiguration.ELASTICSEARCH_IGNORE_CLUSTER_NAME).put(ELASTICSEARCH_CLIENT_TRANSPORT_PING_TIMEOUT_KEY, kinesisConnectorConfiguration.ELASTICSEARCH_PING_TIMEOUT).put(ELASTICSEARCH_CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL_KEY, kinesisConnectorConfiguration.ELASTICSEARCH_NODE_SAMPLER_INTERVAL).build();
        this.elasticsearchEndpoint = kinesisConnectorConfiguration.ELASTICSEARCH_ENDPOINT;
        this.elasticsearchPort = kinesisConnectorConfiguration.ELASTICSEARCH_PORT;
        LOG.info("ElasticsearchEmitter using elasticsearch endpoint " + this.elasticsearchEndpoint + ":" + this.elasticsearchPort);
        this.elasticsearchClient = new TransportClient(build);
        this.elasticsearchClient.addTransportAddress(new InetSocketTransportAddress(this.elasticsearchEndpoint, this.elasticsearchPort));
    }

    @Override // com.amazonaws.services.kinesis.connectors.interfaces.IEmitter
    public List<ElasticsearchObject> emit(UnmodifiableBuffer<ElasticsearchObject> unmodifiableBuffer) throws IOException {
        ArrayList arrayList;
        List<ElasticsearchObject> records = unmodifiableBuffer.getRecords();
        if (records.isEmpty()) {
            return Collections.emptyList();
        }
        BulkRequestBuilder prepareBulk = this.elasticsearchClient.prepareBulk();
        for (ElasticsearchObject elasticsearchObject : records) {
            IndexRequestBuilder prepareIndex = this.elasticsearchClient.prepareIndex(elasticsearchObject.getIndex(), elasticsearchObject.getType(), elasticsearchObject.getId());
            prepareIndex.setSource(elasticsearchObject.getSource());
            Long version = elasticsearchObject.getVersion();
            if (version != null) {
                prepareIndex.setVersion(version.longValue());
            }
            Long ttl = elasticsearchObject.getTtl();
            if (ttl != null) {
                prepareIndex.setTTL(ttl.longValue());
            }
            Boolean create = elasticsearchObject.getCreate();
            if (create != null) {
                prepareIndex.setCreate(create.booleanValue());
            }
            prepareBulk.add(prepareIndex);
        }
        while (true) {
            try {
                BulkItemResponse[] items = ((BulkResponse) prepareBulk.execute().actionGet()).getItems();
                arrayList = new ArrayList();
                int i = 0;
                for (int i2 = 0; i2 < items.length; i2++) {
                    if (items[i2].isFailed()) {
                        LOG.error("Record failed with message: " + items[i2].getFailureMessage());
                        BulkItemResponse.Failure failure = items[i2].getFailure();
                        if (failure.getMessage().contains("DocumentAlreadyExistsException") || failure.getMessage().contains("VersionConflictEngineException")) {
                            i++;
                        } else {
                            arrayList.add(records.get(i2));
                        }
                    }
                }
                LOG.info("Emitted " + ((records.size() - arrayList.size()) - i) + " records to Elasticsearch");
                if (!arrayList.isEmpty()) {
                    printClusterStatus();
                    LOG.warn("Returning " + arrayList.size() + " records as failed");
                    break;
                }
                break;
            } catch (NoNodeAvailableException e) {
                LOG.error("No nodes found at " + this.elasticsearchEndpoint + ":" + this.elasticsearchPort + ". Retrying in " + this.BACKOFF_PERIOD + " milliseconds", e);
                sleep(this.BACKOFF_PERIOD);
            } catch (Exception e2) {
                LOG.error("ElasticsearchEmitter threw an unexpected exception ", e2);
                sleep(this.BACKOFF_PERIOD);
            }
        }
        return arrayList;
    }

    @Override // com.amazonaws.services.kinesis.connectors.interfaces.IEmitter
    public void fail(List<ElasticsearchObject> list) {
        Iterator<ElasticsearchObject> it = list.iterator();
        while (it.hasNext()) {
            LOG.error("Record failed: " + it.next());
        }
    }

    @Override // com.amazonaws.services.kinesis.connectors.interfaces.IEmitter
    public void shutdown() {
        this.elasticsearchClient.close();
    }

    private void sleep(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
        }
    }

    private void printClusterStatus() {
        ClusterHealthResponse clusterHealthResponse = (ClusterHealthResponse) this.elasticsearchClient.admin().cluster().prepareHealth(new String[0]).execute().actionGet();
        if (clusterHealthResponse.getStatus().equals(ClusterHealthStatus.RED)) {
            LOG.error("Cluster health is RED. Indexing ability will be limited");
        } else if (clusterHealthResponse.getStatus().equals(ClusterHealthStatus.YELLOW)) {
            LOG.warn("Cluster health is YELLOW.");
        } else if (clusterHealthResponse.getStatus().equals(ClusterHealthStatus.GREEN)) {
            LOG.info("Cluster health is GREEN.");
        }
    }
}
