package reactor.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.RecoverableChannel;
import com.rabbitmq.client.RecoverableConnection;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import reactor.core.publisher.Mono;

/* loaded from: input_file:reactor/rabbitmq/Utils.class */
public abstract class Utils {

    @FunctionalInterface
    /* loaded from: input_file:reactor/rabbitmq/Utils$ExceptionFunction.class */
    public interface ExceptionFunction<T, R> {
        R apply(T t) throws Exception;
    }

    /* loaded from: input_file:reactor/rabbitmq/Utils$SingleConnectionSupplier.class */
    public static class SingleConnectionSupplier implements ExceptionFunction<ConnectionFactory, Connection> {
        private final Callable<? extends Connection> creationAction;
        private final Duration waitTimeout;
        private final CountDownLatch latch;
        private final AtomicBoolean created;
        private final AtomicReference<Connection> connection;

        public SingleConnectionSupplier(Callable<? extends Connection> callable) {
            this(callable, Duration.ofMinutes(5L));
        }

        public SingleConnectionSupplier(Callable<? extends Connection> callable, Duration duration) {
            this.latch = new CountDownLatch(1);
            this.created = new AtomicBoolean(false);
            this.connection = new AtomicReference<>();
            this.creationAction = callable;
            this.waitTimeout = duration;
        }

        @Override // reactor.rabbitmq.Utils.ExceptionFunction
        public Connection apply(ConnectionFactory connectionFactory) throws Exception {
            if (this.created.compareAndSet(false, true)) {
                this.connection.set(new IdempotentClosedConnection(this.creationAction.call()));
                this.latch.countDown();
            } else if (!this.latch.await(this.waitTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
                if (this.connection.get() != null) {
                    return this.connection.get();
                }
                throw new RabbitFluxException("Reached timeout when waiting for connection to be created: " + this.waitTimeout);
            }
            return this.connection.get();
        }
    }

    public static Mono<? extends Connection> singleConnectionMono(ConnectionFactory connectionFactory) {
        return singleConnectionMono(connectionFactory, connectionFactory2 -> {
            return connectionFactory2.newConnection();
        });
    }

    public static Mono<? extends Connection> singleConnectionMono(ConnectionFactory connectionFactory, ExceptionFunction<ConnectionFactory, ? extends Connection> exceptionFunction) {
        return Mono.fromCallable(() -> {
            return new IdempotentClosedConnection((Connection) exceptionFunction.apply(connectionFactory));
        }).cache();
    }

    public static Mono<? extends Connection> singleConnectionMono(Callable<? extends Connection> callable) {
        return Mono.fromCallable(() -> {
            return new IdempotentClosedConnection((Connection) callable.call());
        }).cache();
    }

    public static ExceptionFunction<ConnectionFactory, ? extends Connection> singleConnectionSupplier(ConnectionFactory connectionFactory, ExceptionFunction<ConnectionFactory, ? extends Connection> exceptionFunction) {
        return new SingleConnectionSupplier(() -> {
            return (Connection) exceptionFunction.apply(connectionFactory);
        });
    }

    public static ExceptionFunction<ConnectionFactory, ? extends Connection> singleConnectionSupplier(ConnectionFactory connectionFactory) {
        return new SingleConnectionSupplier(() -> {
            return connectionFactory.newConnection();
        });
    }

    public static ExceptionFunction<ConnectionFactory, ? extends Connection> singleConnectionSupplier(Callable<? extends Connection> callable) {
        return new SingleConnectionSupplier(callable);
    }

    public static <T> Mono<T> cache(Mono<T> mono) {
        return mono.cache(obj -> {
            return Duration.ofMillis(Long.MAX_VALUE);
        }, th -> {
            return Duration.ZERO;
        }, () -> {
            return Duration.ZERO;
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean isRecoverable(Connection connection) {
        return connection instanceof RecoverableConnection;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean isRecoverable(Channel channel) {
        return channel instanceof RecoverableChannel;
    }
}
