package com.amazon.sqs.javamessaging;

import com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch;
import com.amazon.sqs.javamessaging.acknowledge.AcknowledgeMode;
import com.amazon.sqs.javamessaging.acknowledge.Acknowledger;
import com.amazon.sqs.javamessaging.acknowledge.NegativeAcknowledger;
import com.amazon.sqs.javamessaging.acknowledge.SQSMessageIdentifier;
import com.amazon.sqs.javamessaging.message.SQSBytesMessage;
import com.amazon.sqs.javamessaging.message.SQSObjectMessage;
import com.amazon.sqs.javamessaging.message.SQSTextMessage;
import com.amazon.sqs.javamessaging.util.SQSMessagingClientThreadFactory;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:WEB-INF/lib/amazon-sqs-java-messaging-lib-1.0.8.jar:com/amazon/sqs/javamessaging/SQSSession.class */
public class SQSSession implements Session, QueueSession {
    private static final Log LOG;
    private static final int SESSION_EXECUTOR_GRACEFUL_SHUTDOWN_TIME = 10;
    static final String SESSION_EXECUTOR_NAME = "SessionCallBackScheduler";
    static final SQSMessagingClientThreadFactory SESSION_THREAD_FACTORY;
    static final String CONSUMER_PREFETCH_EXECUTER_NAME = "ConsumerPrefetch";
    static final SQSMessagingClientThreadFactory CONSUMER_PREFETCH_THREAD_FACTORY;
    public static final int UNORDERED_ACKNOWLEDGE = 100;
    private volatile boolean closed;
    volatile boolean running;
    private volatile boolean closing;
    private final AmazonSQSMessagingClientWrapper amazonSQSClient;
    private final SQSConnection parentSQSConnection;
    private final AcknowledgeMode acknowledgeMode;
    private final Acknowledger acknowledger;
    private final NegativeAcknowledger negativeAcknowledger;
    private final Set<SQSMessageProducer> messageProducers;
    private final Set<SQSMessageConsumer> messageConsumers;
    private final SQSSessionCallbackScheduler sqsSessionRunnable;
    private final ExecutorService executor;
    private final Object stateLock;
    private Thread activeCallbackSessionThread;
    private SQSMessageConsumer activeConsumerInCallback;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/amazon-sqs-java-messaging-lib-1.0.8.jar:com/amazon/sqs/javamessaging/SQSSession$CallbackEntry.class */
    public static class CallbackEntry {
        private final MessageListener messageListener;
        private final SQSMessageConsumerPrefetch.MessageManager messageManager;

        /* JADX INFO: Access modifiers changed from: package-private */
        public CallbackEntry(MessageListener messageListener, SQSMessageConsumerPrefetch.MessageManager messageManager) {
            this.messageListener = messageListener;
            this.messageManager = messageManager;
        }

        public MessageListener getMessageListener() {
            return this.messageListener;
        }

