package com.github.ddth.queue.impl;

import com.github.ddth.commons.utils.IdGenerator;
import com.github.ddth.queue.IQueue;
import com.github.ddth.queue.IQueueMessage;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.io.FileUtils;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.SkipListMemTableConfig;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/ddth/queue/impl/RocksDbQueue.class */
public abstract class RocksDbQueue implements IQueue, Closeable, AutoCloseable {
    private RocksDB rocksDb;
    private RocksIterator rocksDbIt;
    private Options rocksOptions;
    private WriteOptions writeOptions;
    private ReadOptions readOptions;
    private Logger LOGGER = LoggerFactory.getLogger(RocksDbQueue.class);
    private String storageDir = "/tmp/ddth-rocksdb-queue";
    private IdGenerator idGen = IdGenerator.getInstance(IdGenerator.getMacAddr());
    private byte[] lastFetchedId = null;
    private Lock lockPut = new ReentrantLock();
    private Lock lockTake = new ReentrantLock();

    public String getStorageDir() {
        return this.storageDir;
    }

    public RocksDbQueue setStorageDir(String str) {
        this.storageDir = str;
        return this;
    }

    public RocksDbQueue init() {
        File file = new File(this.storageDir);
        this.LOGGER.info("Storage Directory: " + file.getAbsolutePath());
        try {
            FileUtils.forceMkdir(file);
            RocksDB.loadLibrary();
            this.rocksOptions = new Options();
            this.rocksOptions.setCreateIfMissing(true).getEnv().setBackgroundThreads(2, 0).setBackgroundThreads(4, 1);
            this.rocksOptions.setMaxBackgroundFlushes(2).setMaxBackgroundCompactions(4);
            this.rocksOptions.setWriteBufferSize(8388608L).setMinWriteBufferNumberToMerge(2).setLevelZeroFileNumCompactionTrigger(512).setTargetFileSizeBase(16777216L);
            this.rocksOptions.setMemTableConfig(new SkipListMemTableConfig());
            this.writeOptions = new WriteOptions().setSync(false).setDisableWAL(false);
            this.readOptions = new ReadOptions().setTailing(true);
            try {
                this.rocksDb = RocksDB.open(this.rocksOptions, file.getAbsolutePath());
                this.rocksDbIt = this.rocksDb.newIterator(this.readOptions);
                return this;
            } catch (RocksDBException e) {
                destroy();
                throw new RuntimeException((Throwable) e);
            }
        } catch (IOException e2) {
            throw new RuntimeException(e2);
        }
    }

    public void destroy() {
        if (this.rocksDbIt != null) {
            try {
                this.rocksDbIt.dispose();
            } catch (Exception e) {
                this.LOGGER.warn(e.getMessage(), e);
            } finally {
                this.rocksDbIt = null;
            }
        }
        try {
        } catch (Exception e2) {
            this.LOGGER.warn(e2.getMessage(), e2);
        } finally {
            this.rocksDb = null;
        }
        if (this.rocksDb != null) {
            this.rocksDb.close();
        }
        if (this.rocksOptions != null) {
            try {
                try {
                    this.rocksOptions.dispose();
                    this.rocksOptions = null;
                } catch (Exception e3) {
                    this.LOGGER.warn(e3.getMessage(), e3);
                    this.rocksOptions = null;
                }
            } catch (Throwable th) {
                this.rocksOptions = null;
                throw th;
            }
        }
        if (this.readOptions != null) {
            try {
                try {
                    this.readOptions.dispose();
                    this.readOptions = null;
                } catch (Exception e4) {
                    this.LOGGER.warn(e4.getMessage(), e4);
                    this.readOptions = null;
                }
            } catch (Throwable th2) {
                this.readOptions = null;
                throw th2;
            }
        }
        if (this.writeOptions != null) {
            try {
                try {
                    this.writeOptions.dispose();
                    this.writeOptions = null;
                } catch (Exception e5) {
                    this.LOGGER.warn(e5.getMessage(), e5);
                    this.writeOptions = null;
                }
            } catch (Throwable th3) {
                this.writeOptions = null;
                throw th3;
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        destroy();
    }

    protected abstract byte[] serialize(IQueueMessage iQueueMessage);

    protected abstract IQueueMessage deserialize(byte[] bArr);

    protected boolean putToQueue(IQueueMessage iQueueMessage) {
        byte[] serialize = serialize(iQueueMessage);
        this.lockPut.lock();
        try {
            try {
                this.rocksDb.put(this.writeOptions, this.idGen.generateId128Hex().toLowerCase().getBytes(), serialize);
                this.lockPut.unlock();
                return true;
            } catch (RocksDBException e) {
                throw new RuntimeException((Throwable) e);
            }
        } catch (Throwable th) {
            this.lockPut.unlock();
            throw th;
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean queue(IQueueMessage iQueueMessage) {
        IQueueMessage mo2clone = iQueueMessage.mo2clone();
        Date date = new Date();
        mo2clone.qNumRequeues(0).qOriginalTimestamp(date).qTimestamp(date);
        return putToQueue(mo2clone);
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean requeue(IQueueMessage iQueueMessage) {
        IQueueMessage mo2clone = iQueueMessage.mo2clone();
        mo2clone.qIncNumRequeues().qTimestamp(new Date());
        return putToQueue(mo2clone);
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean requeueSilent(IQueueMessage iQueueMessage) {
        return putToQueue(iQueueMessage);
    }

    @Override // com.github.ddth.queue.IQueue
    public void finish(IQueueMessage iQueueMessage) {
    }

    @Override // com.github.ddth.queue.IQueue
    public IQueueMessage take() {
        this.lockTake.lock();
        try {
            if (this.lastFetchedId == null) {
                this.rocksDbIt.seekToFirst();
            } else {
                this.rocksDbIt.seek(this.lastFetchedId);
            }
            if (!this.rocksDbIt.isValid()) {
                return null;
            }
            this.lastFetchedId = this.rocksDbIt.key();
            byte[] value = this.rocksDbIt.value();
            try {
                this.rocksDb.remove(this.lastFetchedId);
            } catch (RocksDBException e) {
            }
            this.rocksDbIt.next();
            return deserialize(value);
        } finally {
            this.lockTake.unlock();
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public Collection<IQueueMessage> getOrphanMessages(long j) {
        return null;
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean moveFromEphemeralToQueueStorage(IQueueMessage iQueueMessage) {
        throw new UnsupportedOperationException("Method [moveFromEphemeralToQueueStorage] is not supported!");
    }

    @Override // com.github.ddth.queue.IQueue
    public int queueSize() {
        return -1;
    }

    @Override // com.github.ddth.queue.IQueue
    public int ephemeralSize() {
        return -1;
    }
}
