package com.github.sseserver.remote;

import com.github.sseserver.qos.Message;
import com.github.sseserver.qos.MessageRepository;
import com.github.sseserver.util.CompletableFuture;
import com.github.sseserver.util.LambdaUtil;
import com.github.sseserver.util.ReferenceCounted;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/sseserver/remote/ClusterMessageRepository.class */
public class ClusterMessageRepository implements MessageRepository {
    private static final Logger log = LoggerFactory.getLogger(ClusterConnectionServiceImpl.class);
    private final Supplier<MessageRepository> localRepositorySupplier;
    private final Supplier<ReferenceCounted<List<RemoteMessageRepository>>> remoteRepositorySupplier;

    public ClusterMessageRepository(Supplier<MessageRepository> supplier, Supplier<ReferenceCounted<List<RemoteMessageRepository>>> supplier2) {
        this.localRepositorySupplier = supplier;
        this.remoteRepositorySupplier = supplier2;
    }

    public MessageRepository getLocalRepository() {
        return this.localRepositorySupplier.get();
    }

    public ReferenceCounted<List<RemoteMessageRepository>> getRemoteRepositoryRef() {
        return this.remoteRepositorySupplier == null ? new ReferenceCounted<>(Collections.emptyList()) : this.remoteRepositorySupplier.get();
    }

    @Override // com.github.sseserver.qos.MessageRepository
    public String insert(Message message) {
        return getLocalRepository().insert(message);
    }

    @Override // com.github.sseserver.qos.MessageRepository
    public List<Message> select(MessageRepository.Query query) {
        return selectAsync(query).block();
    }

    @Override // com.github.sseserver.qos.MessageRepository
    public Message delete(String str) {
        return deleteAsync(str, null).block();
    }

    @Override // com.github.sseserver.qos.MessageRepository
    public void addDeleteListener(Consumer<Message> consumer) {
        getLocalRepository().addDeleteListener(consumer);
    }

    public ClusterCompletableFuture<List<Message>, ClusterMessageRepository> selectAsync(MessageRepository.Query query) {
        return mapReduce(remoteMessageRepository -> {
            return remoteMessageRepository.selectAsync(query);
        }, messageRepository -> {
            return messageRepository.select(query);
        }, LambdaUtil.reduceList(), LambdaUtil.noop(), ArrayList::new);
    }

    public ClusterCompletableFuture<Message, ClusterMessageRepository> deleteAsync(String str, String str2) {
        return mapReduce(remoteMessageRepository -> {
            if (str2 == null || Objects.equals(remoteMessageRepository.getId(), str2)) {
                return remoteMessageRepository.deleteAsync(str);
            }
            RemoteCompletableFuture remoteCompletableFuture = new RemoteCompletableFuture();
            remoteCompletableFuture.setClient(remoteMessageRepository);
            remoteCompletableFuture.complete(null);
            return remoteCompletableFuture;
        }, messageRepository -> {
            if (str2 != null) {
                return messageRepository.delete(str);
            }
            return null;
        }, LambdaUtil.filterNull(), LambdaUtil.defaultNull());
    }

    protected <T> ClusterCompletableFuture<T, ClusterMessageRepository> mapReduce(Function<RemoteMessageRepository, RemoteCompletableFuture<T, RemoteMessageRepository>> function, Function<MessageRepository, T> function2, BiFunction<T, T, T> biFunction, Supplier<T> supplier) {
        return mapReduce(function, function2, biFunction, obj -> {
            return obj;
        }, supplier);
    }

    protected <T, R> ClusterCompletableFuture<R, ClusterMessageRepository> mapReduce(Function<RemoteMessageRepository, RemoteCompletableFuture<T, RemoteMessageRepository>> function, Function<MessageRepository, T> function2, BiFunction<T, T, T> biFunction, Function<T, R> function3, Supplier<T> supplier) {
        ReferenceCounted<List<RemoteMessageRepository>> remoteRepositoryRef = getRemoteRepositoryRef();
        Throwable th = null;
        try {
            try {
                List<RemoteMessageRepository> list = remoteRepositoryRef.get();
                ArrayList arrayList = new ArrayList(list.size());
                ArrayList arrayList2 = new ArrayList(list.size());
                for (RemoteMessageRepository remoteMessageRepository : list) {
                    arrayList.add(remoteMessageRepository.getRemoteUrl());
                    arrayList2.add(function.apply(remoteMessageRepository));
                }
                T apply = function2.apply(getLocalRepository());
                ClusterCompletableFuture<R, ClusterMessageRepository> clusterCompletableFuture = new ClusterCompletableFuture<>(arrayList, this);
                CompletableFuture.join(arrayList2, clusterCompletableFuture, () -> {
                    Object obj;
                    Object obj2 = supplier.get();
                    InterruptedException interruptedException = null;
                    Iterator it = arrayList2.iterator();
                    while (it.hasNext()) {
                        RemoteCompletableFuture<?, RemoteMessageRepository> remoteCompletableFuture = (RemoteCompletableFuture) it.next();
                        if (interruptedException != null) {
                            try {
                            } catch (InterruptedException e) {
                                interruptedException = e;
                            } catch (ExecutionException e2) {
                                handleRemoteException(remoteCompletableFuture, e2, clusterCompletableFuture);
                            }
                            if (remoteCompletableFuture.isDone()) {
                                obj = remoteCompletableFuture.get();
                            }
                        } else {
                            obj = remoteCompletableFuture.get();
                        }
                        obj2 = biFunction.apply(obj2, obj);
                    }
                    return function3.apply(biFunction.apply(obj2, apply));
                });
                if (remoteRepositoryRef != null) {
                    if (0 != 0) {
                        try {
                            remoteRepositoryRef.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        remoteRepositoryRef.close();
                    }
                }
                return clusterCompletableFuture;
            } finally {
            }
        } catch (Throwable th3) {
            if (remoteRepositoryRef != null) {
                if (th != null) {
                    try {
                        remoteRepositoryRef.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    remoteRepositoryRef.close();
                }
            }
            throw th3;
        }
    }

    protected <R> void handleRemoteException(RemoteCompletableFuture<?, RemoteMessageRepository> remoteCompletableFuture, ExecutionException executionException, ClusterCompletableFuture<R, ClusterMessageRepository> clusterCompletableFuture) {
        Throwable cause = executionException.getCause();
        if (cause == null) {
            cause = executionException;
        }
        boolean completeExceptionally = clusterCompletableFuture.completeExceptionally(cause);
        if (completeExceptionally) {
            clusterCompletableFuture.setExceptionallyPrefix("ClusterMessageRepository at remoteFuture " + remoteCompletableFuture.getClient().getId());
        }
        if (log.isDebugEnabled()) {
            log.debug("RemoteException: RemoteMessageRepository {} , RemoteException {}, completeExceptionally {}", new Object[]{remoteCompletableFuture.getClient(), executionException, Boolean.valueOf(completeExceptionally), executionException});
        }
    }
}
