package io.dingodb.net.netty.service;

import com.google.auto.service.AutoService;
import io.dingodb.common.CommonId;
import io.dingodb.common.Location;
import io.dingodb.common.codec.ProtostuffCodec;
import io.dingodb.common.concurrent.Executors;
import io.dingodb.common.concurrent.LinkedRunner;
import io.dingodb.common.util.DebugLog;
import io.dingodb.common.util.NoBreakFunctions;
import io.dingodb.common.util.Optional;
import io.dingodb.net.Channel;
import io.dingodb.net.Message;
import io.dingodb.net.NetService;
import io.dingodb.net.netty.Constant;
import io.dingodb.net.service.ListenService;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoService({io.dingodb.net.service.ListenService.class})
/* loaded from: input_file:io/dingodb/net/netty/service/ListenService.class */
public final class ListenService implements io.dingodb.net.service.ListenService {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ListenService.class);
    private static final Map<Tag, Set<Channel>> listenerChannels = new ConcurrentHashMap();
    private static final Map<Tag, Supplier<Message>> replies = new ConcurrentHashMap();
    private static final LinkedRunner runner = new LinkedRunner("listener-service");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/dingodb/net/netty/service/ListenService$Tag.class */
    public static class Tag {
        private final CommonId id;
        private final String tag;

        public String toString() {
            return "ListenService.Tag(id=" + this.id + ", tag=" + this.tag + ")";
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof Tag)) {
                return false;
            }
            Tag tag = (Tag) obj;
            if (!tag.canEqual(this)) {
                return false;
            }
            CommonId commonId = this.id;
            CommonId commonId2 = tag.id;
            if (commonId == null) {
                if (commonId2 != null) {
                    return false;
                }
            } else if (!commonId.equals(commonId2)) {
                return false;
            }
            String str = this.tag;
            String str2 = tag.tag;
            return str == null ? str2 == null : str.equals(str2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof Tag;
        }

        public int hashCode() {
            CommonId commonId = this.id;
            int hashCode = (1 * 59) + (commonId == null ? 43 : commonId.hashCode());
            String str = this.tag;
            return (hashCode * 59) + (str == null ? 43 : str.hashCode());
        }

        public Tag(CommonId commonId, String str) {
            this.id = commonId;
            this.tag = str;
        }
    }

    public static void onListen(Message message, Channel channel) {
        Tag tag = (Tag) ProtostuffCodec.read(message.content());
        runner.forceFollow(() -> {
            Set<Channel> set = listenerChannels.get(tag);
            if (set == null) {
                log.error("Not found listenable resource {}:{}", tag.tag, tag.id);
                channel.close();
            } else {
                set.add(channel);
                set.getClass();
                channel.setCloseListener((v1) -> {
                    r1.remove(v1);
                });
                Optional.ifPresent(replies.get(tag).get(), (Consumer<? super Message>) NoBreakFunctions.wrap(message2 -> {
                    channel.send(message2);
                }, (Consumer<Throwable>) th -> {
                    DebugLog.error(log, "Send listen reply failed.", th);
                }));
            }
        });
    }

    @Override // io.dingodb.net.service.ListenService
    public ListenService.Future listen(CommonId commonId, String str, Location location, Consumer<Message> consumer, Runnable runnable) {
        Tag tag = new Tag(commonId, str);
        Channel newChannel = NetService.getDefault().newChannel(location);
        try {
            newChannel.setMessageListener((message, channel) -> {
                consumer.accept(message);
            });
            newChannel.setCloseListener(channel2 -> {
                runnable.run();
            });
            newChannel.send(new Message(Constant.LISTENER, ProtostuffCodec.write(tag)));
            newChannel.getClass();
            return newChannel::close;
        } catch (Exception e) {
            newChannel.close();
            throw e;
        }
    }

    @Override // io.dingodb.net.service.ListenService
    public Consumer<Message> register(CommonId commonId, String str, Supplier<Message> supplier) {
        Tag tag = new Tag(commonId, str);
        replies.computeIfAbsent(tag, tag2 -> {
            return supplier;
        });
        listenerChannels.computeIfAbsent(tag, tag3 -> {
            return new CopyOnWriteArraySet();
        });
        return message -> {
            runner.forceFollow(() -> {
                Optional.ifPresent(listenerChannels.get(tag), (Consumer<? super Set<Channel>>) set -> {
                    set.forEach(channel -> {
                        Executors.execute("listener-proxy-call-" + commonId, () -> {
                            channel.send(message);
                        }, true);
                    });
                });
            });
        };
    }

    @Override // io.dingodb.net.service.ListenService
    public Consumer<Message> register(List<CommonId> list, String str, Supplier<Message> supplier) {
        List list2 = (List) list.stream().map(commonId -> {
            return register(commonId, str, (Supplier<Message>) supplier);
        }).collect(Collectors.toList());
        return message -> {
            runner.forceFollow(() -> {
                list2.forEach(consumer -> {
                    consumer.accept(message);
                });
            });
        };
    }

    @Override // io.dingodb.net.service.ListenService
    public void unregister(CommonId commonId, String str) {
        runner.forceFollow(() -> {
            Optional.ifPresent(listenerChannels.remove(new Tag(commonId, str)), (Consumer<? super Set<Channel>>) set -> {
                set.forEach(NoBreakFunctions.wrap((v0) -> {
                    v0.close();
                }));
            });
        });
    }

    @Override // io.dingodb.net.service.ListenService
    public void clear(CommonId commonId, String str) {
        runner.forceFollow(() -> {
            Optional.ifPresent(listenerChannels.get(new Tag(commonId, str)), (Consumer<? super Set<Channel>>) set -> {
                set.forEach(NoBreakFunctions.wrap((v0) -> {
                    v0.close();
                }));
            });
        });
    }
}
