package com.github.ddth.queue.impl;

import com.github.ddth.queue.IQueueMessage;
import com.github.ddth.queue.impl.AbstractQueue;
import com.github.ddth.queue.utils.QueueException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Collection;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.StringUtils;

/* loaded from: input_file:com/github/ddth/queue/impl/RabbitMqQueue.class */
public abstract class RabbitMqQueue<ID, DATA> extends AbstractQueue<ID, DATA> {
    public static final String DEFAULT_URI = "amqp://localhost:5672";
    public static final String DEFAULT_QUEUE_NAME = "ddth-queue";
    private ConnectionFactory connectionFactory;
    private boolean myOwnConnectionFactory = true;
    private String uri = DEFAULT_URI;
    private String queueName = "ddth-queue";
    private Connection connection;
    private Channel producerChannel;
    private Channel consumerChannel;

    public String getUri() {
        return this.uri;
    }

    public RabbitMqQueue<ID, DATA> setUri(String str) {
        this.uri = str;
        return this;
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue
    public String getQueueName() {
        return this.queueName;
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue
    public RabbitMqQueue<ID, DATA> setQueueName(String str) {
        this.queueName = str;
        return this;
    }

    protected ConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    protected RabbitMqQueue<ID, DATA> setConnectionFactory(ConnectionFactory connectionFactory, boolean z) {
        if (!this.myOwnConnectionFactory || this.connectionFactory != null) {
        }
        this.connectionFactory = connectionFactory;
        this.myOwnConnectionFactory = z;
        return this;
    }

    public RabbitMqQueue<ID, DATA> setConnectionFactory(ConnectionFactory connectionFactory) {
        return setConnectionFactory(connectionFactory, false);
    }

    protected Connection getConnection() throws IOException, TimeoutException {
        if (this.connection == null) {
            synchronized (this) {
                if (this.connection == null) {
                    this.connection = this.connectionFactory.newConnection();
                }
            }
        }
        return this.connection;
    }

    protected Channel createChannel() throws IOException, TimeoutException {
        return getConnection().createChannel();
    }

    protected Channel getProducerChannel() throws IOException, TimeoutException {
        if (this.producerChannel == null) {
            synchronized (this) {
                if (this.producerChannel == null) {
                    this.producerChannel = createChannel();
                }
            }
        }
        return this.producerChannel;
    }

    protected Channel getConsumerChannel() throws IOException, TimeoutException {
        if (this.consumerChannel == null) {
            synchronized (this) {
                if (this.consumerChannel == null) {
                    this.consumerChannel = createChannel();
                }
            }
        }
        return this.consumerChannel;
    }

    protected ConnectionFactory buildConnectionFactory() throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException {
        String uri = getUri();
        if (StringUtils.isBlank(uri)) {
            throw new IllegalStateException("RabbitMQ Broker URI is not defined.");
        }
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(uri);
        return connectionFactory;
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue
    public RabbitMqQueue<ID, DATA> init() throws Exception {
        if (this.connectionFactory == null) {
            setConnectionFactory(buildConnectionFactory(), true);
        }
        super.init();
        if (this.connectionFactory == null) {
            throw new IllegalStateException("RabbitMQ connection factory is null.");
        }
        return this;
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue
    public void destroy() {
        try {
            super.destroy();
        } finally {
            closeQuietly(this.connection);
            closeQuietly(this.producerChannel);
            closeQuietly(this.consumerChannel);
            if (this.connectionFactory != null && this.myOwnConnectionFactory) {
                this.connectionFactory = null;
            }
        }
    }

    protected void closeQuietly(Connection connection) {
        if (connection != null) {
            try {
                connection.close();
            } catch (Exception e) {
            }
        }
    }

    protected void closeQuietly(Channel channel) {
        if (channel != null) {
            try {
                channel.close();
            } catch (Exception e) {
            }
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public void finish(IQueueMessage<ID, DATA> iQueueMessage) {
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue
    protected boolean doPutToQueue(IQueueMessage<ID, DATA> iQueueMessage, AbstractQueue.PutToQueueCase putToQueueCase) {
        try {
            getProducerChannel().basicPublish("", this.queueName, (AMQP.BasicProperties) null, serialize(iQueueMessage));
            return true;
        } catch (Exception e) {
            if (e instanceof QueueException) {
                throw ((QueueException) e);
            }
            throw new QueueException(e);
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public IQueueMessage<ID, DATA> take() throws QueueException.EphemeralIsFull {
        try {
            GetResponse basicGet = getConsumerChannel().basicGet(this.queueName, true);
            if (basicGet != null) {
                return deserialize(basicGet.getBody());
            }
            return null;
        } catch (Exception e) {
            if (e instanceof QueueException) {
                throw ((QueueException) e);
            }
            throw new QueueException(e);
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public Collection<IQueueMessage<ID, DATA>> getOrphanMessages(long j) {
        throw new QueueException.OperationNotSupported("This queue does not support retrieving orphan messages.");
    }

    @Override // com.github.ddth.queue.IQueue
    public int queueSize() {
        try {
            Channel createChannel = createChannel();
            try {
                int messageCount = createChannel.queueDeclarePassive(getQueueName()).getMessageCount();
                if (createChannel != null) {
                    createChannel.close();
                }
                return messageCount;
            } finally {
            }
        } catch (Exception e) {
            if (e instanceof QueueException) {
                throw ((QueueException) e);
            }
            throw new QueueException(e);
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public int ephemeralSize() {
        return -1;
    }
}
