package org.enodeframework.eventing.impl;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import kotlin.Metadata;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.SourceDebugExtension;
import kotlinx.coroutines.CoroutineDispatcher;
import org.enodeframework.common.extensions.SystemClock;
import org.enodeframework.common.io.IOHelper;
import org.enodeframework.common.io.Task;
import org.enodeframework.common.scheduling.ScheduleService;
import org.enodeframework.common.serializing.SerializeService;
import org.enodeframework.eventing.DomainEventStream;
import org.enodeframework.eventing.EnqueueMessageResult;
import org.enodeframework.eventing.ProcessingEvent;
import org.enodeframework.eventing.ProcessingEventMailBox;
import org.enodeframework.eventing.ProcessingEventProcessor;
import org.enodeframework.eventing.PublishedVersionStore;
import org.enodeframework.messaging.MessageDispatcher;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* compiled from: DefaultProcessingEventProcessor.kt */
@Metadata(mv = {1, 9, 0}, k = 1, xi = 48, d1 = {"��f\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u000e\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0010\b\n\u0002\b\u0002\n\u0002\u0010\u000b\n\u0002\b\u0005\n\u0002\u0010\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u000e\u0018��2\u00020\u0001B/\b\u0016\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\u0006\u0010\b\u001a\u00020\t\u0012\u0006\u0010\n\u001a\u00020\u000b¢\u0006\u0002\u0010\fB7\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\u0006\u0010\b\u001a\u00020\t\u0012\u0006\u0010\n\u001a\u00020\u000b\u0012\b\b\u0002\u0010\r\u001a\u00020\u000e¢\u0006\u0002\u0010\u000fJ\u0010\u0010\"\u001a\u00020#2\u0006\u0010$\u001a\u00020\u0015H\u0002J\u0010\u0010%\u001a\u00020\u00152\u0006\u0010&\u001a\u00020'H\u0002J\b\u0010(\u001a\u00020#H\u0002J\u0018\u0010)\u001a\u00020#2\u0006\u0010*\u001a\u00020'2\u0006\u0010+\u001a\u00020\u001aH\u0002J\u0018\u0010,\u001a\u00020#2\u0006\u0010-\u001a\u00020\u00152\u0006\u0010+\u001a\u00020\u001aH\u0002J\u0010\u0010.\u001a\u00020\u001d2\u0006\u0010$\u001a\u00020\u0015H\u0002J\u0010\u0010/\u001a\u00020#2\u0006\u0010*\u001a\u00020'H\u0016J\b\u00100\u001a\u00020#H\u0002J\b\u00101\u001a\u00020#H\u0016J\b\u00102\u001a\u00020#H\u0016J\u0010\u00103\u001a\u00020#2\u0006\u0010-\u001a\u00020\u0015H\u0002J\u0018\u00104\u001a\u00020#2\u0006\u0010*\u001a\u00020'2\u0006\u0010+\u001a\u00020\u001aH\u0002R\u000e\u0010\n\u001a\u00020\u000bX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\r\u001a\u00020\u000eX\u0082\u0004¢\u0006\u0002\n��R\u0016\u0010\u0010\u001a\n \u0012*\u0004\u0018\u00010\u00110\u0011X\u0082\u0004¢\u0006\u0002\n��R\u001a\u0010\u0013\u001a\u000e\u0012\u0004\u0012\u00020\u000e\u0012\u0004\u0012\u00020\u00150\u0014X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u0016\u001a\u00020\u000eX\u0096\u0004¢\u0006\b\n��\u001a\u0004\b\u0017\u0010\u0018R\u000e\u0010\u0019\u001a\u00020\u001aX\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\u001b\u001a\u00020\u000eX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\b\u001a\u00020\tX\u0082\u0004¢\u0006\u0002\n��R\u001a\u0010\u001c\u001a\u000e\u0012\u0004\u0012\u00020\u000e\u0012\u0004\u0012\u00020\u001d0\u0014X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u001e\u001a\u00020\u001aX\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\u001f\u001a\u00020\u000eX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010 \u001a\u00020\u001aX\u0082\u000e¢\u0006\u0002\n��R\u001a\u0010!\u001a\u000e\u0012\u0004\u0012\u00020\u000e\u0012\u0004\u0012\u00020\u00150\u0014X\u0082\u0004¢\u0006\u0002\n��¨\u00065"}, d2 = {"Lorg/enodeframework/eventing/impl/DefaultProcessingEventProcessor;", "Lorg/enodeframework/eventing/ProcessingEventProcessor;", "scheduleService", "Lorg/enodeframework/common/scheduling/ScheduleService;", "serializeService", "Lorg/enodeframework/common/serializing/SerializeService;", "messageDispatcher", "Lorg/enodeframework/messaging/MessageDispatcher;", "publishedVersionStore", "Lorg/enodeframework/eventing/PublishedVersionStore;", "coroutineDispatcher", "Lkotlinx/coroutines/CoroutineDispatcher;", "(Lorg/enodeframework/common/scheduling/ScheduleService;Lorg/enodeframework/common/serializing/SerializeService;Lorg/enodeframework/messaging/MessageDispatcher;Lorg/enodeframework/eventing/PublishedVersionStore;Lkotlinx/coroutines/CoroutineDispatcher;)V", "domainEventStreamMessageHandlerName", "", "(Lorg/enodeframework/common/scheduling/ScheduleService;Lorg/enodeframework/common/serializing/SerializeService;Lorg/enodeframework/messaging/MessageDispatcher;Lorg/enodeframework/eventing/PublishedVersionStore;Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/String;)V", "logger", "Lorg/slf4j/Logger;", "kotlin.jvm.PlatformType", "mailboxDict", "Ljava/util/concurrent/ConcurrentHashMap;", "Lorg/enodeframework/eventing/ProcessingEventMailBox;", "name", "getName", "()Ljava/lang/String;", "processTryToRefreshAggregateIntervalMilliseconds", "", "processTryToRefreshAggregateTaskName", "refreshingAggregateRootDict", "", "scanExpiredAggregateIntervalMilliseconds", "scanInactiveMailBoxTaskName", "timeoutSeconds", "toRefreshAggregateRootMailBoxDict", "addToRefreshAggregateMailBoxToDict", "", "mailbox", "buildProcessingEventMailBox", "processingMessage", "Lorg/enodeframework/eventing/ProcessingEvent;", "cleanInactiveMailbox", "dispatchProcessingMessageAsync", "processingEvent", "retryTimes", "getAggregateRootLatestPublishedEventVersion", "processingEventMailBox", "isMailBoxAllowRemove", "process", "processToRefreshAggregateRootMailBoxs", "start", "stop", "tryToRefreshAggregateMailBoxNextExpectingEventVersion", "updatePublishedVersionAsync", "enode"})
@SourceDebugExtension({"SMAP\nDefaultProcessingEventProcessor.kt\nKotlin\n*S Kotlin\n*F\n+ 1 DefaultProcessingEventProcessor.kt\norg/enodeframework/eventing/impl/DefaultProcessingEventProcessor\n+ 2 fake.kt\nkotlin/jvm/internal/FakeKt\n+ 3 _Collections.kt\nkotlin/collections/CollectionsKt___CollectionsKt\n*L\n1#1,249:1\n1#2:250\n1855#3,2:251\n766#3:253\n857#3,2:254\n1855#3,2:256\n*S KotlinDebug\n*F\n+ 1 DefaultProcessingEventProcessor.kt\norg/enodeframework/eventing/impl/DefaultProcessingEventProcessor\n*L\n205#1:251,2\n228#1:253\n228#1:254,2\n229#1:256,2\n*E\n"})
/* loaded from: input_file:org/enodeframework/eventing/impl/DefaultProcessingEventProcessor.class */
public final class DefaultProcessingEventProcessor implements ProcessingEventProcessor {

