package net.snowflake.ingest.internal.apache.iceberg.actions;

import java.io.Closeable;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import net.snowflake.ingest.internal.apache.iceberg.Table;
import net.snowflake.ingest.internal.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import net.snowflake.ingest.internal.apache.iceberg.relocated.com.google.common.base.Preconditions;
import net.snowflake.ingest.internal.apache.iceberg.relocated.com.google.common.collect.Lists;
import net.snowflake.ingest.internal.apache.iceberg.relocated.com.google.common.collect.Queues;
import net.snowflake.ingest.internal.apache.iceberg.relocated.com.google.common.collect.Sets;
import net.snowflake.ingest.internal.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/snowflake/ingest/internal/apache/iceberg/actions/BaseCommitService.class */
abstract class BaseCommitService<T> implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(BaseCommitService.class);
    public static final long TIMEOUT_IN_MS_DEFAULT = TimeUnit.MINUTES.toMillis(120);
    private final Table table;
    private final ExecutorService committerService;
    private final ConcurrentLinkedQueue<T> completedRewrites;
    private final ConcurrentLinkedQueue<String> inProgressCommits;
    private final ConcurrentLinkedQueue<T> committedRewrites;
    private final int rewritesPerCommit;
    private final AtomicBoolean running;
    private final long timeoutInMS;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BaseCommitService(Table table, int i) {
        this(table, i, TIMEOUT_IN_MS_DEFAULT);
    }

    BaseCommitService(Table table, int i, long j) {
        this.running = new AtomicBoolean(false);
        this.table = table;
        LOG.info("Creating commit service for table {} with {} groups per commit", table, Integer.valueOf(i));
        this.rewritesPerCommit = i;
        this.timeoutInMS = j;
        this.committerService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("Committer-Service").build());
        this.completedRewrites = Queues.newConcurrentLinkedQueue();
        this.committedRewrites = Queues.newConcurrentLinkedQueue();
        this.inProgressCommits = Queues.newConcurrentLinkedQueue();
    }

    protected abstract void commitOrClean(Set<T> set);

    protected abstract void abortFileGroup(T t);

    public void start() {
        Preconditions.checkState(this.running.compareAndSet(false, true), "Commit service already started");
        LOG.info("Starting commit service for {}", this.table);
        this.committerService.execute(() -> {
            while (true) {
                if (!this.running.get() && this.completedRewrites.isEmpty() && this.inProgressCommits.isEmpty()) {
                    return;
                }
                try {
                    if (this.completedRewrites.isEmpty() && this.inProgressCommits.isEmpty()) {
                        Thread.sleep(100L);
                    }
                    if (!this.running.get() && !this.completedRewrites.isEmpty()) {
                        commitReadyCommitGroups();
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException("Interrupted while processing commits", e);
                }
            }
        });
    }

    public void offer(T t) {
        LOG.debug("Offered to commit service: {}", t);
        Preconditions.checkState(this.running.get(), "Cannot add rewrites to a service which has already been closed");
        this.completedRewrites.add(t);
        commitReadyCommitGroups();
    }

    public List<T> results() {
        Preconditions.checkState(this.committerService.isShutdown(), "Cannot get results from a service which has not been closed");
        return Lists.newArrayList(this.committedRewrites.iterator());
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        Preconditions.checkState(this.running.compareAndSet(true, false), "Cannot close already closed commit service");
        LOG.info("Closing commit service for {} waiting for all commits to finish", this.table);
        this.committerService.shutdown();
        boolean z = false;
        try {
            if (!this.committerService.awaitTermination(this.timeoutInMS, TimeUnit.MILLISECONDS)) {
                LOG.warn("Commit operation did not complete within {} minutes ({} ms) of the all files being rewritten. This may mean that some changes were not successfully committed to the table.", Long.valueOf(TimeUnit.MILLISECONDS.toMinutes(this.timeoutInMS)), Long.valueOf(this.timeoutInMS));
                z = true;
            }
            if (!this.completedRewrites.isEmpty() && z) {
                LOG.error("Attempting to cleanup uncommitted file groups");
                synchronized (this.completedRewrites) {
                    while (!this.completedRewrites.isEmpty()) {
                        abortFileGroup(this.completedRewrites.poll());
                    }
                }
            }
            Preconditions.checkArgument(!z && this.completedRewrites.isEmpty(), "Timeout occurred when waiting for commits to complete. {} file groups committed. {} file groups remain uncommitted. Retry this operation to attempt rewriting the failed groups.", this.committedRewrites.size(), this.completedRewrites.size());
            Preconditions.checkState(this.completedRewrites.isEmpty(), "File groups offered after service was closed, they were not successfully committed.");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Cannot complete commit for rewrite, commit service interrupted", e);
        }
    }

    private void commitReadyCommitGroups() {
        HashSet hashSet = null;
        if (canCreateCommitGroup()) {
            synchronized (this.completedRewrites) {
                if (canCreateCommitGroup()) {
                    hashSet = Sets.newHashSetWithExpectedSize(this.rewritesPerCommit);
                    for (int i = 0; i < this.rewritesPerCommit && !this.completedRewrites.isEmpty(); i++) {
                        hashSet.add(this.completedRewrites.poll());
                    }
                }
            }
        }
        if (hashSet != null) {
            String uuid = UUID.randomUUID().toString();
            this.inProgressCommits.add(uuid);
            try {
                commitOrClean(hashSet);
                this.committedRewrites.addAll(hashSet);
            } catch (Exception e) {
                LOG.error("Failure during rewrite commit process, partial progress enabled. Ignoring", e);
            }
            this.inProgressCommits.remove(uuid);
        }
    }

    @VisibleForTesting
    boolean canCreateCommitGroup() {
        return (this.completedRewrites.size() >= this.rewritesPerCommit) || (!this.running.get() && !this.completedRewrites.isEmpty());
    }

    @VisibleForTesting
    boolean completedRewritesAllCommitted() {
        return this.completedRewrites.isEmpty() && this.inProgressCommits.isEmpty();
    }
}
