package io.github.hylexus.xtream.codec.server.reactive.spec.impl.tcp;

import io.github.hylexus.xtream.codec.server.reactive.spec.TcpXtreamNettyHandlerAdapter;
import io.github.hylexus.xtream.codec.server.reactive.spec.XtreamExchange;
import io.github.hylexus.xtream.codec.server.reactive.spec.XtreamExchangeCreator;
import io.github.hylexus.xtream.codec.server.reactive.spec.XtreamHandler;
import io.github.hylexus.xtream.codec.server.reactive.spec.XtreamSession;
import io.github.hylexus.xtream.codec.server.reactive.spec.XtreamSessionManager;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.net.InetSocketAddress;
import java.time.Instant;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.netty.NettyInbound;
import reactor.netty.NettyOutbound;

/* loaded from: input_file:io/github/hylexus/xtream/codec/server/reactive/spec/impl/tcp/DefaultTcpXtreamNettyHandlerAdapter.class */
public class DefaultTcpXtreamNettyHandlerAdapter implements TcpXtreamNettyHandlerAdapter {
    private static final Logger log = LoggerFactory.getLogger(DefaultTcpXtreamNettyHandlerAdapter.class);
    protected final XtreamHandler xtreamHandler;
    protected final ByteBufAllocator allocator;
    protected final XtreamExchangeCreator xtreamExchangeCreator;
    protected final XtreamSessionManager<? extends XtreamSession> sessionManager;

    public DefaultTcpXtreamNettyHandlerAdapter(ByteBufAllocator byteBufAllocator, XtreamExchangeCreator xtreamExchangeCreator, XtreamHandler xtreamHandler) {
        this.xtreamHandler = xtreamHandler;
        this.allocator = byteBufAllocator;
        this.xtreamExchangeCreator = xtreamExchangeCreator;
        this.sessionManager = xtreamExchangeCreator.sessionManager();
        log.info("DefaultTcpXtreamNettyHandlerAdapter initialized");
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.XtreamNettyHandlerAdapter, java.util.function.BiFunction
    public Publisher<Void> apply(NettyInbound nettyInbound, NettyOutbound nettyOutbound) {
        return nettyInbound.receive().flatMap(byteBuf -> {
            return byteBuf.readableBytes() <= 0 ? Mono.empty() : handleSingleRequest(nettyInbound, nettyOutbound, byteBuf, initTcpRemoteAddress(nettyInbound)).onErrorResume(Throwable.class, th -> {
                log.error("Unexpected Exception", th);
                return Mono.empty();
            });
        }).onErrorResume(th -> {
            log.error("Unexpected Error", th);
            return Mono.empty();
        });
    }

    protected Mono<Void> handleSingleRequest(NettyInbound nettyInbound, NettyOutbound nettyOutbound, ByteBuf byteBuf, InetSocketAddress inetSocketAddress) {
        return doTcpExchange(this.xtreamExchangeCreator.createTcpExchange(this.allocator, nettyInbound, nettyOutbound, byteBuf, inetSocketAddress));
    }

    protected Mono<Void> doTcpExchange(XtreamExchange xtreamExchange) {
        return xtreamExchange.session().flatMap(xtreamSession -> {
            xtreamSession.lastCommunicateTime(Instant.now());
            return this.xtreamHandler.handle(xtreamExchange);
        });
    }

    protected InetSocketAddress initTcpRemoteAddress(NettyInbound nettyInbound) {
        InetSocketAddress[] inetSocketAddressArr = new InetSocketAddress[1];
        nettyInbound.withConnection(connection -> {
            inetSocketAddressArr[0] = (InetSocketAddress) connection.channel().remoteAddress();
        });
        return inetSocketAddressArr[0];
    }
}
