package org.enodeframework.eventing.impl;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.enodeframework.commanding.CommandResult;
import org.enodeframework.commanding.CommandStatus;
import org.enodeframework.commanding.ICommand;
import org.enodeframework.commanding.ProcessingCommand;
import org.enodeframework.commanding.ProcessingCommandMailbox;
import org.enodeframework.common.exception.ENodeRuntimeException;
import org.enodeframework.common.io.IOHelper;
import org.enodeframework.common.serializing.JsonTool;
import org.enodeframework.domain.IMemoryCache;
import org.enodeframework.eventing.DomainEventStream;
import org.enodeframework.eventing.DomainEventStreamMessage;
import org.enodeframework.eventing.EventCommittingContext;
import org.enodeframework.eventing.EventCommittingContextMailBox;
import org.enodeframework.eventing.IEventCommittingService;
import org.enodeframework.eventing.IEventStore;
import org.enodeframework.messaging.IMessagePublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:org/enodeframework/eventing/impl/DefaultEventCommittingService.class */
public class DefaultEventCommittingService implements IEventCommittingService {
    private static final Logger logger = LoggerFactory.getLogger(DefaultEventCommittingService.class);
    private int eventMailBoxCount;
    private List<EventCommittingContextMailBox> eventCommittingContextMailBoxList;

    @Autowired
    private IMemoryCache memoryCache;

    @Autowired
    private IEventStore eventStore;

    @Autowired
    private IMessagePublisher<DomainEventStreamMessage> domainEventPublisher;

    public DefaultEventCommittingService() {
        this(1000, 4);
    }

    public DefaultEventCommittingService(int i, int i2) {
        this.eventCommittingContextMailBoxList = new ArrayList();
        this.eventMailBoxCount = i2;
        for (int i3 = 0; i3 < i2; i3++) {
            this.eventCommittingContextMailBoxList.add(new EventCommittingContextMailBox(i3, i, list -> {
                batchPersistEventAsync(list, 0);
            }));
        }
    }

    public DefaultEventCommittingService setMemoryCache(IMemoryCache iMemoryCache) {
        this.memoryCache = iMemoryCache;
        return this;
    }

    public DefaultEventCommittingService setEventStore(IEventStore iEventStore) {
        this.eventStore = iEventStore;
        return this;
    }

    public DefaultEventCommittingService setDomainEventPublisher(IMessagePublisher<DomainEventStreamMessage> iMessagePublisher) {
        this.domainEventPublisher = iMessagePublisher;
        return this;
    }

    @Override // org.enodeframework.eventing.IEventCommittingService
    public void commitDomainEventAsync(EventCommittingContext eventCommittingContext) {
        this.eventCommittingContextMailBoxList.get(getEventMailBoxIndex(eventCommittingContext.getEventStream().getAggregateRootId())).enqueueMessage(eventCommittingContext);
    }

    @Override // org.enodeframework.eventing.IEventCommittingService
    public void publishDomainEventAsync(ProcessingCommand processingCommand, DomainEventStream domainEventStream) {
        if (domainEventStream.getItems() == null || domainEventStream.getItems().size() == 0) {
            domainEventStream.setItems(processingCommand.getItems());
        }
        publishDomainEventAsync(processingCommand, new DomainEventStreamMessage(processingCommand.getMessage().getId(), domainEventStream.getAggregateRootId(), domainEventStream.getVersion(), domainEventStream.getAggregateRootTypeName(), domainEventStream.events(), domainEventStream.getItems()), 0);
    }

    private int getEventMailBoxIndex(String str) {
        int i = 23;
        for (char c : str.toCharArray()) {
            i = ((i << 5) - i) + c;
        }
        if (i < 0) {
            i = Math.abs(i);
        }
        return i % this.eventMailBoxCount;
    }

