package net.sf.sprtool.recordevent.postgres.impl;

import java.sql.Connection;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import net.sf.sprtool.recordevent.AsyncContext;
import net.sf.sprtool.recordevent.RecordEvent;
import net.sf.sprtool.recordevent.RecordEventProcessor;
import net.sf.sprtool.recordevent.RecordEvents;
import net.sf.sprtool.recordevent.postgres.RecordEventProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.util.StringUtils;

/* loaded from: input_file:net/sf/sprtool/recordevent/postgres/impl/EventProcessorWrapper.class */
public abstract class EventProcessorWrapper implements DisposableBean {
    protected static final String NEW = "N";
    protected static final String EXECUTED = "E";
    protected static final String COMPLETED = "C";
    public static Logger logger = LoggerFactory.getLogger(EventProcessorWrapper.class);
    private static int DELETE_DELAY = 780000;
    private static int MAX_ACTIVE_BATCHES = 5;
    private int id;
    private RecordEventProcessor processor;
    private RecordEventProperties recordEventProperties;
    private ExecutorService executorService;
    private JdbcConnection jdbc;
    private String updateCompleteSql;
    private String updateCauseSql;
    private String updateRetriesSql;
    private String deleteCompleteSql;
    private String deleteExpireSql;
    private AtomicBoolean processing = new AtomicBoolean();
    private AtomicInteger activeBatches = new AtomicInteger();
    private volatile long lastDeleteTime = 0;
    private Set<Integer> grantedPartitions = new HashSet();
    private Map<Integer, PartitionState> allPartitions = new HashMap();
    private Map<String, Long> instances = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/sf/sprtool/recordevent/postgres/impl/EventProcessorWrapper$InnerRecordEvent.class */
    public static class InnerRecordEvent implements RecordEvent {
        long recordId;
        Map record;
        RecordEventOp op;

        private InnerRecordEvent(Map map, RecordEventOp recordEventOp) {
            this.record = map;
            this.op = recordEventOp;
            this.recordId = ((Long) map.get("id")).longValue();
        }

        public String getId() {
            return String.valueOf(this.recordId);
        }

        public long getRecordId() {
            return this.recordId;
        }

        public String getPayload() {
            return (String) this.record.get("payload");
        }

        public void setException(String str) {
            this.op.setException(this.recordId, str);
        }

        public void setCompleted(String str) {
            this.op.setCompleted(this.recordId, str);
        }

        public void setCompleted() {
            setCompleted(null);
        }

        public void setScheduleTime(OffsetDateTime offsetDateTime) {
            this.op.setScheduleTime(this.recordId, offsetDateTime);
        }

        public OffsetDateTime getCreateTime() {
            return (OffsetDateTime) this.record.get("creat");
        }

        public int getRetries() {
            Integer num = (Integer) this.record.get("retries");
            if (num == null) {
                return 0;
            }
            return num.intValue();
        }

        public RecordEvent.TYPE getType() {
            return RecordEvent.TYPE.fromString((String) this.record.get("typ"));
        }

