package com.github.sseserver.remote;

import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.github.sseserver.local.LocalController;
import com.github.sseserver.springboot.SseServerProperties;
import com.github.sseserver.util.LambdaUtil;
import com.github.sseserver.util.SpringUtil;
import com.github.sseserver.util.TypeUtil;
import java.io.Serializable;
import java.net.URL;
import java.nio.channels.ClosedChannelException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.function.Function;
import org.springframework.http.HttpEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SuccessCallback;
import org.springframework.web.client.AsyncRestTemplate;

/* loaded from: input_file:com/github/sseserver/remote/RemoteConnectionServiceImpl.class */
public class RemoteConnectionServiceImpl implements RemoteConnectionService {
    public static int connectTimeout = Integer.getInteger("RemoteConnectionServiceImpl.connectTimeout", 2000).intValue();
    public static int readTimeout = Integer.getInteger("RemoteConnectionServiceImpl.readTimeout", 10000).intValue();
    public static int threadsIfAsyncRequest = Integer.getInteger("RemoteConnectionServiceImpl.threadsIfAsyncRequest", 1).intValue();
    public static int threadsIfBlockRequest = Integer.getInteger("RemoteConnectionServiceImpl.threadsIfBlockRequest", Math.max(16, Runtime.getRuntime().availableProcessors() * 2)).intValue();
    private final AsyncRestTemplate restTemplate;
    private final URL url;
    private final String urlConnectionQueryService;
    private final String urlSendService;
    private final String urlRemoteConnectionService;
    private final SseServerProperties.AutoType autoTypeEnum;
    private final String id;
    private final ThreadLocal<Boolean> scopeOnWriteableThreadLocal = new ThreadLocal<>();
    private final Set<String> classNotFoundSet = new ConcurrentHashSet();
    private boolean closeFlag = false;

    public RemoteConnectionServiceImpl(URL url, String str, String str2, SseServerProperties.AutoType autoType) {
        this.url = url;
        this.id = str;
        this.urlConnectionQueryService = url + "/ConnectionQueryService";
        this.urlSendService = url + "/SendService";
        this.urlRemoteConnectionService = url + "/RemoteConnectionService";
        this.autoTypeEnum = autoType;
        this.restTemplate = SpringUtil.newAsyncRestTemplate(connectTimeout, readTimeout, threadsIfAsyncRequest, threadsIfBlockRequest, str + "RemoteConnectionService", str, str2);
    }

    @Override // com.github.sseserver.remote.RemoteConnectionService
    public String getId() {
        return this.id;
    }

    @Override // com.github.sseserver.remote.RemoteConnectionService
    public RemoteCompletableFuture<Boolean, RemoteConnectionService> isOnlineAsync(Serializable serializable) {
        return asyncGetConnectionQueryService("/isOnline?userId={userId}", this::extract, serializable);
    }

    @Override // com.github.sseserver.remote.RemoteConnectionService
    public <ACCESS_USER> RemoteCompletableFuture<ACCESS_USER, RemoteConnectionService> getUserAsync(Serializable serializable) {
        return asyncGetConnectionQueryService("/getUser?userId={userId}", this::extract, serializable);
    }

    @Override // com.github.sseserver.remote.RemoteConnectionService
    public <ACCESS_USER> RemoteCompletableFuture<List<ACCESS_USER>, RemoteConnectionService> getUsersAsync() {
        return asyncGetConnectionQueryService("/getUsers", this::extract, new Object[0]);
    }

    @Override // com.github.sseserver.remote.RemoteConnectionService
    public <ACCESS_USER> RemoteCompletableFuture<List<ACCESS_USER>, RemoteConnectionService> getUsersByListeningAsync(String str) {
        return asyncGetConnectionQueryService("/getUsersByListening?sseListenerName={sseListenerName}", this::extract, str);
    }

    @Override // com.github.sseserver.remote.RemoteConnectionService
    public <ACCESS_USER> RemoteCompletableFuture<List<ACCESS_USER>, RemoteConnectionService> getUsersByTenantIdListeningAsync(Serializable serializable, String str) {
        return asyncGetConnectionQueryService("/getUsersByTenantIdListening?tenantId={tenantId}&sseListenerName={sseListenerName}", this::extract, serializable, str);
    }

