package org.elasticsearch.river.couchdb;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.Base64;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.river.AbstractRiverComponent;
import org.elasticsearch.river.River;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverSettings;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;

/* loaded from: input_file:org/elasticsearch/river/couchdb/CouchdbRiver.class */
public class CouchdbRiver extends AbstractRiverComponent implements River {
    private final Client client;
    private final String riverIndexName;
    private final String couchProtocol;
    private final String couchHost;
    private final int couchPort;
    private final String couchDb;
    private final String couchFilter;
    private final String couchFilterParamsUrl;
    private final String basicAuth;
    private final boolean noVerify;
    private final boolean couchIgnoreAttachments;
    private final String indexName;
    private final String typeName;
    private final int bulkSize;
    private final TimeValue bulkTimeout;
    private final int throttleSize;
    private final ExecutableScript script;
    private volatile Thread slurperThread;
    private volatile Thread indexerThread;
    private volatile boolean closed;
    private final BlockingQueue<String> stream;

    /* loaded from: input_file:org/elasticsearch/river/couchdb/CouchdbRiver$Indexer.class */
    private class Indexer implements Runnable {
        private Indexer() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!CouchdbRiver.this.closed) {
                try {
                    String str = (String) CouchdbRiver.this.stream.take();
                    BulkRequestBuilder prepareBulk = CouchdbRiver.this.client.prepareBulk();
                    Object obj = null;
                    Object processLine = CouchdbRiver.this.processLine(str, prepareBulk);
                    if (processLine != null) {
                        obj = processLine;
                    }
                    do {
                        try {
                            String str2 = (String) CouchdbRiver.this.stream.poll(CouchdbRiver.this.bulkTimeout.millis(), TimeUnit.MILLISECONDS);
                            if (str2 == null) {
                                break;
                            }
                            Object processLine2 = CouchdbRiver.this.processLine(str2, prepareBulk);
                            if (processLine2 != null) {
                                obj = processLine2;
                            }
                        } catch (InterruptedException e) {
                            if (CouchdbRiver.this.closed) {
                                return;
                            }
                        }
                    } while (prepareBulk.numberOfActions() < CouchdbRiver.this.bulkSize);
                    if (obj != null) {
                        try {
                            String str3 = null;
                            if (obj instanceof List) {
                                try {
                                    XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
                                    jsonBuilder.startArray();
                                    Iterator it = ((List) obj).iterator();
                                    while (it.hasNext()) {
                                        jsonBuilder.value(it.next());
                                    }
                                    jsonBuilder.endArray();
                                    str3 = jsonBuilder.string();
                                } catch (Exception e2) {
                                    CouchdbRiver.this.logger.error("failed to convert last_seq to a json string", e2, new Object[0]);
                                }
                            } else {
                                str3 = obj.toString();
                            }
                            if (CouchdbRiver.this.logger.isTraceEnabled()) {
                                CouchdbRiver.this.logger.trace("processing [_seq  ]: [{}]/[{}]/[{}], last_seq [{}]", new Object[]{CouchdbRiver.this.riverIndexName, CouchdbRiver.this.riverName.name(), "_seq", str3});
                            }
                            prepareBulk.add(Requests.indexRequest(CouchdbRiver.this.riverIndexName).type(CouchdbRiver.this.riverName.name()).id("_seq").source(XContentFactory.jsonBuilder().startObject().startObject("couchdb").field("last_seq", str3).endObject().endObject()));
                        } catch (IOException e3) {
                            CouchdbRiver.this.logger.warn("failed to add last_seq entry to bulk indexing", new Object[0]);
                        }
                    }
                    try {
                        BulkResponse bulkResponse = (BulkResponse) prepareBulk.execute().actionGet();
                        if (bulkResponse.hasFailures()) {
                            CouchdbRiver.this.logger.warn("failed to execute" + bulkResponse.buildFailureMessage(), new Object[0]);
                        }
                    } catch (Exception e4) {
                        CouchdbRiver.this.logger.warn("failed to execute bulk", e4, new Object[0]);
                    }
                } catch (InterruptedException e5) {
                    if (CouchdbRiver.this.closed) {
                        return;
                    }
                }
            }
        }
    }

    /* loaded from: input_file:org/elasticsearch/river/couchdb/CouchdbRiver$Slurper.class */
    private class Slurper implements Runnable {
        private Slurper() {
        }

        /* JADX WARN: Code restructure failed: missing block: B:73:0x02b6, code lost:
        
            org.elasticsearch.common.io.Closeables.close(r0, true);
         */
        @Override // java.lang.Runnable
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void run() {
            /*
                Method dump skipped, instructions count: 989
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.elasticsearch.river.couchdb.CouchdbRiver.Slurper.run():void");
        }
    }

    @Inject
    public CouchdbRiver(RiverName riverName, RiverSettings riverSettings, @RiverIndexName String str, Client client, ScriptService scriptService) {
        super(riverName, riverSettings);
        this.riverIndexName = str;
        this.client = client;
        if (riverSettings.settings().containsKey("couchdb")) {
            Map map = (Map) riverSettings.settings().get("couchdb");
            this.couchProtocol = XContentMapValues.nodeStringValue(map.get("protocol"), "http");
            this.noVerify = XContentMapValues.nodeBooleanValue(map.get("no_verify"), false);
            this.couchHost = XContentMapValues.nodeStringValue(map.get("host"), "localhost");
            this.couchPort = XContentMapValues.nodeIntegerValue(map.get("port"), 5984);
            this.couchDb = XContentMapValues.nodeStringValue(map.get("db"), riverName.name());
            this.couchFilter = XContentMapValues.nodeStringValue(map.get("filter"), (String) null);
            if (map.containsKey("filter_params")) {
                Map map2 = (Map) map.get("filter_params");
                StringBuilder sb = new StringBuilder();
                for (Map.Entry entry : map2.entrySet()) {
                    try {
                        sb.append("&").append(URLEncoder.encode((String) entry.getKey(), "UTF-8")).append("=").append(URLEncoder.encode(entry.getValue().toString(), "UTF-8"));
                    } catch (UnsupportedEncodingException e) {
                    }
                }
                this.couchFilterParamsUrl = sb.toString();
            } else {
                this.couchFilterParamsUrl = null;
            }
            this.couchIgnoreAttachments = XContentMapValues.nodeBooleanValue(map.get("ignore_attachments"), false);
            if (map.containsKey("user") && map.containsKey("password")) {
                this.basicAuth = "Basic " + Base64.encodeBytes((map.get("user").toString() + ":" + map.get("password").toString()).getBytes());
            } else {
                this.basicAuth = null;
            }
            if (map.containsKey("script")) {
                this.script = scriptService.executable(map.containsKey("scriptType") ? map.get("scriptType").toString() : "js", map.get("script").toString(), Maps.newHashMap());
            } else {
                this.script = null;
            }
        } else {
            this.couchProtocol = "http";
            this.couchHost = "localhost";
            this.couchPort = 5984;
            this.couchDb = "db";
            this.couchFilter = null;
            this.couchFilterParamsUrl = null;
            this.couchIgnoreAttachments = false;
            this.noVerify = false;
            this.basicAuth = null;
            this.script = null;
        }
        if (riverSettings.settings().containsKey("index")) {
            Map map3 = (Map) riverSettings.settings().get("index");
            this.indexName = XContentMapValues.nodeStringValue(map3.get("index"), this.couchDb);
            this.typeName = XContentMapValues.nodeStringValue(map3.get("type"), this.couchDb);
            this.bulkSize = XContentMapValues.nodeIntegerValue(map3.get("bulk_size"), 100);
            if (map3.containsKey("bulk_timeout")) {
                this.bulkTimeout = TimeValue.parseTimeValue(XContentMapValues.nodeStringValue(map3.get("bulk_timeout"), "10ms"), TimeValue.timeValueMillis(10L));
            } else {
                this.bulkTimeout = TimeValue.timeValueMillis(10L);
            }
            this.throttleSize = XContentMapValues.nodeIntegerValue(map3.get("throttle_size"), this.bulkSize * 5);
        } else {
            this.indexName = this.couchDb;
            this.typeName = this.couchDb;
            this.bulkSize = 100;
            this.bulkTimeout = TimeValue.timeValueMillis(10L);
            this.throttleSize = this.bulkSize * 5;
        }
        if (this.throttleSize == -1) {
            this.stream = new LinkedTransferQueue();
        } else {
            this.stream = new ArrayBlockingQueue(this.throttleSize);
        }
    }

    public void start() {
        this.logger.info("starting couchdb stream: host [{}], port [{}], filter [{}], db [{}], indexing to [{}]/[{}]", new Object[]{this.couchHost, Integer.valueOf(this.couchPort), this.couchFilter, this.couchDb, this.indexName, this.typeName});
        try {
            this.client.admin().indices().prepareCreate(this.indexName).execute().actionGet();
        } catch (Exception e) {
            if (!(ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) && !(ExceptionsHelper.unwrapCause(e) instanceof ClusterBlockException)) {
                this.logger.warn("failed to create index [{}], disabling river...", e, new Object[]{this.indexName});
                return;
            }
        }
        this.slurperThread = EsExecutors.daemonThreadFactory(this.settings.globalSettings(), "couchdb_river_slurper").newThread(new Slurper());
        this.indexerThread = EsExecutors.daemonThreadFactory(this.settings.globalSettings(), "couchdb_river_indexer").newThread(new Indexer());
        this.indexerThread.start();
        this.slurperThread.start();
    }

    public void close() {
        if (this.closed) {
            return;
        }
        this.logger.info("closing couchdb stream river", new Object[0]);
        this.slurperThread.interrupt();
        this.indexerThread.interrupt();
        this.closed = true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Object processLine(String str, BulkRequestBuilder bulkRequestBuilder) {
        try {
            Map<String, Object> mapAndClose = XContentFactory.xContent(XContentType.JSON).createParser(str).mapAndClose();
            if (mapAndClose.containsKey("error")) {
                this.logger.warn("received error {}", new Object[]{str});
                return null;
            }
            Object obj = mapAndClose.get("seq");
            String obj2 = mapAndClose.get("id").toString();
            if (obj2.startsWith("_design/")) {
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("ignoring design document {}", new Object[]{obj2});
                }
                return obj;
            }
            if (this.script != null) {
                this.script.setNextVar("ctx", mapAndClose);
                try {
                    this.script.run();
                    mapAndClose = (Map) this.script.unwrap(mapAndClose);
                } catch (Exception e) {
                    this.logger.warn("failed to script process {}, ignoring", e, new Object[]{mapAndClose});
                    return obj;
                }
            }
            if (!mapAndClose.containsKey("ignore") || !mapAndClose.get("ignore").equals(Boolean.TRUE)) {
                if (mapAndClose.containsKey("deleted") && mapAndClose.get("deleted").equals(Boolean.TRUE)) {
                    String extractIndex = extractIndex(mapAndClose);
                    String extractType = extractType(mapAndClose);
                    if (this.logger.isTraceEnabled()) {
                        this.logger.trace("processing [delete]: [{}]/[{}]/[{}]", new Object[]{extractIndex, extractType, obj2});
                    }
                    bulkRequestBuilder.add(Requests.deleteRequest(extractIndex).type(extractType).id(obj2).routing(extractRouting(mapAndClose)).parent(extractParent(mapAndClose)));
                } else if (mapAndClose.containsKey("doc")) {
                    String extractIndex2 = extractIndex(mapAndClose);
                    String extractType2 = extractType(mapAndClose);
                    Map map = (Map) mapAndClose.get("doc");
                    if (this.couchIgnoreAttachments) {
                        map.remove("_attachments");
                    }
                    if (this.logger.isTraceEnabled()) {
                        this.logger.trace("processing [index ]: [{}]/[{}]/[{}], source {}", new Object[]{extractIndex2, extractType2, obj2, map});
                    }
                    bulkRequestBuilder.add(Requests.indexRequest(extractIndex2).type(extractType2).id(obj2).source(map).routing(extractRouting(mapAndClose)).parent(extractParent(mapAndClose)));
                } else {
                    this.logger.warn("ignoring unknown change {}", new Object[]{str});
                }
            }
            return obj;
        } catch (IOException e2) {
            this.logger.warn("failed to parse {}", e2, new Object[]{str});
            return null;
        }
    }

    private String extractParent(Map<String, Object> map) {
        return (String) map.get("_parent");
    }

    private String extractRouting(Map<String, Object> map) {
        return (String) map.get("_routing");
    }

    private String extractType(Map<String, Object> map) {
        String str = (String) map.get("_type");
        if (str == null) {
            str = this.typeName;
        }
        return str;
    }

    private String extractIndex(Map<String, Object> map) {
        String str = (String) map.get("_index");
        if (str == null) {
            str = this.indexName;
        }
        return str;
    }
}
