package org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.actions;

import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.CombinedScanTask;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.DataFile;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.FileScanTask;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.PartitionSpec;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.RewriteFiles;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.Table;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.TableProperties;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.encryption.EncryptionManager;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.expressions.Expression;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.expressions.Expressions;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.io.CloseableIterable;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.io.CloseableIterator;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.io.FileIO;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.relocated.com.google.common.collect.Multimaps;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.util.PropertyUtil;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.util.StructLikeWrapper;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.util.TableScanUtil;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/shaded/net/snowflake/ingest/internal/apache/iceberg/actions/BaseRewriteDataFilesAction.class */
public abstract class BaseRewriteDataFilesAction<ThisT> extends BaseSnapshotUpdateAction<ThisT, RewriteDataFilesActionResult> {
    private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteDataFilesAction.class);
    private final Table table;
    private final EncryptionManager encryptionManager;
    private PartitionSpec spec;
    private long targetSizeInBytes;
    private int splitLookback;
    private long splitOpenFileCost;
    private Expression filter = Expressions.alwaysTrue();
    private boolean caseSensitive = false;
    private boolean useStartingSequenceNumber = false;
    private final FileIO fileIO = fileIO();

    protected BaseRewriteDataFilesAction(Table table) {
        this.table = table;
        this.spec = table.spec();
        this.targetSizeInBytes = Math.min(PropertyUtil.propertyAsLong(table.properties(), TableProperties.SPLIT_SIZE, TableProperties.SPLIT_SIZE_DEFAULT), PropertyUtil.propertyAsLong(table.properties(), TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT));
        this.splitLookback = PropertyUtil.propertyAsInt(table.properties(), TableProperties.SPLIT_LOOKBACK, 10);
        this.splitOpenFileCost = PropertyUtil.propertyAsLong(table.properties(), TableProperties.SPLIT_OPEN_FILE_COST, TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
        this.encryptionManager = table.encryption();
    }

    @Override // org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.actions.BaseAction
    protected Table table() {
        return this.table;
    }

    protected PartitionSpec spec() {
        return this.spec;
    }

    protected EncryptionManager encryptionManager() {
        return this.encryptionManager;
    }

    protected boolean caseSensitive() {
        return this.caseSensitive;
    }

    public BaseRewriteDataFilesAction<ThisT> caseSensitive(boolean z) {
        this.caseSensitive = z;
        return this;
    }

    public BaseRewriteDataFilesAction<ThisT> outputSpecId(int i) {
        Preconditions.checkArgument(this.table.specs().containsKey(Integer.valueOf(i)), "Invalid spec id %s", i);
        this.spec = this.table.specs().get(Integer.valueOf(i));
        return this;
    }

    public BaseRewriteDataFilesAction<ThisT> targetSizeInBytes(long j) {
        Preconditions.checkArgument(j > 0, "Invalid target rewrite data file size in bytes %s", j);
        this.targetSizeInBytes = j;
        return this;
    }

    public BaseRewriteDataFilesAction<ThisT> splitLookback(int i) {
        Preconditions.checkArgument(((long) i) > 0, "Invalid split lookback %s", i);
        this.splitLookback = i;
        return this;
    }

    public BaseRewriteDataFilesAction<ThisT> splitOpenFileCost(long j) {
        Preconditions.checkArgument(j > 0, "Invalid split openFileCost %s", j);
        this.splitOpenFileCost = j;
        return this;
    }

    public BaseRewriteDataFilesAction<ThisT> filter(Expression expression) {
        this.filter = Expressions.and(this.filter, expression);
        return this;
    }

    public BaseRewriteDataFilesAction<ThisT> useStartingSequenceNumber(boolean z) {
        this.useStartingSequenceNumber = z;
        return this;
    }

    @Override // org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.actions.Action
    public RewriteDataFilesActionResult execute() {
        CloseableIterable<FileScanTask> closeableIterable = null;
        if (this.table.currentSnapshot() == null) {
            return RewriteDataFilesActionResult.empty();
        }
        long snapshotId = this.table.currentSnapshot().snapshotId();
        try {
            closeableIterable = this.table.newScan().useSnapshot(snapshotId).caseSensitive(this.caseSensitive).ignoreResiduals().filter(this.filter).planFiles();
            if (closeableIterable != null) {
                try {
                    closeableIterable.close();
                } catch (IOException e) {
                    LOG.warn("Failed to close task iterable", e);
                }
            }
            Map map = (Map) groupTasksByPartition(closeableIterable.iterator()).entrySet().stream().filter(entry -> {
                return ((Collection) entry.getValue()).size() > 1;
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            }));
            if (map.isEmpty()) {
                return RewriteDataFilesActionResult.empty();
            }
            List<CombinedScanTask> list = (List) map.values().stream().map(collection -> {
                return TableScanUtil.planTasks(TableScanUtil.splitFiles(CloseableIterable.withNoopClose((Iterable) collection), this.targetSizeInBytes), this.targetSizeInBytes, this.splitLookback, this.splitOpenFileCost);
            }).flatMap((v0) -> {
                return Streams.stream(v0);
            }).filter(combinedScanTask -> {
                return combinedScanTask.files().size() > 1 || isPartialFileScan(combinedScanTask);
            }).collect(Collectors.toList());
            if (list.isEmpty()) {
                return RewriteDataFilesActionResult.empty();
            }
            List<DataFile> rewriteDataForTasks = rewriteDataForTasks(list);
            List list2 = (List) list.stream().flatMap(combinedScanTask2 -> {
                return combinedScanTask2.files().stream().map((v0) -> {
                    return v0.file();
                });
            }).collect(Collectors.toList());
            replaceDataFiles(list2, rewriteDataForTasks, snapshotId);
            return new RewriteDataFilesActionResult(list2, rewriteDataForTasks);
        } catch (Throwable th) {
            if (closeableIterable != null) {
                try {
                    closeableIterable.close();
                } catch (IOException e2) {
                    LOG.warn("Failed to close task iterable", e2);
                    throw th;
                }
            }
            throw th;
        }
    }

    private Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition(CloseableIterator<FileScanTask> closeableIterator) {
        ListMultimap newListMultimap = Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
        StructLikeWrapper forType = StructLikeWrapper.forType(this.spec.partitionType());
        Throwable th = null;
        try {
            try {
                try {
                    closeableIterator.forEachRemaining(fileScanTask -> {
                        newListMultimap.put(forType.copyFor(fileScanTask.file().partition()), fileScanTask);
                    });
                    if (closeableIterator != null) {
                        if (0 != 0) {
                            try {
                                closeableIterator.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            closeableIterator.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            LOG.warn("Failed to close task iterator", e);
        }
        return newListMultimap.asMap();
    }

    private void replaceDataFiles(Iterable<DataFile> iterable, Iterable<DataFile> iterable2, long j) {
        try {
            doReplace(iterable, iterable2, j);
        } catch (CommitStateUnknownException e) {
            LOG.warn("Commit state unknown, cannot clean up files that may have been committed", e);
            throw e;
        } catch (Exception e2) {
            LOG.warn("Failed to commit rewrite, cleaning up rewritten files", e2);
            Tasks.Builder onFailure = Tasks.foreach(Iterables.transform(iterable2, dataFile -> {
                return dataFile.path().toString();
            })).noRetry().suppressFailureWhenFinished().onFailure((str, exc) -> {
                LOG.warn("Failed to delete: {}", str, exc);
            });
            FileIO fileIO = this.fileIO;
            Objects.requireNonNull(fileIO);
            onFailure.run(fileIO::deleteFile);
            throw e2;
        }
    }

    @VisibleForTesting
    void doReplace(Iterable<DataFile> iterable, Iterable<DataFile> iterable2, long j) {
        RewriteFiles validateFromSnapshot = this.table.newRewrite().validateFromSnapshot(j);
        Iterator<DataFile> it = iterable.iterator();
        while (it.hasNext()) {
            validateFromSnapshot.deleteFile(it.next());
        }
        Iterator<DataFile> it2 = iterable2.iterator();
        while (it2.hasNext()) {
            validateFromSnapshot.addFile(it2.next());
        }
        if (this.useStartingSequenceNumber) {
            validateFromSnapshot.dataSequenceNumber(this.table.snapshot(j).sequenceNumber());
        }
        commit(validateFromSnapshot);
    }

    private boolean isPartialFileScan(CombinedScanTask combinedScanTask) {
        if (combinedScanTask.files().size() != 1) {
            return false;
        }
        FileScanTask next = combinedScanTask.files().iterator().next();
        return next.file().fileSizeInBytes() != next.length();
    }

    protected abstract FileIO fileIO();

    protected abstract List<DataFile> rewriteDataForTasks(List<CombinedScanTask> list);

    @Override // org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.actions.BaseSnapshotUpdateAction, org.apache.flink.shaded.net.snowflake.ingest.internal.apache.iceberg.actions.SnapshotUpdateAction
    public /* bridge */ /* synthetic */ Object set(String str, String str2) {
        return super.set(str, str2);
    }
}
