package com.github.ddth.pubsub.impl;

import com.github.ddth.pubsub.ISubscriber;
import com.github.ddth.pubsub.internal.utils.MongoUtils;
import com.github.ddth.queue.IMessage;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.mongodb.CursorType;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.CreateCollectionOptions;
import com.mongodb.client.model.Sorts;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.StringUtils;
import org.bson.Document;
import org.bson.types.Binary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/ddth/pubsub/impl/MongodbPubSubHub.class */
public class MongodbPubSubHub<ID, DATA> extends AbstractPubSubHub<ID, DATA> {
    public static final String DEFAULT_CONN_STR = "mongodb://localhost:27017/local";
    public static final String DEFAULT_DATABASE_NAME = "ddth_pubsub";
    public static final long DEFAULT_MAX_DOCUMENTS = Runtime.getRuntime().availableProcessors() * 2;
    public static final long DEFAULT_MAX_COLLECTION_SIZE = DEFAULT_MAX_DOCUMENTS * 1024;
    private MongoClient mongoClient;
    private MongoDatabase database;
    public static final String COLLECTION_FIELD_ID = "id";
    public static final String COLLECTION_FIELD_TIME = "time";
    public static final String COLLECTION_FIELD_DATA = "data";
    private final Logger LOGGER = LoggerFactory.getLogger(MongodbPubSubHub.class);
    private boolean myOwnMongoClient = true;
    private String connectionString = "mongodb://localhost:27017/local";
    private String databaseName = DEFAULT_DATABASE_NAME;
    private long maxDocuments = DEFAULT_MAX_DOCUMENTS;
    private long maxCollectionSize = DEFAULT_MAX_COLLECTION_SIZE;
    private LoadingCache<String, MongodbPubSubHub<ID, DATA>.MongoPubSubGateway> cursors = CacheBuilder.newBuilder().removalListener(removalNotification -> {
        ((MongoPubSubGateway) removalNotification.getValue()).destroy();
    }).build(new CacheLoader<String, MongodbPubSubHub<ID, DATA>.MongoPubSubGateway>() { // from class: com.github.ddth.pubsub.impl.MongodbPubSubHub.1
        public MongodbPubSubHub<ID, DATA>.MongoPubSubGateway load(String str) {
            MongodbPubSubHub<ID, DATA>.MongoPubSubGateway mongoPubSubGateway = new MongoPubSubGateway(str);
            mongoPubSubGateway.run();
            return mongoPubSubGateway;
        }
    });

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/ddth/pubsub/impl/MongodbPubSubHub$MongoPubSubGateway.class */
    public class MongoPubSubGateway {
        private MongoCollection<Document> collection;
        private MongoCursor<Document> cursor;
        private String channel;
        private boolean stopped = false;
        private Set<ISubscriber<ID, DATA>> subscriptions = new HashSet();

        public MongoPubSubGateway(String str) {
            this.channel = str;
        }

        public boolean publish(IMessage<ID, DATA> iMessage) {
            this.collection.insertOne(MongodbPubSubHub.this.toDocument(iMessage));
            return true;
        }

        public void subscribe(ISubscriber<ID, DATA> iSubscriber) {
            synchronized (this.subscriptions) {
                this.subscriptions.add(iSubscriber);
            }
        }

        public void unsubscribe(ISubscriber<ID, DATA> iSubscriber) {
            synchronized (this.subscriptions) {
                this.subscriptions.remove(iSubscriber);
            }
        }

        public void run() {
            if (MongoUtils.collectionExists(MongodbPubSubHub.this.getDatabase(), this.channel)) {
                this.collection = MongodbPubSubHub.this.getDatabase().getCollection(this.channel);
            } else {
                this.collection = MongoUtils.createCollection(MongodbPubSubHub.this.getDatabase(), this.channel, new CreateCollectionOptions().capped(true).maxDocuments(MongodbPubSubHub.this.getMaxDocuments()).sizeInBytes(MongodbPubSubHub.this.getMaxCollectionSize()));
                publish(MongodbPubSubHub.this.createMessage(null, null));
            }
            this.cursor = this.collection.find().cursorType(CursorType.TailableAwait).sort(Sorts.ascending(new String[]{"$natural"})).iterator();
            new Thread(() -> {
                LinkedList linkedList;
                while (!this.stopped) {
                    Document document = null;
                    try {
                        try {
                            document = (Document) this.cursor.next();
                        } catch (IllegalStateException e) {
                            MongodbPubSubHub.this.LOGGER.warn(e.getMessage(), e);
                        }
                        IMessage<ID, DATA> fromDocument = MongodbPubSubHub.this.fromDocument(document);
                        if (fromDocument != null && (fromDocument.getId() != null || fromDocument.getData() != null)) {
                            synchronized (this.subscriptions) {
                                linkedList = new LinkedList(this.subscriptions);
                            }
                            try {
                                linkedList.forEach(iSubscriber -> {
                                    iSubscriber.onMessage(this.channel, fromDocument);
                                });
                            } catch (Exception e2) {
                                MongodbPubSubHub.this.LOGGER.warn(e2.getMessage(), e2);
                            }
                        }
                    } catch (Exception e3) {
                        MongodbPubSubHub.this.LOGGER.error(e3.getMessage(), e3);
                    }
                }
            }).start();
        }

