package io.rsocket.fragmentation;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import io.rsocket.frame.FrameLengthFlyweight;
import java.util.Objects;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:BOOT-INF/lib/rsocket-core-1.0.0-RC7.jar:io/rsocket/fragmentation/ReassemblyDuplexConnection.class */
public class ReassemblyDuplexConnection implements DuplexConnection {
    private final DuplexConnection delegate;
    private final FrameReassembler frameReassembler;
    private final boolean decodeLength;

    public ReassemblyDuplexConnection(DuplexConnection duplexConnection, boolean z) {
        Objects.requireNonNull(duplexConnection, "delegate must not be null");
        this.decodeLength = z;
        this.delegate = duplexConnection;
        this.frameReassembler = new FrameReassembler(duplexConnection.alloc());
        duplexConnection.onClose().doFinally(signalType -> {
            this.frameReassembler.dispose();
        }).subscribe();
    }

    @Override // io.rsocket.DuplexConnection
    public Mono<Void> send(Publisher<ByteBuf> publisher) {
        return this.delegate.send(publisher);
    }

    @Override // io.rsocket.DuplexConnection
    public Mono<Void> sendOne(ByteBuf byteBuf) {
        return this.delegate.sendOne(byteBuf);
    }

    private ByteBuf decode(ByteBuf byteBuf) {
        return this.decodeLength ? FrameLengthFlyweight.frame(byteBuf).retain() : byteBuf;
    }

    @Override // io.rsocket.DuplexConnection
    public Flux<ByteBuf> receive() {
        return this.delegate.receive().handle((byteBuf, synchronousSink) -> {
            this.frameReassembler.reassembleFrame(decode(byteBuf), synchronousSink);
        });
    }

    @Override // io.rsocket.DuplexConnection
    public ByteBufAllocator alloc() {
        return this.delegate.alloc();
    }

    @Override // io.rsocket.Closeable
    public Mono<Void> onClose() {
        return this.delegate.onClose();
    }

    @Override // reactor.core.Disposable
    public void dispose() {
        this.delegate.dispose();
    }
}
