package de.caluga.morphium.changestream;

import de.caluga.morphium.AnnotationAndReflectionHelper;
import de.caluga.morphium.Morphium;
import de.caluga.morphium.ObjectMapperImpl;
import de.caluga.morphium.ShutdownListener;
import de.caluga.morphium.driver.Doc;
import de.caluga.morphium.driver.DriverTailableIterationCallback;
import de.caluga.morphium.driver.MorphiumDriver;
import de.caluga.morphium.driver.MorphiumId;
import de.caluga.morphium.driver.commands.WatchCommand;
import de.caluga.morphium.driver.inmem.InMemoryDriver;
import de.caluga.morphium.driver.wire.ConnectionType;
import de.caluga.morphium.driver.wire.SingleMongoConnectDriver;
import de.caluga.morphium.objectmapping.MorphiumObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.bson.types.ObjectId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/caluga/morphium/changestream/ChangeStreamMonitor.class */
public class ChangeStreamMonitor implements Runnable, ShutdownListener {
    private final Collection<ChangeStreamListener> listeners;
    private final Morphium morphium;
    private final Logger log;
    private final String collectionName;
    private final boolean fullDocument;
    private final int maxWait;
    private volatile boolean running;
    private Thread changeStreamThread;
    private final MorphiumObjectMapper mapper;
    private boolean dbOnly;
    private final List<Map<String, Object>> pipeline;
    private MorphiumDriver dedicatedConnection;

    public ChangeStreamMonitor(Morphium morphium) {
        this(morphium, null, false, null);
        this.dbOnly = true;
    }

    public ChangeStreamMonitor(Morphium morphium, List<Map<String, Object>> list) {
        this(morphium, null, false, list);
        this.dbOnly = true;
    }

    public ChangeStreamMonitor(Morphium morphium, Class<?> cls) {
        this(morphium, morphium.getMapper().getCollectionName(cls), false, null);
    }

    public ChangeStreamMonitor(Morphium morphium, Class<?> cls, List<Map<String, Object>> list) {
        this(morphium, morphium.getMapper().getCollectionName(cls), false, null);
    }

    public ChangeStreamMonitor(Morphium morphium, String str, boolean z) {
        this(morphium, str, z, null);
    }

    public ChangeStreamMonitor(Morphium morphium, String str, boolean z, List<Map<String, Object>> list) {
        this(morphium, str, z, morphium.getConfig().getMaxWaitTime(), list);
    }

    public ChangeStreamMonitor(Morphium morphium, String str, boolean z, int i, List<Map<String, Object>> list) {
        this.log = LoggerFactory.getLogger(ChangeStreamMonitor.class);
        this.running = true;
        this.dbOnly = false;
        this.morphium = morphium;
        try {
            if (morphium.getDriver() instanceof InMemoryDriver) {
                this.dedicatedConnection = morphium.getDriver();
            } else {
                this.dedicatedConnection = new SingleMongoConnectDriver().setConnectionType(ConnectionType.PRIMARY);
                this.dedicatedConnection.setDefaultBatchSize(this.morphium.getConfig().getCursorBatchSize());
                this.dedicatedConnection.setMaxWaitTime(this.morphium.getConfig().getMaxWaitTime());
                this.dedicatedConnection.setHostSeed(this.morphium.getConfig().getHostSeed());
                this.dedicatedConnection.setMinConnections(1);
                this.dedicatedConnection.setMaxConnections(3);
                this.dedicatedConnection.setCredentials(this.morphium.getConfig().decryptAuthDb(), this.morphium.getConfig().decryptMongoLogin(), this.morphium.getConfig().decryptMongoPassword());
                this.dedicatedConnection.connect();
                Thread.sleep(1000L);
            }
        } catch (Exception e) {
            if (!e.getMessage().contains("sleep interrupted")) {
                throw new RuntimeException(e);
            }
        }
        this.listeners = new ConcurrentLinkedDeque();
        this.morphium.addShutdownListener(this);
        this.pipeline = list;
        this.collectionName = str;
        this.fullDocument = z;
        if (i != 0) {
            this.maxWait = i;
        } else {
            this.maxWait = morphium.getConfig().getMaxWaitTime();
        }
        this.mapper = new ObjectMapperImpl();
        this.mapper.setAnnotationHelper(new AnnotationAndReflectionHelper(false));
    }

    public void addListener(ChangeStreamListener changeStreamListener) {
        this.listeners.add(changeStreamListener);
    }

    public void removeListener(ChangeStreamListener changeStreamListener) {
        this.listeners.remove(changeStreamListener);
    }

    public boolean isFullDocument() {
        return this.fullDocument;
    }

    public void start() {
        if (this.changeStreamThread != null) {
            throw new RuntimeException("Already running!");
        }
        this.changeStreamThread = new Thread(this);
        this.changeStreamThread.setDaemon(true);
        this.changeStreamThread.setName("changeStream");
        this.changeStreamThread.start();
        this.running = true;
    }

    public boolean isRunning() {
        return this.running;
    }