    private void batchPersistEventAsync(List<EventCommittingContext> list, int i) {
        if (list == null || list.size() == 0) {
            return;
        }
        IOHelper.tryAsyncActionRecursively("BatchPersistEventAsync", () -> {
            return this.eventStore.batchAppendAsync((List) list.stream().map((v0) -> {
                return v0.getEventStream();
            }).collect(Collectors.toList()));
        }, eventAppendResult -> {
            EventCommittingContextMailBox mailBox = ((EventCommittingContext) list.stream().findFirst().orElseThrow(() -> {
                return new ENodeRuntimeException("eventMailBox can not be null");
            })).getMailBox();
            if (eventAppendResult == null) {
                logger.error("Batch persist events success, but the persist result is null, the current event committing mailbox should be pending, mailboxNumber: {}", Integer.valueOf(mailBox.getNumber()));
                return;
            }
            if (eventAppendResult.getSuccessAggregateRootIdList().size() > 0) {
                for (String str : eventAppendResult.getSuccessAggregateRootIdList()) {
                    List<EventCommittingContext> list2 = (List) list.stream().filter(eventCommittingContext -> {
                        return eventCommittingContext.getEventStream().getAggregateRootId().equals(str);
                    }).collect(Collectors.toList());
                    if (list2.size() > 0) {
                        for (EventCommittingContext eventCommittingContext2 : list2) {
                            publishDomainEventAsync(eventCommittingContext2.getProcessingCommand(), eventCommittingContext2.getEventStream());
                        }
                        if (logger.isDebugEnabled()) {
                            logger.debug("Batch persist events success, mailboxNumber: {}, aggregateRootId: {}", Integer.valueOf(mailBox.getNumber()), str);
                        }
                    }
                }
            }
            if (eventAppendResult.getDuplicateCommandAggregateRootIdList().size() > 0) {
                for (Map.Entry<String, List<String>> entry : eventAppendResult.getDuplicateCommandAggregateRootIdList().entrySet()) {
                    Optional findFirst = list.stream().filter(eventCommittingContext3 -> {
                        return ((String) entry.getKey()).equals(eventCommittingContext3.getAggregateRoot().getUniqueId());
                    }).findFirst();
                    if (findFirst.isPresent()) {
                        logger.warn("Batch persist events has duplicate commandIds, mailboxNumber: {}, aggregateRootId: {}, commandIds: {}", new Object[]{Integer.valueOf(mailBox.getNumber()), entry.getKey(), String.join(",", entry.getValue())});
                        EventCommittingContext eventCommittingContext4 = (EventCommittingContext) findFirst.get();
                        resetCommandMailBoxConsumingSequence(eventCommittingContext4, eventCommittingContext4.getProcessingCommand().getSequence() + 1, entry.getValue()).thenAccept(r6 -> {
                            tryToRepublishEventAsync(eventCommittingContext4, 0);
                        });
                    }
                }
            }
            if (eventAppendResult.getDuplicateEventAggregateRootIdList().size() > 0) {
                for (String str2 : eventAppendResult.getDuplicateEventAggregateRootIdList()) {
                    Optional findFirst2 = list.stream().filter(eventCommittingContext5 -> {
                        return eventCommittingContext5.getEventStream().getAggregateRootId().equals(str2);
                    }).findFirst();
                    if (findFirst2.isPresent()) {
                        logger.warn("Batch persist events, mailboxNumber: {}, duplicateEventAggregateRootCount: {}, detail: {}", new Object[]{Integer.valueOf(mailBox.getNumber()), Integer.valueOf(eventAppendResult.getDuplicateEventAggregateRootIdList().size()), JsonTool.serialize(eventAppendResult.getDuplicateEventAggregateRootIdList())});
                        EventCommittingContext eventCommittingContext6 = (EventCommittingContext) findFirst2.get();
                        if (eventCommittingContext6.getEventStream().getVersion() == 1) {
                            handleFirstEventDuplicationAsync(eventCommittingContext6, 0);
                        } else {
                            resetCommandMailBoxConsumingSequence(eventCommittingContext6, eventCommittingContext6.getProcessingCommand().getSequence(), null);
                        }
                    }
                }
            }
            mailBox.completeRun();
        }, () -> {
            return String.format("[contextListCount:%d]", Integer.valueOf(list.size()));
        }, null, i, true);
    }

