package com.github.ddth.queue.impl;

import com.github.ddth.queue.IQueueMessage;
import com.github.ddth.queue.impl.rocksdb.RocksDbUtils;
import com.github.ddth.queue.impl.rocksdb.RocksDbWrapper;
import com.github.ddth.queue.utils.QueueException;
import com.github.ddth.queue.utils.QueueUtils;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.io.FileUtils;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.DBOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/ddth/queue/impl/RocksDbQueue.class */
public abstract class RocksDbQueue extends AbstractEphemeralSupportQueue {
    private Logger LOGGER = LoggerFactory.getLogger(RocksDbQueue.class);
    private byte[] lastFetchedId = null;
    private Lock lockPut = new ReentrantLock();
    private Lock lockTake = new ReentrantLock();
    private String storageDir = "/tmp/ddth-rocksdb-queue";
    private String cfNameQueue = "queue";
    private String cfNameMetadata = "metadata";
    private String cfNameEphemeral = "ephemeral";
    private DBOptions dbOptions;
    private ReadOptions readOptions;
    private WriteOptions writeOptions;
    private RocksDbWrapper rocksDbWrapper;
    private WriteBatch batchPutToQueue;
    private WriteBatch batchTake;
    private ColumnFamilyHandle cfQueue;
    private ColumnFamilyHandle cfMetadata;
    private ColumnFamilyHandle cfEphemeral;
    private RocksIterator itQueue;
    private static final byte[] keyLastFetchedId;

    public String getStorageDir() {
        return this.storageDir;
    }

    public RocksDbQueue setStorageDir(String str) {
        this.storageDir = str;
        return this;
    }

    public String getCfNameQueue() {
        return this.cfNameQueue;
    }

    public RocksDbQueue setCfNameQueue(String str) {
        this.cfNameQueue = str;
        return this;
    }

    public String getCfNameMetadata() {
        return this.cfNameMetadata;
    }

    public RocksDbQueue setCfNameMetadata(String str) {
        this.cfNameMetadata = str;
        return this;
    }

    public String getCfNameEphemeral() {
        return this.cfNameEphemeral;
    }

