package com.codecubic.dao;

import com.codecubic.common.DocData;
import com.codecubic.common.ESConfig;
import com.codecubic.exception.BulkProcessorInitExcp;
import com.codecubic.util.TimeUtil;
import com.google.common.base.Preconditions;
import java.io.Closeable;
import java.net.ConnectException;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.geo.parsers.GeoWKTParser;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/codecubic/dao/RetryBulkProcessor.class */
public class RetryBulkProcessor implements Closeable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) RetryBulkProcessor.class);
    private RestHighLevelClient client;
    private ESConfig esConfig;
    private final List<DocWriteRequest> lazyQueue = new LinkedList();
    private final AtomicBoolean processorHealth = new AtomicBoolean(true);
    private volatile BulkProcessor bulkProcessor = buildProcessor();
    private volatile long cnt;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/codecubic/dao/RetryBulkProcessor$RetryFailureListener.class */
    public static class RetryFailureListener implements BulkProcessor.Listener {
        private final List<DocWriteRequest> lazyQueue;
        private final AtomicBoolean processorHealth;

        public RetryFailureListener(List<DocWriteRequest> list, AtomicBoolean atomicBoolean) {
            this.lazyQueue = list;
            this.processorHealth = atomicBoolean;
        }

        @Override // org.elasticsearch.action.bulk.BulkProcessor.Listener
        public void beforeBulk(long j, BulkRequest bulkRequest) {
            RetryBulkProcessor.log.debug("ids:{}", String.join(GeoWKTParser.COMMA, (List) bulkRequest.requests().stream().map(docWriteRequest -> {
                return docWriteRequest.id();
            }).collect(Collectors.toList())));
        }

        @Override // org.elasticsearch.action.bulk.BulkProcessor.Listener
        public void afterBulk(long j, BulkRequest bulkRequest, BulkResponse bulkResponse) {
            Iterator<BulkItemResponse> it = bulkResponse.iterator();
            while (it.hasNext()) {
                BulkItemResponse next = it.next();
                int status = next.status().getStatus();
                if (next.isFailed()) {
                    DocWriteRequest<?> docWriteRequest = bulkRequest.requests().stream().filter(docWriteRequest2 -> {
                        return docWriteRequest2.id().equals(next.getFailure().getId());
                    }).findFirst().get();
                    if (429 == status) {
                        this.lazyQueue.add(docWriteRequest);
                    }
                    RetryBulkProcessor.log.error("status:{},doc id:{}", Integer.valueOf(status), docWriteRequest.id());
                }
            }
        }

        @Override // org.elasticsearch.action.bulk.BulkProcessor.Listener
        public void afterBulk(long j, BulkRequest bulkRequest, Throwable th) {
            RetryBulkProcessor.log.error("", th);
            if (th instanceof ConnectException) {
                this.processorHealth.set(false);
                this.lazyQueue.addAll(bulkRequest.requests());
            }
        }
    }

    public RetryBulkProcessor(RestHighLevelClient restHighLevelClient, ESConfig eSConfig) throws BulkProcessorInitExcp {
        this.client = restHighLevelClient;
        this.esConfig = eSConfig;
    }

    public void flush() {
        do {
            if (this.processorHealth.get()) {
                this.bulkProcessor.flush();
                retryWriteAll();
            }
            TimeUtil.sleepSec(Integer.valueOf(this.esConfig.getAwaitCloseSec().intValue()));
            log.debug("flush sleep:{}(ms)", Long.valueOf(this.esConfig.getAwaitCloseSec().longValue() * 1000));
        } while (!this.lazyQueue.isEmpty());
    }

    private void addReq(DocWriteRequest docWriteRequest, boolean z) {
        while (!this.processorHealth.get()) {
            synchronized (this.bulkProcessor) {
                if (!this.processorHealth.get()) {
                    try {
                        this.bulkProcessor = buildProcessor();
                        this.processorHealth.set(true);
                    } catch (BulkProcessorInitExcp e) {
                        log.error("", (Throwable) e);
                        TimeUtil.sleepSec(3);
                        log.error("buildProcessor failure sleep:{}(ms)", (Object) 3000);
                    }
                }
            }
        }
        this.bulkProcessor.add(docWriteRequest);
        if (z) {
            this.cnt++;
            if (this.cnt % 2000 == 0) {
                log.info("cnt:{}", Long.valueOf(this.cnt));
            }
            if (this.cnt == Long.MAX_VALUE) {
                log.info("cnt:{},reset", Long.valueOf(this.cnt));
                this.cnt = 0L;
            }
        }
        if (this.lazyQueue.isEmpty() || this.lazyQueue.size() <= 10000 || this.lazyQueue.size() % 10000 != 0) {
            return;
        }
        log.warn("lazyExe start,sleep:{}(ms)", this.esConfig.getReqWriteWaitMill());
        TimeUtil.sleepMill(this.esConfig.getReqWriteWaitMill());
    }

    private synchronized BulkProcessor buildProcessor() throws BulkProcessorInitExcp {
        try {
            try {
                BulkProcessor.Builder builder = BulkProcessor.builder((BiConsumer<BulkRequest, ActionListener<BulkResponse>>) (bulkRequest, actionListener) -> {
                    this.client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, (ActionListener<BulkResponse>) actionListener);
                }, new RetryFailureListener(this.lazyQueue, this.processorHealth));
                builder.setBulkActions(this.esConfig.getBatch().intValue());
                builder.setBulkSize(new ByteSizeValue(this.esConfig.getBufferWriteSize().longValue(), ByteSizeUnit.MB));
                builder.setConcurrentRequests(this.esConfig.getParallel().intValue());
                builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(this.esConfig.getBackOffSec().longValue()), this.esConfig.getBackOffRetries().intValue()));
                BulkProcessor build = builder.build();
                log.info("exec buildProcessor end");
                return build;
            } catch (Exception e) {
                throw new BulkProcessorInitExcp(e);
            }
        } catch (Throwable th) {
            log.info("exec buildProcessor end");
            throw th;
        }
    }

    public boolean asyncBulkUpsert(String str, String str2, List<DocData> list) {
        Preconditions.checkNotNull(str, "indexName can not be null");
        Preconditions.checkNotNull(str2, "docType can not be null");
        Preconditions.checkNotNull(list, "docs can not be null");
        try {
            for (DocData docData : list) {
                Map<String, Object> map = docData.toMap();
                UpdateRequest doc = new UpdateRequest(str, str2, docData.getId()).upsert(map).doc(map);
                doc.retryOnConflict(10);
                doc.waitForActiveShards(1);
                doc.timeout(TimeValue.timeValueMillis(this.esConfig.getReqWriteWaitMill().intValue()));
                addReq(doc, true);
            }
            retryWrite();
            return true;
        } catch (Exception e) {
            log.error("", (Throwable) e);
            return false;
        }
    }

    public boolean asyBulkDelDoc(String str, String str2, Collection<String> collection) {
        Preconditions.checkNotNull(str, "indexName can not be null");
        Preconditions.checkNotNull(str2, "docType can not be null");
        Preconditions.checkNotNull(collection, "docIds can not be null");
        try {
            Iterator<String> it = collection.iterator();
            while (it.hasNext()) {
                DeleteRequest deleteRequest = new DeleteRequest(str, str2, it.next());
                deleteRequest.waitForActiveShards(1);
                addReq(deleteRequest, true);
            }
            return true;
        } catch (Exception e) {
            log.error("", (Throwable) e);
            return false;
        }
    }

    private void retryWrite() {
        log.info("lazyQueue.size:{}", Integer.valueOf(this.lazyQueue.size()));
        Iterator<DocWriteRequest> it = this.lazyQueue.iterator();
        for (int i = 0; it.hasNext() && i < 200; i++) {
            addReq(this.lazyQueue.get(0), false);
            this.lazyQueue.remove(0);
        }
        log.info("lazyQueue.size:{}", Integer.valueOf(this.lazyQueue.size()));
    }

    private void retryWriteAll() {
        log.info("lazyQueue.size:{}", Integer.valueOf(this.lazyQueue.size()));
        Iterator<DocWriteRequest> it = this.lazyQueue.iterator();
        while (it.hasNext()) {
            addReq(this.lazyQueue.get(0), false);
            this.lazyQueue.remove(0);
        }
        log.info("lazyQueue.size:{}", Integer.valueOf(this.lazyQueue.size()));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        flush();
        if (this.bulkProcessor != null) {
            try {
                this.bulkProcessor.awaitClose(this.esConfig.getAwaitCloseSec().longValue(), TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                log.error("", (Throwable) e);
            }
        }
    }
}
