package org.nustaq.kontraktor.reactivestreams;

import java.util.Iterator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.annotations.CallerSideMethod;
import org.nustaq.kontraktor.reactivestreams.impl.KxPublisherActor;
import org.nustaq.kontraktor.reactivestreams.impl.KxSubscriber;
import org.nustaq.kontraktor.remoting.base.ActorPublisher;
import org.nustaq.kontraktor.remoting.base.ActorServer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:org/nustaq/kontraktor/reactivestreams/KxPublisher.class */
public interface KxPublisher<T> extends Publisher<T> {
    @CallerSideMethod
    default void subscribe(Callback<T> callback) {
        subscribe(getKxStreamsInstance().subscriber(callback));
    }

    @CallerSideMethod
    KxReactiveStreams getKxStreamsInstance();

    @CallerSideMethod
    default void subscribe(int i, Callback<T> callback) {
        subscribe(getKxStreamsInstance().subscriber(i, callback));
    }

    @CallerSideMethod
    default void stream(Consumer<Stream<T>> consumer) {
        stream(getKxStreamsInstance().getBatchSize(), consumer);
    }

    /* JADX WARN: Type inference failed for: r0v11, types: [org.nustaq.kontraktor.reactivestreams.KxPublisher$1] */
    @CallerSideMethod
    default void stream(final int i, final Consumer<Stream<T>> consumer) {
        if (!Actor.inside()) {
            new Thread("Stream Consumer") { // from class: org.nustaq.kontraktor.reactivestreams.KxPublisher.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        consumer.accept(KxPublisher.this.getKxStreamsInstance().stream(KxPublisher.this, i));
                    } catch (Throwable th) {
                        Subscription subscription = KxSubscriber.subsToCancel.get();
                        if (subscription != null) {
                            subscription.cancel();
                        }
                        throw th;
                    }
                }
            }.start();
            return;
        }
        try {
            consumer.accept(getKxStreamsInstance().stream(this, i));
        } catch (Throwable th) {
            Subscription subscription = KxSubscriber.subsToCancel.get();
            if (subscription != null) {
                subscription.cancel();
            }
            throw th;
        }
    }

    /* JADX WARN: Type inference failed for: r0v7, types: [org.nustaq.kontraktor.reactivestreams.KxPublisher$2] */
    @CallerSideMethod
    default void iterator(final int i, final Consumer<Iterator<T>> consumer) {
        if (!Actor.inside()) {
            new Thread("Iterator Consumer") { // from class: org.nustaq.kontraktor.reactivestreams.KxPublisher.2
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        consumer.accept(KxPublisher.this.getKxStreamsInstance().iterator(KxPublisher.this, i));
                    } catch (Throwable th) {
                        KxSubscriber.subsToCancel.get().cancel();
                        throw th;
                    }
                }
            }.start();
            return;
        }
        try {
            consumer.accept(getKxStreamsInstance().iterator(this, i));
        } catch (Throwable th) {
            KxSubscriber.subsToCancel.get().cancel();
            throw th;
        }
    }

    @CallerSideMethod
    default void iterator(Consumer<Iterator<T>> consumer) {
        iterator(getKxStreamsInstance().getBatchSize(), consumer);
    }

    @CallerSideMethod
    default <OUT> KxPublisher<OUT> map(Function<T, OUT> function) {
        KxPublisher<OUT> newAsyncProcessor = getKxStreamsInstance().newAsyncProcessor(function);
        subscribe((Subscriber) newAsyncProcessor);
        return newAsyncProcessor;
    }

    @CallerSideMethod
    default <OUT> KxPublisher<OUT> async() {
        return map(obj -> {
            return obj;
        });
    }

    @CallerSideMethod
    default <OUT> KxPublisher<OUT> lossy() {
        return lossyMap(obj -> {
            return obj;
        });
    }

    @CallerSideMethod
    default <OUT> KxPublisher<OUT> lossyMap(Function<T, OUT> function) {
        return lossyMap(function, getKxStreamsInstance().getBatchSize());
    }

    @CallerSideMethod
    default <OUT> KxPublisher<OUT> lossyMap(Function<T, OUT> function, int i) {
        KxPublisher<OUT> newLossyProcessor = getKxStreamsInstance().newLossyProcessor(function, i);
        subscribe((Subscriber) newLossyProcessor);
        return newLossyProcessor;
    }

    @CallerSideMethod
    default <OUT> KxPublisher<OUT> map(Function<T, OUT> function, int i) {
        KxPublisher<OUT> newAsyncProcessor = getKxStreamsInstance().newAsyncProcessor(function, i);
        subscribe((Subscriber) newAsyncProcessor);
        return newAsyncProcessor;
    }

    @CallerSideMethod
    default ActorServer serve(ActorPublisher actorPublisher, Consumer<Actor> consumer) {
        return (ActorServer) getKxStreamsInstance().serve(this, actorPublisher, true, consumer).await();
    }

    @CallerSideMethod
    default ActorServer serve(ActorPublisher actorPublisher, boolean z, Consumer<Actor> consumer) {
        return (ActorServer) getKxStreamsInstance().serve(this, actorPublisher, z, consumer).await();
    }

    @CallerSideMethod
    default ActorServer serve(ActorPublisher actorPublisher) {
        return serve(actorPublisher, true, null);
    }

    @CallerSideMethod
    default <OUT> KxPublisher<OUT> syncMap(Function<T, OUT> function) {
        if ((this instanceof KxPublisherActor) && ((KxPublisherActor) this).isRemote()) {
            return map(function);
        }
        KxPublisher<OUT> newSyncProcessor = getKxStreamsInstance().newSyncProcessor(function);
        subscribe((Subscriber) newSyncProcessor);
        return newSyncProcessor;
    }

    @CallerSideMethod
    default Actor asActor() {
        return this instanceof Actor ? (Actor) this : async();
    }
}
