package com.github.ddth.queue.impl;

import com.github.ddth.queue.IQueueMessage;
import com.github.ddth.queue.impl.AbstractQueue;
import com.github.ddth.queue.internal.utils.MongoUtils;
import com.github.ddth.queue.internal.utils.QueueUtils;
import com.github.ddth.queue.utils.QueueException;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.FindOneAndDeleteOptions;
import com.mongodb.client.model.FindOneAndUpdateOptions;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.model.Sorts;
import com.mongodb.client.model.Updates;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import org.apache.commons.lang3.StringUtils;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.Binary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/ddth/queue/impl/MongodbQueue.class */
public abstract class MongodbQueue<ID, DATA> extends AbstractEphemeralSupportQueue<ID, DATA> {
    public static final String DEFAULT_CONN_STR = "mongodb://localhost:27017/local";
    public static final String DEFAULT_COLLECTION_NAME = "ddth_queue";
    private MongoClient mongoClient;
    private String databaseName;
    private MongoDatabase database;
    private MongoCollection<Document> collection;
    public static final String COLLECTION_FIELD_ID = "id";
    public static final String COLLECTION_FIELD_TIME = "time";
    public static final String COLLECTION_FIELD_QUEUE_TIME = "queue_time";
    public static final String COLLECTION_FIELD_QUEUE_DATA = "data";
    private static final ReplaceOptions REPLACE_OPTIONS = new ReplaceOptions().upsert(true);
    public static final String COLLECTION_FIELD_EPHEMERAL_KEY = "ekey";
    private static final Bson FILTER_TAKE = Filters.eq(COLLECTION_FIELD_EPHEMERAL_KEY, (Object) null);
    private static final FindOneAndUpdateOptions TAKE_OPTIONS = new FindOneAndUpdateOptions().sort(Sorts.ascending(new String[]{"queue_time"}));
    private static final FindOneAndDeleteOptions TAKE_EPHEMERAL_DISABLED_OPTIONS = new FindOneAndDeleteOptions().sort(Sorts.ascending(new String[]{"queue_time"}));
    private final Logger LOGGER = LoggerFactory.getLogger(MongodbQueue.class);
    private boolean myOwnMongoClient = true;
    private String connectionString = "mongodb://localhost:27017/local";
    private String collectionName = DEFAULT_COLLECTION_NAME;

    public String getConnectionString() {
        return this.connectionString;
    }

    public MongodbQueue<ID, DATA> setConnectionString(String str) {
        this.connectionString = str;
        return this;
    }

    public String getDatabaseName() {
        return this.databaseName;
    }

    public MongodbQueue<ID, DATA> setDatabaseName(String str) {
        this.databaseName = str;
        return this;
    }

    public String getCollectionName() {
        return this.collectionName;
    }

    public MongodbQueue<ID, DATA> setCollectionName(String str) {
        this.collectionName = str;
        return this;
    }

    protected MongoClient getMongoClient() {
        return this.mongoClient;
    }

    public MongodbQueue<ID, DATA> setMongoClient(MongoClient mongoClient) {
        return setMongoClient(mongoClient, false);
    }

    protected MongodbQueue<ID, DATA> setMongoClient(MongoClient mongoClient, boolean z) {
        if (this.myOwnMongoClient && this.mongoClient != null) {
            this.mongoClient.close();
        }
        this.mongoClient = mongoClient;
        this.myOwnMongoClient = z;
        return this;
    }

    protected MongoDatabase getDatabase() {
        if (this.database == null) {
            synchronized (this) {
                if (this.database == null) {
                    this.database = this.mongoClient.getDatabase(getDatabaseName());
                }
            }
        }
        return this.database;
    }

    protected MongoCollection<Document> getCollection() {
        if (this.collection == null) {
            synchronized (this) {
                if (this.collection == null) {
                    this.collection = getDatabase().getCollection(getCollectionName());
                }
            }
        }
        return this.collection;
    }

    protected void initCollection() {
        if (MongoUtils.collectionExists(getDatabase(), getCollectionName())) {
            return;
        }
        this.LOGGER.info("Creating collection [" + getCollectionName() + "]...");
        MongoUtils.createCollection(getDatabase(), getCollectionName(), null);
        MongoCollection<Document> collection = getCollection();
        this.LOGGER.info("Creating index for field [" + getCollectionName() + ".id]...");
        collection.createIndex(new Document().append("id", 1), new IndexOptions().unique(true));
        this.LOGGER.info("Creating index for field [" + getCollectionName() + ".ekey]...");
        collection.createIndex(new Document().append(COLLECTION_FIELD_EPHEMERAL_KEY, 1), new IndexOptions());
        this.LOGGER.info("Creating index for field [" + getCollectionName() + ".queue_time]...");
        collection.createIndex(new Document().append("queue_time", 1), new IndexOptions());
        this.LOGGER.info("Creating index for field [" + getCollectionName() + ".time]...");
        collection.createIndex(new Document().append("time", 1), new IndexOptions());
    }

