package org.elasticsearch.compute.operator.exchange;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.compute.EsqlRefCountingListener;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.IsBlockedResult;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.tasks.TaskCancelledException;

/* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.class */
public final class ExchangeSourceHandler {
    private final ExchangeBuffer buffer;
    private final Executor fetchExecutor;
    private final PendingInstances outstandingSources;
    private volatile boolean aborted = false;
    private final AtomicInteger nextSinkId = new AtomicInteger();
    private final Map<Integer, RemoteSink> remoteSinks = ConcurrentCollections.newConcurrentMap();
    private final PendingInstances outstandingSinks = new PendingInstances(() -> {
        this.buffer.finish(false);
    });

    /* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler$ExchangeSourceImpl.class */
    private class ExchangeSourceImpl implements ExchangeSource {
        private boolean finished;

        ExchangeSourceImpl() {
            ExchangeSourceHandler.this.outstandingSources.trackNewInstance();
        }

        @Override // org.elasticsearch.compute.operator.exchange.ExchangeSource
        public Page pollPage() {
            ExchangeSourceHandler.this.checkFailure();
            return ExchangeSourceHandler.this.buffer.pollPage();
        }

        @Override // org.elasticsearch.compute.operator.exchange.ExchangeSource
        public boolean isFinished() {
            ExchangeSourceHandler.this.checkFailure();
            return this.finished || ExchangeSourceHandler.this.buffer.isFinished();
        }

        @Override // org.elasticsearch.compute.operator.exchange.ExchangeSource
        public IsBlockedResult waitForReading() {
            return ExchangeSourceHandler.this.buffer.waitForReading();
        }

        @Override // org.elasticsearch.compute.operator.exchange.ExchangeSource
        public void finish() {
            if (this.finished) {
                return;
            }
            this.finished = true;
            ExchangeSourceHandler.this.outstandingSources.finishInstance();
        }