    @NotNull
    private final ScheduleService scheduleService;

    @NotNull
    private final SerializeService serializeService;

    @NotNull
    private final MessageDispatcher messageDispatcher;

    @NotNull
    private final PublishedVersionStore publishedVersionStore;

    @NotNull
    private final CoroutineDispatcher coroutineDispatcher;

    @NotNull
    private final String domainEventStreamMessageHandlerName;
    private final Logger logger;

    @NotNull
    private final String scanInactiveMailBoxTaskName;

    @NotNull
    private final String processTryToRefreshAggregateTaskName;

    @NotNull
    private final String name;

    @NotNull
    private final ConcurrentHashMap<String, ProcessingEventMailBox> toRefreshAggregateRootMailBoxDict;

    @NotNull
    private final ConcurrentHashMap<String, ProcessingEventMailBox> mailboxDict;

    @NotNull
    private final ConcurrentHashMap<String, Boolean> refreshingAggregateRootDict;
    private int timeoutSeconds;
    private int scanExpiredAggregateIntervalMilliseconds;
    private int processTryToRefreshAggregateIntervalMilliseconds;

    public DefaultProcessingEventProcessor(@NotNull ScheduleService scheduleService, @NotNull SerializeService serializeService, @NotNull MessageDispatcher messageDispatcher, @NotNull PublishedVersionStore publishedVersionStore, @NotNull CoroutineDispatcher coroutineDispatcher, @NotNull String str) {
        Intrinsics.checkNotNullParameter(scheduleService, "scheduleService");
        Intrinsics.checkNotNullParameter(serializeService, "serializeService");
        Intrinsics.checkNotNullParameter(messageDispatcher, "messageDispatcher");
        Intrinsics.checkNotNullParameter(publishedVersionStore, "publishedVersionStore");
        Intrinsics.checkNotNullParameter(coroutineDispatcher, "coroutineDispatcher");
        Intrinsics.checkNotNullParameter(str, "domainEventStreamMessageHandlerName");
        this.scheduleService = scheduleService;
        this.serializeService = serializeService;
        this.messageDispatcher = messageDispatcher;
        this.publishedVersionStore = publishedVersionStore;
        this.coroutineDispatcher = coroutineDispatcher;
        this.domainEventStreamMessageHandlerName = str;
        this.logger = LoggerFactory.getLogger(DefaultProcessingEventProcessor.class);
        long now = SystemClock.now();
        new Random().nextInt(10000);
        this.scanInactiveMailBoxTaskName = "CleanInactiveProcessingEventMailBoxes_" + now + this;
        long now2 = SystemClock.now();
        new Random().nextInt(10000);
        this.processTryToRefreshAggregateTaskName = "ProcessTryToRefreshAggregate_" + now2 + this;
        this.name = this.domainEventStreamMessageHandlerName;
        this.toRefreshAggregateRootMailBoxDict = new ConcurrentHashMap<>();
        this.mailboxDict = new ConcurrentHashMap<>();
        this.refreshingAggregateRootDict = new ConcurrentHashMap<>();
        this.timeoutSeconds = 259200;
        this.scanExpiredAggregateIntervalMilliseconds = 5000;
        this.processTryToRefreshAggregateIntervalMilliseconds = 1000;
    }

