package io.reactiverse.awssdk.reactivestreams;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:io/reactiverse/awssdk/reactivestreams/ReadStreamPublisher.class */
public class ReadStreamPublisher<T extends Buffer> implements Publisher<ByteBuffer> {
    private ReadStream<T> stream;
    private CompletableFuture<Void> future;

    public ReadStreamPublisher(ReadStream<T> readStream) {
        this(readStream, null);
    }

    public ReadStreamPublisher(ReadStream<T> readStream, CompletableFuture<Void> completableFuture) {
        this.stream = readStream;
        this.future = completableFuture;
    }

    public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
        subscriber.onSubscribe(new Subscription() { // from class: io.reactiverse.awssdk.reactivestreams.ReadStreamPublisher.1
            public void request(long j) {
                ReadStreamPublisher.this.stream.fetch(j);
            }

            public void cancel() {
            }
        });
        this.stream.endHandler(r5 -> {
            subscriber.onComplete();
            if (this.future != null) {
                this.future.complete(null);
            }
        });
        this.stream.handler(buffer -> {
            subscriber.onNext(ByteBuffer.wrap(buffer.getBytes()));
        });
        ReadStream<T> readStream = this.stream;
        Objects.requireNonNull(subscriber);
        readStream.exceptionHandler(subscriber::onError);
    }
}
