package org.enodeframework.eventing;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.enodeframework.common.exception.DuplicateEventStreamException;
import org.enodeframework.common.function.Action1;
import org.enodeframework.common.io.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/enodeframework/eventing/EventCommittingContextMailBox.class */
public class EventCommittingContextMailBox {
    public static final Logger logger = LoggerFactory.getLogger(EventCommittingContextMailBox.class);
    private static final Byte ONE_BYTE = (byte) 1;
    private final Executor executor;
    private final int number;
    private final Action1<List<EventCommittingContext>> handleMessageAction;
    private final int batchSize;
    private boolean running;
    private final Object lockObj = new Object();
    private final Object processMessageLockObj = new Object();
    private final ConcurrentHashMap<String, ConcurrentHashMap<String, Byte>> aggregateDictDict = new ConcurrentHashMap<>();
    private final ConcurrentLinkedQueue<EventCommittingContext> messageQueue = new ConcurrentLinkedQueue<>();
    private Date lastActiveTime = new Date();

    public EventCommittingContextMailBox(int i, int i2, Action1<List<EventCommittingContext>> action1, Executor executor) {
        this.executor = executor;
        this.handleMessageAction = action1;
        this.number = i;
        this.batchSize = i2;
    }

    public Date getLastActiveTime() {
        return this.lastActiveTime;
    }

    public boolean isRunning() {
        return this.running;
    }

    public long getTotalUnHandledMessageCount() {
        return this.messageQueue.size();
    }

    public void enqueueMessage(EventCommittingContext eventCommittingContext) {
        synchronized (this.lockObj) {
            if (this.aggregateDictDict.computeIfAbsent(eventCommittingContext.getEventStream().getAggregateRootId(), str -> {
                return new ConcurrentHashMap();
            }).putIfAbsent(eventCommittingContext.getEventStream().getId(), ONE_BYTE) != null) {
                throw new DuplicateEventStreamException(eventCommittingContext.getEventStream());
            }
            eventCommittingContext.setMailBox(this);
            this.messageQueue.add(eventCommittingContext);
            if (logger.isDebugEnabled()) {
                logger.debug("{} enqueued new message, mailboxNumber: {}, aggregateRootId: {}, commandId: {}, eventVersion: {}, eventStreamId: {}, eventIds: {}", new Object[]{getClass().getName(), Integer.valueOf(this.number), eventCommittingContext.getEventStream().getAggregateRootId(), eventCommittingContext.getProcessingCommand().getMessage().getId(), Integer.valueOf(eventCommittingContext.getEventStream().getVersion()), eventCommittingContext.getEventStream().getId(), eventCommittingContext.getEventStream().getEvents().stream().map((v0) -> {
                    return v0.getId();
                }).collect(Collectors.joining("|"))});
            }
            this.lastActiveTime = new Date();
            tryRun();
        }
    }

    public void tryRun() {
        synchronized (this.lockObj) {
            if (isRunning()) {
                return;
            }
            setAsRunning();
            if (logger.isDebugEnabled()) {
                logger.debug("{} start run, mailboxNumber: {}", getClass().getName(), Integer.valueOf(this.number));
            }
            CompletableFuture.runAsync(this::processMessages, this.executor);
        }
    }

    public void completeRun() {
        this.lastActiveTime = new Date();
        if (logger.isDebugEnabled()) {
            logger.debug("{} complete run, mailboxNumber: {}", getClass().getName(), Integer.valueOf(this.number));
        }
        setAsNotRunning();
        if (getTotalUnHandledMessageCount() > 0) {
            tryRun();
        }
    }

    public void removeAggregateAllEventCommittingContexts(String str) {
        this.aggregateDictDict.remove(str);
    }

    public boolean isInactive(int i) {
        return System.currentTimeMillis() - this.lastActiveTime.getTime() >= ((long) i);
    }

    private void processMessages() {
        EventCommittingContext poll;
        synchronized (this.processMessageLockObj) {
            this.lastActiveTime = new Date();
            ArrayList arrayList = new ArrayList();
            while (arrayList.size() < this.batchSize && (poll = this.messageQueue.poll()) != null) {
                ConcurrentHashMap<String, Byte> orDefault = this.aggregateDictDict.getOrDefault(poll.getEventStream().getAggregateRootId(), null);
                if (orDefault != null && orDefault.remove(poll.getEventStream().getId()) != null) {
                    arrayList.add(poll);
                }
            }
            if (arrayList.size() == 0) {
                completeRun();
                return;
            }
            try {
                this.handleMessageAction.apply(arrayList);
            } catch (Exception e) {
                logger.error("{} run has unknown exception, mailboxNumber: {}", new Object[]{getClass().getName(), Integer.valueOf(this.number), e});
                Task.sleep(1L);
                completeRun();
            }
        }
    }

    private void setAsRunning() {
        this.running = true;
    }

    private void setAsNotRunning() {
        this.running = false;
    }

    public int getNumber() {
        return this.number;
    }
}