    @Override // com.github.sseserver.remote.RemoteConnectionService
    public <T> RemoteCompletableFuture<Collection<T>, RemoteConnectionService> getUserIdsAsync(Class<T> cls) {
        return asyncGetConnectionQueryService("/getUserIds", responseEntity -> {
            return castBasic((Collection) extract(responseEntity), cls);
        }, new Object[0]);
    }

    @Override // com.github.sseserver.remote.RemoteConnectionService
    public <T> RemoteCompletableFuture<List<T>, RemoteConnectionService> getUserIdsByListeningAsync(String str, Class<T> cls) {
        return asyncGetConnectionQueryService("/getUserIdsByListening?sseListenerName={sseListenerName}", responseEntity -> {
            return castBasic((Collection) extract(responseEntity), cls);
        }, str);
    }

    @Override // com.github.sseserver.remote.RemoteConnectionService
    public <T> RemoteCompletableFuture<List<T>, RemoteConnectionService> getUserIdsByTenantIdListeningAsync(Serializable serializable, String str, Class<T> cls) {
        return asyncGetConnectionQueryService("/getUserIdsByTenantIdListening?tenantId={tenantId}&sseListenerName={sseListenerName}", responseEntity -> {
            return castBasic((Collection) extract(responseEntity), cls);
        }, serializable, str);
    }

    @Override // com.github.sseserver.remote.RemoteConnectionService
    public RemoteCompletableFuture<Collection<String>, RemoteConnectionService> getAccessTokensAsync() {
        return asyncGetConnectionQueryService("/getAccessTokens", this::extract, new Object[0]);
    }

    @Override // com.github.sseserver.remote.RemoteConnectionService
    public <T> RemoteCompletableFuture<List<T>, RemoteConnectionService> getTenantIdsAsync(Class<T> cls) {
        return asyncGetConnectionQueryService("/getTenantIds", responseEntity -> {
            return castBasic((Collection) extract(responseEntity), cls);
        }, new Object[0]);
    }

    @Override // com.github.sseserver.remote.RemoteConnectionService
    public RemoteCompletableFuture<List<String>, RemoteConnectionService> getChannelsAsync() {
        return asyncGetConnectionQueryService("/getChannels", this::extract, new Object[0]);
    }

    @Override // com.github.sseserver.remote.RemoteConnectionService
    public RemoteCompletableFuture<Integer, RemoteConnectionService> getConnectionCountAsync() {
        return asyncGetConnectionQueryService("/getConnectionCount", this::extract, new Object[0]);
    }

    @Override // com.github.sseserver.ConnectionQueryService
    public boolean isOnline(Serializable serializable) {
        Boolean block = isOnlineAsync(serializable).block();
        Objects.requireNonNull(block, "RemoteConnectionServiceImpl -> public boolean isOnline(userId) result is Null");
        return block.booleanValue();
    }

    @Override // com.github.sseserver.ConnectionQueryService
    public <ACCESS_USER> ACCESS_USER getUser(Serializable serializable) {
        return getUserAsync(serializable).block();
    }

    @Override // com.github.sseserver.ConnectionQueryService
    public <ACCESS_USER> List<ACCESS_USER> getUsers() {
        return getUsersAsync().block();
    }

    @Override // com.github.sseserver.ConnectionQueryService
    public <ACCESS_USER> List<ACCESS_USER> getUsersByListening(String str) {
        return getUsersByListeningAsync(str).block();
    }

    @Override // com.github.sseserver.ConnectionQueryService
    public <ACCESS_USER> List<ACCESS_USER> getUsersByTenantIdListening(Serializable serializable, String str) {
        return getUsersByTenantIdListeningAsync(serializable, str).block();
    }

    @Override // com.github.sseserver.ConnectionQueryService
    public <T> Collection<T> getUserIds(Class<T> cls) {
        return getUserIdsAsync(cls).block();
    }

    @Override // com.github.sseserver.ConnectionQueryService
    public <T> List<T> getUserIdsByListening(String str, Class<T> cls) {
        return getUserIdsByListeningAsync(str, cls).block();
    }

    @Override // com.github.sseserver.ConnectionQueryService
    public <T> List<T> getUserIdsByTenantIdListening(Serializable serializable, String str, Class<T> cls) {
        return getUserIdsByTenantIdListeningAsync(serializable, str, cls).block();
    }

