package org.apache.iotdb.db.pipe.connector.protocol.websocket;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.collections4.BidiMap;
import org.apache.commons.collections4.bidimap.DualTreeBidiMap;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.tsfile.exception.NotImplementedException;
import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.class */
public class WebSocketConnectorServer extends WebSocketServer {
    private final AtomicLong eventIdGenerator;
    private final ConcurrentHashMap<String, PriorityBlockingQueue<EventWaitingForTransfer>> eventsWaitingForTransfer;
    private final ConcurrentHashMap<String, ConcurrentHashMap<Long, EventWaitingForAck>> eventsWaitingForAck;
    private final BidiMap<String, WebSocket> router;
    private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketConnectorServer.class);
    private static final AtomicReference<WebSocketConnectorServer> instance = new AtomicReference<>();
    private static final AtomicBoolean isStarted = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer$EventWaitingForAck.class */
    public static class EventWaitingForAck {
        private final WebSocketConnector connector;
        private final Event event;

        public EventWaitingForAck(WebSocketConnector webSocketConnector, Event event) {
            this.connector = webSocketConnector;
            this.event = event;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer$EventWaitingForTransfer.class */
    public static class EventWaitingForTransfer {
        private final Long eventId;
        private final WebSocketConnector connector;
        private final Event event;

        public EventWaitingForTransfer(Long l, WebSocketConnector webSocketConnector, Event event) {
            this.eventId = l;
            this.connector = webSocketConnector;
            this.event = event;
        }
    }

    /* loaded from: input_file:org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer$TransferThread.class */
    private class TransferThread extends Thread {
        private final WebSocketConnectorServer server;

        public TransferThread(WebSocketConnectorServer webSocketConnectorServer) {
            this.server = webSocketConnectorServer;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                if (!sleepIfNecessary()) {
                    Iterator it = WebSocketConnectorServer.this.eventsWaitingForTransfer.keySet().iterator();
                    while (it.hasNext()) {
                        String str = (String) it.next();
                        PriorityBlockingQueue priorityBlockingQueue = (PriorityBlockingQueue) WebSocketConnectorServer.this.eventsWaitingForTransfer.getOrDefault(str, null);
                        if (priorityBlockingQueue != null && !priorityBlockingQueue.isEmpty() && WebSocketConnectorServer.this.router.containsKey(str)) {
                            try {
                                EventWaitingForTransfer eventWaitingForTransfer = (EventWaitingForTransfer) priorityBlockingQueue.take();
                                synchronized (priorityBlockingQueue) {
                                    priorityBlockingQueue.notifyAll();
                                }
                                transfer(str, eventWaitingForTransfer);
                            } catch (InterruptedException e) {
                                WebSocketConnectorServer.LOGGER.warn("The transfer thread is interrupted.", e);
                                Thread.currentThread().interrupt();
                            }
                        }
                    }
                }
            }
        }

        private void transfer(String str, EventWaitingForTransfer eventWaitingForTransfer) {
            Long l = eventWaitingForTransfer.eventId;
            PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent = eventWaitingForTransfer.event;
            WebSocketConnector webSocketConnector = eventWaitingForTransfer.connector;
            try {
                if (!(pipeRawTabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
                    throw new NotImplementedException("IoTDBCDCConnector only support PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent.");
                }
                ByteBuffer serialize = pipeRawTabletInsertionEvent.convertToTablet().serialize();
                if (serialize == null) {
                    webSocketConnector.commit(pipeRawTabletInsertionEvent);
                    return;
                }
                ByteBuffer allocate = ByteBuffer.allocate(8 + serialize.limit());
                allocate.putLong(l.longValue());
                allocate.put(serialize);
                allocate.flip();
                this.server.broadcast(allocate, Collections.singletonList((WebSocket) WebSocketConnectorServer.this.router.get(str)));
                ConcurrentHashMap concurrentHashMap = (ConcurrentHashMap) WebSocketConnectorServer.this.eventsWaitingForAck.get(str);
                if (concurrentHashMap == null) {
                    WebSocketConnectorServer.LOGGER.warn("The pipe {} was dropped so the event ack {} will be ignored.", str, l);
                } else {
                    concurrentHashMap.put(l, new EventWaitingForAck(webSocketConnector, pipeRawTabletInsertionEvent));
                }
            } catch (Exception e) {
                synchronized (this.server) {
                    PriorityBlockingQueue priorityBlockingQueue = (PriorityBlockingQueue) WebSocketConnectorServer.this.eventsWaitingForTransfer.get(str);
                    if (priorityBlockingQueue != null) {
                        WebSocketConnectorServer.LOGGER.warn("The event {} can't be transferred to client, it will be retried later.", l, e);
                        priorityBlockingQueue.put(new EventWaitingForTransfer(l, webSocketConnector, pipeRawTabletInsertionEvent));
                    } else {
                        WebSocketConnectorServer.LOGGER.warn("The pipe {} was dropped so the event {} will be dropped.", str, l);
                        if (pipeRawTabletInsertionEvent instanceof EnrichedEvent) {
                            pipeRawTabletInsertionEvent.decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false);
                        }
                    }
                }
            }
        }

        private boolean sleepIfNecessary() {
            if (!WebSocketConnectorServer.this.eventsWaitingForTransfer.isEmpty()) {
                return false;
            }
            try {
                Thread.sleep(10000L);
                return true;
            } catch (InterruptedException e) {
                WebSocketConnectorServer.LOGGER.warn("The transfer thread is interrupted.", e);
                Thread.currentThread().interrupt();
                return true;
            }
        }
    }

    private WebSocketConnectorServer(int i) {
        super(new InetSocketAddress(i));
        this.eventIdGenerator = new AtomicLong(0L);
        this.eventsWaitingForTransfer = new ConcurrentHashMap<>();
        this.eventsWaitingForAck = new ConcurrentHashMap<>();
        this.router = new DualTreeBidiMap<String, WebSocket>(null, Comparator.comparing((v0) -> {
            return v0.hashCode();
        })) { // from class: org.apache.iotdb.db.pipe.connector.protocol.websocket.WebSocketConnectorServer.1
        };
        new TransferThread(this).start();
    }

    public static synchronized WebSocketConnectorServer getOrCreateInstance(int i) {
        if (null == instance.get()) {
            instance.set(new WebSocketConnectorServer(i));
        }
        return instance.get();
    }

    public synchronized void register(WebSocketConnector webSocketConnector) {
        this.eventsWaitingForTransfer.putIfAbsent(webSocketConnector.getPipeName(), new PriorityBlockingQueue<>(11, Comparator.comparing(eventWaitingForTransfer -> {
            return eventWaitingForTransfer.eventId;
        })));
        this.eventsWaitingForAck.putIfAbsent(webSocketConnector.getPipeName(), new ConcurrentHashMap<>());
    }

    public synchronized void unregister(WebSocketConnector webSocketConnector) {
        String pipeName = webSocketConnector.getPipeName();
        if (pipeName == null) {
            return;
        }
        if (this.eventsWaitingForTransfer.containsKey(pipeName)) {
            PriorityBlockingQueue<EventWaitingForTransfer> remove = this.eventsWaitingForTransfer.remove(pipeName);
            while (!remove.isEmpty()) {
                remove.forEach(eventWaitingForTransfer -> {
                    if (eventWaitingForTransfer.event instanceof EnrichedEvent) {
                        eventWaitingForTransfer.event.decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false);
                    }
                });
                remove.clear();
                synchronized (remove) {
                    remove.notifyAll();
                }
            }
        }
        if (this.eventsWaitingForAck.containsKey(pipeName)) {
            this.eventsWaitingForAck.remove(pipeName).forEach((l, eventWaitingForAck) -> {
                if (eventWaitingForAck.event instanceof EnrichedEvent) {
                    eventWaitingForAck.event.decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false);
                }
            });
        }
    }

    public void start() {
        super.start();
        isStarted.set(true);
    }

    public void onStart() {
        LOGGER.info("The websocket server {}:{} has been started!", getAddress().getHostName(), Integer.valueOf(getPort()));
    }

    public boolean isStarted() {
        return isStarted.get();
    }

    public void onOpen(WebSocket webSocket, ClientHandshake clientHandshake) {
        LOGGER.info("The websocket connection from client {}:{} has been opened!", webSocket.getRemoteSocketAddress().getHostName(), Integer.valueOf(webSocket.getRemoteSocketAddress().getPort()));
    }

    public void onClose(WebSocket webSocket, int i, String str, boolean z) {
        if (webSocket.getRemoteSocketAddress() != null) {
            LOGGER.info("The websocket connection from client {}:{} has been closed! The code is {}. The reason is {}. Is it closed by remote? {}", new Object[]{webSocket.getRemoteSocketAddress().getHostName(), Integer.valueOf(webSocket.getRemoteSocketAddress().getPort()), Integer.valueOf(i), str, Boolean.valueOf(z)});
        } else {
            LOGGER.warn("The websocket connection from client has been closed!The code is {}. The reason is {}. Is it closed by remote? {}", new Object[]{Integer.valueOf(i), str, Boolean.valueOf(z)});
        }
        this.router.remove(this.router.getKey(webSocket));
    }

    public void onMessage(WebSocket webSocket, String str) {
        if (str.startsWith("BIND")) {
            LOGGER.info("Received a bind message from {}:{}", webSocket.getRemoteSocketAddress().getHostName(), Integer.valueOf(webSocket.getRemoteSocketAddress().getPort()));
            handleBind(webSocket, str.replace("BIND:", ""));
        } else if (str.startsWith("ACK")) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Received a ack message from {}:{}", webSocket.getRemoteSocketAddress().getHostName(), Integer.valueOf(webSocket.getRemoteSocketAddress().getPort()));
            }
            handleAck(webSocket, Long.parseLong(str.replace("ACK:", "")));
        } else if (!str.startsWith("ERROR")) {
            LOGGER.warn("Received an unknown message {} from {}:{}", new Object[]{str, webSocket.getRemoteSocketAddress().getHostName(), Integer.valueOf(webSocket.getRemoteSocketAddress().getPort())});
        } else {
            LOGGER.warn("Received an error message {} from {}:{}", new Object[]{str, webSocket.getRemoteSocketAddress().getHostName(), Integer.valueOf(webSocket.getRemoteSocketAddress().getPort())});
            handleError(webSocket, Long.parseLong(str.replace("ERROR:", "")));
        }
    }

    private void handleBind(WebSocket webSocket, String str) {
        if (this.router.containsKey(str)) {
            broadcast("ERROR", Collections.singletonList(webSocket));
            webSocket.close(4000, "Too many connections.");
        } else {
            broadcast("READY", Collections.singletonList(webSocket));
            this.router.put(str, webSocket);
        }
    }

    private void handleAck(WebSocket webSocket, long j) {
        String str = (String) this.router.getKey(webSocket);
        if (str == null) {
            LOGGER.warn("The websocket connection from {}:{} has been closed, but the ack message of commitId: {} is received.", new Object[]{webSocket.getRemoteSocketAddress().getHostName(), Integer.valueOf(webSocket.getRemoteSocketAddress().getPort()), Long.valueOf(j)});
            return;
        }
        ConcurrentHashMap<Long, EventWaitingForAck> concurrentHashMap = this.eventsWaitingForAck.get(str);
        if (concurrentHashMap == null) {
            LOGGER.warn("The pipe {} was dropped so the event ack {} will be ignored.", str, Long.valueOf(j));
            return;
        }
        EventWaitingForAck remove = concurrentHashMap.remove(Long.valueOf(j));
        if (remove == null) {
            LOGGER.warn("The event ack {} is not found.", Long.valueOf(j));
        } else {
            remove.connector.commit(remove.event instanceof EnrichedEvent ? (EnrichedEvent) remove.event : null);
        }
    }

    private synchronized void handleError(WebSocket webSocket, long j) {
        String str = (String) this.router.getKey(webSocket);
        if (str == null) {
            LOGGER.warn("The websocket connection from {}:{} has been closed, but the error message of commitId: {} is received.", new Object[]{webSocket.getRemoteSocketAddress().getHostName(), Integer.valueOf(webSocket.getRemoteSocketAddress().getPort()), Long.valueOf(j)});
            return;
        }
        ConcurrentHashMap<Long, EventWaitingForAck> concurrentHashMap = this.eventsWaitingForAck.get(str);
        PriorityBlockingQueue<EventWaitingForTransfer> priorityBlockingQueue = this.eventsWaitingForTransfer.get(str);
        if (concurrentHashMap == null || priorityBlockingQueue == null) {
            LOGGER.warn("The pipe {} was dropped so the event in error {} will be ignored.", str, Long.valueOf(j));
            return;
        }
        EventWaitingForAck remove = concurrentHashMap.remove(Long.valueOf(j));
        if (remove == null) {
            LOGGER.warn("The event in error {} is not found.", Long.valueOf(j));
        } else {
            LOGGER.warn("The tablet of commitId: {} can't be parsed by client, it will be retried later.", Long.valueOf(j));
            priorityBlockingQueue.put(new EventWaitingForTransfer(Long.valueOf(j), remove.connector, remove.event));
        }
    }

    public void onError(WebSocket webSocket, Exception exc) {
        if (webSocket.getRemoteSocketAddress() != null) {
            LOGGER.warn("Got an error \"{}\" from {}:{}.", new Object[]{exc.getMessage(), webSocket.getLocalSocketAddress().getHostName(), Integer.valueOf(webSocket.getLocalSocketAddress().getPort()), exc});
        } else {
            LOGGER.warn("Got an error \"{}\" from an unknown client.", exc.getMessage(), exc);
            this.router.remove(this.router.getKey(webSocket));
        }
    }

    public void addEvent(Event event, WebSocketConnector webSocketConnector) {
        PriorityBlockingQueue<EventWaitingForTransfer> priorityBlockingQueue = this.eventsWaitingForTransfer.get(webSocketConnector.getPipeName());
        if (priorityBlockingQueue == null) {
            LOGGER.warn("The pipe {} was dropped so the event {} will be dropped.", webSocketConnector, event);
            if (event instanceof EnrichedEvent) {
                ((EnrichedEvent) event).decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false);
                return;
            }
            return;
        }
        if (priorityBlockingQueue.size() < 5) {
            priorityBlockingQueue.put(new EventWaitingForTransfer(Long.valueOf(this.eventIdGenerator.incrementAndGet()), webSocketConnector, event));
            return;
        }
        synchronized (priorityBlockingQueue) {
            while (priorityBlockingQueue.size() >= 5) {
                try {
                    priorityBlockingQueue.wait();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new PipeException(e.getMessage());
                }
            }
            priorityBlockingQueue.put(new EventWaitingForTransfer(Long.valueOf(this.eventIdGenerator.incrementAndGet()), webSocketConnector, event));
        }
    }
}
