package com.github.sseserver.qos;

import com.github.sseserver.SendService;
import com.github.sseserver.local.LocalConnectionService;
import com.github.sseserver.local.SseEmitter;
import com.github.sseserver.remote.ClusterCompletableFuture;
import com.github.sseserver.remote.ClusterConnectionService;
import com.github.sseserver.remote.ClusterMessageRepository;
import com.github.sseserver.remote.RemoteResponseMessage;
import com.github.sseserver.util.LambdaUtil;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Supplier;

/* loaded from: input_file:com/github/sseserver/qos/AtLeastOnceSendService.class */
public class AtLeastOnceSendService<ACCESS_USER> implements SendService<QosCompletableFuture<Integer>> {
    protected final LocalConnectionService localConnectionService;
    protected final MessageRepository messageRepository;
    protected final Map<String, QosCompletableFuture<Integer>> futureMap = new ConcurrentHashMap(32);
    protected final Set<String> sendingSet = Collections.newSetFromMap(new ConcurrentHashMap());

    public AtLeastOnceSendService(LocalConnectionService localConnectionService, MessageRepository messageRepository) {
        this.localConnectionService = localConnectionService;
        this.messageRepository = messageRepository;
        this.messageRepository.addDeleteListener(message -> {
            QosCompletableFuture<Integer> remove = this.futureMap.remove(message.getId());
            if (remove != null) {
                complete(remove, 1);
            }
        });
        localConnectionService.addConnectListener(this::resend);
        localConnectionService.addListeningChangeWatch(sseChangeEvent -> {
            if (SseEmitter.EVENT_ADD_LISTENER.equals(sseChangeEvent.getEventName())) {
                resend(sseChangeEvent.getInstance());
            }
        });
    }

    public QosCompletableFuture<Integer> qosSend(Function<SendService, ?> function, Supplier<AtLeastOnceMessage> supplier) {
        QosCompletableFuture<Integer> qosCompletableFuture = new QosCompletableFuture<>("qos" + Message.newId());
        if (this.localConnectionService.isEnableCluster()) {
            ClusterConnectionService cluster = this.localConnectionService.getCluster();
            ((ClusterCompletableFuture) cluster.scopeOnWriteable(() -> {
                return (ClusterCompletableFuture) function.apply(cluster);
            })).whenComplete((num, th) -> {
                if (num == null || num.intValue() <= 0) {
                    enqueue((AtLeastOnceMessage) supplier.get(), qosCompletableFuture);
                } else {
                    complete(qosCompletableFuture, num);
                }
            });
        } else {
            Integer num2 = (Integer) this.localConnectionService.scopeOnWriteable(() -> {
                return (Integer) function.apply(this.localConnectionService);
            });
            if (num2 == null || num2.intValue() <= 0) {
                enqueue(supplier.get(), qosCompletableFuture);
            } else {
                complete(qosCompletableFuture, num2);
            }
        }
        return qosCompletableFuture;
    }

