package com.github.sonus21.rqueue.web.service;

import com.github.sonus21.rqueue.common.RqueueLockManager;
import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.config.RqueueWebConfig;
import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.models.aggregator.QueueEvents;
import com.github.sonus21.rqueue.models.aggregator.TasksStat;
import com.github.sonus21.rqueue.models.db.MessageMetadata;
import com.github.sonus21.rqueue.models.db.QueueStatistics;
import com.github.sonus21.rqueue.models.db.TaskStatus;
import com.github.sonus21.rqueue.models.event.RqueueExecutionEvent;
import com.github.sonus21.rqueue.utils.DateTimeUtils;
import com.github.sonus21.rqueue.utils.ThreadUtils;
import com.github.sonus21.rqueue.utils.TimeoutUtils;
import com.github.sonus21.rqueue.web.dao.RqueueQStatsDao;
import java.time.Duration;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.SmartLifecycle;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

@Service
/* loaded from: input_file:com/github/sonus21/rqueue/web/service/RqueueTaskAggregatorService.class */
public class RqueueTaskAggregatorService implements ApplicationListener<RqueueExecutionEvent>, DisposableBean, SmartLifecycle {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(RqueueTaskAggregatorService.class);
    private final RqueueConfig rqueueConfig;
    private final RqueueWebConfig rqueueWebConfig;
    private final RqueueLockManager rqueueLockManager;
    private final RqueueQStatsDao rqueueQStatsDao;
    private final Object lifecycleMgr = new Object();
    private final Object aggregatorLock = new Object();
    private volatile boolean running = false;
    private ThreadPoolTaskScheduler taskExecutor;
    private Map<String, QueueEvents> queueNameToEvents;
    private BlockingQueue<QueueEvents> queue;
    private List<Future<?>> eventAggregatorTasks;

    /* loaded from: input_file:com/github/sonus21/rqueue/web/service/RqueueTaskAggregatorService$EventAggregator.class */
    private class EventAggregator implements Runnable {
        private EventAggregator() {
        }

        private void aggregate(RqueueExecutionEvent rqueueExecutionEvent, TasksStat tasksStat) {
            if (rqueueExecutionEvent.getStatus() == TaskStatus.DISCARDED) {
                tasksStat.discarded++;
            } else if (rqueueExecutionEvent.getStatus() == TaskStatus.SUCCESSFUL) {
                tasksStat.success++;
            } else if (rqueueExecutionEvent.getStatus() == TaskStatus.MOVED_TO_DLQ) {
                tasksStat.movedToDlq++;
            }
            RqueueMessage rqueueMessage = rqueueExecutionEvent.getRqueueMessage();
            MessageMetadata messageMetadata = rqueueExecutionEvent.getMessageMetadata();
            if (rqueueMessage.getFailureCount() != 0) {
                tasksStat.retried++;
            }
            tasksStat.minExecution = Math.min(tasksStat.minExecution, messageMetadata.getTotalExecutionTime());
            tasksStat.maxExecution = Math.max(tasksStat.maxExecution, messageMetadata.getTotalExecutionTime());
            tasksStat.jobCount++;
            tasksStat.totalExecutionTime += messageMetadata.getTotalExecutionTime();
        }

