package io.github.hylexus.xtream.codec.server.reactive.spec.impl.udp;

import io.github.hylexus.xtream.codec.server.reactive.spec.UdpXtreamNettyHandlerAdapter;
import io.github.hylexus.xtream.codec.server.reactive.spec.XtreamExchange;
import io.github.hylexus.xtream.codec.server.reactive.spec.XtreamExchangeCreator;
import io.github.hylexus.xtream.codec.server.reactive.spec.XtreamHandler;
import io.github.hylexus.xtream.codec.server.reactive.spec.XtreamSession;
import io.github.hylexus.xtream.codec.server.reactive.spec.XtreamSessionManager;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.socket.DatagramPacket;
import java.net.InetSocketAddress;
import java.time.Instant;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.netty.NettyInbound;
import reactor.netty.NettyOutbound;

/* loaded from: input_file:io/github/hylexus/xtream/codec/server/reactive/spec/impl/udp/DefaultUdpXtreamNettyHandlerAdapter.class */
public class DefaultUdpXtreamNettyHandlerAdapter implements UdpXtreamNettyHandlerAdapter {
    private static final Logger log = LoggerFactory.getLogger(DefaultUdpXtreamNettyHandlerAdapter.class);
    protected final XtreamHandler xtreamHandler;
    protected final ByteBufAllocator allocator;
    protected final XtreamExchangeCreator exchangeCreator;
    protected final XtreamSessionManager<? extends XtreamSession> sessionManager;

    public DefaultUdpXtreamNettyHandlerAdapter(ByteBufAllocator byteBufAllocator, XtreamExchangeCreator xtreamExchangeCreator, XtreamHandler xtreamHandler) {
        this.xtreamHandler = xtreamHandler;
        this.allocator = byteBufAllocator;
        this.exchangeCreator = xtreamExchangeCreator;
        this.sessionManager = xtreamExchangeCreator.sessionManager();
        log.info("DefaultUdpXtreamNettyHandlerAdapter initialized");
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.XtreamNettyHandlerAdapter, java.util.function.BiFunction
    public Publisher<Void> apply(NettyInbound nettyInbound, NettyOutbound nettyOutbound) {
        return nettyInbound.receiveObject().flatMap(obj -> {
            return obj instanceof DatagramPacket ? handleRequest(nettyInbound, nettyOutbound, (DatagramPacket) obj).onErrorResume(Throwable.class, th -> {
                log.error("Unexpected Exception", th);
                return Mono.empty();
            }) : Mono.error(new IllegalStateException("Cannot handle message. type = [" + String.valueOf(obj.getClass()) + "]"));
        }).onErrorResume(th -> {
            log.error("Unexpected Error", th);
            return Mono.empty();
        });
    }

    protected Mono<Void> handleRequest(NettyInbound nettyInbound, NettyOutbound nettyOutbound, DatagramPacket datagramPacket) {
        return doUdpExchange(this.exchangeCreator.createUdpExchange(this.allocator, nettyInbound, nettyOutbound, (ByteBuf) datagramPacket.content(), (InetSocketAddress) datagramPacket.sender()));
    }

    protected Mono<Void> doUdpExchange(XtreamExchange xtreamExchange) {
        return xtreamExchange.session().flatMap(xtreamSession -> {
            xtreamSession.lastCommunicateTime(Instant.now());
            return this.xtreamHandler.handle(xtreamExchange);
        });
    }
}