    protected MongoClient buildMongoClient() {
        String connectionString = getConnectionString();
        if (StringUtils.isBlank(connectionString)) {
            throw new IllegalStateException("MongoDB ConnectionString is not defined.");
        }
        return MongoClients.create(connectionString);
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue
    public MongodbQueue<ID, DATA> init() throws Exception {
        if (getMongoClient() == null) {
            setMongoClient(buildMongoClient(), true);
        }
        super.init();
        if (getMongoClient() == null) {
            throw new IllegalStateException("MongoDB Client is null.");
        }
        initCollection();
        return this;
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue
    public void destroy() {
        try {
            super.destroy();
            if (this.mongoClient != null) {
                try {
                } catch (Exception e) {
                    this.LOGGER.warn(e.getMessage(), e);
                } finally {
                }
                if (this.myOwnMongoClient) {
                    this.mongoClient.close();
                }
            }
        } catch (Throwable th) {
            if (this.mongoClient != null && this.myOwnMongoClient) {
                try {
                    try {
                        this.mongoClient.close();
                    } catch (Exception e2) {
                        this.LOGGER.warn(e2.getMessage(), e2);
                        this.mongoClient = null;
                        throw th;
                    }
                } finally {
                }
            }
            throw th;
        }
    }

    protected Document toDocument(IQueueMessage<ID, DATA> iQueueMessage) {
        return new Document("id", iQueueMessage.getId()).append(COLLECTION_FIELD_EPHEMERAL_KEY, (Object) null).append("time", iQueueMessage.getTimestamp()).append("queue_time", iQueueMessage.getQueueTimestamp()).append("data", serialize(iQueueMessage));
    }

    protected IQueueMessage<ID, DATA> fromDocument(Document document) {
        if (document != null) {
            return deserialize(((Binary) document.get("data", Binary.class)).getData());
        }
        return null;
    }

    protected boolean upsertToCollection(IQueueMessage<ID, DATA> iQueueMessage) {
        getCollection().replaceOne(Filters.eq("id", iQueueMessage.getId()), toDocument(iQueueMessage), REPLACE_OPTIONS);
        return true;
    }

    protected boolean insertToCollection(IQueueMessage<ID, DATA> iQueueMessage) {
        getCollection().insertOne(toDocument(iQueueMessage));
        return true;
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue
    protected boolean doPutToQueue(IQueueMessage<ID, DATA> iQueueMessage, AbstractQueue.PutToQueueCase putToQueueCase) {
        return (putToQueueCase == null || putToQueueCase == AbstractQueue.PutToQueueCase.NEW || isEphemeralDisabled()) ? insertToCollection(iQueueMessage) : upsertToCollection(iQueueMessage);
    }

    @Override // com.github.ddth.queue.IQueue
    public void finish(IQueueMessage<ID, DATA> iQueueMessage) {
        if (isEphemeralDisabled()) {
            return;
        }
        getCollection().deleteOne(Filters.and(new Bson[]{Filters.ne(COLLECTION_FIELD_EPHEMERAL_KEY, (Object) null), Filters.eq("id", iQueueMessage.getId())}));
    }

    @Override // com.github.ddth.queue.IQueue
    public IQueueMessage<ID, DATA> take() throws QueueException.EphemeralIsFull {
        Document document;
        if (isEphemeralDisabled()) {
            document = (Document) getCollection().findOneAndDelete(FILTER_TAKE, TAKE_EPHEMERAL_DISABLED_OPTIONS);
        } else {
            int ephemeralMaxSize = getEphemeralMaxSize();
            if (ephemeralMaxSize > 0 && ephemeralSize() >= ephemeralMaxSize) {
                throw new QueueException.EphemeralIsFull(ephemeralMaxSize);
            }
            document = (Document) getCollection().findOneAndUpdate(FILTER_TAKE, Updates.set(COLLECTION_FIELD_EPHEMERAL_KEY, QueueUtils.IDGEN.generateId128Hex()), TAKE_OPTIONS);
        }
        return fromDocument(document);
    }

    @Override // com.github.ddth.queue.IQueue
    public Collection<IQueueMessage<ID, DATA>> getOrphanMessages(long j) {
        HashSet hashSet = new HashSet();
        if (!isEphemeralDisabled()) {
            getCollection().find(Filters.and(new Bson[]{Filters.ne(COLLECTION_FIELD_EPHEMERAL_KEY, (Object) null), Filters.lte("queue_time", new Date(System.currentTimeMillis() - j))})).forEach(document -> {
                hashSet.add(fromDocument(document));
            });
        }
        return hashSet;
    }

    @Override // com.github.ddth.queue.IQueue
    public int queueSize() {
        return (int) getCollection().countDocuments(Filters.eq(COLLECTION_FIELD_EPHEMERAL_KEY, (Object) null));
    }

    @Override // com.github.ddth.queue.IQueue
    public int ephemeralSize() {
        if (isEphemeralDisabled()) {
            return 0;
        }
        return (int) getCollection().countDocuments(Filters.ne(COLLECTION_FIELD_EPHEMERAL_KEY, (Object) null));
    }
}
