package io.github.hylexus.xtream.codec.server.reactive.spec.impl;

import io.github.hylexus.xtream.codec.server.reactive.spec.XtreamInbound;
import io.github.hylexus.xtream.codec.server.reactive.spec.XtreamOutbound;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.socket.DatagramPacket;
import java.net.InetSocketAddress;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.NettyOutbound;

/* loaded from: input_file:io/github/hylexus/xtream/codec/server/reactive/spec/impl/AbstractXtreamOutbound.class */
public abstract class AbstractXtreamOutbound implements XtreamOutbound {
    protected final NettyOutbound delegate;
    protected final ByteBufAllocator byteBufAllocator;
    protected final InetSocketAddress remoteAddress;
    protected final XtreamInbound.Type type;

    public AbstractXtreamOutbound(ByteBufAllocator byteBufAllocator, NettyOutbound nettyOutbound, XtreamInbound.Type type, InetSocketAddress inetSocketAddress) {
        this.delegate = nettyOutbound;
        this.byteBufAllocator = byteBufAllocator;
        this.remoteAddress = inetSocketAddress;
        this.type = type;
    }

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.XtreamOutbound
    public XtreamInbound.Type type() {
        return this.type;
    }

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.XtreamOutbound
    public NettyOutbound outbound() {
        return this.delegate;
    }

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.XtreamOutbound
    public InetSocketAddress remoteAddress() {
        return this.remoteAddress;
    }

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.XtreamOutbound
    public ByteBufAllocator bufferFactory() {
        return this.byteBufAllocator;
    }

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.XtreamOutbound
    public Mono<Void> writeWith(Publisher<? extends ByteBuf> publisher) {
        return this.type == XtreamInbound.Type.TCP ? this.delegate.send(publisher).then() : writeWithUdp(publisher);
    }

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.XtreamOutbound
    public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends ByteBuf>> publisher) {
        return this.type == XtreamInbound.Type.TCP ? this.delegate.sendGroups(Flux.from(publisher)).then() : writeAndFlushWithUdp(publisher);
    }

    public Mono<Void> writeWithUdp(Publisher<? extends ByteBuf> publisher) {
        return outbound().sendObject(Flux.from(publisher).map(byteBuf -> {
            return new DatagramPacket(byteBuf, this.remoteAddress);
        })).then();
    }

    public Mono<Void> writeAndFlushWithUdp(Publisher<? extends Publisher<? extends ByteBuf>> publisher) {
        return Flux.from(publisher).flatMap(publisher2 -> {
            return Flux.from(publisher2).map(byteBuf -> {
                return new DatagramPacket(byteBuf, this.remoteAddress);
            });
        }).flatMap(datagramPacket -> {
            return outbound().sendObject(datagramPacket);
        }).then();
    }
}
