package com.github.ddth.queue.impl;

import com.github.ddth.queue.IQueueMessage;
import com.github.ddth.queue.utils.QueueException;
import java.util.Collection;
import java.util.Date;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.StringUtils;

/* loaded from: input_file:com/github/ddth/queue/impl/ActiveMqQueue.class */
public abstract class ActiveMqQueue<ID, DATA> extends AbstractQueue<ID, DATA> {
    public static final String DEFAULT_URI = "tcp://localhost:61616";
    public static final String DEFAULT_QUEUE_NAME = "ddth-queue";
    private ActiveMQConnectionFactory connectionFactory;
    private String username;
    private String password;
    private Connection connection;
    private Session producerSession;
    private MessageProducer messageProducer;
    private Session consumerSession;
    private MessageConsumer messageConsumer;
    private boolean myOwnConnectionFactory = true;
    private String uri = DEFAULT_URI;
    private String queueName = "ddth-queue";

    public String getUri() {
        return this.uri;
    }

    public ActiveMqQueue<ID, DATA> setUri(String str) {
        this.uri = str;
        return this;
    }

    public String getUsername() {
        return this.username;
    }

    public ActiveMqQueue<ID, DATA> setUsername(String str) {
        this.username = str;
        return this;
    }

    public String getPassword() {
        return this.password;
    }