    @Override // com.github.sseserver.SendService
    public <T> T scopeOnWriteable(Callable<T> callable) {
        try {
            return callable.call();
        } catch (Exception e) {
            LambdaUtil.sneakyThrows(e);
            return null;
        }
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public QosCompletableFuture<Integer> sendAll(String str, Object obj) {
        return qosSend(sendService -> {
            return sendService.sendAll(str, obj);
        }, () -> {
            return new AtLeastOnceMessage(str, obj, 0);
        });
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public QosCompletableFuture<Integer> sendAllListening(String str, Object obj) {
        return qosSend(sendService -> {
            return sendService.sendAllListening(str, obj);
        }, () -> {
            AtLeastOnceMessage atLeastOnceMessage = new AtLeastOnceMessage(str, obj, 16);
            atLeastOnceMessage.setListenerName(str);
            return atLeastOnceMessage;
        });
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public QosCompletableFuture<Integer> sendByChannel(Collection<String> collection, String str, Object obj) {
        return qosSend(sendService -> {
            return sendService.sendByChannel((Collection<String>) collection, str, obj);
        }, () -> {
            AtLeastOnceMessage atLeastOnceMessage = new AtLeastOnceMessage(str, obj, 32);
            atLeastOnceMessage.setChannelList(collection);
            atLeastOnceMessage.setListenerName(str);
            return atLeastOnceMessage;
        });
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public QosCompletableFuture<Integer> sendByChannelListening(Collection<String> collection, String str, Object obj) {
        return qosSend(sendService -> {
            return sendService.sendByChannelListening((Collection<String>) collection, str, obj);
        }, () -> {
            AtLeastOnceMessage atLeastOnceMessage = new AtLeastOnceMessage(str, obj, 48);
            atLeastOnceMessage.setChannelList(collection);
            return atLeastOnceMessage;
        });
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public QosCompletableFuture<Integer> sendByAccessToken(Collection<String> collection, String str, Object obj) {
        return qosSend(sendService -> {
            return sendService.sendByAccessToken((Collection<String>) collection, str, obj);
        }, () -> {
            AtLeastOnceMessage atLeastOnceMessage = new AtLeastOnceMessage(str, obj, 4);
            atLeastOnceMessage.setAccessTokenList(collection);
            return atLeastOnceMessage;
        });
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public QosCompletableFuture<Integer> sendByAccessTokenListening(Collection<String> collection, String str, Object obj) {
        return qosSend(sendService -> {
            return sendService.sendByAccessTokenListening((Collection<String>) collection, str, obj);
        }, () -> {
            AtLeastOnceMessage atLeastOnceMessage = new AtLeastOnceMessage(str, obj, 20);
            atLeastOnceMessage.setAccessTokenList(collection);
            atLeastOnceMessage.setListenerName(str);
            return atLeastOnceMessage;
        });
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public QosCompletableFuture<Integer> sendByUserId(Collection<? extends Serializable> collection, String str, Object obj) {
        return qosSend(sendService -> {
            return sendService.sendByUserId((Collection<? extends Serializable>) collection, str, obj);
        }, () -> {
            AtLeastOnceMessage atLeastOnceMessage = new AtLeastOnceMessage(str, obj, 8);
            atLeastOnceMessage.setUserIdList(collection);
            return atLeastOnceMessage;
        });
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public QosCompletableFuture<Integer> sendByUserIdListening(Collection<? extends Serializable> collection, String str, Object obj) {
        return qosSend(sendService -> {
            return sendService.sendByUserIdListening((Collection<? extends Serializable>) collection, str, obj);
        }, () -> {
            AtLeastOnceMessage atLeastOnceMessage = new AtLeastOnceMessage(str, obj, 24);
            atLeastOnceMessage.setUserIdList(collection);
            atLeastOnceMessage.setListenerName(str);
            return atLeastOnceMessage;
        });
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public QosCompletableFuture<Integer> sendByTenantId(Collection<? extends Serializable> collection, String str, Object obj) {
        return qosSend(sendService -> {
            return sendService.sendByTenantId((Collection<? extends Serializable>) collection, str, obj);
        }, () -> {
            AtLeastOnceMessage atLeastOnceMessage = new AtLeastOnceMessage(str, obj, 2);
            atLeastOnceMessage.setTenantIdList(collection);
            return atLeastOnceMessage;
        });
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.github.sseserver.SendService
    public QosCompletableFuture<Integer> sendByTenantIdListening(Collection<? extends Serializable> collection, String str, Object obj) {
        return qosSend(sendService -> {
            return sendService.sendByTenantIdListening((Collection<? extends Serializable>) collection, str, obj);
        }, () -> {
            AtLeastOnceMessage atLeastOnceMessage = new AtLeastOnceMessage(str, obj, 18);
            atLeastOnceMessage.setTenantIdList(collection);
            atLeastOnceMessage.setListenerName(str);
            return atLeastOnceMessage;
        });
    }

    protected void complete(QosCompletableFuture<Integer> qosCompletableFuture, Integer num) {
        qosCompletableFuture.complete(num);
    }

    protected void enqueue(Message message, QosCompletableFuture<Integer> qosCompletableFuture) {
        String messageId = qosCompletableFuture.getMessageId();
        message.setId(messageId);
        this.messageRepository.insert(message);
        this.futureMap.put(messageId, qosCompletableFuture);
    }

    protected void markSending(List<Message> list) {
        int i = 0;
        Iterator<Message> it = list.iterator();
        while (it.hasNext()) {
            if (!this.sendingSet.add(it.next().getId())) {
                list.set(i, null);
            }
            i++;
        }
    }

    protected void resend(SseEmitter<ACCESS_USER> sseEmitter) {
        if (this.messageRepository instanceof ClusterMessageRepository) {
            ((ClusterMessageRepository) this.messageRepository).selectAsync(sseEmitter).thenAccept(list -> {
                resend(list, sseEmitter);
            });
        } else {
            resend(this.messageRepository.select(sseEmitter), sseEmitter);
        }
    }

    protected void resend(List<Message> list, SseEmitter<ACCESS_USER> sseEmitter) {
        if (list.isEmpty()) {
            return;
        }
        markSending(list);
        IOException iOException = null;
        for (Message message : list) {
            if (message != null) {
                String id = message.getId();
                try {
                    if (sseEmitter.isActive() && sseEmitter.isWriteable() && iOException == null) {
                        sseEmitter.send(SseEmitter.event().m8id(id).m7name(message.getEventName()).m5comment("resend").m4data(message.getBody()));
                        if (this.messageRepository instanceof ClusterMessageRepository) {
                            ((ClusterMessageRepository) this.messageRepository).deleteAsync(id, message instanceof RemoteResponseMessage ? ((RemoteResponseMessage) message).getRemoteMessageRepositoryId() : null);
                        } else {
                            this.messageRepository.delete(id);
                        }
                        this.sendingSet.remove(id);
                    } else {
                        this.sendingSet.remove(id);
                    }
                } catch (IOException e) {
                    iOException = e;
                    this.sendingSet.remove(id);
                } catch (Throwable th) {
                    this.sendingSet.remove(id);
                    throw th;
                }
            }
        }
    }

    @Override // com.github.sseserver.SendService
    public /* bridge */ /* synthetic */ QosCompletableFuture<Integer> sendByTenantIdListening(Collection collection, String str, Object obj) {
        return sendByTenantIdListening((Collection<? extends Serializable>) collection, str, obj);
    }

    @Override // com.github.sseserver.SendService
    public /* bridge */ /* synthetic */ QosCompletableFuture<Integer> sendByTenantId(Collection collection, String str, Object obj) {
        return sendByTenantId((Collection<? extends Serializable>) collection, str, obj);
    }

    @Override // com.github.sseserver.SendService
    public /* bridge */ /* synthetic */ QosCompletableFuture<Integer> sendByUserIdListening(Collection collection, String str, Object obj) {
        return sendByUserIdListening((Collection<? extends Serializable>) collection, str, obj);
    }

    @Override // com.github.sseserver.SendService
    public /* bridge */ /* synthetic */ QosCompletableFuture<Integer> sendByUserId(Collection collection, String str, Object obj) {
        return sendByUserId((Collection<? extends Serializable>) collection, str, obj);
    }

    @Override // com.github.sseserver.SendService
    public /* bridge */ /* synthetic */ QosCompletableFuture<Integer> sendByAccessTokenListening(Collection collection, String str, Object obj) {
        return sendByAccessTokenListening((Collection<String>) collection, str, obj);
    }

    @Override // com.github.sseserver.SendService
    public /* bridge */ /* synthetic */ QosCompletableFuture<Integer> sendByAccessToken(Collection collection, String str, Object obj) {
        return sendByAccessToken((Collection<String>) collection, str, obj);
    }

    @Override // com.github.sseserver.SendService
    public /* bridge */ /* synthetic */ QosCompletableFuture<Integer> sendByChannelListening(Collection collection, String str, Object obj) {
        return sendByChannelListening((Collection<String>) collection, str, obj);
    }

    @Override // com.github.sseserver.SendService
    public /* bridge */ /* synthetic */ QosCompletableFuture<Integer> sendByChannel(Collection collection, String str, Object obj) {
        return sendByChannel((Collection<String>) collection, str, obj);
    }
}
