package net.sf.sprtool.recordevent.postgres.impl;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.math.BigInteger;
import java.security.MessageDigest;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import net.sf.sprtool.recordevent.postgres.RecordEventProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.util.StringUtils;

/* loaded from: input_file:net/sf/sprtool/recordevent/postgres/impl/InstanceCoordinator.class */
public class InstanceCoordinator implements DisposableBean, ApplicationEventPublisherAware {
    private static Logger logger = LoggerFactory.getLogger(InstanceCoordinator.class);
    private static String EMPTY_STRING = "";
    private static long HEAT_BEAT_INTERVAL = 10000;
    private RecordEventProperties recordEventProperties;
    private volatile String instanceId;
    private JdbcConnection jdbcConn;
    private EventProcessorRepository processorRepository;
    private ApplicationEventPublisher applicationEventPublisher;
    private volatile boolean leader = false;
    private volatile boolean balanced = true;
    private volatile boolean destroyed = false;
    private volatile long lastLeaderBeat = 0;
    private ObjectMapper objectMapper = new ObjectMapper();
    private BlockingQueue sleepQueue = new LinkedBlockingDeque();
    private Map<String, InstanceInfo> instanceInfos = new HashMap();
    private Map<Integer, Map<Integer, String>> partitionInstances = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/sf/sprtool/recordevent/postgres/impl/InstanceCoordinator$InstanceInfo.class */
    public static class InstanceInfo {
        String granted;
        String revision;
        int[] processors;
        long lastHeatBeat;

        private InstanceInfo() {
            this.granted = InstanceCoordinator.EMPTY_STRING;
            this.revision = InstanceCoordinator.EMPTY_STRING;
            this.processors = new int[0];
        }
    }

    public InstanceCoordinator(RecordEventProperties recordEventProperties, JdbcConnection jdbcConnection, EventProcessorRepository eventProcessorRepository) {
        this.recordEventProperties = recordEventProperties;
        this.processorRepository = eventProcessorRepository;
        this.jdbcConn = jdbcConnection;
    }

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    public void destroy() throws Exception {
        this.destroyed = true;
        this.sleepQueue.offer(EMPTY_STRING);
    }

    public String getInstanceId() {
        return this.instanceId;
    }

