package org.apache.flink.connector.base.source.reader.fetcher;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.class */
public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SplitFetcher.class);
    private static final SplitFetcherTask WAKEUP_TASK = new DummySplitFetcherTask("WAKEUP_TASK");
    private final int id;
    private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
    private final SplitReader<E, SplitT> splitReader;
    private final Consumer<Throwable> errorHandler;
    private final Runnable shutdownHook;
    private final FetchTask<E, SplitT> fetchTask;
    private volatile SplitFetcherTask runningTask = null;
    private final Object lock = new Object();
    private final BlockingDeque<SplitFetcherTask> taskQueue = new LinkedBlockingDeque();
    private final Map<String, SplitT> assignedSplits = new HashMap();

    @GuardedBy("lock")
    private volatile boolean isIdle = true;
    private final AtomicBoolean wakeUp = new AtomicBoolean(false);
    private final AtomicBoolean closed = new AtomicBoolean(false);

    /* loaded from: input_file:org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher$DummySplitFetcherTask.class */
    private static class DummySplitFetcherTask implements SplitFetcherTask {
        private final String name;

        private DummySplitFetcherTask(String str) {
            this.name = str;
        }

        @Override // org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask
        public boolean run() {
            return false;
        }

        @Override // org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask
        public void wakeUp() {
        }

        public String toString() {
            return this.name;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SplitFetcher(int i, FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> futureCompletingBlockingQueue, SplitReader<E, SplitT> splitReader, Consumer<Throwable> consumer, Runnable runnable) {
        this.id = i;
        this.elementsQueue = futureCompletingBlockingQueue;
        this.splitReader = splitReader;
        this.errorHandler = consumer;
        this.shutdownHook = runnable;
        this.fetchTask = new FetchTask<>(splitReader, futureCompletingBlockingQueue, collection -> {
            Map<String, SplitT> map = this.assignedSplits;
            map.getClass();
            collection.forEach((v1) -> {
                r1.remove(v1);
            });
            LOG.info("Finished reading from splits {}", collection);
        }, i);
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.info("Starting split fetcher {}", Integer.valueOf(this.id));
        while (!this.closed.get()) {
            try {
                try {
                    runOnce();
                } catch (Throwable th) {
                    this.errorHandler.accept(th);
                    try {
                        this.splitReader.close();
                    } catch (Exception e) {
                        this.errorHandler.accept(e);
                    }
                    LOG.info("Split fetcher {} exited.", Integer.valueOf(this.id));
                    this.shutdownHook.run();
                    return;
                }
            } finally {
                try {
                    this.splitReader.close();
                } catch (Exception e2) {
                    this.errorHandler.accept(e2);
                }
                LOG.info("Split fetcher {} exited.", Integer.valueOf(this.id));
                this.shutdownHook.run();
            }
        }
    }

    void runOnce() {
        try {
            if (shouldRunFetchTask()) {
                this.runningTask = this.fetchTask;
            } else {
                this.runningTask = this.taskQueue.take();
            }
            LOG.debug("Prepare to run {}", this.runningTask);
            if (!this.wakeUp.get() && this.runningTask.run()) {
                LOG.debug("Finished running task {}", this.runningTask);
                this.runningTask = null;
                checkAndSetIdle();
            }
            maybeEnqueueTask(this.runningTask);
            synchronized (this.wakeUp) {
                this.runningTask = null;
                this.wakeUp.set(false);
                LOG.debug("Cleaned wakeup flag.");
            }
        } catch (Exception e) {
            throw new RuntimeException(String.format("SplitFetcher thread %d received unexpected exception while polling the records", Integer.valueOf(this.id)), e);
        }
    }

    public void addSplits(List<SplitT> list) {
        enqueueTask(new AddSplitsTask(this.splitReader, list, this.assignedSplits));
        wakeUp(true);
    }

    public void enqueueTask(SplitFetcherTask splitFetcherTask) {
        synchronized (this.lock) {
            this.taskQueue.offer(splitFetcherTask);
            this.isIdle = false;
        }
    }

    public SplitReader<E, SplitT> getSplitReader() {
        return this.splitReader;
    }

    public void shutdown() {
        if (this.closed.compareAndSet(false, true)) {
            LOG.info("Shutting down split fetcher {}", Integer.valueOf(this.id));
            wakeUp(false);
        }
    }

    Map<String, SplitT> assignedSplits() {
        return this.assignedSplits;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isIdle() {
        return this.isIdle;
    }

    boolean shouldRunFetchTask() {
        return this.taskQueue.isEmpty() && !this.assignedSplits.isEmpty();
    }

    void wakeUp(boolean z) {
        synchronized (this.wakeUp) {
            this.wakeUp.set(true);
            SplitFetcherTask splitFetcherTask = this.runningTask;
            if (isRunningTask(splitFetcherTask)) {
                LOG.debug("Waking up running task {}", splitFetcherTask);
                splitFetcherTask.wakeUp();
            } else if (!z) {
                LOG.debug("Waking up fetcher thread.");
                this.taskQueue.add(WAKEUP_TASK);
            }
        }
    }

    private void maybeEnqueueTask(SplitFetcherTask splitFetcherTask) {
        if (!this.closed.get() && isRunningTask(splitFetcherTask) && splitFetcherTask != this.fetchTask && !this.taskQueue.offerFirst(splitFetcherTask)) {
            throw new RuntimeException("The task queue is full. This is only theoretically possible when really bad thing happens.");
        }
        if (splitFetcherTask != null) {
            LOG.debug("Enqueued task {}", splitFetcherTask);
        }
    }

    private boolean isRunningTask(SplitFetcherTask splitFetcherTask) {
        return (splitFetcherTask == null || splitFetcherTask == WAKEUP_TASK) ? false : true;
    }

    private void checkAndSetIdle() {
        if (shouldIdle()) {
            synchronized (this.lock) {
                if (shouldIdle()) {
                    this.isIdle = true;
                }
            }
            this.elementsQueue.notifyAvailable();
        }
    }

    private boolean shouldIdle() {
        return this.assignedSplits.isEmpty() && this.taskQueue.isEmpty();
    }
}
