package com.github.davidmoten.rx.internal.operators;

import com.github.davidmoten.rx.buffertofile.CacheType;
import com.github.davidmoten.rx.buffertofile.DataSerializer;
import com.github.davidmoten.rx.buffertofile.Options;
import com.github.davidmoten.rx.internal.operators.RollingQueue;
import com.github.davidmoten.util.Preconditions;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.mapdb.Serializer;
import rx.Observable;
import rx.Producer;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func0;
import rx.internal.operators.BackpressureUtils;
import rx.observers.Subscribers;

/* loaded from: input_file:com/github/davidmoten/rx/internal/operators/OperatorBufferToFile.class */
public final class OperatorBufferToFile<T> implements Observable.Operator<T, T> {
    private static final String QUEUE_NAME = "q";
    private final Serializer<T> serializer;
    private final Scheduler scheduler;
    private final Options options;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/davidmoten/rx/internal/operators/OperatorBufferToFile$MapDbSerializer.class */
    public static final class MapDbSerializer<T> implements Serializer<T>, Serializable {
        private static final long serialVersionUID = -4992031045087289671L;
        private static final int VARIABLE_SIZE = -1;
        private final transient DataSerializer<T> dataSerializer;

        MapDbSerializer(DataSerializer<T> dataSerializer) {
            this.dataSerializer = dataSerializer;
        }

        public T deserialize(DataInput dataInput, int i) throws IOException {
            return this.dataSerializer.deserialize(dataInput, i);
        }

        public int fixedSize() {
            return VARIABLE_SIZE;
        }

