package com.github.sonus21.rqueue.listener;

import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.config.RqueueWebConfig;
import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
import com.github.sonus21.rqueue.exception.UnknownSwitchCase;
import com.github.sonus21.rqueue.models.db.MessageMetadata;
import com.github.sonus21.rqueue.models.db.QueueConfig;
import com.github.sonus21.rqueue.models.db.TaskStatus;
import com.github.sonus21.rqueue.models.event.RqueueExecutionEvent;
import com.github.sonus21.rqueue.utils.BaseLogger;
import com.github.sonus21.rqueue.utils.Constants;
import com.github.sonus21.rqueue.utils.MessageUtils;
import com.github.sonus21.rqueue.utils.RedisUtils;
import com.github.sonus21.rqueue.utils.backoff.TaskExecutionBackOff;
import com.github.sonus21.rqueue.web.dao.RqueueSystemConfigDao;
import com.github.sonus21.rqueue.web.service.RqueueMessageMetadataService;
import java.time.Duration;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import org.springframework.context.ApplicationEventPublisher;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/github/sonus21/rqueue/listener/PostProcessingHandler.class */
public class PostProcessingHandler extends BaseLogger {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(PostProcessingHandler.class);
    private final ApplicationEventPublisher applicationEventPublisher;
    private final RqueueWebConfig rqueueWebConfig;
    private final RqueueMessageMetadataService rqueueMessageMetadataService;
    private final RqueueMessageTemplate rqueueMessageTemplate;
    private final TaskExecutionBackOff taskExecutionBackoff;
    private final MessageProcessorHandler messageProcessorHandler;
    private final RqueueSystemConfigDao rqueueSystemConfigDao;
    private final RqueueConfig rqueueConfig;

