package com.github.ddth.queue.impl;

import com.github.ddth.commons.rocksdb.RocksDbUtils;
import com.github.ddth.commons.rocksdb.RocksDbWrapper;
import com.github.ddth.queue.IQueueMessage;
import com.github.ddth.queue.impl.AbstractQueue;
import com.github.ddth.queue.internal.utils.QueueUtils;
import com.github.ddth.queue.utils.QueueException;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.DBOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.RocksObject;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/ddth/queue/impl/RocksDbQueue.class */
public abstract class RocksDbQueue<ID, DATA> extends AbstractEphemeralSupportQueue<ID, DATA> {
    public static final String DEFAULT_STORAGE_DIR = "/tmp/ddth-rocksdb-queue";
    public static final String DEFAULT_CFNAME_QUEUE = "queue";
    public static final String DEFAULT_CFNAME_METADATA = "metadata";
    public static final String DEFAULT_CFNAME_EPHEMERAL = "ephemeral";
    private String storageDir;
    private DBOptions dbOptions;
    private ReadOptions readOptions;
    private WriteOptions writeOptions;
    private RocksDbWrapper rocksDbWrapper;
    private WriteBatch batchPutToQueue;
    private WriteBatch batchTake;
    private ColumnFamilyHandle cfQueue;
    private ColumnFamilyHandle cfMetadata;
    private ColumnFamilyHandle cfEphemeral;
    private RocksIterator itQueue;
    private RocksIterator itEphemeral;
    private static final String keyLastFetchedId = "last-fetched-id";
    private static final byte[] keyLastFetchedIdBytes;
    private final Logger LOGGER = LoggerFactory.getLogger(RocksDbQueue.class);
    private byte[] lastFetchedId = null;
    private Lock lockPut = new ReentrantLock();
    private Lock lockTake = new ReentrantLock();
    private String cfNameQueue = DEFAULT_CFNAME_QUEUE;
    private String cfNameMetadata = DEFAULT_CFNAME_METADATA;
    private String cfNameEphemeral = DEFAULT_CFNAME_EPHEMERAL;

    public String getStorageDir() {
        return this.storageDir;
    }

    public RocksDbQueue<ID, DATA> setStorageDir(String str) {
        this.storageDir = str;
        return this;
    }

    public String getCfNameQueue() {
        return this.cfNameQueue;
    }

    public RocksDbQueue<ID, DATA> setCfNameQueue(String str) {
        this.cfNameQueue = str;
        return this;
    }

    public String getCfNameMetadata() {
        return this.cfNameMetadata;
    }

    public RocksDbQueue<ID, DATA> setCfNameMetadata(String str) {
        this.cfNameMetadata = str;
        return this;
    }

    public String getCfNameEphemeral() {
        return this.cfNameEphemeral;
    }