        public SQSMessageConsumerPrefetch.MessageManager getMessageManager() {
            return this.messageManager;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SQSSession(SQSConnection sQSConnection, AcknowledgeMode acknowledgeMode) throws JMSException {
        this(sQSConnection, acknowledgeMode, Collections.newSetFromMap(new ConcurrentHashMap()), Collections.newSetFromMap(new ConcurrentHashMap()));
    }

    SQSSession(SQSConnection sQSConnection, AcknowledgeMode acknowledgeMode, Set<SQSMessageConsumer> set, Set<SQSMessageProducer> set2) throws JMSException {
        this.closed = false;
        this.running = false;
        this.closing = false;
        this.stateLock = new Object();
        this.activeConsumerInCallback = null;
        this.parentSQSConnection = sQSConnection;
        this.amazonSQSClient = sQSConnection.getWrappedAmazonSQSClient();
        this.acknowledgeMode = acknowledgeMode;
        this.acknowledger = this.acknowledgeMode.createAcknowledger(this.amazonSQSClient, this);
        this.negativeAcknowledger = new NegativeAcknowledger(this.amazonSQSClient);
        this.sqsSessionRunnable = new SQSSessionCallbackScheduler(this, acknowledgeMode, this.acknowledger, this.negativeAcknowledger);
        this.executor = Executors.newSingleThreadExecutor(SESSION_THREAD_FACTORY);
        this.messageConsumers = set;
        this.messageProducers = set2;
        this.executor.execute(this.sqsSessionRunnable);
    }

    SQSConnection getParentConnection() {
        return this.parentSQSConnection;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isActiveCallbackSessionThread() {
        boolean z;
        synchronized (this.stateLock) {
            z = this.activeCallbackSessionThread == Thread.currentThread();
        }
        return z;
    }

    @Override // javax.jms.QueueSession
    public QueueReceiver createReceiver(Queue queue) throws JMSException {
        return (QueueReceiver) createConsumer(queue);
    }

    @Override // javax.jms.QueueSession
    public QueueReceiver createReceiver(Queue queue, String str) throws JMSException {
        return createReceiver(queue);
    }

    @Override // javax.jms.QueueSession
    public QueueSender createSender(Queue queue) throws JMSException {
        return (QueueSender) createProducer(queue);
    }

    @Override // javax.jms.Session
    public BytesMessage createBytesMessage() throws JMSException {
        checkClosed();
        return new SQSBytesMessage();
    }

    @Override // javax.jms.Session
    public Message createMessage() throws JMSException {
        throw new JMSException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
    }

    @Override // javax.jms.Session
    public ObjectMessage createObjectMessage() throws JMSException {
        checkClosed();
        return new SQSObjectMessage();
    }

    @Override // javax.jms.Session
    public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
        checkClosed();
        return new SQSObjectMessage(serializable);
    }

    @Override // javax.jms.Session
    public TextMessage createTextMessage() throws JMSException {
        checkClosed();
        return new SQSTextMessage();
    }

    @Override // javax.jms.Session
    public TextMessage createTextMessage(String str) throws JMSException {
        checkClosed();
        return new SQSTextMessage(str);
    }

    @Override // javax.jms.Session
    public int getAcknowledgeMode() throws JMSException {
        return this.acknowledgeMode.getOriginalAcknowledgeMode();
    }

    @Override // javax.jms.Session
    public void close() throws JMSException {
        if (this.closed) {
            return;
        }
        if (isActiveCallbackSessionThread()) {
            throw new IllegalStateException("MessageListener must not attempt to close its own Session to prevent potential deadlock issues");
        }
        doClose();
    }

    void doClose() throws JMSException {
        boolean z = false;
        synchronized (this.stateLock) {
            if (!this.closing) {
                z = true;
                this.closing = true;
            }
            this.stateLock.notifyAll();
        }
        if (this.closed) {
            return;
        }
        if (!z) {
            synchronized (this.stateLock) {
                while (!this.closed) {
                    try {
                        this.stateLock.wait();
                    } catch (InterruptedException e) {
                        LOG.error("Interrupted while waiting the session to close.", e);
                    }
                }
            }
            return;
        }
        try {
            this.parentSQSConnection.removeSession(this);
            Iterator<SQSMessageConsumer> it = this.messageConsumers.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            recover();
            try {
                if (this.executor != null) {
                    LOG.info("Shutting down SessionCallBackScheduler executor");
                    this.executor.shutdown();
                    waitForCallbackComplete();
                    this.sqsSessionRunnable.close();
                    Iterator<SQSMessageProducer> it2 = this.messageProducers.iterator();
                    while (it2.hasNext()) {
                        it2.next().close();
                    }
                    if (!this.executor.awaitTermination(10L, TimeUnit.SECONDS)) {
                        LOG.warn("Can't terminate executor service SessionCallBackScheduler after 10 seconds, some running threads will be shutdown immediately");
                        this.executor.shutdownNow();
                    }
                }
            } catch (InterruptedException e2) {
                LOG.error("Interrupted while closing the session.", e2);
            }
            synchronized (this.stateLock) {
                this.closed = true;
                this.running = false;
                this.stateLock.notifyAll();
            }
        } catch (Throwable th) {
            synchronized (this.stateLock) {
                this.closed = true;
                this.running = false;
                this.stateLock.notifyAll();
                throw th;
            }
        }
    }

    @Override // javax.jms.Session
    public void recover() throws JMSException {
        checkClosed();
        List<SQSMessageIdentifier> unAckMessages = this.acknowledger.getUnAckMessages();
        this.acknowledger.forgetUnAckMessages();
        Map<String, Set<String>> affectedGroupsPerQueueUrl = getAffectedGroupsPerQueueUrl(unAckMessages);
        for (SQSMessageConsumer sQSMessageConsumer : this.messageConsumers) {
            Set<String> set = affectedGroupsPerQueueUrl.get(((SQSQueueDestination) sQSMessageConsumer.getQueue()).getQueueUrl());
            if (set != null) {
                unAckMessages.addAll(sQSMessageConsumer.purgePrefetchedMessagesWithGroups(set));
            }
        }
        unAckMessages.addAll(this.sqsSessionRunnable.purgeScheduledCallbacksForQueuesAndGroups(affectedGroupsPerQueueUrl));
        if (unAckMessages.isEmpty()) {
            return;
        }
        this.negativeAcknowledger.bulkAction(unAckMessages, unAckMessages.size());
    }

    private Map<String, Set<String>> getAffectedGroupsPerQueueUrl(List<SQSMessageIdentifier> list) {
        HashMap hashMap = new HashMap();
        for (SQSMessageIdentifier sQSMessageIdentifier : list) {
            String groupId = sQSMessageIdentifier.getGroupId();
            if (groupId != null) {
                String queueUrl = sQSMessageIdentifier.getQueueUrl();
                if (!hashMap.containsKey(queueUrl)) {
                    hashMap.put(queueUrl, new HashSet());
                }
                ((Set) hashMap.get(queueUrl)).add(groupId);
            }
        }
        return hashMap;
    }

    @Override // javax.jms.Session, java.lang.Runnable
    public void run() {
    }

    @Override // javax.jms.Session
    public MessageProducer createProducer(Destination destination) throws JMSException {
        SQSMessageProducer sQSMessageProducer;
        checkClosed();
        if (destination != null && !(destination instanceof SQSQueueDestination)) {
            throw new JMSException("Actual type of Destination/Queue has to be SQSQueueDestination");
        }
        synchronized (this.stateLock) {
            checkClosing();
            sQSMessageProducer = new SQSMessageProducer(this.amazonSQSClient, this, (SQSQueueDestination) destination);
            this.messageProducers.add(sQSMessageProducer);
        }
        return sQSMessageProducer;
    }

    @Override // javax.jms.Session
    public MessageConsumer createConsumer(Destination destination) throws JMSException {
        SQSMessageConsumer createSQSMessageConsumer;
        checkClosed();
        if (!(destination instanceof SQSQueueDestination)) {
            throw new JMSException("Actual type of Destination/Queue has to be SQSQueueDestination");
        }
        synchronized (this.stateLock) {
            checkClosing();
            createSQSMessageConsumer = createSQSMessageConsumer((SQSQueueDestination) destination);
            this.messageConsumers.add(createSQSMessageConsumer);
            if (this.running) {
                createSQSMessageConsumer.startPrefetch();
            }
        }
        return createSQSMessageConsumer;
    }

    SQSMessageConsumer createSQSMessageConsumer(SQSQueueDestination sQSQueueDestination) {
        return new SQSMessageConsumer(this.parentSQSConnection, this, this.sqsSessionRunnable, sQSQueueDestination, this.acknowledger, this.negativeAcknowledger, CONSUMER_PREFETCH_THREAD_FACTORY);
    }

    @Override // javax.jms.Session
    public MessageConsumer createConsumer(Destination destination, String str) throws JMSException {
        if (str != null) {
            throw new JMSException("SQSSession does not support MessageSelector. This should be null.");
        }
        return createConsumer(destination);
    }

    @Override // javax.jms.Session
    public MessageConsumer createConsumer(Destination destination, String str, boolean z) throws JMSException {
        if (str != null) {
            throw new JMSException("SQSSession does not support MessageSelector. This should be null.");
        }
        return createConsumer(destination);
    }

    @Override // javax.jms.Session, javax.jms.QueueSession
    public Queue createQueue(String str) throws JMSException {
        checkClosed();
        return new SQSQueueDestination(str, this.amazonSQSClient.getQueueUrl(str).getQueueUrl());
    }

    public Queue createQueue(String str, String str2) throws JMSException {
        checkClosed();
        return new SQSQueueDestination(str, this.amazonSQSClient.getQueueUrl(str, str2).getQueueUrl());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeConsumer(SQSMessageConsumer sQSMessageConsumer) {
        this.messageConsumers.remove(sQSMessageConsumer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeProducer(SQSMessageProducer sQSMessageProducer) {
        this.messageProducers.remove(sQSMessageProducer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startingCallback(SQSMessageConsumer sQSMessageConsumer) throws InterruptedException, JMSException {
        if (this.closed) {
            return;
        }
        synchronized (this.stateLock) {
            if (this.activeConsumerInCallback != null) {
                throw new IllegalStateException("Callback already in progress");
            }
            if (!$assertionsDisabled && this.activeCallbackSessionThread != null) {
                throw new AssertionError();
            }
            while (!this.running && !this.closing) {
                try {
                    this.stateLock.wait();
                } catch (InterruptedException e) {
                    LOG.warn("Interrupted while waiting on session start. Continue to wait...", e);
                }
            }
            checkClosing();
            this.activeConsumerInCallback = sQSMessageConsumer;
            this.activeCallbackSessionThread = Thread.currentThread();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void finishedCallback() throws JMSException {
        synchronized (this.stateLock) {
            if (this.activeConsumerInCallback == null) {
                throw new IllegalStateException("Callback not in progress");
            }
            this.activeConsumerInCallback = null;
            this.activeCallbackSessionThread = null;
            this.stateLock.notifyAll();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void waitForConsumerCallbackToComplete(SQSMessageConsumer sQSMessageConsumer) throws InterruptedException {
        synchronized (this.stateLock) {
            while (this.activeConsumerInCallback == sQSMessageConsumer) {
                try {
                    this.stateLock.wait();
                } catch (InterruptedException e) {
                    LOG.warn("Interrupted while waiting the active consumer in callback to complete. Continue to wait...", e);
                }
            }
        }
    }

    void waitForCallbackComplete() {
        synchronized (this.stateLock) {
            while (this.activeConsumerInCallback != null) {
                try {
                    this.stateLock.wait();
                } catch (InterruptedException e) {
                    LOG.warn("Interrupted while waiting on session callback completion. Continue to wait...", e);
                }
            }
        }
    }

    @Override // javax.jms.Session
    public boolean getTransacted() throws JMSException {
        return false;
    }

    @Override // javax.jms.Session
    public void commit() throws JMSException {
        throw new JMSException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
    }

    @Override // javax.jms.Session
    public void rollback() throws JMSException {
        throw new JMSException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
    }

    @Override // javax.jms.Session
    public void unsubscribe(String str) throws JMSException {
        throw new JMSException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
    }

    @Override // javax.jms.Session
    public Topic createTopic(String str) throws JMSException {
        throw new JMSException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
    }

    @Override // javax.jms.Session
    public TopicSubscriber createDurableSubscriber(Topic topic, String str) throws JMSException {
        throw new JMSException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
    }

    @Override // javax.jms.Session
    public TopicSubscriber createDurableSubscriber(Topic topic, String str, String str2, boolean z) throws JMSException {
        throw new JMSException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
    }

    @Override // javax.jms.Session, javax.jms.QueueSession
    public QueueBrowser createBrowser(Queue queue) throws JMSException {
        throw new JMSException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
    }

    @Override // javax.jms.Session, javax.jms.QueueSession
    public QueueBrowser createBrowser(Queue queue, String str) throws JMSException {
        throw new JMSException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
    }

    @Override // javax.jms.Session, javax.jms.QueueSession
    public TemporaryQueue createTemporaryQueue() throws JMSException {
        throw new JMSException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
    }

    @Override // javax.jms.Session
    public TemporaryTopic createTemporaryTopic() throws JMSException {
        throw new JMSException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
    }

    @Override // javax.jms.Session
    public MessageListener getMessageListener() throws JMSException {
        throw new JMSException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
    }

    @Override // javax.jms.Session
    public void setMessageListener(MessageListener messageListener) throws JMSException {
        throw new JMSException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
    }

    @Override // javax.jms.Session
    public StreamMessage createStreamMessage() throws JMSException {
        throw new JMSException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
    }

    @Override // javax.jms.Session
    public MapMessage createMapMessage() throws JMSException {
        throw new JMSException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
    }

    public void checkClosed() throws IllegalStateException {
        if (this.closed) {
            throw new IllegalStateException("Session is closed");
        }
    }

    public void checkClosing() throws IllegalStateException {
        if (this.closing) {
            throw new IllegalStateException("Session is closed or closing");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() throws IllegalStateException {
        checkClosed();
        synchronized (this.stateLock) {
            checkClosing();
            this.running = true;
            Iterator<SQSMessageConsumer> it = this.messageConsumers.iterator();
            while (it.hasNext()) {
                it.next().startPrefetch();
            }
            this.stateLock.notifyAll();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() throws IllegalStateException {
        checkClosed();
        synchronized (this.stateLock) {
            checkClosing();
            this.running = false;
            Iterator<SQSMessageConsumer> it = this.messageConsumers.iterator();
            while (it.hasNext()) {
                it.next().stopPrefetch();
            }
            waitForCallbackComplete();
            this.stateLock.notifyAll();
        }
    }

    boolean isCallbackActive() {
        return this.activeConsumerInCallback != null;
    }

    void setActiveConsumerInCallback(SQSMessageConsumer sQSMessageConsumer) {
        this.activeConsumerInCallback = sQSMessageConsumer;
    }

    Object getStateLock() {
        return this.stateLock;
    }

    boolean isClosed() {
        return this.closed;
    }

    boolean isClosing() {
        return this.closing;
    }

    void setClosed(boolean z) {
        this.closed = z;
    }

    void setClosing(boolean z) {
        this.closing = z;
    }

    void setRunning(boolean z) {
        this.running = z;
    }

    boolean isRunning() {
        return this.running;
    }

    SQSSessionCallbackScheduler getSqsSessionRunnable() {
        return this.sqsSessionRunnable;
    }

    static {
        $assertionsDisabled = !SQSSession.class.desiredAssertionStatus();
        LOG = LogFactory.getLog(SQSSession.class);
        SESSION_THREAD_FACTORY = new SQSMessagingClientThreadFactory(SESSION_EXECUTOR_NAME, false, true);
        CONSUMER_PREFETCH_THREAD_FACTORY = new SQSMessagingClientThreadFactory(CONSUMER_PREFETCH_EXECUTER_NAME, true);
    }
}