    /* renamed from: com.github.sonus21.rqueue.listener.PostProcessingHandler$1, reason: invalid class name */
    /* loaded from: input_file:com/github/sonus21/rqueue/listener/PostProcessingHandler$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$github$sonus21$rqueue$models$db$TaskStatus = new int[TaskStatus.values().length];

        static {
            try {
                $SwitchMap$com$github$sonus21$rqueue$models$db$TaskStatus[TaskStatus.SUCCESSFUL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$github$sonus21$rqueue$models$db$TaskStatus[TaskStatus.DELETED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$github$sonus21$rqueue$models$db$TaskStatus[TaskStatus.IGNORED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$github$sonus21$rqueue$models$db$TaskStatus[TaskStatus.FAILED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PostProcessingHandler(RqueueConfig rqueueConfig, RqueueWebConfig rqueueWebConfig, ApplicationEventPublisher applicationEventPublisher, RqueueMessageMetadataService rqueueMessageMetadataService, RqueueMessageTemplate rqueueMessageTemplate, TaskExecutionBackOff taskExecutionBackOff, MessageProcessorHandler messageProcessorHandler, RqueueSystemConfigDao rqueueSystemConfigDao) {
        super(log, null);
        this.applicationEventPublisher = applicationEventPublisher;
        this.rqueueWebConfig = rqueueWebConfig;
        this.rqueueMessageMetadataService = rqueueMessageMetadataService;
        this.rqueueMessageTemplate = rqueueMessageTemplate;
        this.taskExecutionBackoff = taskExecutionBackOff;
        this.messageProcessorHandler = messageProcessorHandler;
        this.rqueueSystemConfigDao = rqueueSystemConfigDao;
        this.rqueueConfig = rqueueConfig;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handlePostProcessing(QueueDetail queueDetail, RqueueMessage rqueueMessage, Object obj, MessageMetadata messageMetadata, TaskStatus taskStatus, int i, long j) {
        if (taskStatus == TaskStatus.QUEUE_INACTIVE) {
            return;
        }
        try {
            switch (AnonymousClass1.$SwitchMap$com$github$sonus21$rqueue$models$db$TaskStatus[taskStatus.ordinal()]) {
                case 1:
                    handleSuccessFullExecution(queueDetail, rqueueMessage, obj, messageMetadata, i, j);
                    break;
                case 2:
                    handleManualDeletion(queueDetail, rqueueMessage, obj, messageMetadata, i, j);
                    break;
                case Constants.DEFAULT_RETRY_DEAD_LETTER_QUEUE /* 3 */:
                    handleIgnoredMessage(queueDetail, rqueueMessage, obj, messageMetadata, i, j);
                    break;
                case 4:
                    handleFailure(queueDetail, rqueueMessage, obj, messageMetadata, i, j);
                    break;
                default:
                    throw new UnknownSwitchCase(String.valueOf(taskStatus));
            }
        } catch (Exception e) {
            log(Level.ERROR, "Error occurred in post processing", e, new Object[0]);
        }
    }

    private void publishEvent(QueueDetail queueDetail, RqueueMessage rqueueMessage, MessageMetadata messageMetadata, TaskStatus taskStatus, long j) {
        if (this.rqueueWebConfig.isCollectListenerStats()) {
            this.applicationEventPublisher.publishEvent(new RqueueExecutionEvent(queueDetail, rqueueMessage, taskStatus, addOrDeleteMetadata(rqueueMessage, messageMetadata, j, false)));
        }
    }

    private MessageMetadata addOrDeleteMetadata(RqueueMessage rqueueMessage, MessageMetadata messageMetadata, long j, boolean z) {
        MessageMetadata messageMetadata2 = messageMetadata;
        String messageMetaId = MessageUtils.getMessageMetaId(rqueueMessage.getId());
        if (messageMetadata2 == null) {
            messageMetadata2 = this.rqueueMessageMetadataService.get(messageMetaId);
        }
        if (messageMetadata2 == null) {
            messageMetadata2 = new MessageMetadata(messageMetaId, rqueueMessage.getId());
            if (!z) {
                messageMetadata2.addExecutionTime(j);
                return messageMetadata2;
            }
        }
        messageMetadata2.addExecutionTime(j);
        if (z) {
            this.rqueueMessageMetadataService.save(messageMetadata2, Duration.ofSeconds(604800L));
        } else {
            this.rqueueMessageMetadataService.delete(messageMetaId);
        }
        return messageMetadata2;
    }

    private void deleteMessage(QueueDetail queueDetail, RqueueMessage rqueueMessage, Object obj, MessageMetadata messageMetadata, TaskStatus taskStatus, int i, long j) {
        this.rqueueMessageTemplate.removeElementFromZset(queueDetail.getProcessingQueueName(), rqueueMessage);
        rqueueMessage.setFailureCount(i);
        this.messageProcessorHandler.handleMessage(rqueueMessage, obj, taskStatus);
        publishEvent(queueDetail, rqueueMessage, messageMetadata, taskStatus, j);
    }

    private void moveMessageToQueue(QueueDetail queueDetail, String str, RqueueMessage rqueueMessage, RqueueMessage rqueueMessage2) {
        RedisUtils.executePipeLine(this.rqueueMessageTemplate.getTemplate(), (redisConnection, stringRedisSerializer, rqueueRedisSerializer) -> {
            byte[] serialize = rqueueRedisSerializer.serialize(rqueueMessage2);
            byte[] serialize2 = rqueueRedisSerializer.serialize(rqueueMessage);
            byte[] serialize3 = stringRedisSerializer.serialize(queueDetail.getProcessingQueueName());
            redisConnection.rPush(stringRedisSerializer.serialize(str), (byte[][]) new byte[]{serialize});
            redisConnection.zRem(serialize3, (byte[][]) new byte[]{serialize2});
        });
    }

    private void moveMessageForReprocessingOrDlq(QueueDetail queueDetail, RqueueMessage rqueueMessage, RqueueMessage rqueueMessage2, Object obj) {
        this.messageProcessorHandler.handleMessage(rqueueMessage2, obj, TaskStatus.MOVED_TO_DLQ);
        if (!queueDetail.isDeadLetterConsumerEnabled()) {
            moveMessageToQueue(queueDetail, queueDetail.getDeadLetterQueueName(), rqueueMessage, rqueueMessage2);
            return;
        }
        QueueConfig qConfig = this.rqueueSystemConfigDao.getQConfig(this.rqueueConfig.getQueueConfigKey(queueDetail.getDeadLetterQueueName()), true);
        if (qConfig != null) {
            moveMessageToQueue(queueDetail, qConfig.getQueueName(), rqueueMessage, rqueueMessage2);
        } else {
            log(Level.ERROR, "Queue Config not found for queue {}", null, queueDetail.getDeadLetterQueue());
            moveMessageToQueue(queueDetail, queueDetail.getDeadLetterQueueName(), rqueueMessage, rqueueMessage2);
        }
    }

    private void moveMessageToDlq(QueueDetail queueDetail, RqueueMessage rqueueMessage, Object obj, MessageMetadata messageMetadata, int i, long j) throws CloneNotSupportedException {
        if (isWarningEnabled()) {
            log(Level.WARN, "Message {} Moved to dead letter queue: {}", null, obj, queueDetail.getDeadLetterQueueName());
        }
        RqueueMessage m6clone = rqueueMessage.m6clone();
        m6clone.setFailureCount(i);
        m6clone.updateReEnqueuedAt();
        moveMessageForReprocessingOrDlq(queueDetail, rqueueMessage, m6clone, obj);
        publishEvent(queueDetail, rqueueMessage, messageMetadata, TaskStatus.MOVED_TO_DLQ, j);
    }

    private void parkMessageForRetry(QueueDetail queueDetail, RqueueMessage rqueueMessage, Object obj, MessageMetadata messageMetadata, int i, long j, long j2) throws CloneNotSupportedException {
        if (isDebugEnabled()) {
            log(Level.DEBUG, "Message {} will be retried in {}Ms", null, obj, Long.valueOf(j2));
        }
        RqueueMessage m6clone = rqueueMessage.m6clone();
        m6clone.setFailureCount(i);
        m6clone.updateReEnqueuedAt();
        this.rqueueMessageTemplate.moveMessage(queueDetail.getProcessingQueueName(), queueDetail.getDelayedQueueName(), rqueueMessage, m6clone, j2);
        addOrDeleteMetadata(rqueueMessage, messageMetadata, j, true);
    }

    private void discardMessage(QueueDetail queueDetail, RqueueMessage rqueueMessage, Object obj, MessageMetadata messageMetadata, int i, long j) {
        if (isDebugEnabled()) {
            log(Level.DEBUG, "Message {} discarded due to retry limit exhaust", null, obj);
        }
        deleteMessage(queueDetail, rqueueMessage, obj, messageMetadata, TaskStatus.DISCARDED, i, j);
    }

    private void handleManualDeletion(QueueDetail queueDetail, RqueueMessage rqueueMessage, Object obj, MessageMetadata messageMetadata, int i, long j) {
        if (isDebugEnabled()) {
            log(Level.DEBUG, "Message Deleted {} successfully", null, rqueueMessage);
        }
        deleteMessage(queueDetail, rqueueMessage, obj, messageMetadata, TaskStatus.DELETED, i, j);
    }

    private void handleSuccessFullExecution(QueueDetail queueDetail, RqueueMessage rqueueMessage, Object obj, MessageMetadata messageMetadata, int i, long j) {
        if (isDebugEnabled()) {
            log(Level.DEBUG, "Message consumed {} successfully", null, rqueueMessage);
        }
        deleteMessage(queueDetail, rqueueMessage, obj, messageMetadata, TaskStatus.SUCCESSFUL, i, j);
    }

    private void handleRetryExceededMessage(QueueDetail queueDetail, RqueueMessage rqueueMessage, Object obj, MessageMetadata messageMetadata, int i, long j) throws CloneNotSupportedException {
        if (queueDetail.isDlqSet()) {
            moveMessageToDlq(queueDetail, rqueueMessage, obj, messageMetadata, i, j);
        } else {
            discardMessage(queueDetail, rqueueMessage, obj, messageMetadata, i, j);
        }
    }

    private int getMaxRetryCount(RqueueMessage rqueueMessage, QueueDetail queueDetail) {
        return rqueueMessage.getRetryCount() == null ? queueDetail.getNumRetry() : rqueueMessage.getRetryCount().intValue();
    }

    private void handleFailure(QueueDetail queueDetail, RqueueMessage rqueueMessage, Object obj, MessageMetadata messageMetadata, int i, long j) throws CloneNotSupportedException {
        if (i >= getMaxRetryCount(rqueueMessage, queueDetail)) {
            handleRetryExceededMessage(queueDetail, rqueueMessage, obj, messageMetadata, i, j);
            return;
        }
        long nextBackOff = this.taskExecutionBackoff.nextBackOff(obj, rqueueMessage, i);
        if (nextBackOff == -1) {
            handleRetryExceededMessage(queueDetail, rqueueMessage, obj, messageMetadata, i, j);
        } else {
            parkMessageForRetry(queueDetail, rqueueMessage, obj, messageMetadata, i, j, nextBackOff);
        }
    }

    private void handleIgnoredMessage(QueueDetail queueDetail, RqueueMessage rqueueMessage, Object obj, MessageMetadata messageMetadata, int i, long j) {
        if (isDebugEnabled()) {
            log(Level.DEBUG, "Message {} ignored, Queue: {}", null, rqueueMessage, queueDetail.getName());
        }
        deleteMessage(queueDetail, rqueueMessage, obj, messageMetadata, TaskStatus.IGNORED, i, j);
    }
}
