package net.sf.sprtool.recordevent.postgres.impl;

import java.sql.Connection;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import net.sf.sprtool.recordevent.AsyncContext;
import net.sf.sprtool.recordevent.RecordEvent;
import net.sf.sprtool.recordevent.RecordEvents;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.util.StringUtils;

/* loaded from: input_file:net/sf/sprtool/recordevent/postgres/impl/EventProcessorWorker.class */
public abstract class EventProcessorWorker implements DisposableBean {
    private static Logger logger = LoggerFactory.getLogger(EventProcessorWorker.class);
    private static int DELETE_DELAY = 780000;
    private static int[] DEFAULT_DELAY = {1, 1, 1, 2, 3, 5, 7, 9, 12, 15, 20, 30, 60, 90, 120, 240, 480, 720};
    private int id;
    protected EventProcessorWrapper eventProcessorWrapper;
    protected volatile boolean destroyed = false;
    private AtomicBoolean processing = new AtomicBoolean();
    protected volatile long scheduleTime = Long.MAX_VALUE;
    private volatile long lastDeleteTime = 0;
    private Set<Integer> grantedPartitions = new HashSet();
    private Set<Integer> allocatedPartitions = new HashSet();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/sf/sprtool/recordevent/postgres/impl/EventProcessorWorker$InnerRecordEvent.class */
    public class InnerRecordEvent implements RecordEvent {
        long recordId;
        Map record;
        BiConsumer<InnerRecordEvent, String> onCompleted;
        String exception;
        OffsetDateTime scheduleTime;

        private InnerRecordEvent(Map map, BiConsumer<InnerRecordEvent, String> biConsumer) {
            this.record = map;
            this.onCompleted = biConsumer;
            this.recordId = ((Long) map.get("id")).longValue();
        }

        public String getId() {
            return String.valueOf(this.recordId);
        }

        public long getRecordId() {
            return this.recordId;
        }

        public String getPayload() {
            return (String) this.record.get("payload");
        }

        public void setException(String str) {
            this.exception = str;
        }

        public void setCompleted(String str) {
            this.onCompleted.accept(this, str);
        }

        public void setCompleted() {
            setCompleted(null);
        }

        public void setScheduleTime(OffsetDateTime offsetDateTime) {
            if (!EventProcessorWorker.this.eventProcessorWrapper.getProcessor().isStateful()) {
                throw new IllegalStateException("Stateless processor can not set schedule time.");
            }
            this.scheduleTime = offsetDateTime;
        }

        public OffsetDateTime getCreateTime() {
            return (OffsetDateTime) this.record.get("creat");
        }

        public int getRetries() {
            Integer num = (Integer) this.record.get("retries");
            if (num == null) {
                return 0;
            }
            return num.intValue();
        }

        public RecordEvent.TYPE getType() {
            return RecordEvent.TYPE.fromString((String) this.record.get("typ"));
        }

