package com.github.sonus21.rqueue.listener;

import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.config.RqueueWebConfig;
import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao;
import com.github.sonus21.rqueue.exception.UnknownSwitchCase;
import com.github.sonus21.rqueue.models.db.QueueConfig;
import com.github.sonus21.rqueue.models.enums.ExecutionStatus;
import com.github.sonus21.rqueue.models.enums.MessageStatus;
import com.github.sonus21.rqueue.models.event.RqueueExecutionEvent;
import com.github.sonus21.rqueue.utils.Constants;
import com.github.sonus21.rqueue.utils.PrefixLogger;
import com.github.sonus21.rqueue.utils.RedisUtils;
import com.github.sonus21.rqueue.utils.backoff.TaskExecutionBackOff;
import java.io.Serializable;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import org.springframework.context.ApplicationEventPublisher;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/github/sonus21/rqueue/listener/PostProcessingHandler.class */
public class PostProcessingHandler extends PrefixLogger {

    @Generated
    private static final Logger log;
    private final ApplicationEventPublisher applicationEventPublisher;
    private final RqueueWebConfig rqueueWebConfig;
    private final RqueueMessageTemplate rqueueMessageTemplate;
    private final TaskExecutionBackOff taskExecutionBackoff;
    private final MessageProcessorHandler messageProcessorHandler;
    private final RqueueSystemConfigDao rqueueSystemConfigDao;
    private final RqueueConfig rqueueConfig;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* renamed from: com.github.sonus21.rqueue.listener.PostProcessingHandler$1, reason: invalid class name */
    /* loaded from: input_file:com/github/sonus21/rqueue/listener/PostProcessingHandler$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$github$sonus21$rqueue$models$enums$ExecutionStatus = new int[ExecutionStatus.values().length];

        static {
            try {
                $SwitchMap$com$github$sonus21$rqueue$models$enums$ExecutionStatus[ExecutionStatus.QUEUE_INACTIVE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$github$sonus21$rqueue$models$enums$ExecutionStatus[ExecutionStatus.DELETED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$github$sonus21$rqueue$models$enums$ExecutionStatus[ExecutionStatus.IGNORED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$github$sonus21$rqueue$models$enums$ExecutionStatus[ExecutionStatus.OLD_MESSAGE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$github$sonus21$rqueue$models$enums$ExecutionStatus[ExecutionStatus.SUCCESSFUL.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$com$github$sonus21$rqueue$models$enums$ExecutionStatus[ExecutionStatus.FAILED.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PostProcessingHandler(RqueueConfig rqueueConfig, RqueueWebConfig rqueueWebConfig, ApplicationEventPublisher applicationEventPublisher, RqueueMessageTemplate rqueueMessageTemplate, TaskExecutionBackOff taskExecutionBackOff, MessageProcessorHandler messageProcessorHandler, RqueueSystemConfigDao rqueueSystemConfigDao) {
        super(log, null);
        this.applicationEventPublisher = applicationEventPublisher;
        this.rqueueWebConfig = rqueueWebConfig;
        this.rqueueMessageTemplate = rqueueMessageTemplate;
        this.taskExecutionBackoff = taskExecutionBackOff;
        this.messageProcessorHandler = messageProcessorHandler;
        this.rqueueSystemConfigDao = rqueueSystemConfigDao;
        this.rqueueConfig = rqueueConfig;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handle(JobImpl jobImpl, ExecutionStatus executionStatus, int i) {
        try {
            switch (AnonymousClass1.$SwitchMap$com$github$sonus21$rqueue$models$enums$ExecutionStatus[executionStatus.ordinal()]) {
                case 1:
                    return;
                case 2:
                    handleManualDeletion(jobImpl, i);
                    break;
                case Constants.DEFAULT_RETRY_DEAD_LETTER_QUEUE /* 3 */:
                    handleIgnoredMessage(jobImpl, i);
                    break;
                case 4:
                    handleOldMessage(jobImpl, jobImpl.getRqueueMessage());
                    break;
                case Constants.AGGREGATION_LOCK_DURATION_IN_SECONDS /* 5 */:
                    handleSuccessFullExecution(jobImpl, i);
                    break;
                case 6:
                    handleFailure(jobImpl, i);
                    break;
                default:
                    throw new UnknownSwitchCase(String.valueOf(executionStatus));
            }
        } catch (Exception e) {
            log(Level.ERROR, "Error occurred in post processing, RqueueMessage: {}, Status: {}", e, jobImpl.getRqueueMessage(), executionStatus);
        }
    }

    private void handleOldMessage(JobImpl jobImpl, RqueueMessage rqueueMessage) {
        log(Level.TRACE, "Message {} ignored due to old message, Queue: {}", null, rqueueMessage, jobImpl.getQueueDetail().getName());
        this.rqueueMessageTemplate.removeElementFromZset(jobImpl.getQueueDetail().getProcessingQueueName(), rqueueMessage);
    }

    private void publishEvent(JobImpl jobImpl, RqueueMessage rqueueMessage, MessageStatus messageStatus) {
        updateMetadata(jobImpl, rqueueMessage, messageStatus);
        if (this.rqueueWebConfig.isCollectListenerStats()) {
            this.applicationEventPublisher.publishEvent(new RqueueExecutionEvent(jobImpl));
        }
    }

    private void updateMetadata(JobImpl jobImpl, RqueueMessage rqueueMessage, MessageStatus messageStatus) {
        jobImpl.updateExecutionTime(rqueueMessage, messageStatus);
    }

    private void deleteMessage(JobImpl jobImpl, MessageStatus messageStatus, int i) {
        RqueueMessage rqueueMessage = jobImpl.getRqueueMessage();
        this.rqueueMessageTemplate.removeElementFromZset(jobImpl.getQueueDetail().getProcessingQueueName(), rqueueMessage);
        rqueueMessage.setFailureCount(i);
        this.messageProcessorHandler.handleMessage(rqueueMessage, jobImpl.getMessage(), messageStatus);
        publishEvent(jobImpl, jobImpl.getRqueueMessage(), messageStatus);
    }

    private void moveMessageToQueue(QueueDetail queueDetail, String str, RqueueMessage rqueueMessage, RqueueMessage rqueueMessage2, long j) {
        RedisUtils.executePipeLine(this.rqueueMessageTemplate.getTemplate(), (redisConnection, redisSerializer, redisSerializer2) -> {
            byte[] serialize = redisSerializer2.serialize(rqueueMessage2);
            byte[] serialize2 = redisSerializer2.serialize(rqueueMessage);
            byte[] serialize3 = redisSerializer.serialize(queueDetail.getProcessingQueueName());
            byte[] serialize4 = redisSerializer.serialize(str);
            if (!$assertionsDisabled && serialize4 == null) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && serialize == null) {
                throw new AssertionError();
            }
            if (j > 0) {
                redisConnection.zAdd(serialize4, j, serialize);
            } else {
                redisConnection.lPush(serialize4, (byte[][]) new byte[]{serialize});
            }
            if (!$assertionsDisabled && serialize3 == null) {
                throw new AssertionError();
            }
            redisConnection.zRem(serialize3, (byte[][]) new byte[]{serialize2});
        });
    }

    private void moveMessageToDlq(JobImpl jobImpl, int i) {
        log(Level.DEBUG, "Message {} Moved to dead letter queue: {}", null, jobImpl.getRqueueMessage(), jobImpl.getQueueDetail().getDeadLetterQueueName());
        RqueueMessage rqueueMessage = jobImpl.getRqueueMessage();
        RqueueMessage build = rqueueMessage.toBuilder().failureCount(i).build();
        build.updateReEnqueuedAt();
        QueueDetail queueDetail = jobImpl.getQueueDetail();
        Object message = jobImpl.getMessage();
        this.messageProcessorHandler.handleMessage(build, message, MessageStatus.MOVED_TO_DLQ);
        if (queueDetail.isDeadLetterConsumerEnabled()) {
            QueueConfig qConfig = this.rqueueSystemConfigDao.getQConfig(this.rqueueConfig.getQueueConfigKey(queueDetail.getDeadLetterQueueName()), true);
            if (qConfig == null) {
                log(Level.ERROR, "Queue Config not found for queue {}", null, queueDetail.getDeadLetterQueue());
                moveMessageToQueue(queueDetail, queueDetail.getDeadLetterQueueName(), rqueueMessage, build, -1L);
            } else {
                build.setQueueName(qConfig.getName());
                build.setFailureCount(0);
                build.setSourceQueueName(rqueueMessage.getQueueName());
                build.setSourceQueueFailureCount(i);
                long nextBackOff = this.taskExecutionBackoff.nextBackOff(message, build, i);
                moveMessageToQueue(queueDetail, qConfig.getDelayedQueueName(), rqueueMessage, build, nextBackOff == -1 ? 5000L : nextBackOff);
            }
        } else {
            moveMessageToQueue(queueDetail, queueDetail.getDeadLetterQueueName(), rqueueMessage, build, -1L);
        }
        publishEvent(jobImpl, build, MessageStatus.MOVED_TO_DLQ);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void parkMessageForRetry(JobImpl jobImpl, Serializable serializable, int i, long j) {
        if (serializable == null) {
            log(Level.TRACE, "Message {} will be retried in {}Ms", null, jobImpl.getRqueueMessage(), Long.valueOf(j));
        } else {
            log(Level.TRACE, "Message {} will be retried in {}Ms, Reason: {}", null, jobImpl.getRqueueMessage(), Long.valueOf(j), serializable);
        }
        RqueueMessage rqueueMessage = jobImpl.getRqueueMessage();
        RqueueMessage updateReEnqueuedAt = rqueueMessage.toBuilder().failureCount(i).build().updateReEnqueuedAt();
        if (j <= 0) {
            this.rqueueMessageTemplate.moveMessage(jobImpl.getQueueDetail().getProcessingQueueName(), jobImpl.getQueueDetail().getQueueName(), rqueueMessage, updateReEnqueuedAt);
        } else {
            this.rqueueMessageTemplate.moveMessage(jobImpl.getQueueDetail().getProcessingQueueName(), jobImpl.getQueueDetail().getDelayedQueueName(), rqueueMessage, updateReEnqueuedAt, j);
        }
        updateMetadata(jobImpl, updateReEnqueuedAt, MessageStatus.FAILED);
    }

    private void discardMessage(JobImpl jobImpl, int i) {
        log(Level.DEBUG, "Message {} discarded due to retry limit exhaust", null, jobImpl.getRqueueMessage());
        deleteMessage(jobImpl, MessageStatus.DISCARDED, i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleManualDeletion(JobImpl jobImpl, int i) {
        log(Level.DEBUG, "Message Deleted {} successfully", null, jobImpl.getRqueueMessage());
        deleteMessage(jobImpl, MessageStatus.DELETED, i);
    }

    private void handleSuccessFullExecution(JobImpl jobImpl, int i) {
        log(Level.DEBUG, "Message consumed {} successfully", null, jobImpl.getRqueueMessage());
        deleteMessage(jobImpl, MessageStatus.SUCCESSFUL, i);
    }

    private void handleRetryExceededMessage(JobImpl jobImpl, int i) {
        if (jobImpl.getQueueDetail().isDlqSet()) {
            moveMessageToDlq(jobImpl, i);
        } else {
            discardMessage(jobImpl, i);
        }
    }

    private int getMaxRetryCount(RqueueMessage rqueueMessage, QueueDetail queueDetail) {
        return rqueueMessage.getRetryCount() == null ? queueDetail.getNumRetry() : rqueueMessage.getRetryCount().intValue();
    }

    private void handleFailure(JobImpl jobImpl, int i) {
        if (i >= getMaxRetryCount(jobImpl.getRqueueMessage(), jobImpl.getQueueDetail())) {
            handleRetryExceededMessage(jobImpl, i);
            return;
        }
        long nextBackOff = this.taskExecutionBackoff.nextBackOff(jobImpl.getMessage(), jobImpl.getRqueueMessage(), i);
        if (nextBackOff == -1) {
            handleRetryExceededMessage(jobImpl, i);
        } else {
            parkMessageForRetry(jobImpl, null, i, nextBackOff);
        }
    }

    private void handleIgnoredMessage(JobImpl jobImpl, int i) {
        log(Level.DEBUG, "Message {} ignored, Queue: {}", null, jobImpl.getRqueueMessage(), jobImpl.getQueueDetail().getName());
        deleteMessage(jobImpl, MessageStatus.IGNORED, i);
    }

    static {
        $assertionsDisabled = !PostProcessingHandler.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(PostProcessingHandler.class);
    }
}
