package io.github.hylexus.xtream.codec.server.reactive.spec.impl;

import io.github.hylexus.xtream.codec.core.EntityCodec;
import io.github.hylexus.xtream.codec.server.reactive.spec.XtreamCommandSender;
import io.github.hylexus.xtream.codec.server.reactive.spec.XtreamSession;
import io.github.hylexus.xtream.codec.server.reactive.spec.XtreamSessionManager;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/github/hylexus/xtream/codec/server/reactive/spec/impl/DefaultXtreamCommandSender.class */
public class DefaultXtreamCommandSender implements XtreamCommandSender<XtreamSession> {
    protected final ByteBufAllocator bufferFactory;
    protected final XtreamSessionManager<XtreamSession> sessionManager;
    protected final EntityCodec entityCodec;

    public DefaultXtreamCommandSender(ByteBufAllocator byteBufAllocator, XtreamSessionManager<XtreamSession> xtreamSessionManager, EntityCodec entityCodec) {
        this.bufferFactory = byteBufAllocator;
        this.sessionManager = xtreamSessionManager;
        this.entityCodec = entityCodec;
    }

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.InternalXtreamCommandSender
    public ByteBufAllocator bufferFactory() {
        return this.bufferFactory;
    }

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.InternalXtreamCommandSender
    public XtreamSessionManager<XtreamSession> sessionManager() {
        return this.sessionManager;
    }

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.InternalXtreamCommandSender
    public Mono<Void> sendByteBuf(String str, Publisher<? extends ByteBuf> publisher) {
        return getSession(str).flatMap(xtreamSession -> {
            return xtreamSession.writeWith(publisher).then();
        });
    }

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.XtreamCommandSender
    public Mono<Void> sendObject(String str, Publisher<Object> publisher) {
        return getSession(str).flatMap(xtreamSession -> {
            return Flux.from(publisher).map(this::doEncode).flatMap(byteBuf -> {
                return xtreamSession.writeWith(Mono.just(byteBuf));
            }).then();
        }).then();
    }

    protected ByteBuf doEncode(Object obj) {
        if (obj instanceof ByteBuf) {
            return (ByteBuf) obj;
        }
        ByteBuf buffer = this.bufferFactory.buffer();
        try {
            this.entityCodec.encode(obj, buffer);
            return buffer;
        } catch (Throwable th) {
            buffer.release();
            throw new RuntimeException(th);
        }
    }

    protected Mono<XtreamSession> getSession(String str) {
        return this.sessionManager.getSessionById(str).switchIfEmpty(Mono.defer(() -> {
            return Mono.error(new IllegalArgumentException("No session found for sessionId: " + str));
        }));
    }
}