        public int hashCode() {
            return (31 * 1) + ((int) (this.recordId ^ (this.recordId >>> 32)));
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return obj != null && getClass() == obj.getClass() && this.recordId == ((InnerRecordEvent) obj).recordId;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:net/sf/sprtool/recordevent/postgres/impl/EventProcessorWorker$ProcessResult.class */
    public static class ProcessResult {
        boolean empty;
        long delay;

        /* JADX INFO: Access modifiers changed from: package-private */
        public ProcessResult(boolean z) {
            this.delay = 0L;
            this.empty = z;
            this.delay = 0L;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public ProcessResult(long j) {
            this.delay = 0L;
            this.empty = false;
            this.delay = j;
        }
    }

    public EventProcessorWorker(EventProcessorWrapper eventProcessorWrapper) {
        this.eventProcessorWrapper = eventProcessorWrapper;
    }

    public void init() {
        this.lastDeleteTime = System.currentTimeMillis() - ThreadLocalRandom.current().nextInt(DELETE_DELAY);
        this.eventProcessorWrapper.getScheduledExecutorService().scheduleWithFixedDelay(() -> {
            processEvent();
        }, ThreadLocalRandom.current().nextLong(1000L), 60000L, TimeUnit.MILLISECONDS);
    }

    public void destroy() throws Exception {
        this.destroyed = true;
    }

    public Set<Integer> getGrantedPartitions() {
        return this.grantedPartitions;
    }

    public Set<Integer> getAllocatedPartitions() {
        return this.allocatedPartitions;
    }

    public void revokePartition(int i) {
        synchronized (this.grantedPartitions) {
            this.grantedPartitions.remove(Integer.valueOf(i));
            grantedPartitionChanged(false);
        }
    }

    public void revokePartition() {
        synchronized (this.grantedPartitions) {
            this.grantedPartitions.clear();
            grantedPartitionChanged(false);
        }
    }

    public void grantPartition(int i) {
        synchronized (this.grantedPartitions) {
            this.grantedPartitions.add(Integer.valueOf(i));
            grantedPartitionChanged(true);
        }
        processEvent();
    }

    public void allocatePartitions(int i) {
        synchronized (this.allocatedPartitions) {
            this.allocatedPartitions.add(Integer.valueOf(i));
        }
        processEvent();
    }

    protected abstract void grantedPartitionChanged(boolean z);

    public abstract boolean notifyEvent(Map map, long j);

    protected abstract void processEvents(Consumer<ProcessResult> consumer) throws Exception;

    protected abstract void eventProcessed(List<Long> list);

    public void processEvent() {
        if (this.processing.compareAndSet(false, true)) {
            this.eventProcessorWrapper.getExecutorService().execute(() -> {
                singleProcessEvents();
            });
        }
    }

    private void delayProcessEvent(long j) {
        long currentTimeMillis = System.currentTimeMillis() + j;
        if (currentTimeMillis >= this.scheduleTime) {
            return;
        }
        this.scheduleTime = currentTimeMillis;
        this.eventProcessorWrapper.getScheduledExecutorService().schedule(() -> {
            this.scheduleTime = Long.MAX_VALUE;
            processEvent();
        }, j, TimeUnit.MILLISECONDS);
    }

    private void singleProcessEvents() {
        try {
            processEvents(processResult -> {
                try {
                    purgeEvent();
                    if (processResult.empty || this.destroyed) {
                        return;
                    }
                    if (processResult.delay > 0) {
                        delayProcessEvent(processResult.delay);
                    } else {
                        processEvent();
                    }
                } finally {
                    this.processing.compareAndSet(true, false);
                }
            });
        } catch (Exception e) {
            this.processing.compareAndSet(true, false);
            logger.error("Record processor: " + this.eventProcessorWrapper.getProcessor().getCode(), e);
        }
    }

    private void purgeEvent() {
        Integer[] numArr = (Integer[]) this.grantedPartitions.toArray(new Integer[0]);
        if (numArr.length == 0 || System.currentTimeMillis() < this.lastDeleteTime + DELETE_DELAY) {
            return;
        }
        this.eventProcessorWrapper.getJdbc().callStatement(this.eventProcessorWrapper.getSqls().getPurgeRecordSql(), preparedStatement -> {
            preparedStatement.setInt(1, this.id);
            preparedStatement.setArray(2, preparedStatement.getConnection().createArrayOf("int4", numArr));
            preparedStatement.execute();
            return null;
        });
        this.lastDeleteTime = System.currentTimeMillis();
    }

    public void processEvent(Map map) {
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(map);
        doProcessEvents(arrayList, false, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void doProcessEvents(final List<Map> list, final boolean z, final Consumer<ProcessResult> consumer) {
        if (list.size() == 0) {
            consumer.accept(new ProcessResult(true));
            return;
        }
        final ArrayList arrayList = new ArrayList(list.size());
        final HashMap hashMap = new HashMap();
        final HashMap hashMap2 = new HashMap();
        final BiConsumer biConsumer = (innerRecordEvent, str) -> {
            long recordId = innerRecordEvent.getRecordId();
            ((List) hashMap.computeIfAbsent(msgText(str), str -> {
                return new ArrayList();
            })).add(Long.valueOf(recordId));
            hashMap2.remove(Long.valueOf(recordId));
        };
        ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < list.size(); i++) {
            InnerRecordEvent innerRecordEvent2 = new InnerRecordEvent(list.get(i), biConsumer);
            arrayList.add(Long.valueOf(innerRecordEvent2.getRecordId()));
            arrayList2.add(innerRecordEvent2);
            hashMap2.put(Long.valueOf(innerRecordEvent2.getRecordId()), innerRecordEvent2);
        }
        final long currentTimeMillis = System.currentTimeMillis();
        final AsyncContext asyncContext = new AsyncContext() { // from class: net.sf.sprtool.recordevent.postgres.impl.EventProcessorWorker.1
            public void complete() {
                long currentTimeMillis2 = (System.currentTimeMillis() - currentTimeMillis) / list.size();
                if (!EventProcessorWorker.this.eventProcessorWrapper.getProcessor().isStateful()) {
                    ArrayList arrayList3 = new ArrayList(hashMap2.values());
                    BiConsumer biConsumer2 = biConsumer;
                    arrayList3.forEach(innerRecordEvent3 -> {
                        biConsumer2.accept(innerRecordEvent3, innerRecordEvent3.exception);
                    });
                }
                try {
                    JdbcConnection jdbc = EventProcessorWorker.this.eventProcessorWrapper.getJdbc();
                    Map map = hashMap;
                    Map map2 = hashMap2;
                    jdbc.callConnection(connection -> {
                        EventProcessorWorker.this.updateEventState(connection, map, map2, currentTimeMillis2);
                        return null;
                    });
                    if (z) {
                        EventProcessorWorker.this.eventProcessed(arrayList);
                        consumer.accept(new ProcessResult(false));
                    }
                } catch (Throwable th) {
                    if (z) {
                        EventProcessorWorker.this.eventProcessed(arrayList);
                        consumer.accept(new ProcessResult(false));
                    }
                    throw th;
                }
            }
        };
        final boolean[] zArr = {false};
        final List unmodifiableList = Collections.unmodifiableList(arrayList2);
        try {
            this.eventProcessorWrapper.getProcessor().processEvent(new RecordEvents() { // from class: net.sf.sprtool.recordevent.postgres.impl.EventProcessorWorker.2
                public AsyncContext startAsync() {
                    zArr[0] = true;
                    return asyncContext;
                }

                public List<RecordEvent> getEvents() {
                    return unmodifiableList;
                }
            });
        } catch (Exception e) {
            String message = e.getMessage();
            hashMap2.forEach((l, innerRecordEvent3) -> {
                if (innerRecordEvent3.exception == null) {
                    return;
                }
                innerRecordEvent3.exception = message;
            });
        }
        if (zArr[0]) {
            return;
        }
        asyncContext.complete();
    }

    private String msgText(String str) {
        return str == null ? "" : str;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateEventState(Connection connection, Map<String, List<Long>> map, Map<Long, InnerRecordEvent> map2, long j) {
        map.forEach((str, list) -> {
            this.eventProcessorWrapper.getJdbc().callStatement(connection, this.eventProcessorWrapper.getSqls().getUpdateCompleteSql().toString(), preparedStatement -> {
                preparedStatement.setLong(1, j);
                if (StringUtils.hasText(str)) {
                    preparedStatement.setString(2, str);
                } else {
                    preparedStatement.setNull(2, 12);
                }
                preparedStatement.setArray(3, preparedStatement.getConnection().createArrayOf("int8", list.toArray(new Long[0])));
                preparedStatement.execute();
                return null;
            });
        });
        if (map2.size() == 0) {
            return;
        }
        StringBuilder sb = new StringBuilder(this.eventProcessorWrapper.getSqls().getUpdateCauseSql());
        ArrayList arrayList = new ArrayList(map2.size() * 3);
        OffsetDateTime now = OffsetDateTime.now();
        Iterator<InnerRecordEvent> it = map2.values().iterator();
        InnerRecordEvent next = it.next();
        sb.append("SELECT ? id,? cause,? schedule");
        arrayList.add(Long.valueOf(next.getRecordId()));
        arrayList.add(next.exception);
        arrayList.add(nextTime(next, now));
        while (it.hasNext()) {
            InnerRecordEvent next2 = it.next();
            sb.append(" UNION ALL SELECT ?,?,?");
            arrayList.add(Long.valueOf(next2.getRecordId()));
            arrayList.add(next2.exception);
            arrayList.add(nextTime(next2, now));
        }
        sb.append(") s WHERE r.id=s.id");
        this.eventProcessorWrapper.getJdbc().callStatement(connection, sb.toString(), preparedStatement -> {
            int size = arrayList.size();
            preparedStatement.setLong(1, j);
            for (int i = 0; i < size; i += 3) {
                preparedStatement.setLong(i + 2, ((Long) arrayList.get(i)).longValue());
                Object obj = arrayList.get(i + 1);
                if (obj != null) {
                    preparedStatement.setString(i + 3, (String) obj);
                } else {
                    preparedStatement.setNull(i + 3, 12);
                }
                preparedStatement.setObject(i + 4, arrayList.get(i + 2));
            }
            preparedStatement.execute();
            return null;
        });
    }

    private OffsetDateTime nextTime(InnerRecordEvent innerRecordEvent, OffsetDateTime offsetDateTime) {
        if (innerRecordEvent.scheduleTime != null) {
            return innerRecordEvent.scheduleTime;
        }
        return offsetDateTime.plusMinutes(DEFAULT_DELAY[innerRecordEvent.getRetries() < DEFAULT_DELAY.length ? innerRecordEvent.getRetries() : DEFAULT_DELAY.length - 1]);
    }
}
