package net.snowflake.hivemetastoreconnector.core;

import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import javax.annotation.ParametersAreNonnullByDefault;
import net.snowflake.hivemetastoreconnector.SnowflakeConf;
import net.snowflake.hivemetastoreconnector.SnowflakeHiveListener;
import net.snowflake.hivemetastoreconnector.commands.AddPartition;
import net.snowflake.hivemetastoreconnector.commands.Command;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/snowflake/hivemetastoreconnector/core/Scheduler.class */
public class Scheduler {
    private static final Logger log = LoggerFactory.getLogger(SnowflakeHiveListener.class);
    private final LoadingCache<TableKey, BlockingDeque<Command>> messageQueues;
    private final ExecutorService threadPool;
    private static final int MAX_STATEMENTS_PER_ROUND = 10;
    private final SnowflakeConf snowflakeConf;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/snowflake/hivemetastoreconnector/core/Scheduler$TableKey.class */
    public static class TableKey {
        private final String databaseName;
        private final String tableName;

        TableKey(String str, String str2) {
            Preconditions.checkNotNull(str);
            Preconditions.checkNotNull(str2);
            this.databaseName = str;
            this.tableName = str2;
        }

        public int hashCode() {
            return new HashCodeBuilder().append(this.databaseName).append(this.tableName).build().intValue();
        }

        public String toString() {
            return String.format("%s.%s", this.databaseName, this.tableName);
        }

        public boolean equals(Object obj) {
            return (obj instanceof TableKey) && this.databaseName.equals(((TableKey) obj).databaseName) && this.tableName.equals(((TableKey) obj).tableName);
        }
    }

    public Scheduler(int i, SnowflakeConf snowflakeConf) {
        Preconditions.checkArgument(i > 0);
        Preconditions.checkNotNull(snowflakeConf);
        this.threadPool = Executors.newFixedThreadPool(i);
        this.snowflakeConf = snowflakeConf;
        this.messageQueues = CacheBuilder.newBuilder().removalListener(removalNotification -> {
            log.info(String.format("Removing queue %s from cache", removalNotification.getKey()));
        }).build(new CacheLoader<TableKey, BlockingDeque<Command>>() { // from class: net.snowflake.hivemetastoreconnector.core.Scheduler.1
            @ParametersAreNonnullByDefault
            public BlockingDeque<Command> load(TableKey tableKey) {
                LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque();
                Scheduler.this.threadPool.submit(() -> {
                    Scheduler.this.doWork(tableKey, linkedBlockingDeque);
                });
                return linkedBlockingDeque;
            }
        });
    }

    public void enqueueMessage(Command command) {
        Preconditions.checkNotNull(command);
        try {
            Queue queue = (Queue) this.messageQueues.get(getKeyFromMessage(command));
            Preconditions.checkNotNull(queue);
            log.info("Enqueueing message. Current count (before enqueuing): " + queue.size());
            queue.add(command);
        } catch (ExecutionException e) {
            log.error("Could not initialize queue " + e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doWork(TableKey tableKey, BlockingDeque<Command> blockingDeque) {
        try {
            if (processMessages(blockingDeque, this.snowflakeConf)) {
                this.threadPool.submit(() -> {
                    doWork(tableKey, blockingDeque);
                });
            } else {
                this.messageQueues.invalidate(tableKey);
                ArrayList arrayList = new ArrayList();
                blockingDeque.drainTo(arrayList);
                arrayList.forEach(this::enqueueMessage);
            }
        } catch (InterruptedException e) {
            log.error("Thread interrupted: " + e);
            Thread.currentThread().interrupt();
        } catch (Throwable th) {
            log.error("Encountered error while processing queue: " + th);
            this.threadPool.submit(() -> {
                doWork(tableKey, blockingDeque);
            });
        }
    }

    private static boolean processMessages(BlockingDeque<Command> blockingDeque, SnowflakeConf snowflakeConf) throws InterruptedException {
        blockingDeque.addFirst(blockingDeque.take());
        for (int i = 0; !blockingDeque.isEmpty() && i < 10; i++) {
            if ((blockingDeque.peek() instanceof AddPartition) && !((AddPartition) blockingDeque.peek()).isCompact()) {
                ArrayList arrayList = new ArrayList();
                while (blockingDeque.peek() instanceof AddPartition) {
                    arrayList.add((AddPartition) blockingDeque.poll());
                }
                List reverse = Lists.reverse(AddPartition.compact(arrayList));
                blockingDeque.getClass();
                reverse.forEach((v1) -> {
                    r1.addFirst(v1);
                });
            }
            SnowflakeClient.generateAndExecuteSnowflakeStatements(blockingDeque.poll(), snowflakeConf);
        }
        log.info("Queue processed.");
        return !blockingDeque.isEmpty();
    }

    private TableKey getKeyFromMessage(Command command) {
        return new TableKey(command.getDatabaseName(), command.getTableName());
    }
}