    private CompletableFuture<Void> resetCommandMailBoxConsumingSequence(EventCommittingContext eventCommittingContext, long j, List<String> list) {
        ProcessingCommandMailbox mailBox = eventCommittingContext.getProcessingCommand().getMailBox();
        EventCommittingContextMailBox mailBox2 = eventCommittingContext.getMailBox();
        String aggregateRootId = eventCommittingContext.getEventStream().getAggregateRootId();
        mailBox.pause();
        mailBox2.removeAggregateAllEventCommittingContexts(aggregateRootId);
        return this.memoryCache.refreshAggregateFromEventStoreAsync(eventCommittingContext.getEventStream().getAggregateRootTypeName(), aggregateRootId).thenAccept(iAggregateRoot -> {
            if (list != null) {
                try {
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        mailBox.addDuplicateCommandId((String) it.next());
                    }
                } catch (Throwable th) {
                    mailBox.resume();
                    mailBox.tryRun();
                    throw th;
                }
            }
            mailBox.resetConsumingSequence(j);
            mailBox.resume();
            mailBox.tryRun();
        }).exceptionally(th -> {
            logger.error("ResetCommandMailBoxConsumingSequence has unknown exception, aggregateRootId: {}", aggregateRootId, th);
            return null;
        });
    }

    private void tryToRepublishEventAsync(EventCommittingContext eventCommittingContext, int i) {
        ICommand message = eventCommittingContext.getProcessingCommand().getMessage();
        IOHelper.tryAsyncActionRecursively("FindEventByCommandIdAsync", () -> {
            return this.eventStore.findAsync(eventCommittingContext.getEventStream().getAggregateRootId(), message.getId());
        }, domainEventStream -> {
            if (domainEventStream != null) {
                publishDomainEventAsync(eventCommittingContext.getProcessingCommand(), domainEventStream);
            } else {
                logger.error(String.format("Command should be exist in the event store, but we cannot find it from the event store, this should not be happen, and we cannot continue again. commandType:%s, commandId:%s, aggregateRootId:%s", message.getClass().getName(), message.getId(), eventCommittingContext.getEventStream().getAggregateRootId()));
                completeCommand(eventCommittingContext.getProcessingCommand(), new CommandResult(CommandStatus.Failed, message.getId(), message.getAggregateRootId(), "Command should be exist in the event store, but we cannot find it from the event store.", String.class.getName()));
            }
        }, () -> {
            return String.format("[aggregateRootId:%s, commandId:%s]", message.getAggregateRootId(), message.getId());
        }, null, i, true);
    }

    private CompletableFuture<Void> handleFirstEventDuplicationAsync(EventCommittingContext eventCommittingContext, int i) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        IOHelper.tryAsyncActionRecursively("FindFirstEventByVersion", () -> {
            return this.eventStore.findAsync(eventCommittingContext.getEventStream().getAggregateRootId(), 1);
        }, domainEventStream -> {
            if (domainEventStream == null) {
                logger.error(String.format("Duplicate aggregate creation, but we cannot find the existing eventstream from eventstore. commandId:%s, aggregateRootId:%s, aggregateRootTypeName:%s", eventCommittingContext.getEventStream().getCommandId(), eventCommittingContext.getEventStream().getAggregateRootId(), eventCommittingContext.getEventStream().getAggregateRootTypeName()));
                resetCommandMailBoxConsumingSequence(eventCommittingContext, eventCommittingContext.getProcessingCommand().getSequence() + 1, null).thenAccept(r11 -> {
                    completeCommand(eventCommittingContext.getProcessingCommand(), new CommandResult(CommandStatus.Failed, eventCommittingContext.getProcessingCommand().getMessage().getId(), eventCommittingContext.getEventStream().getAggregateRootId(), "Duplicate aggregate creation, but we cannot find the existing eventstream from eventstore.", String.class.getName())).thenAccept(r4 -> {
                        completableFuture.complete(null);
                    });
                });
            } else if (eventCommittingContext.getProcessingCommand().getMessage().getId().equals(domainEventStream.getCommandId())) {
                resetCommandMailBoxConsumingSequence(eventCommittingContext, eventCommittingContext.getProcessingCommand().getSequence() + 1, null).thenAccept(r8 -> {
                    publishDomainEventAsync(eventCommittingContext.getProcessingCommand(), domainEventStream);
                    completableFuture.complete(null);
                });
            } else {
                logger.error(String.format("Duplicate aggregate creation. current commandId:%s, existing commandId:%s, aggregateRootId:%s, aggregateRootTypeName:%s", eventCommittingContext.getProcessingCommand().getMessage().getId(), domainEventStream.getCommandId(), domainEventStream.getAggregateRootId(), domainEventStream.getAggregateRootTypeName()));
                resetCommandMailBoxConsumingSequence(eventCommittingContext, eventCommittingContext.getProcessingCommand().getSequence() + 1, null).thenAccept(r112 -> {
                    completeCommand(eventCommittingContext.getProcessingCommand(), new CommandResult(CommandStatus.Failed, eventCommittingContext.getProcessingCommand().getMessage().getId(), eventCommittingContext.getEventStream().getAggregateRootId(), "Duplicate aggregate creation.", String.class.getName())).thenAccept(r4 -> {
                        completableFuture.complete(null);
                    });
                });
            }
        }, () -> {
            return String.format("[eventStream:%s]", eventCommittingContext.getEventStream());
        }, null, i, true);
        return completableFuture;
    }

    private void publishDomainEventAsync(ProcessingCommand processingCommand, DomainEventStreamMessage domainEventStreamMessage, int i) {
        IOHelper.tryAsyncActionRecursivelyWithoutResult("PublishDomainEventAsync", () -> {
            return this.domainEventPublisher.publishAsync(domainEventStreamMessage);
        }, r11 -> {
            if (logger.isDebugEnabled()) {
                logger.debug("Publish domain events success, {}", domainEventStreamMessage);
            }
            completeCommand(processingCommand, new CommandResult(CommandStatus.Success, processingCommand.getMessage().getId(), domainEventStreamMessage.getAggregateRootId(), processingCommand.getCommandExecuteContext().getResult(), String.class.getName()));
        }, () -> {
            return String.format("[eventStream:%s]", domainEventStreamMessage);
        }, null, i, true);
    }

    private CompletableFuture<Void> completeCommand(ProcessingCommand processingCommand, CommandResult commandResult) {
        return processingCommand.getMailBox().completeMessage(processingCommand, commandResult);
    }
}