    @Override // com.github.sseserver.ConnectionQueryService
    public Collection<String> getAccessTokens() {
        return getAccessTokensAsync().block();
    }

    @Override // com.github.sseserver.ConnectionQueryService
    public <T> List<T> getTenantIds(Class<T> cls) {
        return getTenantIdsAsync(cls).block();
    }

    @Override // com.github.sseserver.ConnectionQueryService
    public List<String> getChannels() {
        return getChannelsAsync().block();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.github.sseserver.ConnectionQueryService
    public int getAccessTokenCount() {
        Integer num = (Integer) asyncGetConnectionQueryService("/getAccessTokenCount", this::extract, new Object[0]).block();
        Objects.requireNonNull(num, "RemoteConnectionServiceImpl -> public int getAccessTokenCount() result is Null");
        return num.intValue();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.github.sseserver.ConnectionQueryService
    public int getUserCount() {
        Integer num = (Integer) asyncGetConnectionQueryService("/getUserCount", this::extract, new Object[0]).block();
        Objects.requireNonNull(num, "RemoteConnectionServiceImpl -> public int getUserCount() result is Null");
        return num.intValue();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.github.sseserver.ConnectionQueryService
    public int getConnectionCount() {
        Integer num = (Integer) asyncGetConnectionQueryService("/getConnectionCount", this::extract, new Object[0]).block();
        Objects.requireNonNull(num, "RemoteConnectionServiceImpl -> public boolean getConnectionCount() result is Null");
        return num.intValue();
    }

    @Override // com.github.sseserver.SendService
    public <T> T scopeOnWriteable(Callable<T> callable) {
        this.scopeOnWriteableThreadLocal.set(true);
        try {
            try {
                T call = callable.call();
                this.scopeOnWriteableThreadLocal.remove();
                return call;
            } catch (Exception e) {
                LambdaUtil.sneakyThrows(e);
                this.scopeOnWriteableThreadLocal.remove();
                return null;
            }
        } catch (Throwable th) {
            this.scopeOnWriteableThreadLocal.remove();
            throw th;
        }
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public RemoteCompletableFuture<Integer, RemoteConnectionService> sendAll(String str, Object obj) {
        HashMap hashMap = new HashMap(2);
        hashMap.put("eventName", str);
        hashMap.put("body", obj);
        return asyncPostSendService("/sendAll", this::extract, hashMap);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public RemoteCompletableFuture<Integer, RemoteConnectionService> sendAllListening(String str, Object obj) {
        HashMap hashMap = new HashMap(2);
        hashMap.put("eventName", str);
        hashMap.put("body", obj);
        return asyncPostSendService("/sendAllListening", this::extract, hashMap);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public RemoteCompletableFuture<Integer, RemoteConnectionService> sendByChannel(Collection<String> collection, String str, Object obj) {
        HashMap hashMap = new HashMap(3);
        hashMap.put("channels", collection);
        hashMap.put("eventName", str);
        hashMap.put("body", obj);
        return asyncPostSendService("/sendByChannel", this::extract, hashMap);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public RemoteCompletableFuture<Integer, RemoteConnectionService> sendByChannelListening(Collection<String> collection, String str, Object obj) {
        HashMap hashMap = new HashMap(3);
        hashMap.put("channels", collection);
        hashMap.put("eventName", str);
        hashMap.put("body", obj);
        return asyncPostSendService("/sendByChannelListening", this::extract, hashMap);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public RemoteCompletableFuture<Integer, RemoteConnectionService> sendByAccessToken(Collection<String> collection, String str, Object obj) {
        HashMap hashMap = new HashMap(3);
        hashMap.put("accessTokens", collection);
        hashMap.put("eventName", str);
        hashMap.put("body", obj);
        return asyncPostSendService("/sendByAccessToken", this::extract, hashMap);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public RemoteCompletableFuture<Integer, RemoteConnectionService> sendByAccessTokenListening(Collection<String> collection, String str, Object obj) {
        HashMap hashMap = new HashMap(3);
        hashMap.put("accessTokens", collection);
        hashMap.put("eventName", str);
        hashMap.put("body", obj);
        return asyncPostSendService("/sendByAccessTokenListening", this::extract, hashMap);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public RemoteCompletableFuture<Integer, RemoteConnectionService> sendByUserId(Collection<? extends Serializable> collection, String str, Object obj) {
        HashMap hashMap = new HashMap(3);
        hashMap.put("userIds", collection);
        hashMap.put("eventName", str);
        hashMap.put("body", obj);
        return asyncPostSendService("/sendByUserId", this::extract, hashMap);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public RemoteCompletableFuture<Integer, RemoteConnectionService> sendByUserIdListening(Collection<? extends Serializable> collection, String str, Object obj) {
        HashMap hashMap = new HashMap(3);
        hashMap.put("userIds", collection);
        hashMap.put("eventName", str);
        hashMap.put("body", obj);
        return asyncPostSendService("/sendByUserIdListening", this::extract, hashMap);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public RemoteCompletableFuture<Integer, RemoteConnectionService> sendByTenantId(Collection<? extends Serializable> collection, String str, Object obj) {
        HashMap hashMap = new HashMap(3);
        hashMap.put("tenantIds", collection);
        hashMap.put("eventName", str);
        hashMap.put("body", obj);
        return asyncPostSendService("/sendByTenantId", this::extract, hashMap);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public RemoteCompletableFuture<Integer, RemoteConnectionService> sendByTenantIdListening(Collection<? extends Serializable> collection, String str, Object obj) {
        HashMap hashMap = new HashMap(3);
        hashMap.put("tenantIds", collection);
        hashMap.put("eventName", str);
        hashMap.put("body", obj);
        return asyncPostSendService("/sendByTenantIdListening", this::extract, hashMap);
    }

    @Override // com.github.sseserver.remote.RemoteConnectionService
    public RemoteCompletableFuture<Integer, RemoteConnectionService> disconnectByUserId(Serializable serializable) {
        HashMap hashMap = new HashMap(1);
        hashMap.put("userId", serializable);
        return asyncPostRemoteConnectionService("/disconnectByUserId", this::extract, hashMap);
    }

    @Override // com.github.sseserver.remote.RemoteConnectionService
    public RemoteCompletableFuture<Integer, RemoteConnectionService> disconnectByAccessToken(String str) {
        HashMap hashMap = new HashMap(1);
        hashMap.put("accessToken", str);
        return asyncPostRemoteConnectionService("/disconnectByAccessToken", this::extract, hashMap);
    }

    @Override // com.github.sseserver.remote.RemoteConnectionService
    public RemoteCompletableFuture<Integer, RemoteConnectionService> disconnectByConnectionId(Long l) {
        HashMap hashMap = new HashMap(1);
        hashMap.put("connectionId", l);
        return asyncPostRemoteConnectionService("/disconnectByConnectionId", this::extract, hashMap);
    }

    protected <T> RemoteCompletableFuture<T, RemoteConnectionService> asyncGetConnectionQueryService(String str, Function<ResponseEntity<LocalController.Response>, T> function, Object... objArr) {
        return asyncGet(this.urlConnectionQueryService + str, function, objArr);
    }

    protected <T> RemoteCompletableFuture<T, RemoteConnectionService> asyncPostSendService(String str, Function<ResponseEntity<LocalController.Response>, T> function, Map<String, Object> map) {
        Boolean bool = this.scopeOnWriteableThreadLocal.get();
        if (bool != null && bool.booleanValue()) {
            map.put("scopeOnWriteable", true);
        }
        return asyncPost(this.urlSendService + str, function, map);
    }

    protected <T> RemoteCompletableFuture<T, RemoteConnectionService> asyncPostRemoteConnectionService(String str, Function<ResponseEntity<LocalController.Response>, T> function, Map<String, Object> map) {
        return asyncPost(this.urlRemoteConnectionService + str, function, map);
    }

    protected <T> RemoteCompletableFuture<T, RemoteConnectionService> asyncGet(String str, Function<ResponseEntity<LocalController.Response>, T> function, Object... objArr) {
        checkClose();
        return completable(this.restTemplate.getForEntity(str, LocalController.Response.class, objArr), function);
    }

    protected <T> RemoteCompletableFuture<T, RemoteConnectionService> asyncPost(String str, Function<ResponseEntity<LocalController.Response>, T> function, Map<String, Object> map) {
        checkClose();
        return completable(this.restTemplate.postForEntity(str, new HttpEntity(map, SpringUtil.EMPTY_HEADERS), LocalController.Response.class, new Object[0]), function);
    }

    protected <T> RemoteCompletableFuture<T, RemoteConnectionService> completable(ListenableFuture<ResponseEntity<LocalController.Response>> listenableFuture, Function<ResponseEntity<LocalController.Response>, T> function) {
        RemoteCompletableFuture<T, RemoteConnectionService> remoteCompletableFuture = new RemoteCompletableFuture<>();
        remoteCompletableFuture.setClient(this);
        SuccessCallback successCallback = responseEntity -> {
            try {
                remoteCompletableFuture.complete(function.apply(responseEntity));
            } catch (Throwable th) {
                remoteCompletableFuture.completeExceptionally(th);
            }
        };
        remoteCompletableFuture.getClass();
        listenableFuture.addCallback(successCallback, remoteCompletableFuture::completeExceptionally);
        return remoteCompletableFuture;
    }

    protected <T> T extract(ResponseEntity<LocalController.Response> responseEntity) {
        LocalController.Response response = (LocalController.Response) responseEntity.getBody();
        try {
            response.autoCastClassName(this.autoTypeEnum, this.classNotFoundSet);
        } catch (ClassNotFoundException e) {
            LambdaUtil.sneakyThrows(e);
        }
        return (T) response.getData();
    }

    protected <SOURCE extends Collection<?>, T> List<T> castBasic(SOURCE source, Class<T> cls) {
        return TypeUtil.castBasic(source, cls);
    }

    @Override // com.github.sseserver.remote.RemoteConnectionService
    public URL getRemoteUrl() {
        return this.url;
    }

    public String toString() {
        return this.url == null ? "null" : this.url.toString();
    }

    @Override // com.github.sseserver.remote.RemoteConnectionService, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        SpringUtil.close(this.restTemplate);
        this.closeFlag = true;
    }

    protected void checkClose() {
        if (this.closeFlag) {
            LambdaUtil.sneakyThrows(new ClosedChannelException());
        }
    }

    @Override // com.github.sseserver.SendService
    public /* bridge */ /* synthetic */ RemoteCompletableFuture<Integer, RemoteConnectionService> sendByTenantIdListening(Collection collection, String str, Object obj) {
        return sendByTenantIdListening((Collection<? extends Serializable>) collection, str, obj);
    }

    @Override // com.github.sseserver.SendService
    public /* bridge */ /* synthetic */ RemoteCompletableFuture<Integer, RemoteConnectionService> sendByTenantId(Collection collection, String str, Object obj) {
        return sendByTenantId((Collection<? extends Serializable>) collection, str, obj);
    }

    @Override // com.github.sseserver.SendService
    public /* bridge */ /* synthetic */ RemoteCompletableFuture<Integer, RemoteConnectionService> sendByUserIdListening(Collection collection, String str, Object obj) {
        return sendByUserIdListening((Collection<? extends Serializable>) collection, str, obj);
    }

    @Override // com.github.sseserver.SendService
    public /* bridge */ /* synthetic */ RemoteCompletableFuture<Integer, RemoteConnectionService> sendByUserId(Collection collection, String str, Object obj) {
        return sendByUserId((Collection<? extends Serializable>) collection, str, obj);
    }

    @Override // com.github.sseserver.SendService
    public /* bridge */ /* synthetic */ RemoteCompletableFuture<Integer, RemoteConnectionService> sendByAccessTokenListening(Collection collection, String str, Object obj) {
        return sendByAccessTokenListening((Collection<String>) collection, str, obj);
    }

    @Override // com.github.sseserver.SendService
    public /* bridge */ /* synthetic */ RemoteCompletableFuture<Integer, RemoteConnectionService> sendByAccessToken(Collection collection, String str, Object obj) {
        return sendByAccessToken((Collection<String>) collection, str, obj);
    }

    @Override // com.github.sseserver.SendService
    public /* bridge */ /* synthetic */ RemoteCompletableFuture<Integer, RemoteConnectionService> sendByChannelListening(Collection collection, String str, Object obj) {
        return sendByChannelListening((Collection<String>) collection, str, obj);
    }

    @Override // com.github.sseserver.SendService
    public /* bridge */ /* synthetic */ RemoteCompletableFuture<Integer, RemoteConnectionService> sendByChannel(Collection collection, String str, Object obj) {
        return sendByChannel((Collection<String>) collection, str, obj);
    }
}
