package org.elasticsearch.compute.operator.exchange;

import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.IsBlockedResult;

/* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.class */
public final class ExchangeSinkHandler {
    private final ExchangeBuffer buffer;
    private final Queue<ActionListener<ExchangeResponse>> listeners = new ConcurrentLinkedQueue();
    private final AtomicInteger outstandingSinks = new AtomicInteger();
    private final Semaphore promised = new Semaphore(1);
    private final SubscribableListener<Void> completionFuture;
    private final LongSupplier nowInMillis;
    private final AtomicLong lastUpdatedInMillis;
    private final BlockFactory blockFactory;

    /* loaded from: input_file:org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler$ExchangeSinkImpl.class */
    private class ExchangeSinkImpl implements ExchangeSink {
        boolean finished;
        private final Runnable onPageFetched;
        private final SubscribableListener<Void> onFinished = new SubscribableListener<>();

        ExchangeSinkImpl(Runnable runnable) {
            this.onPageFetched = runnable;
            ExchangeSinkHandler.this.onChanged();
            ExchangeSinkHandler.this.buffer.addCompletionListener(this.onFinished);
            ExchangeSinkHandler.this.outstandingSinks.incrementAndGet();
        }

        @Override // org.elasticsearch.compute.operator.exchange.ExchangeSink
        public void addPage(Page page) {
            this.onPageFetched.run();
            ExchangeSinkHandler.this.buffer.addPage(page);
            ExchangeSinkHandler.this.notifyListeners();
        }

        @Override // org.elasticsearch.compute.operator.exchange.ExchangeSink
        public void finish() {
            if (this.finished) {
                return;
            }
            this.finished = true;
            this.onFinished.onResponse((Object) null);
            ExchangeSinkHandler.this.onChanged();
            if (ExchangeSinkHandler.this.outstandingSinks.decrementAndGet() == 0) {
                ExchangeSinkHandler.this.buffer.finish(false);
                ExchangeSinkHandler.this.notifyListeners();
            }
        }

        @Override // org.elasticsearch.compute.operator.exchange.ExchangeSink
        public boolean isFinished() {
            return this.onFinished.isDone();
        }

        @Override // org.elasticsearch.compute.operator.exchange.ExchangeSink
        public void addCompletionListener(ActionListener<Void> actionListener) {
            this.onFinished.addListener(actionListener);
        }

        @Override // org.elasticsearch.compute.operator.exchange.ExchangeSink
        public IsBlockedResult waitForWriting() {
            return ExchangeSinkHandler.this.buffer.waitForWriting();
        }
    }

    public ExchangeSinkHandler(BlockFactory blockFactory, int i, LongSupplier longSupplier) {
        this.blockFactory = blockFactory;
        this.buffer = new ExchangeBuffer(i);
        ExchangeBuffer exchangeBuffer = this.buffer;
        Objects.requireNonNull(exchangeBuffer);
        this.completionFuture = SubscribableListener.newForked(exchangeBuffer::addCompletionListener);
        this.nowInMillis = longSupplier;
        this.lastUpdatedInMillis = new AtomicLong(longSupplier.getAsLong());
    }

    public void fetchPageAsync(boolean z, ActionListener<ExchangeResponse> actionListener) {
        if (z) {
            this.buffer.finish(true);
        }
        this.listeners.add(actionListener);
        onChanged();
        notifyListeners();
    }

    public void addCompletionListener(ActionListener<Void> actionListener) {
        this.completionFuture.addListener(actionListener);
    }

    public boolean isFinished() {
        return this.completionFuture.isDone();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onFailure(Exception exc) {
        this.completionFuture.onFailure(exc);
        this.buffer.finish(true);
        notifyListeners();
    }

    private void notifyListeners() {
        while (!this.listeners.isEmpty()) {
            if ((this.buffer.size() <= 0 && !this.buffer.noMoreInputs()) || !this.promised.tryAcquire()) {
                return;
            }
            try {
                ActionListener<ExchangeResponse> poll = this.listeners.poll();
                if (poll != null) {
                    ExchangeResponse exchangeResponse = new ExchangeResponse(this.blockFactory, this.buffer.pollPage(), this.buffer.isFinished());
                    onChanged();
                    ActionListener.respondAndRelease(poll, exchangeResponse);
                }
            } finally {
                this.promised.release();
            }
        }
    }

    public ExchangeSink createExchangeSink(Runnable runnable) {
        return new ExchangeSinkImpl(runnable);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasData() {
        return this.outstandingSinks.get() > 0 || this.buffer.size() > 0;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasListeners() {
        return !this.listeners.isEmpty();
    }

    private void onChanged() {
        this.lastUpdatedInMillis.accumulateAndGet(this.nowInMillis.getAsLong(), Math::max);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long lastUpdatedTimeInMillis() {
        return this.lastUpdatedInMillis.get();
    }

    public int bufferSize() {
        return this.buffer.size();
    }
}
