package akka.persistence.r2dbc.client;

import io.r2dbc.spi.ConnectionFactory;
import java.util.Objects;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:akka/persistence/r2dbc/client/R2dbc.class */
public final class R2dbc {
    private static final String FACTORY_REQUIRED = "factory must not be null";
    private static final String FN_REQUIRED = "fn must not be null";
    private final ConnectionFactory factory;

    public R2dbc(ConnectionFactory connectionFactory) {
        this.factory = (ConnectionFactory) Objects.requireNonNull(connectionFactory, FACTORY_REQUIRED);
    }

    public <T> Flux<T> inTransaction(Function<Handle, ? extends Publisher<T>> function) {
        Objects.requireNonNull(function, FN_REQUIRED);
        return withHandle(handle -> {
            return handle.inTransaction(function);
        });
    }

    public <T> Flux<T> withHandle(Function<Handle, ? extends Publisher<T>> function) {
        Objects.requireNonNull(function, FN_REQUIRED);
        return Mono.from(this.factory.create()).map(Handle::new).flatMapMany(handle -> {
            Flux from = Flux.from((Publisher) function.apply(handle));
            handle.getClass();
            Flux concatWith = from.concatWith(ReactiveUtils.passThrough(handle::close));
            handle.getClass();
            return concatWith.onErrorResume(ReactiveUtils.appendError(handle::close));
        });
    }
}
