package com.github.sseserver.remote;

import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.github.sseserver.local.LocalController;
import com.github.sseserver.qos.Message;
import com.github.sseserver.qos.MessageRepository;
import com.github.sseserver.springboot.SseServerProperties;
import com.github.sseserver.util.LambdaUtil;
import com.github.sseserver.util.SpringUtil;
import java.net.URL;
import java.nio.channels.ClosedChannelException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.springframework.http.HttpEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SuccessCallback;
import org.springframework.web.client.AsyncRestTemplate;

/* loaded from: input_file:com/github/sseserver/remote/RemoteMessageRepository.class */
public class RemoteMessageRepository implements MessageRepository {
    public static int connectTimeout = Integer.getInteger("RemoteMessageRepository.connectTimeout", 2000).intValue();
    public static int readTimeout = Integer.getInteger("RemoteMessageRepository.readTimeout", 10000).intValue();
    public static int threadsIfAsyncRequest = Integer.getInteger("RemoteMessageRepository.threadsIfAsyncRequest", 1).intValue();
    public static int threadsIfBlockRequest = Integer.getInteger("RemoteMessageRepository.threadsIfBlockRequest", Math.max(16, Runtime.getRuntime().availableProcessors() * 2)).intValue();
    private final AsyncRestTemplate restTemplate;
    private final URL url;
    private final String urlMessageRepository;
    private final String id;
    private final SseServerProperties.AutoType autoTypeEnum;
    private final Set<String> classNotFoundSet = new ConcurrentHashSet();
    private boolean closeFlag = false;

    public RemoteMessageRepository(URL url, String str, String str2, SseServerProperties.AutoType autoType) {
        this.url = url;
        this.urlMessageRepository = url + "/MessageRepository";
        this.id = str;
        this.autoTypeEnum = autoType;
        this.restTemplate = SpringUtil.newAsyncRestTemplate(connectTimeout, readTimeout, threadsIfAsyncRequest, threadsIfBlockRequest, str + "RemoteMessageRepository", str, str2);
    }

    @Override // com.github.sseserver.qos.MessageRepository
    public String insert(Message message) {
        return insertAsync(message).block();
    }

    @Override // com.github.sseserver.qos.MessageRepository
    public List<Message> select(MessageRepository.Query query) {
        return selectAsync(query).block();
    }

    @Override // com.github.sseserver.qos.MessageRepository
    public Message delete(String str) {
        return deleteAsync(str).block();
    }

    public RemoteCompletableFuture<String, RemoteMessageRepository> insertAsync(Message message) {
        HashMap hashMap = new HashMap(8);
        hashMap.put("filters", Integer.valueOf(message.getFilters()));
        hashMap.put("id", message.getId());
        hashMap.put("body", message.getBody());
        hashMap.put("eventName", message.getEventName());
        hashMap.put("listenerName", message.getListenerName());
        hashMap.put("userIdList", message.getUserIdList());
        hashMap.put("tenantIdList", message.getTenantIdList());
        hashMap.put("accessTokenList", message.getAccessTokenList());
        hashMap.put("channelList", message.getChannelList());
        return asyncPost("/insert", hashMap, this::extract);
    }

    public RemoteCompletableFuture<List<Message>, RemoteMessageRepository> selectAsync(MessageRepository.Query query) {
        HashMap hashMap = new HashMap(6);
        hashMap.put("tenantId", query.getTenantId());
        hashMap.put("channel", query.getChannel());
        hashMap.put("accessToken", query.getAccessToken());
        hashMap.put("userId", query.getUserId());
        hashMap.put("listeners", query.getListeners());
        return asyncPost("/select", hashMap, this::extractListMessage);
    }

    public RemoteCompletableFuture<Message, RemoteMessageRepository> deleteAsync(String str) {
        HashMap hashMap = new HashMap(1);
        hashMap.put("id", str);
        return asyncPost("/delete", hashMap, responseEntity -> {
            return buildMessage((Map) ((LocalController.Response) responseEntity.getBody()).getData());
        });
    }

    @Override // com.github.sseserver.qos.MessageRepository, java.lang.AutoCloseable
    public void close() {
        SpringUtil.close(this.restTemplate);
        this.closeFlag = true;
    }

    @Override // com.github.sseserver.qos.MessageRepository
    public void addDeleteListener(Consumer<Message> consumer) {
        throw new UnsupportedOperationException("public void addDeleteListener(Consumer<Message> listener)");
    }

    protected <T> RemoteCompletableFuture<T, RemoteMessageRepository> asyncPost(String str, Object obj, Function<ResponseEntity<LocalController.Response>, T> function) {
        checkClose();
        return completable(this.restTemplate.postForEntity(this.urlMessageRepository + str, new HttpEntity(obj, SpringUtil.EMPTY_HEADERS), LocalController.Response.class, new Object[0]), function);
    }

    protected <T> RemoteCompletableFuture<T, RemoteMessageRepository> completable(ListenableFuture<ResponseEntity<LocalController.Response>> listenableFuture, Function<ResponseEntity<LocalController.Response>, T> function) {
        RemoteCompletableFuture<T, RemoteMessageRepository> remoteCompletableFuture = new RemoteCompletableFuture<>();
        remoteCompletableFuture.setClient(this);
        SuccessCallback successCallback = responseEntity -> {
            try {
                remoteCompletableFuture.complete(function.apply(responseEntity));
            } catch (Throwable th) {
                remoteCompletableFuture.completeExceptionally(th);
            }
        };
        remoteCompletableFuture.getClass();
        listenableFuture.addCallback(successCallback, remoteCompletableFuture::completeExceptionally);
        return remoteCompletableFuture;
    }

    protected <T> T extract(ResponseEntity<LocalController.Response> responseEntity) {
        LocalController.Response response = (LocalController.Response) responseEntity.getBody();
        try {
            response.autoCastClassName(this.autoTypeEnum, this.classNotFoundSet);
        } catch (ClassNotFoundException e) {
            LambdaUtil.sneakyThrows(e);
        }
        return (T) response.getData();
    }

    protected List<Message> extractListMessage(ResponseEntity<LocalController.Response> responseEntity) {
        return (List) ((List) ((LocalController.Response) responseEntity.getBody()).getData()).stream().map(this::buildMessage).collect(Collectors.toList());
    }

    protected RemoteResponseMessage buildMessage(Map map) {
        if (map == null) {
            return null;
        }
        RemoteResponseMessage remoteResponseMessage = new RemoteResponseMessage();
        remoteResponseMessage.setRemoteMessageRepositoryId(this.id);
        remoteResponseMessage.setFilters(((Integer) map.get("filters")).intValue());
        remoteResponseMessage.setId((String) map.get("id"));
        remoteResponseMessage.setBody(map.get("body"));
        remoteResponseMessage.setEventName((String) map.get("eventName"));
        remoteResponseMessage.setListenerName((String) map.get("listenerName"));
        remoteResponseMessage.setTenantIdList((Collection) map.get("tenantIdList"));
        remoteResponseMessage.setUserIdList((Collection) map.get("userIdList"));
        remoteResponseMessage.setAccessTokenList((Collection) map.get("accessTokenList"));
        remoteResponseMessage.setChannelList((Collection) map.get("channelList"));
        return remoteResponseMessage;
    }

    public URL getRemoteUrl() {
        return this.url;
    }

    public String getId() {
        return this.id;
    }

    public String toString() {
        return this.url == null ? "null" : this.url.toString();
    }

    protected void checkClose() {
        if (this.closeFlag) {
            LambdaUtil.sneakyThrows(new ClosedChannelException());
        }
    }
}
