package com.predic8.membrane.core.exchangestore;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.predic8.membrane.annot.MCAttribute;
import com.predic8.membrane.annot.MCElement;
import com.predic8.membrane.core.exchange.AbstractExchange;
import com.predic8.membrane.core.exchange.Exchange;
import com.predic8.membrane.core.exchange.snapshots.AbstractExchangeSnapshot;
import com.predic8.membrane.core.exchange.snapshots.DynamicAbstractExchangeSnapshot;
import com.predic8.membrane.core.http.BodyCollectingMessageObserver;
import com.predic8.membrane.core.http.Request;
import com.predic8.membrane.core.interceptor.Interceptor;
import com.predic8.membrane.core.interceptor.acl.Hostname;
import com.predic8.membrane.core.rules.Rule;
import com.predic8.membrane.core.rules.RuleKey;
import com.predic8.membrane.core.rules.StatisticCollector;
import com.predic8.membrane.core.transport.http.HttpClient;
import groovyjarjarcommonscli.HelpFormatter;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.PropertyAccessor;

@MCElement(name = "elasticSearchExchangeStore")
/* loaded from: input_file:lib/service-proxy-core-4.8.2.jar:com/predic8/membrane/core/exchangestore/ElasticSearchExchangeStore.class */
public class ElasticSearchExchangeStore extends AbstractExchangeStore {
    HttpClient client;
    static Logger log = LoggerFactory.getLogger((Class<?>) ElasticSearchExchangeStore.class);
    Thread updateJob;
    ObjectMapper mapper;
    private String documentPrefix;
    private long startTime;
    int updateIntervalMs = 1000;
    Map<Long, AbstractExchangeSnapshot> shortTermMemoryForBatching = new HashMap();
    Cache<Long, AbstractExchangeSnapshot> cacheToWaitForElasticSearchIndex = CacheBuilder.newBuilder().expireAfterWrite(5, TimeUnit.SECONDS).build();
    String index = "membrane";
    String type = "exchanges";
    String location = "http://localhost:9200";
    boolean init = false;
    private int maxBodySize = 100000;
    private BodyCollectingMessageObserver.Strategy bodyExceedingMaxSizeStrategy = BodyCollectingMessageObserver.Strategy.TRUNCATE;

