package reactor.core.composable;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import reactor.core.Observable;
import reactor.event.Event;
import reactor.event.dispatch.Dispatcher;
import reactor.event.dispatch.SynchronousDispatcher;
import reactor.event.selector.Selector;
import reactor.event.selector.Selectors;
import reactor.event.support.EventConsumer;
import reactor.function.Consumer;
import reactor.function.Function;
import reactor.function.Functions;
import reactor.function.Predicate;
import reactor.function.Supplier;
import reactor.function.support.Tap;
import reactor.tuple.Tuple;
import reactor.tuple.Tuple2;
import reactor.util.Assert;

/* loaded from: input_file:WEB-INF/lib/reactor-core-1.0.0.RELEASE.jar:reactor/core/composable/Stream.class */
public class Stream<T> extends Composable<T> {
    private final Tuple2<Selector, Object> first;
    private final Tuple2<Selector, Object> last;
    private final int batchSize;
    private final Iterable<T> values;

    public Stream(@Nonnull Dispatcher dispatcher, int i, @Nullable Iterable<T> iterable, @Nullable Composable<?> composable) {
        super(dispatcher, composable);
        this.first = Selectors.$();
        this.last = Selectors.$();
        this.batchSize = i;
        this.values = iterable;
        getObservable().on(getFlush().getT1(), new Consumer<Event<Void>>() { // from class: reactor.core.composable.Stream.1
            @Override // reactor.function.Consumer
            public void accept(Event<Void> event) {
                if (null == Stream.this.values) {
                    return;
                }
                Iterator<T> it = Stream.this.values.iterator();
                while (it.hasNext()) {
                    Stream.this.notifyValue((Stream) it.next());
                }
            }
        });
    }

    @Override // reactor.core.composable.Composable
    public Stream<T> consume(@Nonnull Consumer<T> consumer) {
        return (Stream) super.consume((Consumer) consumer);
    }

    @Override // reactor.core.composable.Composable
    public Stream<T> consume(@Nonnull Composable<T> composable) {
        return (Stream) super.consume((Composable) composable);
    }

    @Override // reactor.core.composable.Composable
    public Stream<T> consume(@Nonnull Object obj, @Nonnull Observable observable) {
        return (Stream) super.consume(obj, observable);
    }

    @Override // reactor.core.composable.Composable
    public Stream<T> flush() {
        return (Stream) super.flush();
    }

    @Override // reactor.core.composable.Composable
    public <E extends Throwable> Stream<T> when(@Nonnull Class<E> cls, @Nonnull Consumer<E> consumer) {
        return (Stream) super.when((Class) cls, (Consumer) consumer);
    }

    @Override // reactor.core.composable.Composable
    public <V> Stream<V> map(@Nonnull Function<T, V> function) {
        return (Stream) super.map((Function) function);
    }

    @Override // reactor.core.composable.Composable
    public Stream<T> filter(@Nonnull Predicate<T> predicate) {
        return (Stream) super.filter((Predicate) predicate);
    }

    @Override // reactor.core.composable.Composable
    public Stream<T> filter(@Nonnull Predicate<T> predicate, Composable<T> composable) {
        return (Stream) super.filter((Predicate) predicate, (Composable) composable);
    }

    public Stream<T> first() {
        Deferred<T, Stream<T>> createDeferredChildStream = createDeferredChildStream();
        getObservable().on(this.first.getT1(), new EventConsumer(createDeferredChildStream));
        return createDeferredChildStream.compose();
    }

    public Stream<T> last() {
        Deferred<T, Stream<T>> createDeferredChildStream = createDeferredChildStream();
        getObservable().on(this.last.getT1(), new EventConsumer(createDeferredChildStream));
        return createDeferredChildStream.compose();
    }

    public Stream<T> batch(int i) {
        Deferred createDeferred = createDeferred(i);
        consume((Consumer) createDeferred);
        return (Stream) createDeferred.compose();
    }

    public boolean isBatch() {
        return this.batchSize > 0;
    }

    public Tap<T> tap() {
        Tap<T> tap = new Tap<>();
        consume((Consumer) tap);
        return tap;
    }