    public void terminate() {
        this.running = false;
        try {
            try {
                long currentTimeMillis = System.currentTimeMillis();
                try {
                    this.dedicatedConnection.close();
                } catch (Exception e) {
                    this.log.warn("Closing mongo connection error", e.getMessage());
                }
                this.dedicatedConnection = null;
                while (true) {
                    if (this.changeStreamThread == null || !this.changeStreamThread.isAlive()) {
                        break;
                    }
                    try {
                        Thread.sleep(100L);
                    } catch (InterruptedException e2) {
                    }
                    if (System.currentTimeMillis() - currentTimeMillis > this.morphium.getConfig().getReadTimeout()) {
                        this.log.debug("Changestream monitor did not finish before max wait time is over! Interrupting");
                        this.changeStreamThread.interrupt();
                        try {
                            Thread.sleep(100L);
                            break;
                        } catch (InterruptedException e3) {
                        }
                    }
                }
                this.changeStreamThread = null;
                this.listeners.clear();
                this.morphium.removeShutdownListener(this);
            } catch (Throwable th) {
                this.listeners.clear();
                this.morphium.removeShutdownListener(this);
                throw th;
            }
        } catch (Exception e4) {
            this.log.warn("Exception when closing changestreamMonitor", e4.getMessage());
            this.listeners.clear();
            this.morphium.removeShutdownListener(this);
        }
    }

    public String getcollectionName() {
        return this.collectionName;
    }

    /* JADX WARN: Finally extract failed */
    @Override // java.lang.Runnable
    public void run() {
        DriverTailableIterationCallback driverTailableIterationCallback;
        WatchCommand watchCommand = null;
        while (this.running) {
            try {
                try {
                    driverTailableIterationCallback = new DriverTailableIterationCallback() { // from class: de.caluga.morphium.changestream.ChangeStreamMonitor.1
                        @Override // de.caluga.morphium.driver.DriverTailableIterationCallback
                        public void incomingData(Map<String, Object> map, long j) {
                            if (ChangeStreamMonitor.this.running) {
                                Map<String, Object> map2 = (Map) map.get("fullDocument");
                                map.remove("fullDocument");
                                if ((map.get("documentKey") instanceof MorphiumId) || (map.get("documentKey") instanceof ObjectId)) {
                                    map.put("documentKey", Doc.of("_id", map.get("documentKey")));
                                }
                                ChangeStreamEvent changeStreamEvent = (ChangeStreamEvent) ChangeStreamMonitor.this.mapper.deserialize(ChangeStreamEvent.class, map);
                                changeStreamEvent.setFullDocument(map2);
                                ArrayList arrayList = new ArrayList();
                                for (ChangeStreamListener changeStreamListener : ChangeStreamMonitor.this.listeners) {
                                    try {
                                        if (!changeStreamListener.incomingData(changeStreamEvent)) {
                                            arrayList.add(changeStreamListener);
                                        }
                                    } catch (Exception e) {
                                        ChangeStreamMonitor.this.log.error("listener threw exception", e);
                                    }
                                }
                                ChangeStreamMonitor.this.listeners.removeAll(arrayList);
                            }
                        }

                        @Override // de.caluga.morphium.driver.DriverTailableIterationCallback
                        public boolean isContinued() {
                            return ChangeStreamMonitor.this.running;
                        }
                    };
                } catch (Exception e) {
                    if (e.getMessage() == null) {
                        this.log.warn("Restarting changestream", e);
                    } else if (e.getMessage().contains("Network error error: state should be: open")) {
                        this.log.warn("Changstream connection broke - restarting");
                    } else if (e.getMessage().contains("Did not receive OpMsg-Reply in time")) {
                        this.log.debug("changestream iteration");
                    } else if (e.getMessage().contains("closed")) {
                        this.log.warn("connection closed!", e);
                        if (watchCommand != null) {
                            watchCommand.releaseConnection();
                        }
                    } else {
                        this.log.warn("Error in changestream monitor - restarting", e);
                    }
                    if (watchCommand != null) {
                        watchCommand.releaseConnection();
                    }
                }
                if (this.dedicatedConnection != null) {
                    watchCommand = new WatchCommand(this.dedicatedConnection.getPrimaryConnection(null)).setCb(driverTailableIterationCallback).setDb(this.morphium.getDatabase()).setBatchSize(1).setMaxTimeMS(Integer.valueOf(this.morphium.getConfig().getMaxConnectionIdleTime())).setFullDocument(this.fullDocument ? WatchCommand.FullDocumentEnum.updateLookup : WatchCommand.FullDocumentEnum.defaultValue).setPipeline(this.pipeline);
                    if (!this.dbOnly) {
                        watchCommand.setColl(this.collectionName);
                    }
                    watchCommand.watch();
                    if (watchCommand != null) {
                        watchCommand.releaseConnection();
                    }
                } else if (watchCommand != null) {
                    watchCommand.releaseConnection();
                }
            } catch (Throwable th) {
                if (watchCommand != null) {
                    watchCommand.releaseConnection();
                }
                throw th;
            }
        }
        try {
            if (this.dedicatedConnection != null && !(this.dedicatedConnection instanceof InMemoryDriver)) {
                this.dedicatedConnection.close();
            }
        } catch (IOException e2) {
        }
        this.log.debug("ChangeStreamMonitor finished gracefully!");
    }

    @Override // de.caluga.morphium.ShutdownListener
    public void onShutdown(Morphium morphium) {
        terminate();
    }
}