    public /* synthetic */ DefaultProcessingEventProcessor(ScheduleService scheduleService, SerializeService serializeService, MessageDispatcher messageDispatcher, PublishedVersionStore publishedVersionStore, CoroutineDispatcher coroutineDispatcher, String str, int i, DefaultConstructorMarker defaultConstructorMarker) {
        this(scheduleService, serializeService, messageDispatcher, publishedVersionStore, coroutineDispatcher, (i & 32) != 0 ? "DefaultEventProcessor" : str);
    }

    /* JADX WARN: 'this' call moved to the top of the method (can break code semantics) */
    public DefaultProcessingEventProcessor(@NotNull ScheduleService scheduleService, @NotNull SerializeService serializeService, @NotNull MessageDispatcher messageDispatcher, @NotNull PublishedVersionStore publishedVersionStore, @NotNull CoroutineDispatcher coroutineDispatcher) {
        this(scheduleService, serializeService, messageDispatcher, publishedVersionStore, coroutineDispatcher, "DefaultEventProcessor");
        Intrinsics.checkNotNullParameter(scheduleService, "scheduleService");
        Intrinsics.checkNotNullParameter(serializeService, "serializeService");
        Intrinsics.checkNotNullParameter(messageDispatcher, "messageDispatcher");
        Intrinsics.checkNotNullParameter(publishedVersionStore, "publishedVersionStore");
        Intrinsics.checkNotNullParameter(coroutineDispatcher, "coroutineDispatcher");
    }

    @Override // org.enodeframework.eventing.ProcessingEventProcessor
    @NotNull
    public String getName() {
        return this.name;
    }