    public boolean isLeader() {
        return this.leader;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void coordinate() {
        doCoordinate();
    }

    private void doCoordinate() {
        this.balanced = true;
        final Map<Integer, EventProcessorWrapper> processors = this.processorRepository.getProcessors();
        this.jdbcConn.callConnection(new ConnectionCallable() { // from class: net.sf.sprtool.recordevent.postgres.impl.InstanceCoordinator.1
            @Override // net.sf.sprtool.recordevent.postgres.impl.ConnectionCallable
            public Object call(Connection connection) throws SQLException {
                connection.setAutoCommit(false);
                try {
                    InstanceCoordinator.this.instanceId = (String) InstanceCoordinator.this.jdbcConn.callStatement(connection, statement -> {
                        statement.execute("SET lock_timeout=180000");
                        ResultSet executeQuery = statement.executeQuery("SELECT pg_backend_pid()::text || '@' || host(inet_client_addr()) addr;");
                        executeQuery.next();
                        return executeQuery.getString(1);
                    });
                    InstanceCoordinator.this.processHeatBeat(InstanceCoordinator.this.instanceId, false, true, InstanceCoordinator.this.computeVevision(new ArrayList()), processors.keySet().stream().mapToInt((v0) -> {
                        return v0.intValue();
                    }).toArray());
                    InstanceCoordinator.this.sendHeatBeat();
                    String str = "SELECT 'sprtool-record-event-instance-candidate';";
                    long j = 300;
                    while (!InstanceCoordinator.this.destroyed && !Thread.currentThread().isInterrupted() && !connection.isClosed()) {
                        String str2 = str;
                        InstanceCoordinator.this.jdbcConn.callStatement(connection, statement2 -> {
                            return Boolean.valueOf(statement2.execute(str2));
                        });
                        try {
                            if (InstanceCoordinator.this.leader) {
                                if (!InstanceCoordinator.this.checkInstance()) {
                                    j = 300;
                                }
                            } else if (InstanceCoordinator.this.isLeaderLate() && InstanceCoordinator.this.tryAdvisoryLock(connection, 0)) {
                                str = "SELECT 'sprtool-record-event-instance-leader';";
                                InstanceCoordinator.this.changeLeader(true);
                                InstanceCoordinator.this.balanced = false;
                            } else {
                                InstanceCoordinator.this.checkLeaderTime();
                            }
                            InstanceCoordinator.this.sleepQueue.poll(j, TimeUnit.MILLISECONDS);
                            if (j < InstanceCoordinator.HEAT_BEAT_INTERVAL) {
                                j += 300;
                            }
                        } catch (InterruptedException e) {
                            if (InstanceCoordinator.this.leader) {
                                InstanceCoordinator.this.changeLeader(false);
                            }
                            InstanceCoordinator.this.sendHeatBeat(false);
                            InstanceCoordinator.this.revokeAllPartition(connection);
                            connection.rollback();
                            return null;
                        }
                    }
                    return null;
                } finally {
                    if (InstanceCoordinator.this.leader) {
                        InstanceCoordinator.this.changeLeader(false);
                    }
                    InstanceCoordinator.this.sendHeatBeat(false);
                    InstanceCoordinator.this.revokeAllPartition(connection);
                    connection.rollback();
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void changeLeader(boolean z) {
        if (logger.isDebugEnabled()) {
            logger.debug("Change leader status: {}", Boolean.valueOf(z));
        }
        synchronized (this.partitionInstances) {
            this.partitionInstances.forEach((num, map) -> {
                map.keySet().forEach(num -> {
                    map.put(num, EMPTY_STRING);
                });
            });
        }
        this.leader = z;
        this.applicationEventPublisher.publishEvent(new LeaderEvent(z));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addPartition(int i, int i2) {
        synchronized (this.partitionInstances) {
            Map<Integer, String> computeIfAbsent = this.partitionInstances.computeIfAbsent(Integer.valueOf(i), num -> {
                return new HashMap();
            });
            if (!computeIfAbsent.containsKey(Integer.valueOf(i2))) {
                computeIfAbsent.put(Integer.valueOf(i2), EMPTY_STRING);
                this.balanced = false;
                this.sleepQueue.offer(EMPTY_STRING);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getHeatBeatInterval() {
        return HEAT_BEAT_INTERVAL;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processHeatBeat(String str, boolean z, boolean z2, String str2, int[] iArr) {
        long currentTimeMillis = System.currentTimeMillis();
        if (z) {
            this.lastLeaderBeat = z2 ? currentTimeMillis : 0L;
        }
        synchronized (this.instanceInfos) {
            if (z2) {
                InstanceInfo computeIfAbsent = this.instanceInfos.computeIfAbsent(str, str3 -> {
                    InstanceInfo instanceInfo = new InstanceInfo();
                    instanceInfo.granted = computeVevision(new ArrayList());
                    return instanceInfo;
                });
                int[] iArr2 = computeIfAbsent.processors;
                computeIfAbsent.revision = str2 == null ? EMPTY_STRING : str2;
                computeIfAbsent.lastHeatBeat = currentTimeMillis;
                computeIfAbsent.processors = iArr;
                if (!Arrays.equals(iArr2, iArr)) {
                    this.balanced = false;
                    this.sleepQueue.offer(EMPTY_STRING);
                }
            } else {
                this.instanceInfos.remove(str);
                clearPartitionInstances(Arrays.asList(str));
                this.balanced = false;
                this.sleepQueue.offer(EMPTY_STRING);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sendHeatBeat() {
        sendHeatBeat(isLeader());
    }

    void sendHeatBeat(boolean z) {
        if (StringUtils.hasText(this.instanceId)) {
            ObjectNode createObjectNode = this.objectMapper.createObjectNode();
            createObjectNode.put("@type", "instance");
            createObjectNode.put("instance", this.instanceId);
            createObjectNode.put("leader", z);
            createObjectNode.put("online", !this.destroyed);
            ArrayNode putArray = createObjectNode.putArray("processors");
            ArrayList arrayList = new ArrayList();
            this.processorRepository.getProcessors().forEach((num, eventProcessorWrapper) -> {
                putArray.add(num);
                eventProcessorWrapper.getGrantedPartitions().forEach(num -> {
                    arrayList.add(eventProcessorWrapper.getId() + "-" + num);
                });
            });
            createObjectNode.put("revision", computeVevision(arrayList));
            this.jdbcConn.callStatement(statement -> {
                statement.execute("NOTIFY sprtool_record_event,'" + createObjectNode.toString() + "';");
                return null;
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void revokeAllPartition(Connection connection) {
        revokeAllPartition();
        this.jdbcConn.callStatement(connection, statement -> {
            statement.executeQuery("SELECT pg_advisory_unlock_all();");
            return null;
        });
    }

    private void revokeAllPartition() {
        Map<Integer, EventProcessorWrapper> processors = this.processorRepository.getProcessors();
        synchronized (processors) {
            processors.forEach((num, eventProcessorWrapper) -> {
                eventProcessorWrapper.revokePartitions();
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkLeaderTime() {
        if (this.lastLeaderBeat > System.currentTimeMillis() - (HEAT_BEAT_INTERVAL * 3)) {
            return;
        }
        this.lastLeaderBeat = 0L;
        revokeAllPartition();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isLeaderLate() {
        return this.lastLeaderBeat < System.currentTimeMillis() - HEAT_BEAT_INTERVAL;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean checkInstance() {
        long currentTimeMillis = System.currentTimeMillis() - (HEAT_BEAT_INTERVAL * 3);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        HashMap hashMap = new HashMap();
        synchronized (this.instanceInfos) {
            Iterator<Map.Entry<String, InstanceInfo>> it = this.instanceInfos.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<String, InstanceInfo> next = it.next();
                String key = next.getKey();
                InstanceInfo value = next.getValue();
                if (next.getValue().lastHeatBeat < currentTimeMillis) {
                    arrayList2.add(key);
                    it.remove();
                    this.balanced = false;
                    this.sleepQueue.offer(EMPTY_STRING);
                } else {
                    if (!value.revision.equals(value.granted)) {
                        arrayList.add(key);
                    }
                    for (int i : value.processors) {
                        hashMap.computeIfAbsent(Integer.valueOf(i), num -> {
                            return new ArrayList();
                        }).add(key);
                    }
                }
            }
        }
        if (!arrayList2.isEmpty()) {
            clearPartitionInstances(arrayList2);
        }
        if (!arrayList.isEmpty()) {
            arrayList.forEach(str -> {
                sendAuthorization(str);
            });
            return false;
        }
        if (this.balanced) {
            return true;
        }
        rebalance(hashMap);
        this.balanced = true;
        return true;
    }

    private void clearPartitionInstances(List<String> list) {
        synchronized (this.partitionInstances) {
            list.forEach(str -> {
                this.partitionInstances.forEach((num, map) -> {
                    map.forEach((num, str) -> {
                        if (str.equals(str)) {
                            map.put(num, EMPTY_STRING);
                        }
                    });
                });
            });
        }
    }

    private void rebalance(Map<Integer, List<String>> map) {
        HashMap hashMap = new HashMap();
        synchronized (this.partitionInstances) {
            this.partitionInstances.forEach((num, map2) -> {
                List list = (List) map.get(num);
                if (list == null) {
                    return;
                }
                Collections.sort(list);
                int size = list.size();
                map2.forEach((num, str) -> {
                    map2.put(num, list.get(num.intValue() % size));
                });
            });
            this.partitionInstances.forEach((num2, map3) -> {
                map3.forEach((num2, str) -> {
                    ((List) hashMap.computeIfAbsent(str, str -> {
                        return new ArrayList();
                    })).add(num2 + "-" + num2);
                });
            });
        }
        synchronized (this.instanceInfos) {
            hashMap.forEach((str, list) -> {
                InstanceInfo instanceInfo = this.instanceInfos.get(str);
                if (instanceInfo == null) {
                    return;
                }
                instanceInfo.granted = computeVevision(list);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String computeVevision(List<String> list) {
        Collections.sort(list);
        return toMd5((String) list.stream().collect(Collectors.joining()));
    }

    private void sendAuthorization(String str) {
        ArrayList arrayList = new ArrayList();
        synchronized (this.partitionInstances) {
            this.partitionInstances.forEach((num, map) -> {
                ObjectNode createObjectNode = this.objectMapper.createObjectNode();
                createObjectNode.put("@type", "authorization");
                createObjectNode.put("instance", str);
                createObjectNode.put("authorizer", this.instanceId);
                createObjectNode.put("proc", num);
                ArrayNode putArray = createObjectNode.putArray("parts");
                map.entrySet().stream().filter(entry -> {
                    return str.equals(entry.getValue());
                }).map(entry2 -> {
                    return (Integer) entry2.getKey();
                }).forEach(num -> {
                    putArray.add(num);
                });
                arrayList.add(createObjectNode.toString());
            });
        }
        arrayList.forEach(str2 -> {
            this.jdbcConn.callStatement(statement -> {
                statement.execute("NOTIFY sprtool_record_event,'" + str2 + "';");
                return null;
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean tryAdvisoryLock(Connection connection, int i) {
        return ((Boolean) this.jdbcConn.callStatement(connection, statement -> {
            ResultSet executeQuery = statement.executeQuery("SELECT case when pg_try_advisory_lock(" + this.recordEventProperties.getLockSpace() + "," + i + ") then '1' else '0' end");
            executeQuery.next();
            return Boolean.valueOf("1".equals(executeQuery.getString(1)));
        })).booleanValue();
    }

    static String toMd5(String str) {
        try {
            return new BigInteger(1, MessageDigest.getInstance("md5").digest(str.getBytes("utf-8"))).toString(16);
        } catch (Exception e) {
            throw new IllegalArgumentException(e);
        }
    }
}