    public RocksDbQueue setCfNameEphemeral(String str) {
        this.cfNameEphemeral = str;
        return this;
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue
    public RocksDbQueue init() {
        File file = new File(this.storageDir);
        this.LOGGER.info("Storage Directory: " + file.getAbsolutePath());
        try {
            FileUtils.forceMkdir(file);
            try {
                this.batchPutToQueue = new WriteBatch();
                this.batchTake = new WriteBatch();
                this.dbOptions = RocksDbUtils.buildDbOptions();
                this.rocksDbWrapper = RocksDbWrapper.openReadWrite(file, this.dbOptions, (ReadOptions) null, (WriteOptions) null, new String[]{this.cfNameEphemeral, this.cfNameMetadata, this.cfNameQueue});
                this.readOptions = this.rocksDbWrapper.getReadOptions();
                this.writeOptions = this.rocksDbWrapper.getWriteOptions();
                this.cfEphemeral = this.rocksDbWrapper.getColumnFamilyHandle(this.cfNameEphemeral);
                this.cfMetadata = this.rocksDbWrapper.getColumnFamilyHandle(this.cfNameMetadata);
                this.cfQueue = this.rocksDbWrapper.getColumnFamilyHandle(this.cfNameQueue);
                this.itQueue = this.rocksDbWrapper.getIterator(this.cfNameQueue);
                this.lastFetchedId = loadLastFetchedId();
                return this;
            } catch (Exception e) {
                destroy();
                if (e instanceof RuntimeException) {
                    throw ((RuntimeException) e);
                }
                throw new RuntimeException(e);
            }
        } catch (IOException e2) {
            throw new RuntimeException(e2);
        }
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue
    public void destroy() {
        try {
            saveLastFetchedId(this.lastFetchedId);
        } catch (Exception e) {
            this.LOGGER.error(e.getMessage(), e);
        }
        try {
            this.rocksDbWrapper.close();
        } catch (Exception e2) {
            this.LOGGER.error(e2.getMessage(), e2);
        }
        RocksDbUtils.closeRocksObjects(this.batchPutToQueue, this.batchTake, this.dbOptions);
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        destroy();
    }

    private byte[] loadLastFetchedId() {
        return this.rocksDbWrapper.get(this.cfMetadata, this.readOptions, keyLastFetchedId);
    }

    private void saveLastFetchedId(byte[] bArr) {
        if (bArr != null) {
            this.rocksDbWrapper.put(this.cfMetadata, this.writeOptions, keyLastFetchedId, bArr);
        }
    }

    protected abstract byte[] serialize(IQueueMessage iQueueMessage);

    protected abstract IQueueMessage deserialize(byte[] bArr);

    protected boolean putToQueue(IQueueMessage iQueueMessage, boolean z) {
        byte[] serialize = serialize(iQueueMessage);
        this.lockPut.lock();
        try {
            try {
                this.batchPutToQueue.put(this.cfQueue, QueueUtils.IDGEN.generateId128Hex().toLowerCase().getBytes(QueueUtils.UTF8), serialize);
                if (z && !isEphemeralDisabled()) {
                    this.batchPutToQueue.remove(this.cfEphemeral, iQueueMessage.qId().toString().getBytes(QueueUtils.UTF8));
                }
                this.rocksDbWrapper.write(this.writeOptions, this.batchPutToQueue);
                this.batchPutToQueue.clear();
                return true;
            } catch (Throwable th) {
                this.batchPutToQueue.clear();
                throw th;
            }
        } finally {
            this.lockPut.unlock();
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean queue(IQueueMessage iQueueMessage) {
        IQueueMessage mo5clone = iQueueMessage.mo5clone();
        Date date = new Date();
        mo5clone.qNumRequeues(0).qOriginalTimestamp(date).qTimestamp(date);
        return putToQueue(mo5clone, false);
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean requeue(IQueueMessage iQueueMessage) {
        IQueueMessage mo5clone = iQueueMessage.mo5clone();
        mo5clone.qIncNumRequeues().qTimestamp(new Date());
        return putToQueue(mo5clone, true);
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean requeueSilent(IQueueMessage iQueueMessage) {
        return putToQueue(iQueueMessage.mo5clone(), true);
    }

    @Override // com.github.ddth.queue.IQueue
    public void finish(IQueueMessage iQueueMessage) {
        if (isEphemeralDisabled()) {
            return;
        }
        this.rocksDbWrapper.delete(this.cfEphemeral, this.writeOptions, iQueueMessage.qId().toString().getBytes(QueueUtils.UTF8));
    }

    @Override // com.github.ddth.queue.IQueue
    public IQueueMessage take() throws QueueException.EphemeralIsFull {
        int ephemeralMaxSize;
        if (!isEphemeralDisabled() && (ephemeralMaxSize = getEphemeralMaxSize()) > 0 && ephemeralSize() >= ephemeralMaxSize) {
            throw new QueueException.EphemeralIsFull(ephemeralMaxSize);
        }
        this.lockTake.lock();
        try {
            if (this.lastFetchedId == null) {
                this.itQueue.seekToFirst();
            } else {
                this.itQueue.seek(this.lastFetchedId);
            }
            if (!this.itQueue.isValid()) {
                return null;
            }
            this.lastFetchedId = this.itQueue.key();
            byte[] value = this.itQueue.value();
            IQueueMessage deserialize = deserialize(value);
            try {
                this.batchTake.remove(this.cfQueue, this.lastFetchedId);
                this.batchTake.put(this.cfMetadata, keyLastFetchedId, this.lastFetchedId);
                if (!isEphemeralDisabled() && deserialize != null) {
                    this.batchTake.put(this.cfEphemeral, deserialize.qId().toString().getBytes(QueueUtils.UTF8), value);
                }
                this.rocksDbWrapper.write(this.writeOptions, this.batchTake);
                this.batchTake.clear();
                this.itQueue.next();
                this.lockTake.unlock();
                return deserialize;
            } catch (Throwable th) {
                this.batchTake.clear();
                throw th;
            }
        } finally {
            this.lockTake.unlock();
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public Collection<IQueueMessage> getOrphanMessages(long j) throws QueueException.OperationNotSupported {
        throw new QueueException.OperationNotSupported();
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean moveFromEphemeralToQueueStorage(IQueueMessage iQueueMessage) {
        return putToQueue(iQueueMessage, true);
    }

    @Override // com.github.ddth.queue.IQueue
    public int queueSize() {
        return (int) this.rocksDbWrapper.getEstimateNumKeys(this.cfNameQueue);
    }

    @Override // com.github.ddth.queue.IQueue
    public int ephemeralSize() {
        return (int) this.rocksDbWrapper.getEstimateNumKeys(this.cfNameEphemeral);
    }

    static {
        RocksDB.loadLibrary();
        keyLastFetchedId = "last-fetched-id".getBytes(QueueUtils.UTF8);
    }
}
