package io.reactivesocket.client.filter;

import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.reactivestreams.extensions.Px;
import io.reactivesocket.reactivestreams.extensions.Scheduler;
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers;
import io.reactivesocket.util.ReactiveSocketDecorator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.reactivestreams.Publisher;

/* loaded from: input_file:io/reactivesocket/client/filter/ReactiveSockets.class */
public final class ReactiveSockets {
    private ReactiveSockets() {
    }

    public static Function<ReactiveSocket, ReactiveSocket> timeout(long j, TimeUnit timeUnit, Scheduler scheduler) {
        return reactiveSocket -> {
            return ReactiveSocketDecorator.wrap(reactiveSocket).decorateAllResponses(_timeout(j, timeUnit, scheduler)).decorateAllVoidResponses(_timeout(j, timeUnit, scheduler)).finish();
        };
    }

    public static Function<ReactiveSocket, ReactiveSocket> safeClose() {
        return reactiveSocket -> {
            AtomicInteger atomicInteger = new AtomicInteger();
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            return ReactiveSocketDecorator.wrap(reactiveSocket).close(reactiveSocket -> {
                return Px.defer(() -> {
                    if (atomicBoolean.compareAndSet(false, true) && atomicInteger.get() == 0) {
                        return reactiveSocket.close();
                    }
                    return reactiveSocket.onClose();
                });
            }).decorateAllResponses(_safeClose(reactiveSocket, atomicBoolean, atomicInteger)).decorateAllVoidResponses(_safeClose(reactiveSocket, atomicBoolean, atomicInteger)).finish();
        };
    }

    private static <T> Function<Publisher<T>, Publisher<T>> _timeout(long j, TimeUnit timeUnit, Scheduler scheduler) {
        return publisher -> {
            return Px.from(publisher).timeout(j, timeUnit, scheduler);
        };
    }

    private static <T> Function<Publisher<T>, Publisher<T>> _safeClose(ReactiveSocket reactiveSocket, AtomicBoolean atomicBoolean, AtomicInteger atomicInteger) {
        return publisher -> {
            return Px.from(publisher).doOnSubscribe(subscription -> {
                atomicInteger.incrementAndGet();
            }).doOnTerminate(() -> {
                if (atomicInteger.decrementAndGet() == 0 && atomicBoolean.get()) {
                    reactiveSocket.close().subscribe(Subscribers.empty());
                }
            });
        };
    }
}
