package org.apache.camel.component.azure.storage.queue;

import com.azure.storage.queue.QueueServiceClient;
import com.azure.storage.queue.models.QueueMessageItem;
import com.azure.storage.queue.models.QueueStorageException;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.stream.Collectors;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.azure.storage.queue.client.QueueClientWrapper;
import org.apache.camel.component.azure.storage.queue.operations.QueueOperations;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/azure/storage/queue/QueueConsumer.class */
public class QueueConsumer extends ScheduledBatchPollingConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(QueueConsumer.class);
    private final QueueClientWrapper clientWrapper;
    private final QueueOperations queueOperations;

    public QueueConsumer(QueueEndpoint queueEndpoint, Processor processor) {
        super(queueEndpoint, processor);
        this.clientWrapper = new QueueClientWrapper(getServiceClient().getQueueClient(getConfiguration().getQueueName()));
        this.queueOperations = new QueueOperations(getConfiguration(), this.clientWrapper);
    }

    protected int poll() throws Exception {
        this.shutdownRunningTask = null;
        this.pendingExchanges = 0;
        try {
            List<QueueMessageItem> receiveMessages = this.clientWrapper.receiveMessages(getConfiguration().getMaxMessages(), getConfiguration().getVisibilityTimeout(), getConfiguration().getTimeout());
            LOG.trace("Receiving messages [{}]...", receiveMessages);
            return processBatch(CastUtils.cast(createExchanges(receiveMessages)));
        } catch (QueueStorageException e) {
            if (404 == e.getStatusCode()) {
                return 0;
            }
            throw e;
        }
    }

    private Queue<Exchange> createExchanges(List<QueueMessageItem> list) {
        return (Queue) list.stream().map(this::createExchange).collect(Collectors.toCollection(LinkedList::new));
    }

    private QueueServiceClient getServiceClient() {
        return m2getEndpoint().getQueueServiceClient();
    }

    private QueueConfiguration getConfiguration() {
        return m2getEndpoint().getConfiguration();
    }

    /* renamed from: getEndpoint, reason: merged with bridge method [inline-methods] */
    public QueueEndpoint m2getEndpoint() {
        return super.getEndpoint();
    }

    public int processBatch(Queue<Object> queue) {
        int size = queue.size();
        int i = 0;
        while (i < size && isBatchAllowed()) {
            Exchange exchange = (Exchange) ObjectHelper.cast(Exchange.class, queue.poll());
            exchange.setProperty(ExchangePropertyKey.BATCH_INDEX, Integer.valueOf(i));
            exchange.setProperty(ExchangePropertyKey.BATCH_SIZE, Integer.valueOf(size));
            exchange.setProperty(ExchangePropertyKey.BATCH_COMPLETE, Boolean.valueOf(i == size - 1));
            this.pendingExchanges = (size - i) - 1;
            final String str = (String) exchange.getIn().getHeader(QueueConstants.MESSAGE_ID, String.class);
            final String str2 = (String) exchange.getIn().getHeader(QueueConstants.POP_RECEIPT, String.class);
            final Duration duration = (Duration) exchange.getIn().getHeader(QueueConstants.TIMEOUT, Duration.class);
            exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { // from class: org.apache.camel.component.azure.storage.queue.QueueConsumer.1
                public void onComplete(Exchange exchange2) {
                    exchange2.getIn().setHeader(QueueConstants.MESSAGE_ID, str);
                    exchange2.getIn().setHeader(QueueConstants.POP_RECEIPT, str2);
                    exchange2.getIn().setHeader(QueueConstants.TIMEOUT, duration);
                    QueueConsumer.this.processCommit(exchange2);
                }

                public void onFailure(Exchange exchange2) {
                    QueueConsumer.this.processRollback(exchange2);
                }
            });
            LOG.trace("Processing exchange [{}]...", exchange);
            getAsyncProcessor().process(exchange, defaultConsumerCallback(exchange, true));
            i++;
        }
        return size;
    }

    private Exchange createExchange(QueueMessageItem queueMessageItem) {
        Exchange createExchange = createExchange(true);
        Message in = createExchange.getIn();
        in.setBody(queueMessageItem.getMessageText());
        in.setHeaders(QueueExchangeHeaders.createQueueExchangeHeadersFromQueueMessageItem(queueMessageItem).toMap());
        return createExchange;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processCommit(Exchange exchange) {
        try {
            LOG.trace("Deleting message with pop receipt handle {}...", QueueExchangeHeaders.getPopReceiptFromHeaders(exchange));
            this.queueOperations.deleteMessage(exchange);
        } catch (QueueStorageException e) {
            getExceptionHandler().handleException("Error occurred during deleting message. This exception is ignored.", exchange, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processRollback(Exchange exchange) {
        Exception exception = exchange.getException();
        if (exception != null) {
            getExceptionHandler().handleException("Error during processing exchange. Will attempt to process the message on next poll.", exchange, exception);
        }
    }
}