    public Stream<List<T>> collect() {
        Assert.state(this.batchSize > 0, "Cannot collect() an unbounded Stream. Try extracting a batch first.");
        final Deferred<V, C> createDeferred = createDeferred(this.batchSize);
        final ArrayList arrayList = new ArrayList();
        consumeEvent(new Consumer<Event<T>>() { // from class: reactor.core.composable.Stream.2
            /* JADX WARN: Multi-variable type inference failed */
            @Override // reactor.function.Consumer
            public void accept(Event<T> event) {
                synchronized (arrayList) {
                    arrayList.add(event.getData());
                    if (arrayList.size() % Stream.this.batchSize != 0) {
                        return;
                    }
                    createDeferred.acceptEvent(event.copy(new ArrayList(arrayList)));
                    arrayList.clear();
                }
            }
        });
        getObservable().on(getFlush().getT1(), new Consumer<Event<Void>>() { // from class: reactor.core.composable.Stream.3
            @Override // reactor.function.Consumer
            public void accept(Event<Void> event) {
                synchronized (arrayList) {
                    if (arrayList.isEmpty()) {
                        return;
                    }
                    createDeferred.accept((Deferred) new ArrayList(arrayList));
                    arrayList.clear();
                }
            }
        });
        return (Stream) createDeferred.compose();
    }

    public <A> Stream<A> reduce(@Nonnull Function<Tuple2<T, A>, A> function, A a) {
        return reduce((Function) function, (Supplier) Functions.supplier(a));
    }

    public <A> Stream<A> reduce(@Nonnull final Function<Tuple2<T, A>, A> function, @Nullable final Supplier<A> supplier) {
        final Deferred<V, C> createDeferred = createDeferred();
        consumeEvent(new Consumer<Event<T>>() { // from class: reactor.core.composable.Stream.4
            private final AtomicLong count = new AtomicLong(0);
            private A acc;

            /* JADX WARN: Multi-variable type inference failed */
            /* JADX WARN: Type inference failed for: r1v4, types: [A, java.lang.Object] */
            @Override // reactor.function.Consumer
            public void accept(Event<T> event) {
                if (0 == this.acc) {
                    this.acc = null != supplier ? supplier.get() : 0;
                }
                this.acc = function.apply(Tuple.of(event.getData(), this.acc));
                if (Stream.this.isBatch() && this.count.incrementAndGet() % Stream.this.batchSize == 0) {
                    createDeferred.acceptEvent(event.copy(this.acc));
                } else {
                    if (Stream.this.isBatch()) {
                        return;
                    }
                    createDeferred.acceptEvent(event.copy(this.acc));
                }
            }
        });
        return (Stream) createDeferred.compose();
    }

    public <A> Stream<A> reduce(@Nonnull Function<Tuple2<T, A>, A> function) {
        return reduce((Function) function, (Supplier) null);
    }

    @Override // reactor.core.composable.Composable
    protected <V, C extends Composable<V>> Deferred<V, C> createDeferred() {
        return createDeferred(this.batchSize);
    }

    protected <V, C extends Composable<V>> Deferred<V, C> createDeferred(int i) {
        return createDeferredChildStream(i);
    }

    private Deferred<T, Stream<T>> createDeferredChildStream() {
        return createDeferredChildStream(-1);
    }

    private Deferred<T, Stream<T>> createDeferredChildStream(int i) {
        return new Deferred<>(new Stream(new SynchronousDispatcher(), i, null, this));
    }

    @Override // reactor.core.composable.Composable
    protected void errorAccepted(Throwable th) {
    }

    @Override // reactor.core.composable.Composable
    protected void valueAccepted(T t) {
        if (isBatch()) {
            long acceptCount = (getAcceptCount() + 1) % this.batchSize;
            if (acceptCount == 1) {
                getObservable().notify(this.first.getT2(), Event.wrap(t));
            } else if (acceptCount == 0) {
                getObservable().notify(this.last.getT2(), Event.wrap(t));
            }
        }
    }

    public String toString() {
        return "Stream{acceptCount=" + getAcceptCount() + ", errorCount=" + getErrorCount() + ", batchSize=" + this.batchSize + ", values=" + this.values + '}';
    }
}