        private void aggregate(QueueEvents queueEvents) {
            List<RqueueExecutionEvent> list = queueEvents.rqueueExecutionEvents;
            RqueueExecutionEvent rqueueExecutionEvent = list.get(0);
            HashMap hashMap = new HashMap();
            for (RqueueExecutionEvent rqueueExecutionEvent2 : list) {
                LocalDate localDateFromMilli = DateTimeUtils.localDateFromMilli(rqueueExecutionEvent.getTimestamp());
                TasksStat tasksStat = (TasksStat) hashMap.getOrDefault(localDateFromMilli, new TasksStat());
                aggregate(rqueueExecutionEvent2, tasksStat);
                hashMap.put(localDateFromMilli, tasksStat);
            }
            String queueStatisticsKey = RqueueTaskAggregatorService.this.rqueueConfig.getQueueStatisticsKey(((QueueDetail) rqueueExecutionEvent.getSource()).getName());
            QueueStatistics findById = RqueueTaskAggregatorService.this.rqueueQStatsDao.findById(queueStatisticsKey);
            if (findById == null) {
                findById = new QueueStatistics(queueStatisticsKey);
            }
            LocalDate localDate = DateTimeUtils.today();
            findById.updateTime();
            for (Map.Entry entry : hashMap.entrySet()) {
                findById.update((TasksStat) entry.getValue(), ((LocalDate) entry.getKey()).toString());
            }
            findById.pruneStats(localDate, RqueueTaskAggregatorService.this.rqueueWebConfig.getHistoryDay());
            RqueueTaskAggregatorService.this.rqueueQStatsDao.save(findById);
        }