    public ActiveMqQueue<ID, DATA> setPassword(String str) {
        this.password = str;
        return this;
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue
    public String getQueueName() {
        return this.queueName;
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue
    public ActiveMqQueue<ID, DATA> setQueueName(String str) {
        this.queueName = str;
        return this;
    }

    protected ActiveMQConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    public ActiveMqQueue<ID, DATA> setConnectionFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
        this.connectionFactory = activeMQConnectionFactory;
        this.myOwnConnectionFactory = false;
        return this;
    }

    protected Connection getConnection() throws JMSException {
        if (this.connection == null) {
            synchronized (this) {
                if (this.connection == null) {
                    this.connection = StringUtils.isEmpty(this.username) ? this.connectionFactory.createConnection() : this.connectionFactory.createConnection(getUsername(), getPassword());
                    this.connection.start();
                }
            }
        }
        return this.connection;
    }

    protected Session createSession(int i) throws JMSException {
        return getConnection().createSession(false, i);
    }

    protected Session getProducerSession() throws JMSException {
        if (this.producerSession == null) {
            synchronized (this) {
                if (this.producerSession == null) {
                    this.producerSession = createSession(1);
                }
            }
        }
        return this.producerSession;
    }

    protected MessageProducer getMessageProducer() throws JMSException {
        if (this.messageProducer == null) {
            synchronized (this) {
                if (this.messageProducer == null) {
                    Session producerSession = getProducerSession();
                    this.messageProducer = producerSession.createProducer(producerSession.createQueue(this.queueName));
                }
            }
        }
        return this.messageProducer;
    }

    protected Session getConsumerSession() throws JMSException {
        if (this.consumerSession == null) {
            synchronized (this) {
                if (this.consumerSession == null) {
                    this.consumerSession = createSession(1);
                }
            }
        }
        return this.consumerSession;
    }

    protected MessageConsumer getMessageConsumer() throws JMSException {
        if (this.messageConsumer == null) {
            synchronized (this) {
                if (this.messageConsumer == null) {
                    Session consumerSession = getConsumerSession();
                    this.messageConsumer = consumerSession.createConsumer(consumerSession.createQueue(this.queueName));
                }
            }
        }
        return this.messageConsumer;
    }

    protected ActiveMQConnectionFactory buildConnectionFactory() {
        String uri = getUri();
        if (StringUtils.isBlank(uri)) {
            throw new IllegalStateException("ActiveMQ Broker URI is not defined.");
        }
        return new ActiveMQConnectionFactory(uri);
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue
    public ActiveMqQueue<ID, DATA> init() throws Exception {
        if (this.connectionFactory == null) {
            this.connectionFactory = buildConnectionFactory();
            this.myOwnConnectionFactory = this.connectionFactory != null;
        }
        super.init();
        if (this.connectionFactory == null) {
            throw new IllegalStateException("ActiveMQ Connection factory is null.");
        }
        return this;
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue
    public void destroy() {
        try {
            super.destroy();
        } finally {
            closeQuietly(this.connection);
            closeQuietly(this.messageProducer);
            closeQuietly(this.producerSession);
            closeQuietly(this.messageConsumer);
            closeQuietly(this.consumerSession);
            if (this.connectionFactory != null && this.myOwnConnectionFactory) {
                this.connectionFactory = null;
            }
        }
    }

    protected abstract byte[] serialize(IQueueMessage<ID, DATA> iQueueMessage);

    protected abstract IQueueMessage<ID, DATA> deserialize(byte[] bArr);

    protected void closeQuietly(Connection connection) {
        if (connection != null) {
            try {
                connection.close();
            } catch (Exception e) {
            }
        }
    }

    protected void closeQuietly(Session session) {
        if (session != null) {
            try {
                session.close();
            } catch (Exception e) {
            }
        }
    }

    protected void closeQuietly(MessageConsumer messageConsumer) {
        if (messageConsumer != null) {
            try {
                messageConsumer.close();
            } catch (Exception e) {
            }
        }
    }

    protected void closeQuietly(MessageProducer messageProducer) {
        if (messageProducer != null) {
            try {
                messageProducer.close();
            } catch (Exception e) {
            }
        }
    }

    protected boolean putToQueue(IQueueMessage<ID, DATA> iQueueMessage) {
        try {
            BytesMessage createBytesMessage = getProducerSession().createBytesMessage();
            createBytesMessage.writeBytes(serialize(iQueueMessage));
            getMessageProducer().send(createBytesMessage);
            return true;
        } catch (Exception e) {
            if (e instanceof QueueException) {
                throw ((QueueException) e);
            }
            throw new QueueException(e);
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean queue(IQueueMessage<ID, DATA> iQueueMessage) {
        IQueueMessage<ID, DATA> mo1clone = iQueueMessage.mo1clone();
        Date date = new Date();
        mo1clone.qNumRequeues2(0).qOriginalTimestamp2(date).qTimestamp2(date);
        return putToQueue(mo1clone);
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean requeue(IQueueMessage<ID, DATA> iQueueMessage) {
        IQueueMessage<ID, DATA> mo1clone = iQueueMessage.mo1clone();
        mo1clone.qIncNumRequeues2().qTimestamp2(new Date());
        return putToQueue(mo1clone);
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean requeueSilent(IQueueMessage<ID, DATA> iQueueMessage) {
        return putToQueue(iQueueMessage.mo1clone());
    }

    @Override // com.github.ddth.queue.IQueue
    public void finish(IQueueMessage<ID, DATA> iQueueMessage) {
    }

    @Override // com.github.ddth.queue.IQueue
    public IQueueMessage<ID, DATA> take() throws QueueException.EphemeralIsFull {
        try {
            MessageConsumer messageConsumer = getMessageConsumer();
            synchronized (messageConsumer) {
                BytesMessage receive = messageConsumer.receive(1000L);
                if (!(receive instanceof BytesMessage)) {
                    return null;
                }
                BytesMessage bytesMessage = receive;
                byte[] bArr = new byte[(int) bytesMessage.getBodyLength()];
                bytesMessage.readBytes(bArr);
                return deserialize(bArr);
            }
        } catch (Exception e) {
            if (e instanceof QueueException) {
                throw ((QueueException) e);
            }
            throw new QueueException(e);
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public Collection<IQueueMessage<ID, DATA>> getOrphanMessages(long j) {
        throw new QueueException.OperationNotSupported("This queue does not support retrieving orphan messages.");
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean moveFromEphemeralToQueueStorage(IQueueMessage<ID, DATA> iQueueMessage) {
        throw new QueueException.OperationNotSupported("This queue does not support ephemeral storage.");
    }

    @Override // com.github.ddth.queue.IQueue
    public int queueSize() {
        return -1;
    }

    @Override // com.github.ddth.queue.IQueue
    public int ephemeralSize() {
        return -1;
    }
}