    public RocksDbQueue<ID, DATA> setCfNameEphemeral(String str) {
        this.cfNameEphemeral = str;
        return this;
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue
    public RocksDbQueue<ID, DATA> init() throws Exception {
        if (StringUtils.isBlank(this.storageDir)) {
            this.storageDir = "/tmp/ddth-rocksdb-queue" + File.pathSeparator + getQueueName();
        }
        File file = new File(this.storageDir);
        this.LOGGER.info("Storage Directory: " + file.getAbsolutePath());
        try {
            FileUtils.forceMkdir(file);
            try {
                this.batchPutToQueue = new WriteBatch();
                this.batchTake = new WriteBatch();
                this.dbOptions = RocksDbUtils.defaultDbOptions();
                this.rocksDbWrapper = RocksDbWrapper.openReadWrite(file, this.dbOptions, (ReadOptions) null, (WriteOptions) null, new String[]{this.cfNameEphemeral, this.cfNameMetadata, this.cfNameQueue});
                this.readOptions = this.rocksDbWrapper.getReadOptions();
                this.writeOptions = this.rocksDbWrapper.getWriteOptions();
                this.cfEphemeral = this.rocksDbWrapper.getColumnFamilyHandle(this.cfNameEphemeral);
                this.cfMetadata = this.rocksDbWrapper.getColumnFamilyHandle(this.cfNameMetadata);
                this.cfQueue = this.rocksDbWrapper.getColumnFamilyHandle(this.cfNameQueue);
                this.itQueue = this.rocksDbWrapper.getIterator(this.cfNameQueue);
                this.itEphemeral = this.rocksDbWrapper.getIterator(this.cfNameEphemeral);
                this.lastFetchedId = loadLastFetchedId();
                super.init();
                return this;
            } catch (Exception e) {
                destroy();
                if (e instanceof RuntimeException) {
                    throw ((RuntimeException) e);
                }
                throw new RuntimeException(e);
            }
        } catch (IOException e2) {
            throw new RuntimeException(e2);
        }
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue
    public void destroy() {
        try {
            super.destroy();
            try {
                saveLastFetchedId(this.lastFetchedId);
            } catch (Exception e) {
                this.LOGGER.error(e.getMessage(), e);
            }
            RocksDbUtils.closeRocksObjects(new RocksObject[]{this.batchPutToQueue, this.batchTake, this.dbOptions});
            try {
                this.rocksDbWrapper.close();
            } catch (Exception e2) {
                this.LOGGER.error(e2.getMessage(), e2);
            }
        } catch (Throwable th) {
            try {
                saveLastFetchedId(this.lastFetchedId);
            } catch (Exception e3) {
                this.LOGGER.error(e3.getMessage(), e3);
            }
            RocksDbUtils.closeRocksObjects(new RocksObject[]{this.batchPutToQueue, this.batchTake, this.dbOptions});
            try {
                this.rocksDbWrapper.close();
            } catch (Exception e4) {
                this.LOGGER.error(e4.getMessage(), e4);
            }
            throw th;
        }
    }

    private byte[] loadLastFetchedId() {
        return this.rocksDbWrapper.get(this.cfNameMetadata, this.readOptions, keyLastFetchedId);
    }

    private void saveLastFetchedId(byte[] bArr) {
        if (bArr != null) {
            this.rocksDbWrapper.put(this.cfNameMetadata, this.writeOptions, keyLastFetchedId, bArr);
        }
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue
    protected boolean doPutToQueue(IQueueMessage<ID, DATA> iQueueMessage, AbstractQueue.PutToQueueCase putToQueueCase) {
        byte[] serialize = serialize(iQueueMessage);
        this.lockPut.lock();
        try {
            try {
                try {
                    this.batchPutToQueue.put(this.cfQueue, QueueUtils.IDGEN.generateId128Hex().toLowerCase().getBytes(StandardCharsets.UTF_8), serialize);
                    if (putToQueueCase != null && putToQueueCase != AbstractQueue.PutToQueueCase.NEW && !isEphemeralDisabled()) {
                        this.batchPutToQueue.delete(this.cfEphemeral, iQueueMessage.getId().toString().getBytes(StandardCharsets.UTF_8));
                    }
                    this.rocksDbWrapper.write(this.writeOptions, this.batchPutToQueue);
                    this.batchPutToQueue.clear();
                    return true;
                } catch (RocksDBException e) {
                    throw new QueueException((Throwable) e);
                }
            } catch (Throwable th) {
                this.batchPutToQueue.clear();
                throw th;
            }
        } finally {
            this.lockPut.unlock();
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public void finish(IQueueMessage<ID, DATA> iQueueMessage) {
        if (isEphemeralDisabled()) {
            return;
        }
        this.rocksDbWrapper.delete(this.cfNameEphemeral, this.writeOptions, iQueueMessage.getId().toString());
    }

    @Override // com.github.ddth.queue.IQueue
    public IQueueMessage<ID, DATA> take() throws QueueException.EphemeralIsFull {
        int ephemeralMaxSize;
        if (!isEphemeralDisabled() && (ephemeralMaxSize = getEphemeralMaxSize()) > 0 && ephemeralSize() >= ephemeralMaxSize) {
            throw new QueueException.EphemeralIsFull(ephemeralMaxSize);
        }
        this.lockTake.lock();
        try {
            if (this.lastFetchedId == null) {
                this.itQueue.seekToFirst();
            } else {
                this.itQueue.seek(this.lastFetchedId);
            }
            if (!this.itQueue.isValid()) {
                return null;
            }
            this.lastFetchedId = this.itQueue.key();
            byte[] value = this.itQueue.value();
            IQueueMessage<ID, DATA> deserialize = deserialize(value);
            try {
                try {
                    this.batchTake.delete(this.cfQueue, this.lastFetchedId);
                    this.batchTake.put(this.cfMetadata, keyLastFetchedIdBytes, this.lastFetchedId);
                    if (!isEphemeralDisabled() && deserialize != null) {
                        this.batchTake.put(this.cfEphemeral, deserialize.getId().toString().getBytes(StandardCharsets.UTF_8), value);
                    }
                    this.rocksDbWrapper.write(this.writeOptions, this.batchTake);
                    this.batchTake.clear();
                    this.itQueue.next();
                    this.lockTake.unlock();
                    return deserialize;
                } catch (Throwable th) {
                    this.batchTake.clear();
                    throw th;
                }
            } catch (RocksDBException e) {
                throw new QueueException((Throwable) e);
            }
        } finally {
            this.lockTake.unlock();
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public Collection<IQueueMessage<ID, DATA>> getOrphanMessages(long j) {
        HashSet hashSet = new HashSet();
        if (!isEphemeralDisabled()) {
            synchronized (this.itEphemeral) {
                long currentTimeMillis = System.currentTimeMillis();
                this.itEphemeral.seekToFirst();
                while (this.itEphemeral.isValid()) {
                    IQueueMessage<ID, DATA> deserialize = deserialize(this.itEphemeral.value());
                    if (deserialize.getQueueTimestamp().getTime() + j < currentTimeMillis) {
                        hashSet.add(deserialize);
                    }
                    this.itEphemeral.next();
                }
            }
        }
        return hashSet;
    }

    @Override // com.github.ddth.queue.IQueue
    public int queueSize() {
        return (int) this.rocksDbWrapper.getEstimateNumKeys(this.cfNameQueue);
    }

    @Override // com.github.ddth.queue.IQueue
    public int ephemeralSize() {
        return (int) this.rocksDbWrapper.getEstimateNumKeys(this.cfNameEphemeral);
    }

    static {
        RocksDB.loadLibrary();
        keyLastFetchedIdBytes = keyLastFetchedId.getBytes(StandardCharsets.UTF_8);
    }
}