        private void processEvents(QueueEvents queueEvents) {
            List<RqueueExecutionEvent> list = queueEvents.rqueueExecutionEvents;
            if (CollectionUtils.isEmpty(list)) {
                return;
            }
            String lockKey = RqueueTaskAggregatorService.this.rqueueConfig.getLockKey(RqueueTaskAggregatorService.this.rqueueConfig.getQueueStatisticsKey(((QueueDetail) list.get(0).getSource()).getName()));
            boolean z = false;
            try {
                if (RqueueTaskAggregatorService.this.rqueueLockManager.acquireLock(lockKey, Duration.ofSeconds(5L))) {
                    z = true;
                    aggregate(queueEvents);
                } else {
                    RqueueTaskAggregatorService.log.warn("Unable to acquire lock, will retry later");
                    TimeoutUtils.sleep(1000L);
                    RqueueTaskAggregatorService.this.queue.add(queueEvents);
                }
                z = z;
            } finally {
                if (0 != 0) {
                    RqueueTaskAggregatorService.this.rqueueLockManager.releaseLock(lockKey);
                }
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            while (RqueueTaskAggregatorService.this.running) {
                try {
                    if (RqueueTaskAggregatorService.log.isTraceEnabled()) {
                        RqueueTaskAggregatorService.log.trace("Aggregating queue stats");
                    }
                    QueueEvents queueEvents = (QueueEvents) RqueueTaskAggregatorService.this.queue.poll(RqueueTaskAggregatorService.this.rqueueWebConfig.getAggregateShutdownWaitTime() / 2, TimeUnit.MILLISECONDS);
                    if (queueEvents != null) {
                        processEvents(queueEvents);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (Exception e2) {
                    if (0 != 0) {
                        RqueueTaskAggregatorService.this.queue.add(null);
                    }
                    RqueueTaskAggregatorService.log.error("Error in aggregator job ", e2);
                    TimeoutUtils.sleepLog(1000L, false);
                }
            }
        }
    }

    /* loaded from: input_file:com/github/sonus21/rqueue/web/service/RqueueTaskAggregatorService$SweepJob.class */
    class SweepJob implements Runnable {
        SweepJob() {
        }

        @Override // java.lang.Runnable
        public void run() {
            if (RqueueTaskAggregatorService.log.isDebugEnabled()) {
                RqueueTaskAggregatorService.log.debug("Checking pending events.");
            }
            synchronized (RqueueTaskAggregatorService.this.aggregatorLock) {
                ArrayList arrayList = new ArrayList();
                for (Map.Entry entry : RqueueTaskAggregatorService.this.queueNameToEvents.entrySet()) {
                    QueueEvents queueEvents = (QueueEvents) entry.getValue();
                    String str = (String) entry.getKey();
                    if (RqueueTaskAggregatorService.this.processingRequired(queueEvents)) {
                        RqueueTaskAggregatorService.this.queue.add(queueEvents);
                        arrayList.add(str);
                    }
                }
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    RqueueTaskAggregatorService.this.queueNameToEvents.remove((String) it.next());
                }
                RqueueTaskAggregatorService.this.aggregatorLock.notifyAll();
            }
        }
    }

    @Autowired
    public RqueueTaskAggregatorService(RqueueConfig rqueueConfig, RqueueWebConfig rqueueWebConfig, RqueueLockManager rqueueLockManager, RqueueQStatsDao rqueueQStatsDao) {
        this.rqueueConfig = rqueueConfig;
        this.rqueueWebConfig = rqueueWebConfig;
        this.rqueueLockManager = rqueueLockManager;
        this.rqueueQStatsDao = rqueueQStatsDao;
    }

    public void destroy() throws Exception {
        log.info("Destroying task aggregator");
        stop();
        if (this.taskExecutor != null) {
            this.taskExecutor.destroy();
        }
    }

    public void start() {
        log.info("Starting task aggregation");
        synchronized (this.lifecycleMgr) {
            this.running = true;
            if (this.rqueueWebConfig.isCollectListenerStats()) {
                this.eventAggregatorTasks = new ArrayList();
                this.queueNameToEvents = new ConcurrentHashMap();
                this.queue = new LinkedBlockingQueue();
                int statsAggregatorThreadCount = this.rqueueWebConfig.getStatsAggregatorThreadCount();
                this.taskExecutor = ThreadUtils.createTaskScheduler(statsAggregatorThreadCount, "RqueueTaskAggregator-", 30);
                for (int i = 0; i < statsAggregatorThreadCount; i++) {
                    this.eventAggregatorTasks.add(this.taskExecutor.submit(new EventAggregator()));
                }
                this.taskExecutor.scheduleAtFixedRate(new SweepJob(), Duration.ofSeconds(this.rqueueWebConfig.getAggregateEventWaitTime()));
                this.lifecycleMgr.notifyAll();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean processingRequired(QueueEvents queueEvents) {
        return queueEvents.processingRequired(this.rqueueWebConfig.getAggregateEventWaitTime(), this.rqueueWebConfig.getAggregateEventCount());
    }

    private void waitForRunningTaskToStop() {
        if (CollectionUtils.isEmpty(this.eventAggregatorTasks)) {
            return;
        }
        Iterator<Future<?>> it = this.eventAggregatorTasks.iterator();
        while (it.hasNext()) {
            ThreadUtils.waitForTermination(log, it.next(), this.rqueueWebConfig.getAggregateShutdownWaitTime(), "Aggregator task termination", new Object[0]);
        }
    }

    public void stop() {
        log.info("Stopping task aggregation");
        synchronized (this.lifecycleMgr) {
            synchronized (this.aggregatorLock) {
                if (!CollectionUtils.isEmpty(this.queueNameToEvents)) {
                    Collection<QueueEvents> values = this.queueNameToEvents.values();
                    this.queue.addAll(values);
                    values.clear();
                }
                this.aggregatorLock.notifyAll();
            }
            this.running = false;
            waitForRunningTaskToStop();
            this.lifecycleMgr.notifyAll();
        }
    }

    public boolean isRunning() {
        boolean z;
        synchronized (this.lifecycleMgr) {
            z = this.running;
        }
        return z;
    }

    public void onApplicationEvent(RqueueExecutionEvent rqueueExecutionEvent) {
        synchronized (this.aggregatorLock) {
            if (log.isTraceEnabled()) {
                log.trace("Event {}", rqueueExecutionEvent);
            }
            String name = ((QueueDetail) rqueueExecutionEvent.getSource()).getName();
            QueueEvents queueEvents = this.queueNameToEvents.get(name);
            if (queueEvents == null) {
                queueEvents = new QueueEvents(rqueueExecutionEvent);
            } else {
                queueEvents.addEvent(rqueueExecutionEvent);
            }
            if (processingRequired(queueEvents)) {
                if (log.isTraceEnabled()) {
                    log.trace("Adding events to the queue");
                }
                this.queue.add(queueEvents);
                this.queueNameToEvents.remove(name);
            } else {
                this.queueNameToEvents.put(name, queueEvents);
            }
            this.aggregatorLock.notifyAll();
        }
    }
}
