package com.taxonic.carml.util;

import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import lombok.Generated;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:BOOT-INF/lib/carml-commons-0.4.0-beta-2.jar:com/taxonic/carml/util/ReactiveInputStreams.class */
public final class ReactiveInputStreams {

    @Generated
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ReactiveInputStreams.class);
    private static final int DATA_BUFFER_SIZE = 4096;

    private ReactiveInputStreams() {
    }

    public static Flux<DataBuffer> fluxInputStream(@NonNull InputStream inputStream) {
        if (inputStream == null) {
            throw new NullPointerException("inputStream is marked non-null but is null");
        }
        return DataBufferUtils.readInputStream(() -> {
            return inputStream;
        }, new DefaultDataBufferFactory(), 4096).onErrorMap(th -> {
            return new ReactiveInputStreamsException("Exception occurred while creating Flux form input stream.", th);
        });
    }

    public static InputStream inputStreamFrom(Flux<DataBuffer> flux) throws IOException {
        PipedOutputStream pipedOutputStream = new PipedOutputStream();
        PipedInputStream pipedInputStream = new PipedInputStream(pipedOutputStream);
        DataBufferUtils.write(flux, pipedOutputStream).subscribeOn(Schedulers.boundedElastic()).doFinally(signalType -> {
            try {
                pipedOutputStream.close();
            } catch (IOException e) {
                LOG.warn("An exception occurred while closing a PipedOutputStream:{}{}", System.lineSeparator(), e);
            }
        }).onErrorMap(th -> {
            return new ReactiveInputStreamsException("Exception occurred while creating input stream form Flux.", th);
        }).subscribe(DataBufferUtils.releaseConsumer());
        return pipedInputStream;
    }
}