        public void destroy() {
            try {
                this.stopped = true;
                this.cursor.close();
            } catch (Exception e) {
                MongodbPubSubHub.this.LOGGER.warn(e.getMessage(), e);
            }
        }
    }

    public long getMaxDocuments() {
        return this.maxDocuments;
    }

    public MongodbPubSubHub<ID, DATA> setMaxDocuments(long j) {
        this.maxDocuments = j;
        return this;
    }

    public long getMaxCollectionSize() {
        return this.maxCollectionSize;
    }

    public MongodbPubSubHub<ID, DATA> setMaxCollectionSize(long j) {
        this.maxCollectionSize = j;
        return this;
    }

    public String getConnectionString() {
        return this.connectionString;
    }

    public MongodbPubSubHub<ID, DATA> setConnectionString(String str) {
        this.connectionString = str;
        return this;
    }

    public String getDatabaseName() {
        return this.databaseName;
    }

    public MongodbPubSubHub<ID, DATA> setDatabaseName(String str) {
        this.databaseName = str;
        return this;
    }

    protected MongoClient getMongoClient() {
        return this.mongoClient;
    }

    public MongodbPubSubHub<ID, DATA> setMongoClient(MongoClient mongoClient) {
        return setMongoClient(mongoClient, false);
    }

    protected MongodbPubSubHub<ID, DATA> setMongoClient(MongoClient mongoClient, boolean z) {
        if (this.myOwnMongoClient && this.mongoClient != null) {
            this.mongoClient.close();
        }
        this.mongoClient = mongoClient;
        this.myOwnMongoClient = z;
        return this;
    }

    protected MongoDatabase getDatabase() {
        if (this.database == null) {
            synchronized (this) {
                if (this.database == null) {
                    this.database = this.mongoClient.getDatabase(getDatabaseName());
                }
            }
        }
        return this.database;
    }

    protected Document toDocument(IMessage<ID, DATA> iMessage) {
        return new Document("id", iMessage.getId()).append("time", iMessage.getTimestamp()).append("data", serialize(iMessage));
    }

    protected IMessage<ID, DATA> fromDocument(Document document) {
        if (document != null) {
            return deserialize(((Binary) document.get("data", Binary.class)).getData());
        }
        return null;
    }

    protected MongoClient buildMongoClient() {
        String connectionString = getConnectionString();
        if (StringUtils.isBlank(connectionString)) {
            throw new IllegalStateException("MongoDB ConnectionString is not defined.");
        }
        return MongoClients.create(connectionString);
    }

    @Override // com.github.ddth.pubsub.impl.AbstractPubSubHub
    public MongodbPubSubHub<ID, DATA> init() {
        if (getMongoClient() == null) {
            setMongoClient(buildMongoClient(), true);
        }
        super.init();
        if (getMongoClient() == null) {
            throw new IllegalStateException("MongoDB Client is null.");
        }
        return this;
    }

    @Override // com.github.ddth.pubsub.impl.AbstractPubSubHub
    public void destroy() {
        try {
            super.destroy();
            try {
                this.cursors.invalidateAll();
            } catch (Exception e) {
                this.LOGGER.warn(e.getMessage(), e);
            }
            if (this.mongoClient == null || !this.myOwnMongoClient) {
                return;
            }
            try {
                this.mongoClient.close();
            } catch (Exception e2) {
                this.LOGGER.warn(e2.getMessage(), e2);
            } finally {
            }
        } catch (Throwable th) {
            try {
                this.cursors.invalidateAll();
            } catch (Exception e3) {
                this.LOGGER.warn(e3.getMessage(), e3);
            }
            if (this.mongoClient != null && this.myOwnMongoClient) {
                try {
                    try {
                        this.mongoClient.close();
                    } catch (Exception e4) {
                        this.LOGGER.warn(e4.getMessage(), e4);
                        this.mongoClient = null;
                        throw th;
                    }
                } finally {
                }
            }
            throw th;
        }
    }

    @Override // com.github.ddth.pubsub.IPubSubHub
    public boolean publish(String str, IMessage<ID, DATA> iMessage) {
        try {
            return ((MongoPubSubGateway) this.cursors.get(str)).publish(iMessage);
        } catch (ExecutionException e) {
            this.LOGGER.error(e.getMessage(), e);
            return false;
        }
    }

    @Override // com.github.ddth.pubsub.IPubSubHub
    public void subscribe(String str, ISubscriber<ID, DATA> iSubscriber) {
        try {
            ((MongoPubSubGateway) this.cursors.get(str)).subscribe(iSubscriber);
        } catch (ExecutionException e) {
            this.LOGGER.error(e.getMessage(), e);
        }
    }

    @Override // com.github.ddth.pubsub.IPubSubHub
    public void unsubscribe(String str, ISubscriber<ID, DATA> iSubscriber) {
        try {
            ((MongoPubSubGateway) this.cursors.get(str)).unsubscribe(iSubscriber);
        } catch (ExecutionException e) {
            this.LOGGER.error(e.getMessage(), e);
        }
    }
}