        public void serialize(DataOutput dataOutput, T t) throws IOException {
            this.dataSerializer.serialize(dataOutput, t);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/davidmoten/rx/internal/operators/OperatorBufferToFile$OnSubscribeFromQueue.class */
    public static final class OnSubscribeFromQueue<T> implements Observable.OnSubscribe<T> {
        private final AtomicReference<QueueProducer<T>> queueProducer;
        private final CloseableQueue<T> queue;
        private final Scheduler.Worker worker;
        private final Options options;

        OnSubscribeFromQueue(AtomicReference<QueueProducer<T>> atomicReference, CloseableQueue<T> closeableQueue, Scheduler.Worker worker, Options options) {
            this.queueProducer = atomicReference;
            this.queue = closeableQueue;
            this.worker = worker;
            this.options = options;
        }

        public void call(Subscriber<? super T> subscriber) {
            QueueProducer<T> queueProducer = new QueueProducer<>(this.queue, subscriber, this.worker, this.options.delayError());
            this.queueProducer.set(queueProducer);
            subscriber.setProducer(queueProducer);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/davidmoten/rx/internal/operators/OperatorBufferToFile$ParentSubscriber.class */
    public static final class ParentSubscriber<T> extends Subscriber<T> {
        private final AtomicReference<QueueProducer<T>> queueProducer;

        ParentSubscriber(AtomicReference<QueueProducer<T>> atomicReference) {
            this.queueProducer = atomicReference;
        }

        public void onStart() {
            request(Long.MAX_VALUE);
        }

        public void onCompleted() {
            this.queueProducer.get().onCompleted();
        }

        public void onError(Throwable th) {
            this.queueProducer.get().onError(th);
        }

        public void onNext(T t) {
            this.queueProducer.get().onNext(t);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/davidmoten/rx/internal/operators/OperatorBufferToFile$Q2.class */
    public static final class Q2<T> extends AtomicBoolean implements RollingQueue.Queue2<T> {
        private static final long serialVersionUID = -950306777716863302L;
        private DB db;
        private Queue<T> queue;
        private final AtomicInteger currentCalls = new AtomicInteger(0);
        private boolean closing = false;

        Q2(DB db, Queue<T> queue) {
            this.db = db;
            this.queue = queue;
            lazySet(false);
        }

        @Override // com.github.davidmoten.rx.internal.operators.RollingQueue.Queue2
        public T poll() {
            try {
                this.currentCalls.incrementAndGet();
                if (this.closing) {
                    return null;
                }
                return this.queue.poll();
            } finally {
                this.currentCalls.decrementAndGet();
                checkClosed();
            }
        }

        @Override // com.github.davidmoten.rx.internal.operators.RollingQueue.Queue2
        public boolean offer(T t) {
            try {
                this.currentCalls.incrementAndGet();
                if (this.closing) {
                    return true;
                }
                return this.queue.offer(t);
            } finally {
                this.currentCalls.decrementAndGet();
                checkClosed();
            }
        }

        @Override // com.github.davidmoten.rx.internal.operators.RollingQueue.Queue2
        public boolean isEmpty() {
            try {
                this.currentCalls.incrementAndGet();
                if (this.closing) {
                    return true;
                }
                return this.queue.isEmpty();
            } finally {
                this.currentCalls.decrementAndGet();
                checkClosed();
            }
        }

        @Override // com.github.davidmoten.rx.internal.operators.RollingQueue.Queue2
        public void close() {
            synchronized (this) {
                this.closing = true;
            }
            checkClosed();
        }

        private void checkClosed() {
            if (this.closing && this.currentCalls.get() == 0 && compareAndSet(false, true)) {
                this.db.close();
                this.db = null;
                this.queue = null;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/davidmoten/rx/internal/operators/OperatorBufferToFile$QueueProducer.class */
    public static final class QueueProducer<T> extends AtomicLong implements Producer, Action0 {
        private static final long serialVersionUID = 2521533710633950102L;
        private final CloseableQueue<T> queue;
        private final Subscriber<? super T> child;
        private final Scheduler.Worker worker;
        private final boolean delayError;
        private final AtomicInteger drainRequested = new AtomicInteger(0);
        private Throwable error = null;
        private volatile boolean done = false;

        QueueProducer(CloseableQueue<T> closeableQueue, Subscriber<? super T> subscriber, Scheduler.Worker worker, boolean z) {
            this.queue = closeableQueue;
            this.child = subscriber;
            this.worker = worker;
            this.delayError = z;
        }

        void onNext(T t) {
            if (this.queue.offer(t)) {
                drain();
            } else {
                onError(new RuntimeException("could not place item on queue (queue.offer(item) returned false), item= " + t));
            }
        }

        void onError(Throwable th) {
            this.error = th;
            this.done = true;
            drain();
        }

        void onCompleted() {
            this.done = true;
            drain();
        }

        public void request(long j) {
            if (j > 0) {
                BackpressureUtils.getAndAddRequest(this, j);
                drain();
            }
        }

        private void drain() {
            if (this.drainRequested.getAndIncrement() == 0) {
                this.worker.schedule(this);
            }
        }

        public void call() {
            try {
                drainNow();
            } catch (Throwable th) {
                this.child.onError(th);
            }
        }

        /* JADX WARN: Code restructure failed: missing block: B:13:0x004d, code lost:
        
            r6 = addAndGet(-r8);
         */
        /* JADX WARN: Code restructure failed: missing block: B:14:0x0057, code lost:
        
            if (r6 != 0) goto L26;
         */
        /* JADX WARN: Code restructure failed: missing block: B:17:0x0067, code lost:
        
            if (finished(r5.queue.isEmpty()) == false) goto L27;
         */
        /* JADX WARN: Code restructure failed: missing block: B:19:0x006a, code lost:
        
            return;
         */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        private void drainNow() {
            /*
                r5 = this;
                r0 = r5
                long r0 = r0.get()
                r6 = r0
            L5:
                r0 = r5
                java.util.concurrent.atomic.AtomicInteger r0 = r0.drainRequested
                r1 = 1
                r0.set(r1)
                r0 = 0
                r8 = r0
            Lf:
                r0 = r6
                r1 = 0
                int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
                if (r0 <= 0) goto L4d
                r0 = r5
                rx.Subscriber<? super T> r0 = r0.child
                boolean r0 = r0.isUnsubscribed()
                if (r0 == 0) goto L20
                return
            L20:
                r0 = r5
                com.github.davidmoten.rx.internal.operators.CloseableQueue<T> r0 = r0.queue
                java.lang.Object r0 = r0.poll()
                r10 = r0
                r0 = r10
                if (r0 != 0) goto L39
                r0 = r5
                r1 = 1
                boolean r0 = r0.finished(r1)
                if (r0 == 0) goto L4d
                return
            L39:
                r0 = r5
                rx.Subscriber<? super T> r0 = r0.child
                r1 = r10
                r0.onNext(r1)
                r0 = r6
                r1 = 1
                long r0 = r0 - r1
                r6 = r0
                r0 = r8
                r1 = 1
                long r0 = r0 + r1
                r8 = r0
                goto Lf
            L4d:
                r0 = r5
                r1 = r8
                long r1 = -r1
                long r0 = r0.addAndGet(r1)
                r6 = r0
                r0 = r6
                r1 = 0
                int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
                if (r0 != 0) goto L6b
                r0 = r5
                r1 = r5
                com.github.davidmoten.rx.internal.operators.CloseableQueue<T> r1 = r1.queue
                boolean r1 = r1.isEmpty()
                boolean r0 = r0.finished(r1)
                if (r0 == 0) goto L6b
                return
            L6b:
                goto L5
            */
            throw new UnsupportedOperationException("Method not decompiled: com.github.davidmoten.rx.internal.operators.OperatorBufferToFile.QueueProducer.drainNow():void");
        }

        private boolean finished(boolean z) {
            if (!this.done) {
                return this.drainRequested.compareAndSet(1, 0);
            }
            Throwable th = this.error;
            if (z) {
                try {
                    this.queue.close();
                    if (th != null) {
                        this.child.onError(th);
                    } else {
                        this.child.onCompleted();
                    }
                    return true;
                } finally {
                    this.worker.unsubscribe();
                }
            }
            if (th == null || this.delayError) {
                return this.drainRequested.compareAndSet(1, 0);
            }
            try {
                this.queue.close();
                this.child.onError(th);
                this.worker.unsubscribe();
                return true;
            } finally {
                this.worker.unsubscribe();
            }
        }
    }

    public OperatorBufferToFile(DataSerializer<T> dataSerializer, Scheduler scheduler, Options options) {
        Preconditions.checkNotNull(dataSerializer);
        Preconditions.checkNotNull(scheduler);
        Preconditions.checkNotNull(options);
        this.scheduler = scheduler;
        this.serializer = createSerializer(dataSerializer);
        this.options = options;
    }

    private static <T> Serializer<T> createSerializer(DataSerializer<T> dataSerializer) {
        return new MapDbSerializer(dataSerializer);
    }

    public Subscriber<? super T> call(Subscriber<? super T> subscriber) {
        RollingQueue createQueue = createQueue(this.serializer, this.options);
        AtomicReference atomicReference = new AtomicReference();
        Scheduler.Worker createWorker = this.scheduler.createWorker();
        Observable create = Observable.create(new OnSubscribeFromQueue(atomicReference, createQueue, createWorker, this.options));
        ParentSubscriber parentSubscriber = new ParentSubscriber(atomicReference);
        subscriber.add(createWorker);
        subscriber.add(parentSubscriber);
        subscriber.add(createQueue);
        create.unsafeSubscribe(Subscribers.wrap(subscriber));
        return parentSubscriber;
    }

    private static <T> RollingQueue<T> createQueue(final Serializer<T> serializer, final Options options) {
        return new RollingQueue<>(new Func0<RollingQueue.Queue2<T>>() { // from class: com.github.davidmoten.rx.internal.operators.OperatorBufferToFile.1
            /* renamed from: call, reason: merged with bridge method [inline-methods] */
            public RollingQueue.Queue2<T> m45call() {
                DB createDb = OperatorBufferToFile.createDb((File) Options.this.fileFactory().call(), Options.this);
                return new Q2(createDb, createDb.createQueue(OperatorBufferToFile.QUEUE_NAME, serializer, false));
            }
        }, options.rolloverEvery());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static DB createDb(File file, Options options) {
        DBMaker cacheLRUEnable;
        DBMaker newFileDB = DBMaker.newFileDB(file);
        if (options.cacheType() == CacheType.NO_CACHE) {
            cacheLRUEnable = newFileDB.cacheDisable();
        } else if (options.cacheType() == CacheType.HARD_REF) {
            cacheLRUEnable = newFileDB.cacheHardRefEnable();
        } else if (options.cacheType() == CacheType.SOFT_REF) {
            cacheLRUEnable = newFileDB.cacheSoftRefEnable();
        } else if (options.cacheType() == CacheType.WEAK_REF) {
            cacheLRUEnable = newFileDB.cacheWeakRefEnable();
        } else {
            if (options.cacheType() != CacheType.LEAST_RECENTLY_USED) {
                throw new RuntimeException("unknown cacheType " + options.cacheType());
            }
            cacheLRUEnable = newFileDB.cacheLRUEnable();
        }
        if (options.cacheSizeItems().isPresent()) {
            cacheLRUEnable = cacheLRUEnable.cacheSize(options.cacheSizeItems().get().intValue());
        }
        if (options.storageSizeLimitMB().isPresent()) {
            cacheLRUEnable = cacheLRUEnable.sizeLimit(options.storageSizeLimitMB().get().doubleValue() / 1024.0d);
        }
        return cacheLRUEnable.deleteFilesAfterClose().transactionDisable().make();
    }
}
