package de.caluga.morphium.changestream;

import de.caluga.morphium.AnnotationAndReflectionHelper;
import de.caluga.morphium.Morphium;
import de.caluga.morphium.MorphiumObjectMapper;
import de.caluga.morphium.ObjectMapperImpl;
import de.caluga.morphium.ShutdownListener;
import de.caluga.morphium.driver.DriverTailableIterationCallback;
import de.caluga.morphium.driver.MorphiumDriverException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/caluga/morphium/changestream/ChangeStreamMonitor.class */
public class ChangeStreamMonitor implements Runnable, ShutdownListener {
    private final Collection<ChangeStreamListener> listeners;
    private final Morphium morphium;
    private final Logger log;
    private final String collectionName;
    private final boolean fullDocument;
    private final int maxWait;
    private volatile boolean running;
    private Thread changeStreamThread;
    private final MorphiumObjectMapper mapper;
    private boolean dbOnly;
    private final List<Map<String, Object>> pipeline;

    public ChangeStreamMonitor(Morphium morphium) {
        this(morphium, null, false, null);
        this.dbOnly = true;
    }

    public ChangeStreamMonitor(Morphium morphium, List<Map<String, Object>> list) {
        this(morphium, null, false, list);
        this.dbOnly = true;
    }

    public ChangeStreamMonitor(Morphium morphium, Class<?> cls) {
        this(morphium, morphium.getMapper().getCollectionName(cls), false, null);
    }

    public ChangeStreamMonitor(Morphium morphium, Class<?> cls, List<Map<String, Object>> list) {
        this(morphium, morphium.getMapper().getCollectionName(cls), false, null);
    }

    public ChangeStreamMonitor(Morphium morphium, String str, boolean z) {
        this(morphium, str, z, null);
    }

    public ChangeStreamMonitor(Morphium morphium, String str, boolean z, List<Map<String, Object>> list) {
        this(morphium, str, z, morphium.getConfig().getMaxWaitTime(), list);
    }

    public ChangeStreamMonitor(Morphium morphium, String str, boolean z, int i, List<Map<String, Object>> list) {
        this.log = LoggerFactory.getLogger(ChangeStreamMonitor.class);
        this.running = true;
        this.dbOnly = false;
        this.morphium = morphium;
        this.listeners = new ConcurrentLinkedDeque();
        this.morphium.addShutdownListener(this);
        this.pipeline = list;
        this.collectionName = str;
        this.fullDocument = z;
        if (i != 0) {
            this.maxWait = i;
        } else {
            this.maxWait = morphium.getConfig().getMaxWaitTime();
        }
        this.mapper = new ObjectMapperImpl();
        this.mapper.setAnnotationHelper(new AnnotationAndReflectionHelper(false));
    }

    public void addListener(ChangeStreamListener changeStreamListener) {
        this.listeners.add(changeStreamListener);
    }

    public void removeListener(ChangeStreamListener changeStreamListener) {
        this.listeners.remove(changeStreamListener);
    }

    public boolean isFullDocument() {
        return this.fullDocument;
    }

    public void start() {
        if (this.changeStreamThread != null) {
            throw new RuntimeException("Already running!");
        }
        this.changeStreamThread = new Thread(this);
        this.changeStreamThread.setDaemon(true);
        this.changeStreamThread.setName("changeStream");
        this.changeStreamThread.start();
        this.running = true;
    }

    public boolean isRunning() {
        return this.running;
    }

