package com.github.sseserver.remote;

import com.github.sseserver.local.LocalConnectionService;
import com.github.sseserver.util.CompletableFuture;
import com.github.sseserver.util.LambdaUtil;
import com.github.sseserver.util.ReferenceCounted;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/sseserver/remote/ClusterConnectionServiceImpl.class */
public class ClusterConnectionServiceImpl implements ClusterConnectionService {
    private static final Logger log = LoggerFactory.getLogger(ClusterConnectionServiceImpl.class);
    private final Supplier<LocalConnectionService> localSupplier;
    private final Supplier<ReferenceCounted<List<RemoteConnectionService>>> remoteSupplier;
    private final ThreadLocal<Boolean> scopeOnWriteableThreadLocal = new ThreadLocal<>();

    public ClusterConnectionServiceImpl(Supplier<LocalConnectionService> supplier, Supplier<ReferenceCounted<List<RemoteConnectionService>>> supplier2) {
        this.localSupplier = supplier;
        this.remoteSupplier = supplier2;
    }

    public LocalConnectionService getLocalService() {
        return this.localSupplier.get();
    }

    public ReferenceCounted<List<RemoteConnectionService>> getRemoteServiceRef() {
        return this.remoteSupplier == null ? new ReferenceCounted<>(Collections.emptyList()) : this.remoteSupplier.get();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.github.sseserver.ConnectionQueryService
    public boolean isOnline(Serializable serializable) {
        if (getLocalService().isOnline(serializable)) {
            return true;
        }
        return ((Boolean) mapReduce(remoteConnectionService -> {
            return remoteConnectionService.isOnlineAsync(serializable);
        }, localConnectionService -> {
            return false;
        }, (v0, v1) -> {
            return Boolean.logicalOr(v0, v1);
        }, LambdaUtil.defaultFalse()).block()).booleanValue();
    }

    @Override // com.github.sseserver.ConnectionQueryService
    public <ACCESS_USER> ACCESS_USER getUser(Serializable serializable) {
        ACCESS_USER access_user = (ACCESS_USER) getLocalService().getUser(serializable);
        return access_user != null ? access_user : (ACCESS_USER) mapReduce(remoteConnectionService -> {
            return remoteConnectionService.getUserAsync(serializable);
        }, localConnectionService -> {
            return null;
        }, LambdaUtil.filterNull(), LambdaUtil.defaultNull()).block();
    }

    @Override // com.github.sseserver.ConnectionQueryService
    public <ACCESS_USER> List<ACCESS_USER> getUsers() {
        return (List) mapReduce((v0) -> {
            return v0.getUsersAsync();
        }, (v0) -> {
            return v0.getUsers();
        }, LambdaUtil.reduceList(), LambdaUtil.distinct(), ArrayList::new).block();
    }

    @Override // com.github.sseserver.ConnectionQueryService
    public <ACCESS_USER> List<ACCESS_USER> getUsersByListening(String str) {
        return (List) mapReduce(remoteConnectionService -> {
            return remoteConnectionService.getUsersByListeningAsync(str);
        }, localConnectionService -> {
            return localConnectionService.getUsersByListening(str);
        }, LambdaUtil.reduceList(), LambdaUtil.distinct(), ArrayList::new).block();
    }

    @Override // com.github.sseserver.ConnectionQueryService
    public <ACCESS_USER> List<ACCESS_USER> getUsersByTenantIdListening(Serializable serializable, String str) {
        return (List) mapReduce(remoteConnectionService -> {
            return remoteConnectionService.getUsersByTenantIdListeningAsync(serializable, str);
        }, localConnectionService -> {
            return localConnectionService.getUsersByTenantIdListening(serializable, str);
        }, LambdaUtil.reduceList(), LambdaUtil.distinct(), ArrayList::new).block();
    }

    @Override // com.github.sseserver.ConnectionQueryService
    public <T> Collection<T> getUserIds(Class<T> cls) {
        return (Collection) mapReduce(remoteConnectionService -> {
            return remoteConnectionService.getUserIdsAsync(cls);
        }, localConnectionService -> {
            return localConnectionService.getUserIds(cls);
        }, LambdaUtil.reduceList(), LambdaUtil.noop(), LinkedHashSet::new).block();
    }

    @Override // com.github.sseserver.ConnectionQueryService
    public <T> List<T> getUserIdsByListening(String str, Class<T> cls) {
        return (List) mapReduce(remoteConnectionService -> {
            return remoteConnectionService.getUserIdsByListeningAsync(str, cls);
        }, localConnectionService -> {
            return localConnectionService.getUserIdsByListening(str, cls);
        }, LambdaUtil.reduceList(), LambdaUtil.distinct(), ArrayList::new).block();
    }

    @Override // com.github.sseserver.ConnectionQueryService
    public <T> List<T> getUserIdsByTenantIdListening(Serializable serializable, String str, Class<T> cls) {
        return (List) mapReduce(remoteConnectionService -> {
            return remoteConnectionService.getUserIdsByTenantIdListeningAsync(serializable, str, cls);
        }, localConnectionService -> {
            return localConnectionService.getUserIdsByTenantIdListening(serializable, str, cls);
        }, LambdaUtil.reduceList(), LambdaUtil.distinct(), ArrayList::new).block();
    }

    @Override // com.github.sseserver.ConnectionQueryService
    public Collection<String> getAccessTokens() {
        return (Collection) mapReduce((v0) -> {
            return v0.getAccessTokensAsync();
        }, (v0) -> {
            return v0.getAccessTokens();
        }, LambdaUtil.reduceList(), LambdaUtil.noop(), LinkedHashSet::new).block();
    }

    @Override // com.github.sseserver.ConnectionQueryService
    public <T> List<T> getTenantIds(Class<T> cls) {
        return (List) mapReduce(remoteConnectionService -> {
            return remoteConnectionService.getTenantIdsAsync(cls);
        }, localConnectionService -> {
            return localConnectionService.getTenantIds(cls);
        }, LambdaUtil.reduceList(), LambdaUtil.distinct(), ArrayList::new).block();
    }

    @Override // com.github.sseserver.ConnectionQueryService
    public List<String> getChannels() {
        return (List) mapReduce((v0) -> {
            return v0.getChannelsAsync();
        }, (v0) -> {
            return v0.getChannels();
        }, LambdaUtil.reduceList(), LambdaUtil.distinct(), ArrayList::new).block();
    }

    @Override // com.github.sseserver.ConnectionQueryService
    public int getAccessTokenCount() {
        return getAccessTokens().size();
    }

    @Override // com.github.sseserver.ConnectionQueryService
    public int getUserCount() {
        return getUserIds(String.class).size();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.github.sseserver.ConnectionQueryService
    public int getConnectionCount() {
        return ((Integer) mapReduce((v0) -> {
            return v0.getConnectionCountAsync();
        }, (v0) -> {
            return v0.getConnectionCount();
        }, (v0, v1) -> {
            return Integer.sum(v0, v1);
        }, LambdaUtil.defaultZero()).block()).intValue();
    }

    @Override // com.github.sseserver.SendService
    public <T> T scopeOnWriteable(Callable<T> callable) {
        this.scopeOnWriteableThreadLocal.set(true);
        try {
            try {
                T call = callable.call();
                this.scopeOnWriteableThreadLocal.remove();
                return call;
            } catch (Exception e) {
                LambdaUtil.sneakyThrows(e);
                this.scopeOnWriteableThreadLocal.remove();
                return null;
            }
        } catch (Throwable th) {
            this.scopeOnWriteableThreadLocal.remove();
            throw th;
        }
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public ClusterCompletableFuture<Integer, ClusterConnectionService> sendAll(String str, Object obj) {
        return mapReduce(remoteConnectionService -> {
            return remoteConnectionService.sendAll(str, obj);
        }, localConnectionService -> {
            return localConnectionService.sendAll(str, obj);
        }, (v0, v1) -> {
            return Integer.sum(v0, v1);
        }, LambdaUtil.defaultZero());
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public ClusterCompletableFuture<Integer, ClusterConnectionService> sendAllListening(String str, Object obj) {
        return mapReduce(remoteConnectionService -> {
            return remoteConnectionService.sendAllListening(str, obj);
        }, localConnectionService -> {
            return localConnectionService.sendAllListening(str, obj);
        }, (v0, v1) -> {
            return Integer.sum(v0, v1);
        }, LambdaUtil.defaultZero());
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public ClusterCompletableFuture<Integer, ClusterConnectionService> sendByChannel(Collection<String> collection, String str, Object obj) {
        return mapReduce(remoteConnectionService -> {
            return remoteConnectionService.sendByChannel((Collection<String>) collection, str, obj);
        }, localConnectionService -> {
            return localConnectionService.sendByChannel((Collection<String>) collection, str, obj);
        }, (v0, v1) -> {
            return Integer.sum(v0, v1);
        }, LambdaUtil.defaultZero());
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public ClusterCompletableFuture<Integer, ClusterConnectionService> sendByChannelListening(Collection<String> collection, String str, Object obj) {
        return mapReduce(remoteConnectionService -> {
            return remoteConnectionService.sendByChannelListening((Collection<String>) collection, str, obj);
        }, localConnectionService -> {
            return localConnectionService.sendByChannelListening((Collection<String>) collection, str, obj);
        }, (v0, v1) -> {
            return Integer.sum(v0, v1);
        }, LambdaUtil.defaultZero());
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public ClusterCompletableFuture<Integer, ClusterConnectionService> sendByAccessToken(Collection<String> collection, String str, Object obj) {
        return mapReduce(remoteConnectionService -> {
            return remoteConnectionService.sendByAccessToken((Collection<String>) collection, str, obj);
        }, localConnectionService -> {
            return localConnectionService.sendByAccessToken((Collection<String>) collection, str, obj);
        }, (v0, v1) -> {
            return Integer.sum(v0, v1);
        }, LambdaUtil.defaultZero());
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public ClusterCompletableFuture<Integer, ClusterConnectionService> sendByAccessTokenListening(Collection<String> collection, String str, Object obj) {
        return mapReduce(remoteConnectionService -> {
            return remoteConnectionService.sendByAccessTokenListening((Collection<String>) collection, str, obj);
        }, localConnectionService -> {
            return localConnectionService.sendByAccessTokenListening((Collection<String>) collection, str, obj);
        }, (v0, v1) -> {
            return Integer.sum(v0, v1);
        }, LambdaUtil.defaultZero());
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public ClusterCompletableFuture<Integer, ClusterConnectionService> sendByUserId(Collection<? extends Serializable> collection, String str, Object obj) {
        return mapReduce(remoteConnectionService -> {
            return remoteConnectionService.sendByUserId((Collection<? extends Serializable>) collection, str, obj);
        }, localConnectionService -> {
            return localConnectionService.sendByUserId((Collection<? extends Serializable>) collection, str, obj);
        }, (v0, v1) -> {
            return Integer.sum(v0, v1);
        }, LambdaUtil.defaultZero());
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public ClusterCompletableFuture<Integer, ClusterConnectionService> sendByUserIdListening(Collection<? extends Serializable> collection, String str, Object obj) {
        return mapReduce(remoteConnectionService -> {
            return remoteConnectionService.sendByUserIdListening((Collection<? extends Serializable>) collection, str, obj);
        }, localConnectionService -> {
            return localConnectionService.sendByUserIdListening((Collection<? extends Serializable>) collection, str, obj);
        }, (v0, v1) -> {
            return Integer.sum(v0, v1);
        }, LambdaUtil.defaultZero());
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public ClusterCompletableFuture<Integer, ClusterConnectionService> sendByTenantId(Collection<? extends Serializable> collection, String str, Object obj) {
        return mapReduce(remoteConnectionService -> {
            return remoteConnectionService.sendByTenantId((Collection<? extends Serializable>) collection, str, obj);
        }, localConnectionService -> {
            return localConnectionService.sendByTenantId((Collection<? extends Serializable>) collection, str, obj);
        }, (v0, v1) -> {
            return Integer.sum(v0, v1);
        }, LambdaUtil.defaultZero());
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public ClusterCompletableFuture<Integer, ClusterConnectionService> sendByTenantIdListening(Collection<? extends Serializable> collection, String str, Object obj) {
        return mapReduce(remoteConnectionService -> {
            return remoteConnectionService.sendByTenantIdListening((Collection<? extends Serializable>) collection, str, obj);
        }, localConnectionService -> {
            return localConnectionService.sendByTenantIdListening((Collection<? extends Serializable>) collection, str, obj);
        }, (v0, v1) -> {
            return Integer.sum(v0, v1);
        }, LambdaUtil.defaultZero());
    }

    @Override // com.github.sseserver.remote.ClusterConnectionService
    public ClusterCompletableFuture<Integer, ClusterConnectionService> disconnectByUserId(Serializable serializable) {
        return mapReduce(remoteConnectionService -> {
            return remoteConnectionService.disconnectByUserId(serializable);
        }, localConnectionService -> {
            return Integer.valueOf(localConnectionService.disconnectByUserId(serializable).size());
        }, (v0, v1) -> {
            return Integer.sum(v0, v1);
        }, LambdaUtil.defaultZero());
    }

    @Override // com.github.sseserver.remote.ClusterConnectionService
    public ClusterCompletableFuture<Integer, ClusterConnectionService> disconnectByAccessToken(String str) {
        return mapReduce(remoteConnectionService -> {
            return remoteConnectionService.disconnectByAccessToken(str);
        }, localConnectionService -> {
            return Integer.valueOf(localConnectionService.disconnectByAccessToken(str).size());
        }, (v0, v1) -> {
            return Integer.sum(v0, v1);
        }, LambdaUtil.defaultZero());
    }

    @Override // com.github.sseserver.remote.ClusterConnectionService
    public ClusterCompletableFuture<Integer, ClusterConnectionService> disconnectByConnectionId(Long l) {
        return mapReduce(remoteConnectionService -> {
            return remoteConnectionService.disconnectByConnectionId(l);
        }, localConnectionService -> {
            return Integer.valueOf(localConnectionService.disconnectByConnectionId(l) != null ? 1 : 0);
        }, (v0, v1) -> {
            return Integer.sum(v0, v1);
        }, LambdaUtil.defaultZero());
    }

    protected <T> ClusterCompletableFuture<T, ClusterConnectionService> mapReduce(Function<RemoteConnectionService, RemoteCompletableFuture<T, RemoteConnectionService>> function, Function<LocalConnectionService, T> function2, BiFunction<T, T, T> biFunction, Supplier<T> supplier) {
        return mapReduce(function, function2, biFunction, LambdaUtil.noop(), supplier);
    }

    protected <T, R> ClusterCompletableFuture<R, ClusterConnectionService> mapReduce(Function<RemoteConnectionService, RemoteCompletableFuture<T, RemoteConnectionService>> function, Function<LocalConnectionService, T> function2, BiFunction<T, T, T> biFunction, Function<T, R> function3, Supplier<T> supplier) {
        ReferenceCounted<List<RemoteConnectionService>> remoteServiceRef = getRemoteServiceRef();
        Throwable th = null;
        try {
            try {
                List<RemoteConnectionService> list = remoteServiceRef.get();
                Boolean bool = this.scopeOnWriteableThreadLocal.get();
                ArrayList arrayList = new ArrayList(list.size());
                ArrayList arrayList2 = new ArrayList(list.size());
                for (RemoteConnectionService remoteConnectionService : list) {
                    arrayList.add(remoteConnectionService.getRemoteUrl());
                    if (bool == null || !bool.booleanValue()) {
                        arrayList2.add(function.apply(remoteConnectionService));
                    } else {
                        arrayList2.add(remoteConnectionService.scopeOnWriteable(() -> {
                            return (RemoteCompletableFuture) function.apply(remoteConnectionService);
                        }));
                    }
                }
                LocalConnectionService localService = getLocalService();
                Object apply = (bool == null || !bool.booleanValue()) ? function2.apply(localService) : localService.scopeOnWriteable(() -> {
                    return function2.apply(localService);
                });
                ClusterCompletableFuture<R, ClusterConnectionService> clusterCompletableFuture = new ClusterCompletableFuture<>(arrayList, this);
                Object obj = apply;
                CompletableFuture.join(arrayList2, clusterCompletableFuture, () -> {
                    Object obj2;
                    Object obj3 = supplier.get();
                    InterruptedException interruptedException = null;
                    Iterator it = arrayList2.iterator();
                    while (it.hasNext()) {
                        RemoteCompletableFuture<?, RemoteConnectionService> remoteCompletableFuture = (RemoteCompletableFuture) it.next();
                        if (interruptedException != null) {
                            try {
                            } catch (InterruptedException e) {
                                interruptedException = e;
                            } catch (ExecutionException e2) {
                                handleRemoteException(remoteCompletableFuture, e2, clusterCompletableFuture);
                            }
                            if (remoteCompletableFuture.isDone()) {
                                obj2 = remoteCompletableFuture.get();
                            }
                        } else {
                            obj2 = remoteCompletableFuture.get();
                        }
                        obj3 = biFunction.apply(obj3, obj2);
                    }
                    return function3.apply(biFunction.apply(obj3, obj));
                });
                if (remoteServiceRef != null) {
                    if (0 != 0) {
                        try {
                            remoteServiceRef.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        remoteServiceRef.close();
                    }
                }
                return clusterCompletableFuture;
            } finally {
            }
        } catch (Throwable th3) {
            if (remoteServiceRef != null) {
                if (th != null) {
                    try {
                        remoteServiceRef.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    remoteServiceRef.close();
                }
            }
            throw th3;
        }
    }

    protected <R> void handleRemoteException(RemoteCompletableFuture<?, RemoteConnectionService> remoteCompletableFuture, ExecutionException executionException, ClusterCompletableFuture<R, ClusterConnectionService> clusterCompletableFuture) {
        Throwable cause = executionException.getCause();
        if (cause == null) {
            cause = executionException;
        }
        boolean completeExceptionally = clusterCompletableFuture.completeExceptionally(cause);
        if (completeExceptionally) {
            clusterCompletableFuture.setExceptionallyPrefix("ClusterConnectionServiceImpl at remoteFuture " + remoteCompletableFuture.getClient().getId());
        }
        if (log.isDebugEnabled()) {
            log.debug("RemoteException: RemoteConnectionService {} , RemoteException {}, completeExceptionally {}", new Object[]{remoteCompletableFuture.getClient(), executionException, Boolean.valueOf(completeExceptionally), executionException});
        }
    }

    @Override // com.github.sseserver.SendService
    public /* bridge */ /* synthetic */ ClusterCompletableFuture<Integer, ClusterConnectionService> sendByTenantIdListening(Collection collection, String str, Object obj) {
        return sendByTenantIdListening((Collection<? extends Serializable>) collection, str, obj);
    }

    @Override // com.github.sseserver.SendService
    public /* bridge */ /* synthetic */ ClusterCompletableFuture<Integer, ClusterConnectionService> sendByTenantId(Collection collection, String str, Object obj) {
        return sendByTenantId((Collection<? extends Serializable>) collection, str, obj);
    }

    @Override // com.github.sseserver.SendService
    public /* bridge */ /* synthetic */ ClusterCompletableFuture<Integer, ClusterConnectionService> sendByUserIdListening(Collection collection, String str, Object obj) {
        return sendByUserIdListening((Collection<? extends Serializable>) collection, str, obj);
    }

    @Override // com.github.sseserver.SendService
    public /* bridge */ /* synthetic */ ClusterCompletableFuture<Integer, ClusterConnectionService> sendByUserId(Collection collection, String str, Object obj) {
        return sendByUserId((Collection<? extends Serializable>) collection, str, obj);
    }

    @Override // com.github.sseserver.SendService
    public /* bridge */ /* synthetic */ ClusterCompletableFuture<Integer, ClusterConnectionService> sendByAccessTokenListening(Collection collection, String str, Object obj) {
        return sendByAccessTokenListening((Collection<String>) collection, str, obj);
    }

    @Override // com.github.sseserver.SendService
    public /* bridge */ /* synthetic */ ClusterCompletableFuture<Integer, ClusterConnectionService> sendByAccessToken(Collection collection, String str, Object obj) {
        return sendByAccessToken((Collection<String>) collection, str, obj);
    }

    @Override // com.github.sseserver.SendService
    public /* bridge */ /* synthetic */ ClusterCompletableFuture<Integer, ClusterConnectionService> sendByChannelListening(Collection collection, String str, Object obj) {
        return sendByChannelListening((Collection<String>) collection, str, obj);
    }

    @Override // com.github.sseserver.SendService
    public /* bridge */ /* synthetic */ ClusterCompletableFuture<Integer, ClusterConnectionService> sendByChannel(Collection collection, String str, Object obj) {
        return sendByChannel((Collection<String>) collection, str, obj);
    }
}
