package org.dataconservancy.pass.indexer.reindex;

import java.io.File;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Stream;
import org.dataconservancy.pass.model.PassEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sqlite.JDBC;

/* loaded from: input_file:org/dataconservancy/pass/indexer/reindex/SqliteRunner.class */
public class SqliteRunner implements AutoCloseable {
    public static final int PROGRESS_FAILED = -1;
    public static final int PROGRESS_ENQUEUED = 0;
    public static final int PROGRESS_RUNNING = 1;
    public static final int PROGRESS_DONE = 100;
    static final String STMNT_CREATE_TABLE_RESULTS = "CREATE TABLE results (id INTEGER PRIMARY KEY, url TEXT NOT NULL, type TEXT NOT NULL, status INTEGER NOT NULL, result TEXT)";
    static final String STMNT_ITEM_SAVE_RESULT = "INSERT INTO results (type, url, status, result) VALUES (?, ?, ?, ?)";
    private final Connection conn;
    private final String filepath;
    BlockingExecutor exe;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SqliteRunner.class);
    static final String STMNT_CREATE_TABLE_TYPE_QUEUE = String.format("CREATE TABLE types_queue (id INTEGER PRIMARY KEY, type TEXT NOT NULL, progress INTEGER NOT NULL DEFAULT %d)", 0);
    static final String STMNT_CREATE_TABLE_ITEM_QUEUE = String.format("CREATE TABLE item_queue (id INTEGER PRIMARY KEY, type TEXT NOT NULL, url TEXT NOT NULL, progress INTEGER NOT NULL DEFAULT %d)", 0);
    static final String STMNT_ITEM_START = String.format("UPDATE item_queue SET progress = %s WHERE id = ?", 1);
    static final String STMNT_ITEM_REMOVE = String.format("DELETE FROM item_queue WHERE id = ?", new Object[0]);
    static final String STMNT_ITEM_FAIL = String.format("UPDATE item_queue SET progress = %d WHERE id = ?", -1);
    static final String STMNT_ITEM_POLL = String.format("SELECT id, type, url FROM item_queue WHERE progress = %d LIMIT ?", 0);
    static final String STMNT_FAIL_COUNT_TYPES = String.format("SELECT count(*) FROM  types_queue WHERE progress = %d", -1);
    static final String STMNT_FAIL_COUNT_ITEMS = String.format("SELECT count(*) FROM item_queue WHERE PROGRESS = %d", -1);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/dataconservancy/pass/indexer/reindex/SqliteRunner$Item.class */
    public class Item {
        public int id;
        public String uri;
        public String type;

        public Item(int i, String str, String str2) {
            this.id = i;
            this.type = str;
            this.uri = str2;
        }

        public Item start(Connection connection) {
            try {
                SqliteRunner.this.begin();
                SqliteRunner.this.execUpdate(SqliteRunner.STMNT_ITEM_START, preparedStatement -> {
                    preparedStatement.setInt(1, this.id);
                });
                SqliteRunner.this.commit();
                return this;
            } catch (Exception e) {
                SqliteRunner.this.rollback();
                throw new RuntimeException("could not take item out of queue", e);
            }
        }

        private void saveFailure(Exception exc) {
            synchronized (SqliteRunner.this.conn) {
                try {
                    SqliteRunner.this.begin();
                    SqliteRunner.this.execUpdate(SqliteRunner.STMNT_ITEM_FAIL, preparedStatement -> {
                        preparedStatement.setInt(1, this.id);
                    });
                    SqliteRunner.this.execUpdate(SqliteRunner.STMNT_ITEM_SAVE_RESULT, preparedStatement2 -> {
                        preparedStatement2.setString(1, this.type);
                        preparedStatement2.setString(2, this.uri);
                        preparedStatement2.setInt(3, -1);
                        preparedStatement2.setString(4, SqliteRunner.getStackTrace(exc));
                    });
                    SqliteRunner.this.commit();
                } catch (Exception e) {
                    SqliteRunner.this.rollback();
                    SqliteRunner.LOG.warn("Could not save the following error: \n" + SqliteRunner.getStackTrace(exc), (Throwable) e);
                }
            }
        }

        public boolean process(Function<URI, String> function) {
            try {
                String apply = function.apply(new URI(this.uri));
                synchronized (SqliteRunner.this.conn) {
                    try {
                        SqliteRunner.this.begin();
                        SqliteRunner.this.execUpdate(SqliteRunner.STMNT_ITEM_REMOVE, preparedStatement -> {
                            preparedStatement.setInt(1, this.id);
                        });
                        SqliteRunner.this.execUpdate(SqliteRunner.STMNT_ITEM_SAVE_RESULT, preparedStatement2 -> {
                            preparedStatement2.setString(1, this.type);
                            preparedStatement2.setString(2, this.uri);
                            preparedStatement2.setInt(3, 100);
                            preparedStatement2.setString(4, apply);
                        });
                        SqliteRunner.this.commit();
                    } catch (Exception e) {
                        SqliteRunner.this.rollback();
                        SqliteRunner.LOG.warn("Could not save success result, will retry later", (Throwable) e);
                        return false;
                    }
                }
                return true;
            } catch (Exception e2) {
                saveFailure(e2);
                return false;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/dataconservancy/pass/indexer/reindex/SqliteRunner$SQLConsumer.class */
    public interface SQLConsumer<T> {
        void accept(T t) throws SQLException;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/dataconservancy/pass/indexer/reindex/SqliteRunner$SQLRunnable.class */
    public interface SQLRunnable {
        void run() throws SQLException;
    }

    public SqliteRunner(Collection<Class<? extends PassEntity>> collection) {
        this.exe = new BlockingExecutor(4);
        this.filepath = newFilePath();
        this.conn = open(this.filepath);
        init(this.conn);
        try {
            begin();
            for (Class<? extends PassEntity> cls : collection) {
                execUpdate("INSERT INTO types_queue (type) VALUES (?);", preparedStatement -> {
                    preparedStatement.setString(1, cls.getName());
                });
            }
            commit();
        } catch (Exception e) {
            quietly(() -> {
                this.conn.close();
            });
            try {
                Files.delete(new File(this.filepath).toPath());
            } catch (Exception e2) {
                if (Files.exists(new File(this.filepath).toPath(), new LinkOption[0])) {
                    LOG.warn("Could not delete db file at {}", this.filepath);
                }
            }
            throw new RuntimeException("could not initialize", e);
        }
    }

    public SqliteRunner(String str) {
        this.exe = new BlockingExecutor(4);
        this.filepath = str;
        this.conn = open(str);
    }

    public String getFilePath() {
        return this.filepath;
    }

    public void process(Function<Class<? extends PassEntity>, Stream<URI>> function, Function<URI, String> function2) {
        this.exe.execute(() -> {
            try {
                Class<? extends PassEntity> nextType = nextType();
                while (nextType != null) {
                    LOG.info("Enqueueing " + nextType.getSimpleName());
                    enqueue(nextType, function);
                    LOG.info("Done enqueueing " + nextType.getSimpleName());
                    nextType = nextType();
                }
            } catch (Exception e) {
                LOG.warn("Error populating item queue", (Throwable) e);
            }
        });
        while (!isDoneQueueing()) {
            processItems(function2);
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOG.warn("Interrupted");
                return;
            }
        }
        processItems(function2);
        this.exe.awaitDone();
    }

    private void processItems(Function<URI, String> function) {
        AtomicInteger atomicInteger = new AtomicInteger();
        for (Item item : queuedItems()) {
            this.exe.execute(() -> {
                try {
                    item.process(function);
                    if (atomicInteger.incrementAndGet() % 1000 == 0) {
                        LOG.info("Processed " + atomicInteger.get());
                    }
                } catch (Exception e) {
                    LOG.warn("Uncaut exception processing items", (Throwable) e);
                }
            });
        }
    }

    private Iterable<Item> queuedItems() {
        return () -> {
            return new Iterator<Item>() { // from class: org.dataconservancy.pass.indexer.reindex.SqliteRunner.1
                Queue items = new ConcurrentLinkedQueue();

                @Override // java.util.Iterator
                public boolean hasNext() {
                    if (this.items.isEmpty()) {
                        this.items.addAll(SqliteRunner.this.fetchItems(100));
                    }
                    return !this.items.isEmpty();
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.Iterator
                public Item next() {
                    hasNext();
                    return (Item) this.items.poll();
                }
            };
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Collection<Item> fetchItems(int i) {
        ArrayList arrayList = new ArrayList(i);
        synchronized (this.conn) {
            autocommit();
            execQuery(STMNT_ITEM_POLL, preparedStatement -> {
                preparedStatement.setInt(1, i);
            }, resultSet -> {
                while (resultSet.next()) {
                    arrayList.add(new Item(resultSet.getInt(1), resultSet.getString(2), resultSet.getString(3)).start(this.conn));
                }
            });
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void begin() {
        quietly(() -> {
            this.conn.setAutoCommit(false);
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void rollback() {
        quietly(() -> {
            this.conn.rollback();
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void commit() {
        quietly(() -> {
            this.conn.commit();
        });
    }

    public void autocommit() {
        quietly(() -> {
            this.conn.setAutoCommit(true);
        });
    }

    public boolean isDoneQueueing() {
        boolean z;
        synchronized (this.conn) {
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            autocommit();
            execQuery(String.format("SELECT count(*) from types_queue WHERE progress = %d;", 0), null, resultSet -> {
                atomicBoolean.set(resultSet.next() && resultSet.getInt(1) == 0);
            });
            z = atomicBoolean.get();
        }
        return z;
    }

    public int errorCount() {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        synchronized (this.conn) {
            autocommit();
            execQuery(STMNT_FAIL_COUNT_TYPES, null, resultSet -> {
                if (resultSet.next()) {
                    atomicInteger.getAndAdd(resultSet.getInt(1));
                }
            });
            execQuery(STMNT_FAIL_COUNT_ITEMS, null, resultSet2 -> {
                if (resultSet2.next()) {
                    atomicInteger.getAndAdd(resultSet2.getInt(1));
                }
            });
        }
        return atomicInteger.get();
    }

    public void clearErrors() {
        synchronized (this.conn) {
            autocommit();
            execUpdate(String.format("UPDATE types_queue SET progress = %d WHERE progress = %d", 0, -1), null);
            execUpdate(String.format("UPDATE item_queue SET progress = %d WHERE progress = %d", 0, -1), null);
        }
    }

    private void quietly(SQLRunnable sQLRunnable) {
        try {
            sQLRunnable.run();
        } catch (SQLException e) {
            throw new RuntimeException("Error performing quiet SQL operation", e);
        }
    }

    int execUpdate(String str, SQLConsumer<PreparedStatement> sQLConsumer) {
        try {
            PreparedStatement prepareStatement = this.conn.prepareStatement(str);
            if (sQLConsumer != null) {
                try {
                    sQLConsumer.accept(prepareStatement);
                } finally {
                }
            }
            int executeUpdate = prepareStatement.executeUpdate();
            if (prepareStatement != null) {
                prepareStatement.close();
            }
            return executeUpdate;
        } catch (SQLException e) {
            throw new RuntimeException("Could not execute SQL", e);
        }
    }

    private void execQuery(String str, SQLConsumer<PreparedStatement> sQLConsumer, SQLConsumer<ResultSet> sQLConsumer2) {
        try {
            PreparedStatement prepareStatement = this.conn.prepareStatement(str);
            if (sQLConsumer != null) {
                try {
                    sQLConsumer.accept(prepareStatement);
                } finally {
                }
            }
            ResultSet executeQuery = prepareStatement.executeQuery();
            if (sQLConsumer2 != null) {
                try {
                    sQLConsumer2.accept(executeQuery);
                } catch (Throwable th) {
                    if (executeQuery != null) {
                        try {
                            executeQuery.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
            if (executeQuery != null) {
                executeQuery.close();
            }
            if (prepareStatement != null) {
                prepareStatement.close();
            }
        } catch (SQLException e) {
            throw new RuntimeException("Could not execute SQL", e);
        }
    }

    private Class<? extends PassEntity> nextType() {
        AtomicReference atomicReference = new AtomicReference();
        synchronized (this.conn) {
            autocommit();
            execQuery("SELECT type, id FROM types_queue WHERE progress = 0 LIMIT 1", null, resultSet -> {
                if (resultSet.next()) {
                    atomicReference.set(resultSet.getString(1));
                }
            });
            if (atomicReference.get() == null) {
                return null;
            }
            try {
                begin();
                Class cls = Class.forName((String) atomicReference.get());
                execUpdate(String.format("UPDATE types_queue SET progress = %s WHERE type = '%s'", 1, atomicReference.get()), null);
                commit();
                return cls;
            } catch (Exception e) {
                rollback();
                throw new RuntimeException("Could not update type queue", e);
            }
        }
    }

    private void enqueue(Class<? extends PassEntity> cls, Function<Class<? extends PassEntity>, Stream<URI>> function) {
        synchronized (this.conn) {
            try {
                begin();
                function.apply(cls).forEach(uri -> {
                    execUpdate("INSERT INTO item_queue (type, url) VALUES (?, ?)", preparedStatement -> {
                        preparedStatement.setString(1, cls.getSimpleName());
                        preparedStatement.setString(2, uri.toString());
                    });
                });
                execUpdate(String.format("UPDATE types_queue SET progress = %d WHERE type = '%s'", 100, cls.getName()), null);
                commit();
            } catch (Exception e) {
                rollback();
                autocommit();
                execUpdate(String.format("UPDATE types_queue SET progress = %d WHERE type = '%s'", -1, cls.getName()), null);
                LOG.warn("error loading item queue", (Throwable) e);
            }
        }
    }

    private Connection open(String str) {
        try {
            return DriverManager.getConnection(JDBC.PREFIX + str);
        } catch (SQLException e) {
            throw new RuntimeException("Failed opening db at " + str, e);
        }
    }

    private String newFilePath() {
        return new SimpleDateFormat("yyyy_mm_dd-HHMMss.SSS").format(new Date()) + ".db";
    }

    private void init(Connection connection) {
        begin();
        execUpdate(STMNT_CREATE_TABLE_TYPE_QUEUE, null);
        execUpdate(STMNT_CREATE_TABLE_ITEM_QUEUE, null);
        execUpdate(STMNT_CREATE_TABLE_RESULTS, null);
        commit();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String getStackTrace(Exception exc) {
        StringWriter stringWriter = new StringWriter();
        exc.printStackTrace(new PrintWriter(stringWriter));
        return stringWriter.toString();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        quietly(() -> {
            this.conn.close();
        });
    }
}
