package org.dataconservancy.pass.indexer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/dataconservancy/pass/indexer/JmsClient.class */
public class JmsClient implements AutoCloseable {
    private Connection conn;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) JmsClient.class);
    private ConnectionFactory connectionFactory;
    private MessageProducer producer;
    private Session session;
    private volatile boolean connected = false;
    private final List<Consumer<Session>> sessionListeners = Collections.synchronizedList(new ArrayList());

    /* loaded from: input_file:org/dataconservancy/pass/indexer/JmsClient$JmsRuntimeException.class */
    private class JmsRuntimeException extends RuntimeException {
        public JmsRuntimeException(Throwable th) {
            super(th.getMessage(), th);
        }
    }

    public void setConnectionFactory(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    public JmsClient(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
        init();
    }

    public JmsClient() {
    }

    public void init() {
        if (this.producer == null) {
            addSessionListener(session -> {
                try {
                    this.producer = this.session.createProducer(null);
                } catch (JMSException e) {
                    throw new JmsRuntimeException(e);
                }
            });
        }
        connect();
    }

    private void addSessionListener(Consumer<Session> consumer) {
        this.sessionListeners.add(consumer);
        if (this.connected) {
            consumer.accept(this.session);
        }
    }

    public void listen(String str, MessageListener messageListener) {
        addSessionListener(session -> {
            try {
                Queue createQueue = session.createQueue(str);
                session.createConsumer(createQueue).setMessageListener(messageListener);
                LOG.info("Listening on " + createQueue);
            } catch (JMSException e) {
                throw new JmsRuntimeException(e);
            }
        });
    }

    public Supplier<Session> getSessionSupplier() {
        return () -> {
            while (!this.connected) {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException("Interrupted", e);
                }
            }
            return this.session;
        };
    }

    public synchronized void write(String str, Message message) {
        LOG.debug("Sending message to queue {}", str);
        while (!this.connected) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Interrupted", e);
            }
        }
        try {
            this.producer.send(this.session.createQueue(str), message);
        } catch (JMSException e2) {
            throw new RuntimeException("Error writing to queue " + str, e2);
        }
    }

    private void connect() {
        while (!this.connected) {
            try {
                this.conn = this.connectionFactory.createConnection();
                this.conn.start();
                this.session = this.conn.createSession(false, 1);
                this.sessionListeners.forEach(consumer -> {
                    consumer.accept(this.session);
                });
                this.conn.setExceptionListener(jMSException -> {
                    if (this.connected) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Connection disrupted", (Throwable) jMSException);
                        } else {
                            LOG.info("Connection disrupted", jMSException.getMessage());
                        }
                        close();
                        connect();
                    }
                });
                this.connected = true;
            } catch (JMSException e) {
                try {
                    if (this.conn != null) {
                        this.conn.close();
                    }
                } catch (JMSException e2) {
                    LOG.warn("Error closing connection, j");
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("JMS error, re-trying", (Throwable) e);
                } else {
                    LOG.info("JMS error: {}, re-connecting", e.getMessage());
                }
                try {
                    Thread.sleep(2000L);
                } catch (InterruptedException e3) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        try {
            LOG.info("Closing ActiveMQ Sessions");
            this.connected = false;
            this.producer.close();
            this.session.close();
            this.conn.close();
        } catch (JMSException e) {
            LOG.debug("Exception while closing connection", (Throwable) e);
        }
    }
}
