package net.sf.sprtool.recordevent.postgres.impl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import net.sf.sprtool.recordevent.RecordEventException;
import net.sf.sprtool.recordevent.postgres.RecordEventProperties;
import org.postgresql.PGConnection;
import org.postgresql.PGNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.util.StringUtils;

/* loaded from: input_file:net/sf/sprtool/recordevent/postgres/impl/RecordEventScheduler.class */
public class RecordEventScheduler implements DisposableBean, SmartInitializingSingleton {
    public static Logger logger = LoggerFactory.getLogger(RecordEventScheduler.class);
    private EventProcessorRepository processorRepository;
    private JdbcConnection jdbc;
    private InstanceCoordinator instanceCoordinator;
    private String deleteSql;
    private String loadPartitionsSql;
    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3, new InnserThreadFactory("sprtool-record-event-schedule"));
    private long heatBeatInterval = 20000;
    private volatile boolean destroyed = false;
    private AtomicLong lastNotifyEventId = new AtomicLong();
    private ObjectMapper objectMapper = new ObjectMapper();
    private Map<Integer, Set<Integer>> createdPartitions = new HashMap();

    /* loaded from: input_file:net/sf/sprtool/recordevent/postgres/impl/RecordEventScheduler$InnserThreadFactory.class */
    static class InnserThreadFactory implements ThreadFactory {
        int seq;
        String prefix;

        private InnserThreadFactory(String str) {
            this.seq = 1;
            this.prefix = str;
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            StringBuilder append = new StringBuilder().append(this.prefix).append("-");
            int i = this.seq;
            this.seq = i + 1;
            thread.setName(append.append(i).toString());
            return thread;
        }
    }

    public RecordEventScheduler(RecordEventProperties recordEventProperties, JdbcConnection jdbcConnection, EventProcessorRepository eventProcessorRepository, InstanceCoordinator instanceCoordinator) {
        if (recordEventProperties.isProcessorEnabled()) {
            this.processorRepository = eventProcessorRepository;
            this.instanceCoordinator = instanceCoordinator;
            this.jdbc = jdbcConnection;
            long maxRetentions = recordEventProperties.getMaxRetentions();
            if (maxRetentions > 0) {
                this.deleteSql = "DELETE FROM " + recordEventProperties.getRecordTable() + " WHERE creat < (current_timestamp - interval '" + maxRetentions + " minutes') ";
            }
            this.loadPartitionsSql = "SELECT * FROM " + recordEventProperties.getPartitionTable();
        }
    }

    public void afterSingletonsInstantiated() {
        if (this.processorRepository == null) {
            return;
        }
        this.scheduledExecutorService.scheduleWithFixedDelay(() -> {
            scheduleProcessor();
        }, 1L, 60L, TimeUnit.SECONDS);
        this.scheduledExecutorService.scheduleWithFixedDelay(() -> {
            scheduleListenEvent();
        }, 2L, 60L, TimeUnit.SECONDS);
        this.scheduledExecutorService.scheduleWithFixedDelay(() -> {
            this.instanceCoordinator.scheduleLock();
        }, 4L, 60L, TimeUnit.SECONDS);
    }

    public void destroy() throws Exception {
        this.destroyed = true;
        this.scheduledExecutorService.shutdown();
    }

    private void scheduleProcessor() {
        Map<Integer, EventProcessorWrapper> processors = this.processorRepository.getProcessors();
        long j = 0;
        long j2 = 0;
        long j3 = 0;
        while (!this.destroyed && !Thread.currentThread().isInterrupted()) {
            try {
                if (System.currentTimeMillis() - j > this.heatBeatInterval) {
                    sendInstanceHeatBeat(processors);
                    j = System.currentTimeMillis();
                }
                if (System.currentTimeMillis() - j2 > 60000) {
                    loadPartitions(processors);
                    j2 = System.currentTimeMillis();
                }
                if (System.currentTimeMillis() - j3 > 2220000) {
                    runDelete();
                    j3 = System.currentTimeMillis();
                }
                Thread.sleep(500L);
            } catch (Exception e) {
                logger.error("Schedule processor", e);
                return;
            }
        }
    }

    private void scheduleListenEvent() {
        final Map<Integer, EventProcessorWrapper> processors = this.processorRepository.getProcessors();
        try {
            this.jdbc.callConnection(new ConnectionCallable() { // from class: net.sf.sprtool.recordevent.postgres.impl.RecordEventScheduler.1
                @Override // net.sf.sprtool.recordevent.postgres.impl.ConnectionCallable
                public Object call(Connection connection) throws SQLException {
                    PGConnection pGConnection = (PGConnection) connection.unwrap(PGConnection.class);
                    Statement createStatement = connection.createStatement();
                    createStatement.execute("LISTEN sprtool_record_event");
                    createStatement.close();
                    while (!RecordEventScheduler.this.destroyed && !Thread.currentThread().isInterrupted() && !connection.isClosed()) {
                        PGNotification[] notifications = pGConnection.getNotifications(500);
                        if (notifications != null) {
                            for (PGNotification pGNotification : notifications) {
                                try {
                                    JsonNode readTree = RecordEventScheduler.this.objectMapper.readTree(pGNotification.getParameter());
                                    String asText = readTree.get("@type").asText();
                                    if ("event".equals(asText)) {
                                        RecordEventScheduler.this.onNotifyEvent(processors, RecordEventScheduler.this.nodeToMap(readTree));
                                    } else if ("partition".equals(asText)) {
                                        synchronized (processors) {
                                            RecordEventScheduler.this.addPartition(processors, readTree.get("id").asInt(), readTree.get("proc").asInt(), readTree.get("part").asInt());
                                        }
                                    } else if ("instance".equals(asText)) {
                                        String asText2 = readTree.get("instance").asText();
                                        ArrayNode arrayNode = readTree.get("processors");
                                        int[] iArr = new int[arrayNode.size()];
                                        for (int i = 0; i < arrayNode.size(); i++) {
                                            iArr[i] = arrayNode.get(i).asInt();
                                        }
                                        RecordEventScheduler.this.instanceCoordinator.heatBeat(asText2, iArr);
                                    }
                                } catch (JsonProcessingException e) {
                                    throw new RecordEventException(e.getLocalizedMessage(), e);
                                }
                            }
                        }
                    }
                    return null;
                }
            });
        } catch (Exception e) {
            logger.error("Schedule listen record event", e);
        }
    }

    private void sendInstanceHeatBeat(Map<Integer, EventProcessorWrapper> map) throws SQLException {
        String instanceId = this.instanceCoordinator.getInstanceId();
        if (StringUtils.hasText(instanceId)) {
            ObjectNode createObjectNode = this.objectMapper.createObjectNode();
            createObjectNode.put("@type", "instance");
            createObjectNode.put("instance", instanceId);
            ArrayNode putArray = createObjectNode.putArray("processors");
            map.keySet().forEach(num -> {
                putArray.add(num);
            });
            this.jdbc.callStatement(statement -> {
                statement.execute("NOTIFY sprtool_record_event,'" + createObjectNode.toString() + "';");
                return null;
            });
        }
    }

    private void loadPartitions(Map<Integer, EventProcessorWrapper> map) throws SQLException {
        ArrayList arrayList = new ArrayList();
        this.jdbc.callStatement(this.loadPartitionsSql, preparedStatement -> {
            ResultSet executeQuery = preparedStatement.executeQuery();
            while (executeQuery.next()) {
                arrayList.add(new int[]{executeQuery.getInt("id"), executeQuery.getInt("proc"), executeQuery.getInt("part")});
            }
            return null;
        });
        arrayList.forEach(iArr -> {
            addPartition(map, iArr[0], iArr[1], iArr[2]);
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addPartition(Map<Integer, EventProcessorWrapper> map, int i, int i2, int i3) {
        EventProcessorWrapper eventProcessorWrapper = map.get(Integer.valueOf(i2));
        if (eventProcessorWrapper == null) {
            return;
        }
        eventProcessorWrapper.addPartition(i3, i);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map nodeToMap(JsonNode jsonNode) {
        HashMap hashMap = new HashMap();
        ObjectNode objectNode = (ObjectNode) jsonNode;
        hashMap.put("id", Long.valueOf(jsonNode.get("id").asLong()));
        hashMap.put("proc", Integer.valueOf(jsonNode.get("proc").asInt()));
        hashMap.put("part", Integer.valueOf(jsonNode.get("part").asInt()));
        strNode(objectNode, hashMap, "typ");
        strNode(objectNode, hashMap, "payload");
        tsNode(objectNode, hashMap, "creat");
        tsNode(objectNode, hashMap, "schedule");
        hashMap.put("retries", Integer.valueOf(jsonNode.get("retries").asInt()));
        strNode(objectNode, hashMap, "state");
        strNode(objectNode, hashMap, "cause");
        return hashMap;
    }

    private void strNode(ObjectNode objectNode, Map map, String str) {
        JsonNode jsonNode = objectNode.get(str);
        if (jsonNode == null || jsonNode.isNull()) {
            return;
        }
        map.put(str, jsonNode.asText());
    }

    private void tsNode(ObjectNode objectNode, Map map, String str) {
        JsonNode jsonNode = objectNode.get(str);
        if (jsonNode == null || jsonNode.isNull()) {
            return;
        }
        map.put(str, OffsetDateTime.parse(jsonNode.asText(), DateTimeFormatter.ISO_OFFSET_DATE_TIME));
    }

    void onNotifyEvent(Map<Integer, EventProcessorWrapper> map, Map map2) {
        long longValue = ((Long) map2.get("id")).longValue();
        Integer num = (Integer) map2.get("proc");
        Integer num2 = (Integer) map2.get("part");
        Set<Integer> set = this.createdPartitions.get(num);
        if (set == null || !set.contains(num2)) {
            this.processorRepository.cteatePartition(num, num2);
            if (set == null) {
                set = new HashSet();
                this.createdPartitions.put(num, set);
            }
            set.add(num2);
        }
        long j = this.lastNotifyEventId.get();
        long j2 = longValue - (j > 0 ? j + 1 : longValue);
        this.lastNotifyEventId.set(longValue);
        EventProcessorWrapper eventProcessorWrapper = map.get(num);
        if (eventProcessorWrapper == null) {
            return;
        }
        eventProcessorWrapper.notifyEvent(map2, j2);
    }

    private void runDelete() {
        if (this.deleteSql == null || !this.instanceCoordinator.isLeader()) {
            return;
        }
        this.jdbc.callStatement(this.deleteSql, preparedStatement -> {
            preparedStatement.execute();
            return null;
        });
    }
}