    @Override // org.enodeframework.eventing.ProcessingEventProcessor
    public void process(@NotNull final ProcessingEvent processingEvent) {
        Intrinsics.checkNotNullParameter(processingEvent, "processingEvent");
        String aggregateRootId = processingEvent.getMessage().getAggregateRootId();
        if (!(!Strings.isNullOrEmpty(aggregateRootId))) {
            throw new IllegalArgumentException(("aggregateRootId of domain event stream cannot be null or empty, domainEventStreamId:" + processingEvent.getMessage().getId()).toString());
        }
        ConcurrentHashMap<String, ProcessingEventMailBox> concurrentHashMap = this.mailboxDict;
        Function1<String, ProcessingEventMailBox> function1 = new Function1<String, ProcessingEventMailBox>() { // from class: org.enodeframework.eventing.impl.DefaultProcessingEventProcessor$process$mailbox$1
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(1);
            }

            @NotNull
            public final ProcessingEventMailBox invoke(@NotNull String str) {
                ProcessingEventMailBox buildProcessingEventMailBox;
                Intrinsics.checkNotNullParameter(str, "it");
                buildProcessingEventMailBox = DefaultProcessingEventProcessor.this.buildProcessingEventMailBox(processingEvent);
                return buildProcessingEventMailBox;
            }
        };
        ProcessingEventMailBox computeIfAbsent = concurrentHashMap.computeIfAbsent(aggregateRootId, (v1) -> {
            return process$lambda$1(r2, v1);
        });
        Intrinsics.checkNotNullExpressionValue(computeIfAbsent, "computeIfAbsent(...)");
        ProcessingEventMailBox processingEventMailBox = computeIfAbsent;
        long j = 0;
        while (!processingEventMailBox.tryUsing()) {
            Task.sleep(1L);
            j++;
            if (j % 10000 == 0) {
                this.logger.warn("Event mailbox try using count: {}, aggregateRootId: {}, aggregateRootTypeName: {}", new Object[]{Long.valueOf(j), processingEventMailBox.getAggregateRootId(), processingEventMailBox.getAggregateRootTypeName()});
            }
        }
        if (processingEventMailBox.isRemoved()) {
            ConcurrentHashMap<String, ProcessingEventMailBox> concurrentHashMap2 = this.mailboxDict;
            Function1<String, ProcessingEventMailBox> function12 = new Function1<String, ProcessingEventMailBox>() { // from class: org.enodeframework.eventing.impl.DefaultProcessingEventProcessor$process$2
                /* JADX INFO: Access modifiers changed from: package-private */
                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super(1);
                }

                @NotNull
                public final ProcessingEventMailBox invoke(@NotNull String str) {
                    ProcessingEventMailBox buildProcessingEventMailBox;
                    Intrinsics.checkNotNullParameter(str, "it");
                    buildProcessingEventMailBox = DefaultProcessingEventProcessor.this.buildProcessingEventMailBox(processingEvent);
                    return buildProcessingEventMailBox;
                }
            };
            ProcessingEventMailBox computeIfAbsent2 = concurrentHashMap2.computeIfAbsent(aggregateRootId, (v1) -> {
                return process$lambda$2(r2, v1);
            });
            Intrinsics.checkNotNullExpressionValue(computeIfAbsent2, "computeIfAbsent(...)");
            processingEventMailBox = computeIfAbsent2;
        }
        EnqueueMessageResult enqueueMessage = processingEventMailBox.enqueueMessage(processingEvent);
        if (enqueueMessage == EnqueueMessageResult.Ignored) {
            processingEvent.getProcessContext().notifyEventProcessed();
        } else if (enqueueMessage == EnqueueMessageResult.AddToWaitingList) {
            addToRefreshAggregateMailBoxToDict(processingEventMailBox);
        }
        processingEventMailBox.exitUsing();
    }

    private final void addToRefreshAggregateMailBoxToDict(ProcessingEventMailBox processingEventMailBox) {
        if (this.toRefreshAggregateRootMailBoxDict.putIfAbsent(processingEventMailBox.getAggregateRootId(), processingEventMailBox) == null) {
            this.logger.info("Added toRefreshPublishedVersion aggregate mailbox, aggregateRootTypeName: {}, aggregateRootId: {}", processingEventMailBox.getAggregateRootTypeName(), processingEventMailBox.getAggregateRootId());
            tryToRefreshAggregateMailBoxNextExpectingEventVersion(processingEventMailBox);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final ProcessingEventMailBox buildProcessingEventMailBox(ProcessingEvent processingEvent) {
        return new ProcessingEventMailBox(processingEvent.getMessage().getAggregateRootTypeName(), processingEvent.getMessage().getAggregateRootId(), this.coroutineDispatcher, (v1) -> {
            buildProcessingEventMailBox$lambda$3(r5, v1);
        });
    }

    private final void tryToRefreshAggregateMailBoxNextExpectingEventVersion(ProcessingEventMailBox processingEventMailBox) {
        if (this.refreshingAggregateRootDict.putIfAbsent(processingEventMailBox.getAggregateRootId(), true) == null) {
            getAggregateRootLatestPublishedEventVersion(processingEventMailBox, 0);
        }
    }

    private final void getAggregateRootLatestPublishedEventVersion(ProcessingEventMailBox processingEventMailBox, int i) {
        IOHelper.tryAsyncActionRecursively("GetAggregateRootLatestPublishedEventVersion", () -> {
            return getAggregateRootLatestPublishedEventVersion$lambda$4(r1, r2);
        }, (v2) -> {
            getAggregateRootLatestPublishedEventVersion$lambda$5(r2, r3, v2);
        }, () -> {
            return getAggregateRootLatestPublishedEventVersion$lambda$6(r3);
        }, null, i, true);
    }

    @Override // org.enodeframework.eventing.ProcessingEventProcessor
    public void start() {
        this.scheduleService.startTask(this.scanInactiveMailBoxTaskName, () -> {
            start$lambda$7(r2);
        }, this.scanExpiredAggregateIntervalMilliseconds, this.scanExpiredAggregateIntervalMilliseconds);
        this.scheduleService.startTask(this.processTryToRefreshAggregateTaskName, () -> {
            start$lambda$8(r2);
        }, this.processTryToRefreshAggregateIntervalMilliseconds, this.processTryToRefreshAggregateIntervalMilliseconds);
    }

    @Override // org.enodeframework.eventing.ProcessingEventProcessor
    public void stop() {
        this.scheduleService.stopTask(this.scanInactiveMailBoxTaskName);
        this.scheduleService.stopTask(this.processTryToRefreshAggregateTaskName);
    }

    private final void dispatchProcessingMessageAsync(ProcessingEvent processingEvent, int i) {
        IOHelper.tryAsyncActionRecursivelyWithoutResult("DispatchProcessingMessageAsync", () -> {
            return dispatchProcessingMessageAsync$lambda$9(r1, r2);
        }, (v2) -> {
            dispatchProcessingMessageAsync$lambda$10(r2, r3, v2);
        }, () -> {
            return dispatchProcessingMessageAsync$lambda$11(r3);
        }, null, i, true);
    }

    private final void updatePublishedVersionAsync(ProcessingEvent processingEvent, int i) {
        DomainEventStream message = processingEvent.getMessage();
        IOHelper.tryAsyncActionRecursivelyWithoutResult("UpdatePublishedVersionAsync", () -> {
            return updatePublishedVersionAsync$lambda$12(r1, r2);
        }, (v3) -> {
            updatePublishedVersionAsync$lambda$13(r2, r3, r4, v3);
        }, () -> {
            return updatePublishedVersionAsync$lambda$14(r3);
        }, null, i, true);
    }

    private final void processToRefreshAggregateRootMailBoxs() {
        ArrayList newArrayList = Lists.newArrayList();
        Intrinsics.checkNotNullExpressionValue(newArrayList, "newArrayList(...)");
        ArrayList arrayList = newArrayList;
        ArrayList newArrayList2 = Lists.newArrayList();
        Intrinsics.checkNotNullExpressionValue(newArrayList2, "newArrayList(...)");
        ArrayList arrayList2 = newArrayList2;
        Collection<ProcessingEventMailBox> values = this.toRefreshAggregateRootMailBoxDict.values();
        Intrinsics.checkNotNullExpressionValue(values, "<get-values>(...)");
        for (ProcessingEventMailBox processingEventMailBox : values) {
            if (processingEventMailBox.getWaitingMessageCount() > 0) {
                arrayList.add(processingEventMailBox);
            } else {
                arrayList2.add(processingEventMailBox);
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            tryToRefreshAggregateMailBoxNextExpectingEventVersion((ProcessingEventMailBox) it.next());
        }
        Iterator it2 = arrayList2.iterator();
        while (it2.hasNext()) {
            ProcessingEventMailBox remove = this.toRefreshAggregateRootMailBoxDict.remove(((ProcessingEventMailBox) it2.next()).getAggregateRootId());
            if (remove != null) {
                this.logger.info("Removed healthy aggregate mailbox, aggregateRootTypeName: {}, aggregateRootId: {}", remove.getAggregateRootTypeName(), remove.getAggregateRootId());
            }
        }
    }

    private final void cleanInactiveMailbox() {
        ProcessingEventMailBox remove;
        Set<Map.Entry<String, ProcessingEventMailBox>> entrySet = this.mailboxDict.entrySet();
        Intrinsics.checkNotNullExpressionValue(entrySet, "<get-entries>(...)");
        Set<Map.Entry<String, ProcessingEventMailBox>> set = entrySet;
        ArrayList<Map.Entry> arrayList = new ArrayList();
        for (Object obj : set) {
            Object value = ((Map.Entry) obj).getValue();
            Intrinsics.checkNotNullExpressionValue(value, "<get-value>(...)");
            if (isMailBoxAllowRemove((ProcessingEventMailBox) value)) {
                arrayList.add(obj);
            }
        }
        for (Map.Entry entry : arrayList) {
            String str = (String) entry.getKey();
            ProcessingEventMailBox processingEventMailBox = (ProcessingEventMailBox) entry.getValue();
            if (processingEventMailBox.tryUsing() && isMailBoxAllowRemove(processingEventMailBox) && (remove = this.mailboxDict.remove(str)) != null) {
                remove.markAsRemoved();
                this.logger.info("Removed inactive domain event stream mailbox, aggregateRootTypeName: {}, aggregateRootId: {}", remove.getAggregateRootTypeName(), remove.getAggregateRootId());
            }
        }
    }

    private final boolean isMailBoxAllowRemove(ProcessingEventMailBox processingEventMailBox) {
        return processingEventMailBox.isInactive(this.timeoutSeconds) && !processingEventMailBox.isRunning() && processingEventMailBox.getTotalUnHandledMessageCount() == 0 && processingEventMailBox.getWaitingMessageCount() == 0;
    }

    private static final ProcessingEventMailBox process$lambda$1(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        return (ProcessingEventMailBox) function1.invoke(obj);
    }

    private static final ProcessingEventMailBox process$lambda$2(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        return (ProcessingEventMailBox) function1.invoke(obj);
    }

    private static final void buildProcessingEventMailBox$lambda$3(DefaultProcessingEventProcessor defaultProcessingEventProcessor, ProcessingEvent processingEvent) {
        Intrinsics.checkNotNullParameter(defaultProcessingEventProcessor, "this$0");
        Intrinsics.checkNotNullParameter(processingEvent, "y");
        defaultProcessingEventProcessor.dispatchProcessingMessageAsync(processingEvent, 0);
    }

    private static final CompletableFuture getAggregateRootLatestPublishedEventVersion$lambda$4(DefaultProcessingEventProcessor defaultProcessingEventProcessor, ProcessingEventMailBox processingEventMailBox) {
        Intrinsics.checkNotNullParameter(defaultProcessingEventProcessor, "this$0");
        Intrinsics.checkNotNullParameter(processingEventMailBox, "$processingEventMailBox");
        return defaultProcessingEventProcessor.publishedVersionStore.getPublishedVersionAsync(defaultProcessingEventProcessor.getName(), processingEventMailBox.getAggregateRootTypeName(), processingEventMailBox.getAggregateRootId());
    }

    private static final void getAggregateRootLatestPublishedEventVersion$lambda$5(ProcessingEventMailBox processingEventMailBox, DefaultProcessingEventProcessor defaultProcessingEventProcessor, int i) {
        Intrinsics.checkNotNullParameter(processingEventMailBox, "$processingEventMailBox");
        Intrinsics.checkNotNullParameter(defaultProcessingEventProcessor, "this$0");
        processingEventMailBox.setNextExpectingEventVersion(i + 1);
        defaultProcessingEventProcessor.refreshingAggregateRootDict.remove(processingEventMailBox.getAggregateRootId());
    }

    private static final String getAggregateRootLatestPublishedEventVersion$lambda$6(ProcessingEventMailBox processingEventMailBox) {
        Intrinsics.checkNotNullParameter(processingEventMailBox, "$processingEventMailBox");
        return "publishedVersionStore.GetPublishedVersionAsync has unknown exception, aggregateRootTypeName: " + processingEventMailBox.getAggregateRootTypeName() + ", aggregateRootId: " + processingEventMailBox.getAggregateRootId();
    }

    private static final void start$lambda$7(DefaultProcessingEventProcessor defaultProcessingEventProcessor) {
        Intrinsics.checkNotNullParameter(defaultProcessingEventProcessor, "this$0");
        defaultProcessingEventProcessor.cleanInactiveMailbox();
    }

    private static final void start$lambda$8(DefaultProcessingEventProcessor defaultProcessingEventProcessor) {
        Intrinsics.checkNotNullParameter(defaultProcessingEventProcessor, "this$0");
        defaultProcessingEventProcessor.processToRefreshAggregateRootMailBoxs();
    }

    private static final CompletableFuture dispatchProcessingMessageAsync$lambda$9(DefaultProcessingEventProcessor defaultProcessingEventProcessor, ProcessingEvent processingEvent) {
        Intrinsics.checkNotNullParameter(defaultProcessingEventProcessor, "this$0");
        Intrinsics.checkNotNullParameter(processingEvent, "$processingEvent");
        return defaultProcessingEventProcessor.messageDispatcher.dispatchMessagesAsync(processingEvent.getMessage().getEvents());
    }

    private static final void dispatchProcessingMessageAsync$lambda$10(DefaultProcessingEventProcessor defaultProcessingEventProcessor, ProcessingEvent processingEvent, Boolean bool) {
        Intrinsics.checkNotNullParameter(defaultProcessingEventProcessor, "this$0");
        Intrinsics.checkNotNullParameter(processingEvent, "$processingEvent");
        if (defaultProcessingEventProcessor.logger.isDebugEnabled()) {
            defaultProcessingEventProcessor.logger.debug("dispatch messages success, msg: {}", defaultProcessingEventProcessor.serializeService.serialize(processingEvent.getMessage()));
        }
        defaultProcessingEventProcessor.updatePublishedVersionAsync(processingEvent, 0);
    }

    private static final String dispatchProcessingMessageAsync$lambda$11(ProcessingEvent processingEvent) {
        Intrinsics.checkNotNullParameter(processingEvent, "$processingEvent");
        return "sequence message [messageId:" + processingEvent.getMessage().getId() + ", messageType:" + processingEvent.getMessage().getClass().getName() + ", aggregateRootId:" + processingEvent.getMessage().getAggregateRootId() + ", aggregateRootVersion:" + processingEvent.getMessage().getVersion() + "]";
    }

    private static final CompletableFuture updatePublishedVersionAsync$lambda$12(DefaultProcessingEventProcessor defaultProcessingEventProcessor, DomainEventStream domainEventStream) {
        Intrinsics.checkNotNullParameter(defaultProcessingEventProcessor, "this$0");
        Intrinsics.checkNotNullParameter(domainEventStream, "$message");
        return defaultProcessingEventProcessor.publishedVersionStore.updatePublishedVersionAsync(defaultProcessingEventProcessor.getName(), domainEventStream.getAggregateRootTypeName(), domainEventStream.getAggregateRootId(), domainEventStream.getVersion());
    }

    private static final void updatePublishedVersionAsync$lambda$13(DefaultProcessingEventProcessor defaultProcessingEventProcessor, DomainEventStream domainEventStream, ProcessingEvent processingEvent, Integer num) {
        Intrinsics.checkNotNullParameter(defaultProcessingEventProcessor, "this$0");
        Intrinsics.checkNotNullParameter(domainEventStream, "$message");
        Intrinsics.checkNotNullParameter(processingEvent, "$processingEvent");
        if (defaultProcessingEventProcessor.logger.isDebugEnabled()) {
            defaultProcessingEventProcessor.logger.debug("update published version success, message ack: {}", defaultProcessingEventProcessor.serializeService.serialize(domainEventStream));
        }
        processingEvent.complete();
    }

    private static final String updatePublishedVersionAsync$lambda$14(DomainEventStream domainEventStream) {
        Intrinsics.checkNotNullParameter(domainEventStream, "$message");
        return "DomainEventStreamMessage [messageId:" + domainEventStream.getId() + ", messageType:" + domainEventStream.getClass().getName() + ", aggregateRootId:" + domainEventStream.getAggregateRootId() + ", aggregateRootVersion:" + domainEventStream.getVersion() + "]";
    }
}