        public int hashCode() {
            return (31 * 1) + ((int) (this.recordId ^ (this.recordId >>> 32)));
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return obj != null && getClass() == obj.getClass() && this.recordId == ((InnerRecordEvent) obj).recordId;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/sf/sprtool/recordevent/postgres/impl/EventProcessorWrapper$RecordEventOp.class */
    public interface RecordEventOp {
        void setException(long j, String str);

        void setCompleted(long j, String str);

        void setScheduleTime(long j, OffsetDateTime offsetDateTime);
    }

    public void init() {
        this.updateCompleteSql = "UPDATE " + this.recordEventProperties.getRecordTable() + " SET state=?,cost=?,cause=?,execut=current_timestamp,purge=(current_timestamp + interval '" + this.processor.getPurgeTime() + " minutes'),complete=current_timestamp,retries=retries+1 WHERE id=ANY(?)  AND state<>'" + COMPLETED + "'";
        this.updateCauseSql = "UPDATE " + this.recordEventProperties.getRecordTable() + " SET state=?,cost=?,cause=?,execut=current_timestamp,retries=retries+1 WHERE id=ANY(?)";
        this.updateRetriesSql = "UPDATE " + this.recordEventProperties.getRecordTable() + " SET state=CASE WHEN state='" + NEW + "' THEN '" + EXECUTED + "' ELSE state END,execut=current_timestamp,cost=?,retries=retries+1 WHERE id=ANY(?)";
        this.deleteCompleteSql = "DELETE FROM " + this.recordEventProperties.getRecordTable() + " WHERE proc=? AND part=ANY(?) AND state='" + COMPLETED + "' AND purge< current_timestamp ";
        this.deleteExpireSql = "DELETE FROM " + this.recordEventProperties.getRecordTable() + " WHERE proc=? AND part=ANY(?) AND ((state='" + COMPLETED + "' AND purge< current_timestamp) OR ( creat < (current_timestamp - interval '" + this.processor.getRetentionTime() + " minutes'))) ";
        this.lastDeleteTime = System.currentTimeMillis() - ThreadLocalRandom.current().nextInt(DELETE_DELAY);
    }

    public int getId() {
        return this.id;
    }

    public void setId(int i) {
        this.id = i;
    }

    public RecordEventProcessor getProcessor() {
        return this.processor;
    }

    public void setProcessor(RecordEventProcessor recordEventProcessor) {
        this.processor = recordEventProcessor;
    }

    public JdbcConnection getJdbc() {
        return this.jdbc;
    }

    public void setJdbc(JdbcConnection jdbcConnection) {
        this.jdbc = jdbcConnection;
    }

    public void setExecutorService(ExecutorService executorService) {
        this.executorService = executorService;
    }

    public Map<Integer, PartitionState> getAllPartitions() {
        return this.allPartitions;
    }

    public Map<String, Long> getInstances() {
        return this.instances;
    }

    public Set<Integer> getGrantedPartitions() {
        return this.grantedPartitions;
    }

    public void revokePartition(int i) {
        synchronized (this.grantedPartitions) {
            this.grantedPartitions.remove(Integer.valueOf(i));
            grantedPartitionChanged(false);
        }
    }

    public void revokePartition() {
        synchronized (this.grantedPartitions) {
            this.grantedPartitions.clear();
            grantedPartitionChanged(false);
        }
    }

    public void grantPartition(int i) {
        synchronized (this.grantedPartitions) {
            this.grantedPartitions.add(Integer.valueOf(i));
            grantedPartitionChanged(true);
        }
    }

    public RecordEventProperties getRecordEventProperties() {
        return this.recordEventProperties;
    }

    public void setRecordEventProperties(RecordEventProperties recordEventProperties) {
        this.recordEventProperties = recordEventProperties;
    }

    protected abstract void grantedPartitionChanged(boolean z);

    public abstract boolean notifyEvent(Map map, long j);

    protected abstract void processEvents() throws Exception;

    protected abstract void eventProcessed(List<Long> list);

    public void processEvent() {
        if (this.activeBatches.get() <= MAX_ACTIVE_BATCHES && this.processing.compareAndSet(false, true)) {
            this.executorService.execute(() -> {
                singleProcessEvents();
            });
        }
    }

    private void singleProcessEvents() {
        try {
            processEvents();
            purgeEvent();
        } catch (Exception e) {
            logger.error("Process event", e);
        } finally {
            this.processing.set(false);
        }
    }

    private void purgeEvent() {
        Integer[] numArr = (Integer[]) this.grantedPartitions.toArray(new Integer[0]);
        if (numArr.length == 0 || System.currentTimeMillis() < this.lastDeleteTime + DELETE_DELAY) {
            return;
        }
        this.jdbc.callStatement(this.processor.getRetentionTime() <= 0 ? this.deleteCompleteSql : this.deleteExpireSql, preparedStatement -> {
            preparedStatement.setInt(1, this.id);
            preparedStatement.setArray(2, preparedStatement.getConnection().createArrayOf("int4", numArr));
            preparedStatement.execute();
            return null;
        });
        this.lastDeleteTime = System.currentTimeMillis();
    }

    public void processEvents(List<Map> list) {
        doProcessEvents(list, false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void doProcessEvents(final List<Map> list, final boolean z) {
        if (list.size() == 0) {
            return;
        }
        if (z) {
            this.activeBatches.incrementAndGet();
        }
        final ArrayList arrayList = new ArrayList(list.size());
        final HashMap hashMap = new HashMap();
        final HashMap hashMap2 = new HashMap();
        final HashMap hashMap3 = new HashMap();
        RecordEventOp recordEventOp = new RecordEventOp() { // from class: net.sf.sprtool.recordevent.postgres.impl.EventProcessorWrapper.1
            @Override // net.sf.sprtool.recordevent.postgres.impl.EventProcessorWrapper.RecordEventOp
            public void setException(long j, String str) {
                ((List) hashMap2.computeIfAbsent(EventProcessorWrapper.this.msgTest(str), str2 -> {
                    return new ArrayList();
                })).add(Long.valueOf(j));
            }

            @Override // net.sf.sprtool.recordevent.postgres.impl.EventProcessorWrapper.RecordEventOp
            public void setCompleted(long j, String str) {
                ((List) hashMap.computeIfAbsent(EventProcessorWrapper.this.msgTest(str), str2 -> {
                    return new ArrayList();
                })).add(Long.valueOf(j));
            }

            @Override // net.sf.sprtool.recordevent.postgres.impl.EventProcessorWrapper.RecordEventOp
            public void setScheduleTime(long j, OffsetDateTime offsetDateTime) {
                if (!EventProcessorWrapper.this.processor.isStateful()) {
                    throw new IllegalStateException("Collaborative processor can not set schedule time.");
                }
                hashMap3.put(Long.valueOf(j), offsetDateTime);
            }
        };
        ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < list.size(); i++) {
            InnerRecordEvent innerRecordEvent = new InnerRecordEvent(list.get(i), recordEventOp);
            arrayList.add(Long.valueOf(innerRecordEvent.getRecordId()));
            arrayList2.add(innerRecordEvent);
        }
        final long currentTimeMillis = System.currentTimeMillis();
        final AsyncContext asyncContext = new AsyncContext() { // from class: net.sf.sprtool.recordevent.postgres.impl.EventProcessorWrapper.2
            public void complete() {
                long currentTimeMillis2 = (System.currentTimeMillis() - currentTimeMillis) / list.size();
                try {
                    JdbcConnection jdbcConnection = EventProcessorWrapper.this.jdbc;
                    Map map = hashMap;
                    Map map2 = hashMap2;
                    Map map3 = hashMap3;
                    List list2 = arrayList;
                    jdbcConnection.callConnection(connection -> {
                        EventProcessorWrapper.this.updateEventState(connection, map, map2, map3, new ArrayList(list2), currentTimeMillis2);
                        return null;
                    });
                } finally {
                    if (z) {
                        EventProcessorWrapper.this.eventProcessed(arrayList);
                        EventProcessorWrapper.this.activeBatches.decrementAndGet();
                    }
                }
            }
        };
        final boolean[] zArr = {false};
        final List unmodifiableList = Collections.unmodifiableList(arrayList2);
        try {
            this.processor.processEvent(new RecordEvents() { // from class: net.sf.sprtool.recordevent.postgres.impl.EventProcessorWrapper.3
                public AsyncContext startAsync() {
                    zArr[0] = true;
                    return asyncContext;
                }

                public List<RecordEvent> getEvents() {
                    return unmodifiableList;
                }
            });
        } catch (Exception e) {
            ArrayList arrayList3 = new ArrayList(arrayList);
            hashMap2.values().forEach(list2 -> {
                arrayList3.removeAll(list2);
            });
            hashMap2.put(e.getMessage(), arrayList3);
        }
        if (zArr[0]) {
            return;
        }
        asyncContext.complete();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String msgTest(String str) {
        return str == null ? "" : str;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateEventState(Connection connection, Map<String, List<Long>> map, Map<String, List<Long>> map2, Map<Long, OffsetDateTime> map3, List<Long> list, long j) {
        updateEventState(connection, map, COMPLETED, j, list);
        updateEventState(connection, map2, EXECUTED, j, list);
        if (map3.size() > 0) {
            String str = "UPDATE " + this.recordEventProperties.getRecordTable() + " SET state='" + EXECUTED + "',retries=retries+1,execut=current_timestamp,cost=?,schedule=CASE ";
            Iterator<Long> it = map3.keySet().iterator();
            while (true) {
                List<Object[]> someSchedules = getSomeSchedules(map3, it, 100);
                if (someSchedules.size() == 0) {
                    break;
                }
                int size = someSchedules.size();
                StringBuilder sb = new StringBuilder();
                StringBuilder sb2 = new StringBuilder(str);
                for (int i = 0; i < size; i++) {
                    Long l = (Long) someSchedules.get(i)[0];
                    sb2.append("WHEN id=? THEN ?");
                    if (sb.length() > 0) {
                        sb.append(",");
                    }
                    sb.append(l);
                }
                sb2.append(" END ");
                sb2.append(" WHERE id in (").append((CharSequence) sb).append(")");
                this.jdbc.callStatement(connection, sb2.toString(), preparedStatement -> {
                    int i2 = 1 + 1;
                    preparedStatement.setLong(1, j);
                    for (int i3 = 0; i3 < size; i3++) {
                        Object[] objArr = (Object[]) someSchedules.get(i3);
                        Long l2 = (Long) objArr[0];
                        list.remove(l2);
                        OffsetDateTime offsetDateTime = (OffsetDateTime) objArr[1];
                        int i4 = i2;
                        int i5 = i2 + 1;
                        preparedStatement.setLong(i4, l2.longValue());
                        i2 = i5 + 1;
                        preparedStatement.setObject(i5, offsetDateTime);
                    }
                    preparedStatement.execute();
                    return null;
                });
            }
        }
        if (list.size() == 0) {
            return;
        }
        this.jdbc.callStatement(connection, this.updateRetriesSql, preparedStatement2 -> {
            preparedStatement2.setLong(1, j);
            preparedStatement2.setArray(2, preparedStatement2.getConnection().createArrayOf("int8", list.toArray(new Long[0])));
            preparedStatement2.execute();
            return null;
        });
    }

    private List<Object[]> getSomeSchedules(Map<Long, OffsetDateTime> map, Iterator<Long> it, int i) {
        ArrayList arrayList = new ArrayList(Math.min(map.size(), i));
        while (it.hasNext()) {
            Long next = it.next();
            arrayList.add(new Object[]{next, map.get(next)});
            if (arrayList.size() >= i) {
                break;
            }
        }
        return arrayList;
    }

    private void updateEventState(Connection connection, Map<String, List<Long>> map, String str, long j, List<Long> list) {
        map.forEach((str2, list2) -> {
            list2.forEach(l -> {
                list.remove(l);
            });
            this.jdbc.callStatement(connection, (COMPLETED.equals(str) ? this.updateCompleteSql : this.updateCauseSql).toString(), preparedStatement -> {
                preparedStatement.setString(1, str);
                preparedStatement.setLong(2, j);
                if (StringUtils.hasText(str2)) {
                    preparedStatement.setString(3, str2);
                } else {
                    preparedStatement.setNull(3, 12);
                }
                preparedStatement.setArray(4, preparedStatement.getConnection().createArrayOf("int8", list2.toArray(new Long[0])));
                preparedStatement.execute();
                return null;
            });
        });
    }
}