        @Override // org.elasticsearch.compute.operator.exchange.ExchangeSource
        public int bufferSize() {
            return ExchangeSourceHandler.this.buffer.size();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler$LoopControl.class */
    public static class LoopControl {
        private Status status = Status.RUNNING;
        private final Thread startedThread = Thread.currentThread();

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler$LoopControl$Status.class */
        public enum Status {
            RUNNING,
            EXITING,
            EXITED
        }

        LoopControl() {
        }

        boolean isRunning() {
            return this.status == Status.RUNNING;
        }

        boolean tryResume() {
            if (this.startedThread != Thread.currentThread() || this.status == Status.EXITED) {
                return false;
            }
            this.status = Status.RUNNING;
            return true;
        }

        void exiting() {
            this.status = Status.EXITING;
        }

        void exited() {
            this.status = Status.EXITED;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler$PendingInstances.class */
    public static class PendingInstances {
        private final AtomicInteger instances = new AtomicInteger();
        private final SubscribableListener<Void> completion = new SubscribableListener<>();
        static final /* synthetic */ boolean $assertionsDisabled;

        PendingInstances(Runnable runnable) {
            this.completion.addListener(ActionListener.running(runnable));
        }

        void trackNewInstance() {
            int incrementAndGet = this.instances.incrementAndGet();
            if (!$assertionsDisabled && incrementAndGet <= 0) {
                throw new AssertionError();
            }
        }

        void finishInstance() {
            int decrementAndGet = this.instances.decrementAndGet();
            if (!$assertionsDisabled && decrementAndGet < 0) {
                throw new AssertionError();
            }
            if (decrementAndGet == 0) {
                this.completion.onResponse((Object) null);
            }
        }

        static {
            $assertionsDisabled = !ExchangeSourceHandler.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler$RemoteSinkFetcher.class */
    private final class RemoteSinkFetcher {
        private volatile boolean finished = false;
        private final RemoteSink remoteSink;
        private final boolean failFast;
        private final Runnable onPageFetched;
        private final ActionListener<Void> completionListener;

        RemoteSinkFetcher(RemoteSink remoteSink, boolean z, Runnable runnable, ActionListener<Void> actionListener) {
            ExchangeSourceHandler.this.outstandingSinks.trackNewInstance();
            this.remoteSink = remoteSink;
            this.onPageFetched = runnable;
            this.failFast = z;
            this.completionListener = actionListener;
        }

        void fetchPage() {
            LoopControl loopControl = new LoopControl();
            while (loopControl.isRunning()) {
                loopControl.exiting();
                this.remoteSink.fetchPageAsync(ExchangeSourceHandler.this.buffer.noMoreInputs() || ExchangeSourceHandler.this.aborted, ActionListener.wrap(exchangeResponse -> {
                    Page takePage = exchangeResponse.takePage();
                    if (takePage != null) {
                        this.onPageFetched.run();
                        ExchangeSourceHandler.this.buffer.addPage(takePage);
                    }
                    if (exchangeResponse.finished()) {
                        onSinkComplete();
                        return;
                    }
                    IsBlockedResult waitForWriting = ExchangeSourceHandler.this.buffer.waitForWriting();
                    if (!waitForWriting.listener().isDone()) {
                        waitForWriting.listener().addListener(ActionListener.wrap(r4 -> {
                            if (loopControl.tryResume()) {
                                return;
                            }
                            fetchPage();
                        }, this::onSinkFailed));
                    } else {
                        if (loopControl.tryResume()) {
                            return;
                        }
                        fetchPage();
                    }
                }, this::onSinkFailed));
            }
            loopControl.exited();
        }

        void onSinkFailed(Exception exc) {
            if (this.failFast) {
                ExchangeSourceHandler.this.aborted = true;
            }
            ExchangeSourceHandler.this.buffer.waitForReading().listener().onResponse((Object) null);
            if (this.finished) {
                return;
            }
            this.finished = true;
            this.remoteSink.close(ActionListener.running(() -> {
                ExchangeSourceHandler.this.outstandingSinks.finishInstance();
                this.completionListener.onFailure(exc);
            }));
        }

        void onSinkComplete() {
            if (this.finished) {
                return;
            }
            this.finished = true;
            ExchangeSourceHandler.this.outstandingSinks.finishInstance();
            this.completionListener.onResponse((Object) null);
        }
    }

    public ExchangeSourceHandler(int i, Executor executor, ActionListener<Void> actionListener) {
        this.buffer = new ExchangeBuffer(i);
        this.fetchExecutor = executor;
        PendingInstances pendingInstances = new PendingInstances(() -> {
        });
        pendingInstances.trackNewInstance();
        this.outstandingSources = new PendingInstances(() -> {
            Objects.requireNonNull(pendingInstances);
            finishEarly(true, ActionListener.running(pendingInstances::finishInstance));
        });
        this.buffer.addCompletionListener(ActionListener.running(() -> {
            RefCountingRunnable refCountingRunnable = new RefCountingRunnable(ActionRunnable.run(ActionListener.assertAtLeastOnce(actionListener), this::checkFailure));
            try {
                pendingInstances.completion.addListener(refCountingRunnable.acquireListener());
                for (PendingInstances pendingInstances2 : List.of(this.outstandingSinks, this.outstandingSources)) {
                    pendingInstances2.trackNewInstance();
                    pendingInstances2.completion.addListener(refCountingRunnable.acquireListener());
                    pendingInstances2.finishInstance();
                }
                refCountingRunnable.close();
            } catch (Throwable th) {
                try {
                    refCountingRunnable.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }));
    }

    private void checkFailure() {
        if (this.aborted) {
            throw new TaskCancelledException("remote sinks failed");
        }
    }

    public ExchangeSource createExchangeSource() {
        return new ExchangeSourceImpl();
    }

    public void addRemoteSink(final RemoteSink remoteSink, final boolean z, final Runnable runnable, final int i, ActionListener<Void> actionListener) {
        int incrementAndGet = this.nextSinkId.incrementAndGet();
        this.remoteSinks.put(Integer.valueOf(incrementAndGet), remoteSink);
        final ActionListener assertAtLeastOnce = ActionListener.assertAtLeastOnce(ActionListener.notifyOnce(ActionListener.runBefore(actionListener, () -> {
            this.remoteSinks.remove(Integer.valueOf(incrementAndGet));
        })));
        this.fetchExecutor.execute(new AbstractRunnable() { // from class: org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler.1
            public void onFailure(Exception exc) {
                if (z) {
                    ExchangeSourceHandler.this.aborted = true;
                }
                ExchangeSourceHandler.this.buffer.waitForReading().listener().onResponse((Object) null);
                RemoteSink remoteSink2 = remoteSink;
                ActionListener actionListener2 = assertAtLeastOnce;
                remoteSink2.close(ActionListener.running(() -> {
                    actionListener2.onFailure(exc);
                }));
            }

            protected void doRun() {
                EsqlRefCountingListener esqlRefCountingListener = new EsqlRefCountingListener(assertAtLeastOnce);
                for (int i2 = 0; i2 < i; i2++) {
                    try {
                        new RemoteSinkFetcher(remoteSink, z, runnable, esqlRefCountingListener.acquire()).fetchPage();
                    } catch (Throwable th) {
                        try {
                            esqlRefCountingListener.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                    }
                }
                esqlRefCountingListener.close();
            }
        });
    }

    public Releasable addEmptySink() {
        this.outstandingSinks.trackNewInstance();
        PendingInstances pendingInstances = this.outstandingSinks;
        Objects.requireNonNull(pendingInstances);
        return pendingInstances::finishInstance;
    }

    public void finishEarly(boolean z, ActionListener<Void> actionListener) {
        this.buffer.finish(z);
        EsqlRefCountingListener esqlRefCountingListener = new EsqlRefCountingListener(actionListener);
        try {
            Iterator<RemoteSink> it = this.remoteSinks.values().iterator();
            while (it.hasNext()) {
                it.next().close(esqlRefCountingListener.acquire());
            }
            esqlRefCountingListener.close();
        } catch (Throwable th) {
            try {
                esqlRefCountingListener.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
