package stream.io;

import java.util.Collection;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;

/* loaded from: input_file:stream/io/SnappyBlockingQueue.class */
public class SnappyBlockingQueue extends AbstractQueue {
    private static final Logger log = LoggerFactory.getLogger(SnappyBlockingQueue.class);
    protected boolean closed;
    private int count;
    private final Data[] data;
    final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;
    private int last;
    private int head;
    protected boolean writeSnap;
    protected boolean readSnap;

    public SnappyBlockingQueue() {
        this(10000);
    }

    public SnappyBlockingQueue(int i) {
        this.closed = false;
        this.count = 0;
        this.last = 0;
        this.head = 0;
        this.writeSnap = false;
        this.readSnap = false;
        if (i <= 0) {
            throw new IllegalArgumentException();
        }
        this.capacity = i;
        this.data = new Data[i];
        this.lock = new ReentrantLock();
        this.notEmpty = this.lock.newCondition();
        this.notFull = this.lock.newCondition();
    }

    public int size() {
        return this.count;
    }

    public int remainingCapacity() {
        return this.capacity - this.count;
    }

    final int inc(int i) {
        int i2 = i + 1;
        if (i2 == this.data.length) {
            return 0;
        }
        return i2;
    }

    final int dec(int i) {
        return (i == 0 ? this.data.length : i) - 1;
    }

    protected boolean conditionWriteSnap() {
        return (this.writeSnap && this.count * 3 > this.capacity) || (!this.writeSnap && this.count == this.capacity);
    }

    protected boolean conditionReadNotSnap() {
        return this.readSnap && (this.count * 3) / 2 > this.capacity;
    }

    @Override // stream.io.Sink, stream.io.Source
    public void init() throws Exception {
        if (getCapacity().intValue() < 1) {
            throw new IllegalArgumentException("Invalid queue-capacity '" + getCapacity() + "'!");
        }
    }

    @Override // stream.io.Sink, stream.io.Source
    public void close() throws Exception {
        log.debug("Closing queue '{}'...", getId());
        this.lock.lockInterruptibly();
        try {
            if (this.closed) {
                log.debug("Queue '{}' already closed.", getId());
            } else {
                this.closed = true;
            }
        } finally {
            this.lock.unlock();
        }
    }

    @Override // stream.io.Source
    public Data read() throws Exception {
        log.trace("Reading from queue {}", getId());
        this.lock.lockInterruptibly();
        try {
            if (this.closed && this.count == 0) {
                log.debug("Queue '{}' is closed and empty => null", getId());
                return null;
            }
            while (this.count == 0) {
                this.readSnap = true;
                this.notEmpty.await();
            }
            return extract();
        } finally {
            this.lock.unlock();
        }
    }

    private Data extract() {
        Data[] dataArr = this.data;
        Data data = dataArr[this.last];
        dataArr[this.last] = null;
        this.last = inc(this.last);
        this.count--;
        log.trace("last: {}", Integer.valueOf(this.last));
        log.trace("take size: {}", Integer.valueOf(this.count));
        log.trace("took item from queue: {}", data);
        if (!this.writeSnap) {
            this.notFull.signal();
        } else if (conditionWriteSnap()) {
            this.writeSnap = false;
            this.notFull.signal();
        }
        return data;
    }

    @Override // stream.io.Sink
    public boolean write(Data data) throws Exception {
        log.trace("Queue {}: Enqueuing event {}", getId(), data);
        if (data == null) {
            throw new NullPointerException();
        }
        if (this.closed) {
            return false;
        }
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lockInterruptibly();
        while (conditionWriteSnap()) {
            try {
                this.writeSnap = true;
                this.notFull.await();
            } catch (Throwable th) {
                reentrantLock.unlock();
                throw th;
            }
        }
        boolean insert = insert(data);
        reentrantLock.unlock();
        return insert;
    }

    private boolean insert(Data data) {
        this.data[this.head] = data;
        this.head = inc(this.head);
        this.count++;
        if (!this.readSnap) {
            this.notEmpty.signal();
            return true;
        }
        if (!conditionReadNotSnap()) {
            return true;
        }
        this.readSnap = false;
        this.notEmpty.signal();
        return true;
    }

    @Override // stream.io.Sink
    public boolean write(Collection<Data> collection) throws Exception {
        throw new IllegalAccessError("Not Implemented");
    }

    @Override // stream.io.Barrel
    public int clear() {
        try {
            this.lock.lockInterruptibly();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            int i = this.count;
            for (int i2 = 0; i2 < this.capacity; i2++) {
                this.data[i2] = null;
            }
            this.last = 0;
            this.head = 0;
            this.count = 0;
            return i;
        } finally {
            this.lock.unlock();
        }
    }

    @Override // stream.io.QueueService
    public int level() {
        return this.count;
    }

    @Override // stream.io.QueueService
    public int capacity() {
        return this.capacity;
    }

    @Override // stream.io.Queue
    public Integer getSize() {
        return Integer.valueOf(this.count);
    }

    @Override // stream.service.Service
    public void reset() throws Exception {
    }

    public String toString() {
        return "stream.io.BlockingQueue['" + this.id + "']";
    }

    @Override // stream.io.QueueService
    public Data poll() {
        return null;
    }

    @Override // stream.io.QueueService
    public Data take() {
        return null;
    }

    @Override // stream.io.QueueService
    public boolean enqueue(Data data) {
        return false;
    }
}
