package org.neo4j.cluster;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.neo4j.cluster.com.message.Message;
import org.neo4j.cluster.com.message.MessageProcessor;
import org.neo4j.cluster.com.message.MessageSource;
import org.neo4j.cluster.com.message.MessageType;
import org.neo4j.cluster.statemachine.StateMachine;
import org.neo4j.cluster.statemachine.StateTransitionListener;
import org.neo4j.cluster.timeout.TimeoutStrategy;
import org.neo4j.cluster.timeout.Timeouts;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/neo4j/cluster/ConnectedStateMachines.class */
public class ConnectedStateMachines implements MessageProcessor, MessageSource {
    private final MessageProcessor sender;
    private DelayedDirectExecutor executor;
    private Timeouts timeouts;
    private final Logger logger = LoggerFactory.getLogger(ConnectedStateMachines.class);
    private final Map<Class<? extends MessageType>, StateMachine> stateMachines = new LinkedHashMap();
    private final List<MessageProcessor> outgoingProcessors = new ArrayList();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
    private final OutgoingMessageProcessor outgoing = new OutgoingMessageProcessor();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/neo4j/cluster/ConnectedStateMachines$OutgoingMessageProcessor.class */
    public class OutgoingMessageProcessor implements MessageProcessor {
        private Queue<Message<? extends MessageType>> outgoingMessages;

        private OutgoingMessageProcessor() {
            this.outgoingMessages = new LinkedList();
        }

        @Override // org.neo4j.cluster.com.message.MessageProcessor
        public synchronized void process(Message<? extends MessageType> message) {
            this.outgoingMessages.offer(message);
        }

        public synchronized Message<? extends MessageType> nextOutgoingMessage() {
            return this.outgoingMessages.poll();
        }
    }

    public ConnectedStateMachines(MessageSource messageSource, MessageProcessor messageProcessor, TimeoutStrategy timeoutStrategy, DelayedDirectExecutor delayedDirectExecutor) {
        this.sender = messageProcessor;
        this.executor = delayedDirectExecutor;
        this.timeouts = new Timeouts(this, timeoutStrategy);
        messageSource.addMessageProcessor(this);
    }

    public Timeouts getTimeouts() {
        return this.timeouts;
    }

    public synchronized void addStateMachine(StateMachine stateMachine) {
        this.stateMachines.put(stateMachine.getMessageType(), stateMachine);
    }

    public synchronized void removeStateMachine(StateMachine stateMachine) {
        this.stateMachines.remove(stateMachine.getMessageType());
    }

    public Iterable<StateMachine> getStateMachines() {
        return this.stateMachines.values();
    }

    @Override // org.neo4j.cluster.com.message.MessageSource
    public void addMessageProcessor(MessageProcessor messageProcessor) {
        this.outgoingProcessors.add(messageProcessor);
    }

    public OutgoingMessageProcessor getOutgoing() {
        return this.outgoing;
    }

    @Override // org.neo4j.cluster.com.message.MessageProcessor
    public synchronized void process(Message<? extends MessageType> message) {
        this.lock.writeLock().lock();
        try {
            synchronized (this.timeouts) {
                StateMachine stateMachine = this.stateMachines.get(message.getMessageType().getClass());
                if (stateMachine == null) {
                    return;
                }
                stateMachine.handle(message, this.outgoing);
                while (true) {
                    try {
                        Message<? extends MessageType> nextOutgoingMessage = this.outgoing.nextOutgoingMessage();
                        if (nextOutgoingMessage == null) {
                            break;
                        }
                        message.copyHeadersTo(nextOutgoingMessage, Message.CONVERSATION_ID, Message.CREATED_BY);
                        Iterator<MessageProcessor> it = this.outgoingProcessors.iterator();
                        while (it.hasNext()) {
                            try {
                                it.next().process(nextOutgoingMessage);
                            } catch (Throwable th) {
                                this.logger.warn("Outgoing message processor threw exception", th);
                            }
                        }
                        if (nextOutgoingMessage.hasHeader(Message.TO)) {
                            try {
                                this.sender.process(nextOutgoingMessage);
                            } catch (Throwable th2) {
                                this.logger.warn("Message sending threw exception", th2);
                            }
                        } else {
                            StateMachine stateMachine2 = this.stateMachines.get(nextOutgoingMessage.getMessageType().getClass());
                            if (stateMachine2 != null) {
                                stateMachine2.handle(nextOutgoingMessage, this.outgoing);
                            }
                        }
                    } catch (Exception e) {
                        this.logger.warn("Error processing message " + message, e);
                    }
                }
                this.executor.drain();
                this.lock.writeLock().unlock();
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    public void addStateTransitionListener(StateTransitionListener stateTransitionListener) {
        Iterator<StateMachine> it = this.stateMachines.values().iterator();
        while (it.hasNext()) {
            it.next().addStateTransitionListener(stateTransitionListener);
        }
    }

    public void removeStateTransitionListener(StateTransitionListener stateTransitionListener) {
        Iterator<StateMachine> it = this.stateMachines.values().iterator();
        while (it.hasNext()) {
            it.next().removeStateTransitionListener(stateTransitionListener);
        }
    }

    public String toString() {
        ArrayList arrayList = new ArrayList();
        for (StateMachine stateMachine : this.stateMachines.values()) {
            arrayList.add(stateMachine.getState().getClass().getSuperclass().getSimpleName() + ":" + stateMachine.getState().toString());
        }
        return arrayList.toString();
    }

    public StateMachine getStateMachine(Class<? extends MessageType> cls) {
        return this.stateMachines.get(cls);
    }
}