    /* JADX WARN: Code restructure failed: missing block: B:13:0x0038, code lost:
    
        r5.log.debug("Changestream monitor did not finish before max wait time is over! Interrupting");
        r5.changeStreamThread.interrupt();
     */
    /* JADX WARN: Code restructure failed: missing block: B:15:0x004a, code lost:
    
        java.lang.Thread.sleep(100);
     */
    /* JADX WARN: Code restructure failed: missing block: B:29:0x0053, code lost:
    
        r8 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:30:0x0054, code lost:
    
        r8.printStackTrace();
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void terminate() {
        /*
            r5 = this;
            r0 = r5
            r1 = 0
            r0.running = r1
            long r0 = java.lang.System.currentTimeMillis()     // Catch: java.lang.Throwable -> L8d
            r6 = r0
        L9:
            r0 = r5
            java.lang.Thread r0 = r0.changeStreamThread     // Catch: java.lang.Throwable -> L8d
            if (r0 == 0) goto L74
            r0 = r5
            java.lang.Thread r0 = r0.changeStreamThread     // Catch: java.lang.Throwable -> L8d
            boolean r0 = r0.isAlive()     // Catch: java.lang.Throwable -> L8d
            if (r0 == 0) goto L74
            r0 = 100
            java.lang.Thread.sleep(r0)     // Catch: java.lang.InterruptedException -> L23 java.lang.Throwable -> L8d
            goto L24
        L23:
            r8 = move-exception
        L24:
            long r0 = java.lang.System.currentTimeMillis()     // Catch: java.lang.Throwable -> L8d
            r1 = r6
            long r0 = r0 - r1
            r1 = r5
            de.caluga.morphium.Morphium r1 = r1.morphium     // Catch: java.lang.Throwable -> L8d
            de.caluga.morphium.MorphiumConfig r1 = r1.getConfig()     // Catch: java.lang.Throwable -> L8d
            int r1 = r1.getMaxWaitTime()     // Catch: java.lang.Throwable -> L8d
            long r1 = (long) r1     // Catch: java.lang.Throwable -> L8d
            int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
            if (r0 <= 0) goto L9
            r0 = r5
            org.slf4j.Logger r0 = r0.log     // Catch: java.lang.Throwable -> L8d
            java.lang.String r1 = "Changestream monitor did not finish before max wait time is over! Interrupting"
            r0.debug(r1)     // Catch: java.lang.Throwable -> L8d
            r0 = r5
            java.lang.Thread r0 = r0.changeStreamThread     // Catch: java.lang.Throwable -> L8d
            r0.interrupt()     // Catch: java.lang.Throwable -> L8d
            r0 = 100
            java.lang.Thread.sleep(r0)     // Catch: java.lang.InterruptedException -> L53 java.lang.Throwable -> L8d
            goto L58
        L53:
            r8 = move-exception
            r0 = r8
            r0.printStackTrace()     // Catch: java.lang.Throwable -> L8d
        L58:
            r0 = r5
            java.lang.Thread r0 = r0.changeStreamThread     // Catch: java.lang.Throwable -> L8d
            boolean r0 = r0.isAlive()     // Catch: java.lang.Throwable -> L8d
            if (r0 == 0) goto L74
            r0 = r5
            java.lang.Thread r0 = r0.changeStreamThread     // Catch: java.lang.Exception -> L6c java.lang.Throwable -> L8d
            r0.stop()     // Catch: java.lang.Exception -> L6c java.lang.Throwable -> L8d
            goto L74
        L6c:
            r8 = move-exception
            r0 = r8
            r0.printStackTrace()     // Catch: java.lang.Throwable -> L8d
            goto L74
        L74:
            r0 = r5
            r1 = 0
            r0.changeStreamThread = r1     // Catch: java.lang.Throwable -> L8d
            r0 = r5
            java.util.Collection<de.caluga.morphium.changestream.ChangeStreamListener> r0 = r0.listeners
            r0.clear()
            r0 = r5
            de.caluga.morphium.Morphium r0 = r0.morphium
            r1 = r5
            r0.removeShutdownListener(r1)
            goto La3
        L8d:
            r9 = move-exception
            r0 = r5
            java.util.Collection<de.caluga.morphium.changestream.ChangeStreamListener> r0 = r0.listeners
            r0.clear()
            r0 = r5
            de.caluga.morphium.Morphium r0 = r0.morphium
            r1 = r5
            r0.removeShutdownListener(r1)
            r0 = r9
            throw r0
        La3:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: de.caluga.morphium.changestream.ChangeStreamMonitor.terminate():void");
    }

    public String getcollectionName() {
        return this.collectionName;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.running) {
            try {
                DriverTailableIterationCallback driverTailableIterationCallback = new DriverTailableIterationCallback() { // from class: de.caluga.morphium.changestream.ChangeStreamMonitor.1
                    @Override // de.caluga.morphium.driver.DriverTailableIterationCallback
                    public void incomingData(Map<String, Object> map, long j) {
                        if (ChangeStreamMonitor.this.running) {
                            Map<String, Object> map2 = (Map) map.get("fullDocument");
                            map.remove("fullDocument");
                            ChangeStreamEvent changeStreamEvent = (ChangeStreamEvent) ChangeStreamMonitor.this.mapper.deserialize(ChangeStreamEvent.class, map);
                            changeStreamEvent.setFullDocument(map2);
                            ArrayList arrayList = new ArrayList();
                            for (ChangeStreamListener changeStreamListener : ChangeStreamMonitor.this.listeners) {
                                try {
                                    if (!changeStreamListener.incomingData(changeStreamEvent)) {
                                        arrayList.add(changeStreamListener);
                                    }
                                } catch (Exception e) {
                                    ChangeStreamMonitor.this.log.error("listener threw exception", e);
                                }
                            }
                            ChangeStreamMonitor.this.listeners.removeAll(arrayList);
                        }
                    }

                    @Override // de.caluga.morphium.driver.DriverTailableIterationCallback
                    public boolean isContinued() {
                        return ChangeStreamMonitor.this.running;
                    }
                };
                if (this.dbOnly) {
                    this.morphium.getDriver().watch(this.morphium.getConfig().getDatabase(), this.maxWait, this.fullDocument, this.pipeline, driverTailableIterationCallback);
                } else {
                    this.morphium.getDriver().watch(this.morphium.getConfig().getDatabase(), this.collectionName, this.maxWait, this.fullDocument, this.pipeline, driverTailableIterationCallback);
                }
            } catch (MorphiumDriverException e) {
                if (e.getMessage().contains("Network error error: state should be: open")) {
                    this.log.warn("Changstream connection broke - restarting");
                } else {
                    this.log.warn("Error in changestream monitor - restarting", e);
                }
            }
            this.log.debug("ChangeStreamMonitor finished gracefully!!!!!!!!!!!");
        }
    }

    @Override // de.caluga.morphium.ShutdownListener
    public void onShutdown(Morphium morphium) {
        terminate();
    }
}