    @Override // com.predic8.membrane.core.exchangestore.AbstractExchangeStore, com.predic8.membrane.core.exchangestore.ExchangeStore
    public void init() {
        super.init();
        if (this.client == null) {
            this.client = new HttpClient();
        }
        if (this.mapper == null) {
            this.mapper = new ObjectMapper();
        }
        if (this.documentPrefix == null) {
            this.documentPrefix = getLocalHostname();
        }
        this.documentPrefix = this.documentPrefix.toLowerCase();
        this.startTime = System.nanoTime();
        this.updateJob = new Thread(() -> {
            List<AbstractExchangeSnapshot> list;
            while (true) {
                try {
                    synchronized (this.shortTermMemoryForBatching) {
                        list = (List) this.shortTermMemoryForBatching.values().stream().collect(Collectors.toList());
                        this.shortTermMemoryForBatching.values().stream().forEach(abstractExchangeSnapshot -> {
                            this.cacheToWaitForElasticSearchIndex.put(Long.valueOf(abstractExchangeSnapshot.getId()), abstractExchangeSnapshot);
                        });
                        this.shortTermMemoryForBatching.clear();
                    }
                    if (list.size() > 0) {
                        sendToElasticSearch(list);
                    } else {
                        Thread.sleep(this.updateIntervalMs);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    return;
                } catch (Exception e2) {
                    throw new RuntimeException(e2);
                }
            }
        });
        this.updateJob.start();
        this.init = true;
    }

    private void sendToElasticSearch(List<AbstractExchangeSnapshot> list) throws Exception {
        this.client.call(new Request.Builder().post(this.location + "/_bulk").header("Content-Type", "application/x-ndjson").body(((StringBuilder) list.stream().map(abstractExchangeSnapshot -> {
            return wrapForBulkOperationElasticSearch(this.index, this.type, getLocalMachineNameWithSuffix() + HelpFormatter.DEFAULT_OPT_PREFIX + abstractExchangeSnapshot.getId(), collectExchangeDataFrom(abstractExchangeSnapshot));
        }).collect(StringBuilder::new, (sb, str) -> {
            sb.append(str);
        }, (sb2, sb3) -> {
            sb2.append((CharSequence) sb3);
        })).toString()).buildExchange());
    }

    private static String getLocalHostname() {
        try {
            return InetAddress.getLocalHost().getHostName();
        } catch (UnknownHostException e) {
            try {
                return IOUtils.toString(Runtime.getRuntime().exec(Hostname.ELEMENT_NAME).getInputStream());
            } catch (IOException e2) {
                e2.printStackTrace();
                return "localhost";
            }
        }
    }

    private String getLocalMachineNameWithSuffix() {
        return this.documentPrefix + HelpFormatter.DEFAULT_OPT_PREFIX + this.startTime;
    }

    public String wrapForBulkOperationElasticSearch(String str, String str2, String str3, String str4) {
        return "{ \"index\" : { \"_index\" : \"" + str + "\", \"_type\" : \"" + str2 + "\", \"_id\" : \"" + str3 + "\" } }\n" + str4 + "\n";
    }

    @Override // com.predic8.membrane.core.exchangestore.ExchangeStore
    public void snap(AbstractExchange abstractExchange, Interceptor.Flow flow) {
        try {
            if (flow == Interceptor.Flow.REQUEST) {
                addForElasticSearch(new DynamicAbstractExchangeSnapshot(abstractExchange, flow, this::addForElasticSearch, this.bodyExceedingMaxSizeStrategy, this.maxBodySize));
            } else {
                AbstractExchangeSnapshot exchangeDtoById = getExchangeDtoById((int) abstractExchange.getId());
                DynamicAbstractExchangeSnapshot.addObservers(abstractExchange, exchangeDtoById, this::addForElasticSearch, flow);
                addForElasticSearch(exchangeDtoById.updateFrom(abstractExchange, flow));
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void addForElasticSearch(AbstractExchangeSnapshot abstractExchangeSnapshot) {
        synchronized (this.shortTermMemoryForBatching) {
            this.shortTermMemoryForBatching.put(Long.valueOf(abstractExchangeSnapshot.getId()), abstractExchangeSnapshot);
        }
    }

    private String collectExchangeDataFrom(AbstractExchangeSnapshot abstractExchangeSnapshot) {
        try {
            Map map = (Map) this.mapper.readValue(this.mapper.writeValueAsString(abstractExchangeSnapshot), Map.class);
            map.put("issuer", this.documentPrefix);
            return this.mapper.writeValueAsString(map);
        } catch (IOException e) {
            e.printStackTrace();
            return "";
        }
    }

    public AbstractExchangeSnapshot getExchangeDtoById(int i) {
        Long valueOf = Long.valueOf(i);
        return this.shortTermMemoryForBatching.get(valueOf) != null ? this.shortTermMemoryForBatching.get(valueOf) : this.cacheToWaitForElasticSearchIndex.getIfPresent(valueOf) != null ? (AbstractExchangeSnapshot) this.cacheToWaitForElasticSearchIndex.getIfPresent(valueOf) : getFromElasticSearchById(i);
    }

    private AbstractExchangeSnapshot getFromElasticSearchById(long j) {
        try {
            return (AbstractExchangeSnapshot) this.mapper.readValue(this.mapper.writeValueAsString(getSourceElementFromElasticSearchResponse(responseToMap(this.client.call(new Request.Builder().post(getElasticSearchExchangesPath() + "_search").body("{\n  \"query\": {\n    \"bool\": {\n      \"must\": [\n        {\n          \"wildcard\": {\n            \"issuer\": \"" + this.documentPrefix + "\"\n          }\n        },\n        {\n          \"match\": {\n            \"id\": \"" + j + "\"\n          }\n        }\n      ]\n    }\n  }\n}").header("Content-Type", "application/json").buildExchange()))).get(0)), AbstractExchangeSnapshot.class);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private String getElasticSearchExchangesPath() {
        return this.location + "/" + this.index + "/" + this.type + "/";
    }

    public List<Map> getSourceElementFromElasticSearchResponse(Map map) {
        return getSourceElementFromHitsElement(getHitsElementFromElasticSearchResponse(map));
    }

    public List getHitsElementFromElasticSearchResponse(Map map) {
        return (List) ((Map) map.get("hits")).get("hits");
    }

    public List<Map> getSourceElementFromHitsElement(List list) {
        return (List) list.stream().map(obj -> {
            return ((Map) obj).get("_source");
        }).collect(Collectors.toList());
    }

    @Override // com.predic8.membrane.core.exchangestore.AbstractExchangeStore, com.predic8.membrane.core.exchangestore.ExchangeStore
    public AbstractExchange getExchangeById(long j) {
        return getFromElasticSearchById(j).toAbstractExchange();
    }

    @Override // com.predic8.membrane.core.exchangestore.ExchangeStore
    public void remove(AbstractExchange abstractExchange) {
        try {
            removeFromElasticSearchById(abstractExchange.getId());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void removeFromElasticSearchById(long j) throws Exception {
        this.client.call(new Request.Builder().delete(getElasticSearchExchangesPath() + getLocalMachineNameWithSuffix() + HelpFormatter.DEFAULT_OPT_PREFIX + j).buildExchange());
    }

    @Override // com.predic8.membrane.core.exchangestore.ExchangeStore
    public void removeAllExchanges(Rule rule) {
        try {
            this.client.call(new Request.Builder().post(getElasticSearchExchangesPath() + "_delete_by_query").body("{\n  \"query\": {\n    \"bool\": {\n      \"must\": [\n        {\n          \"wildcard\": {\n            \"issuer\": \"" + this.documentPrefix + "\"\n          }\n        },\n        {\n          \"match\": {\n            \"rule.name\": \"" + rule.toString() + "\"\n          }\n        }\n      ]\n    }\n  }\n}").header("Content-Type", "application/json").buildExchange());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override // com.predic8.membrane.core.exchangestore.ExchangeStore
    public void removeAllExchanges(AbstractExchange[] abstractExchangeArr) {
        StringBuilder sb = (StringBuilder) Stream.of((Object[]) abstractExchangeArr).map(abstractExchange -> {
            return Long.valueOf(abstractExchange.getId());
        }).collect(() -> {
            StringBuilder sb2 = new StringBuilder();
            sb2.append(PropertyAccessor.PROPERTY_KEY_PREFIX);
            return sb2;
        }, (sb2, l) -> {
            sb2.append(l).append(",");
        }, (sb3, sb4) -> {
            sb3.append(",").append((CharSequence) sb4);
        });
        sb.deleteCharAt(sb.length() - 1);
        sb.append("]");
        try {
            this.client.call(new Request.Builder().post(getElasticSearchExchangesPath() + "_delete_by_query").body("{\n  \"query\": {\n    \"bool\": {\n      \"must\": [\n        {\n          \"wildcard\": {\n            \"issuer\": \"" + this.documentPrefix + "\"\n          }\n        },\n        {\n          \"terms\": {\n            \"id\": \"" + sb.toString() + "\"\n          }\n        }\n      ]\n    }\n  }\n}").header("Content-Type", "application/json").buildExchange());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override // com.predic8.membrane.core.exchangestore.ExchangeStore
    public AbstractExchange[] getExchanges(RuleKey ruleKey) {
        try {
            return (AbstractExchange[]) ((List) Stream.of((Object[]) this.mapper.readValue(this.mapper.writeValueAsString(getSourceElementFromElasticSearchResponse(responseToMap(this.client.call(new Request.Builder().post(getElasticSearchExchangesPath() + "_search").body("{\n  \"query\": {\n    \"bool\": {\n      \"must\": [\n        {\n          \"wildcard\": {\n            \"issuer\": \"" + this.documentPrefix + "\"\n          }\n        },\n        {\n          \"match\": {\n            \"rule.port\": \"" + ruleKey.getPort() + "\"\n          }\n        }\n      ]\n    }\n  }\n}").header("Content-Type", "application/json").buildExchange())))), AbstractExchangeSnapshot[].class)).map(abstractExchangeSnapshot -> {
                return abstractExchangeSnapshot.toAbstractExchange();
            }).collect(Collectors.toList())).toArray(new AbstractExchange[0]);
        } catch (Exception e) {
            e.printStackTrace();
            return new AbstractExchange[0];
        }
    }

    @Override // com.predic8.membrane.core.exchangestore.ExchangeStore
    public int getNumberOfExchanges(RuleKey ruleKey) {
        return getExchanges(ruleKey).length;
    }

    @Override // com.predic8.membrane.core.exchangestore.ExchangeStore
    public StatisticCollector getStatistics(RuleKey ruleKey) {
        StatisticCollector statisticCollector = new StatisticCollector(false);
        List asList = Arrays.asList(getExchanges(ruleKey));
        if (asList == null || asList.isEmpty()) {
            return statisticCollector;
        }
        for (int i = 0; i < asList.size(); i++) {
            statisticCollector.collectFrom((AbstractExchange) asList.get(i));
        }
        return statisticCollector;
    }

    @Override // com.predic8.membrane.core.exchangestore.ExchangeStore
    public Object[] getAllExchanges() {
        return getAllExchangesAsList().toArray();
    }

    @Override // com.predic8.membrane.core.exchangestore.ExchangeStore
    public List<AbstractExchange> getAllExchangesAsList() {
        try {
            Exchange call = this.client.call(new Request.Builder().post(getElasticSearchExchangesPath() + "_search").header("Content-Type", "application/json").body("{\n  \"query\": {\n    \"wildcard\": {\n      \"issuer\": \"" + this.documentPrefix + "\"\n    }\n  }\n}").buildExchange());
            return !call.getResponse().isOk() ? new ArrayList() : (List) getSourceElementFromElasticSearchResponse(responseToMap(call)).stream().map(obj -> {
                try {
                    return ((AbstractExchangeSnapshot) this.mapper.readValue(this.mapper.writeValueAsString(obj), AbstractExchangeSnapshot.class)).toAbstractExchange();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }).collect(Collectors.toList());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private Map responseToMap(Exchange exchange) throws IOException {
        return (Map) this.mapper.readValue(exchange.getResponse().getBodyAsStringDecoded(), Map.class);
    }

    public HttpClient getClient() {
        return this.client;
    }

    @MCAttribute
    public void setClient(HttpClient httpClient) {
        this.client = httpClient;
    }

    public int getUpdateIntervalMs() {
        return this.updateIntervalMs;
    }

    @MCAttribute
    public void setUpdateIntervalMs(int i) {
        this.updateIntervalMs = i;
    }

    public String getLocation() {
        return this.location;
    }

    @MCAttribute
    public void setLocation(String str) {
        this.location = str;
    }

    public String getDocumentPrefix() {
        return this.documentPrefix;
    }

    @MCAttribute
    public void setDocumentPrefix(String str) {
        this.documentPrefix = str;
    }

    public int getMaxBodySize() {
        return this.maxBodySize;
    }

    @MCAttribute
    public void setMaxBodySize(int i) {
        this.maxBodySize = i;
    }

    public BodyCollectingMessageObserver.Strategy getBodyExceedingMaxSizeStrategy() {
        return this.bodyExceedingMaxSizeStrategy;
    }

    @MCAttribute
    public void setBodyExceedingMaxSizeStrategy(BodyCollectingMessageObserver.Strategy strategy) {
        this.bodyExceedingMaxSizeStrategy = strategy;
    }
}
