package tech.ydb.yoj.repository.ydb;

import com.google.common.annotations.VisibleForTesting;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.util.Spliterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.Status;
import tech.ydb.yoj.repository.db.exception.DeadlineExceededException;
import tech.ydb.yoj.repository.db.exception.QueryInterruptedException;
import tech.ydb.yoj.repository.ydb.client.YdbValidator;

/* loaded from: input_file:tech/ydb/yoj/repository/ydb/YdbSpliterator.class */
public class YdbSpliterator<V> implements Spliterator<V> {
    private static final Logger log = LoggerFactory.getLogger(YdbSpliterator.class);
    private static final Duration DEFAULT_STREAM_WORK_TIMEOUT = Duration.ofMinutes(5);
    private final long streamWorkDeadlineNanos;
    private final int flags;
    private final BlockingQueue<QueueValue<V>> queue;
    private final BiConsumer<Status, Throwable> validateResponse;
    private volatile boolean streamClosed;

    @VisibleForTesting
    /* loaded from: input_file:tech/ydb/yoj/repository/ydb/YdbSpliterator$ConsumerDoneException.class */
    protected static class ConsumerDoneException extends RuntimeException {
        public static final ConsumerDoneException INSTANCE = new ConsumerDoneException();

        protected ConsumerDoneException() {
        }
    }

    /* loaded from: input_file:tech/ydb/yoj/repository/ydb/YdbSpliterator$OfferDeadlineExceededException.class */
    private static class OfferDeadlineExceededException extends RuntimeException {
        public static final OfferDeadlineExceededException INSTANCE = new OfferDeadlineExceededException();

        private OfferDeadlineExceededException() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:tech/ydb/yoj/repository/ydb/YdbSpliterator$QueueValue.class */
    public static final class QueueValue<V> {
        private final V value;
        private final Status status;
        private final Throwable error;
        private final boolean endData;

        public static <V> QueueValue<V> of(V v) {
            return new QueueValue<>(v, null, null, false);
        }

        public static <V> QueueValue<V> ofEndData(Status status, Throwable th) {
            return new QueueValue<>(null, status, th, true);
        }

        @Generated
        @ConstructorProperties({"value", "status", "error", "endData"})
        public QueueValue(V v, Status status, Throwable th, boolean z) {
            this.value = v;
            this.status = status;
            this.error = th;
            this.endData = z;
        }

        @Generated
        public V getValue() {
            return this.value;
        }

        @Generated
        public Status getStatus() {
            return this.status;
        }

        @Generated
        public Throwable getError() {
            return this.error;
        }

        @Generated
        public boolean isEndData() {
            return this.endData;
        }

        @Generated
        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof QueueValue)) {
                return false;
            }
            QueueValue queueValue = (QueueValue) obj;
            if (isEndData() != queueValue.isEndData()) {
                return false;
            }
            V value = getValue();
            Object value2 = queueValue.getValue();
            if (value == null) {
                if (value2 != null) {
                    return false;
                }
            } else if (!value.equals(value2)) {
                return false;
            }
            Status status = getStatus();
            Status status2 = queueValue.getStatus();
            if (status == null) {
                if (status2 != null) {
                    return false;
                }
            } else if (!status.equals(status2)) {
                return false;
            }
            Throwable error = getError();
            Throwable error2 = queueValue.getError();
            return error == null ? error2 == null : error.equals(error2);
        }

        @Generated
        public int hashCode() {
            int i = (1 * 59) + (isEndData() ? 79 : 97);
            V value = getValue();
            int hashCode = (i * 59) + (value == null ? 43 : value.hashCode());
            Status status = getStatus();
            int hashCode2 = (hashCode * 59) + (status == null ? 43 : status.hashCode());
            Throwable error = getError();
            return (hashCode2 * 59) + (error == null ? 43 : error.hashCode());
        }

        @Generated
        public String toString() {
            return "YdbSpliterator.QueueValue(value=" + getValue() + ", status=" + getStatus() + ", error=" + getError() + ", endData=" + isEndData() + ")";
        }
    }

    public YdbSpliterator(String str, boolean z) {
        this(str, z, DEFAULT_STREAM_WORK_TIMEOUT);
    }

    @VisibleForTesting
    protected YdbSpliterator(String str, boolean z, Duration duration) {
        this.queue = new ArrayBlockingQueue(1);
        this.streamClosed = false;
        this.flags = (z ? 16 : 0) | 256;
        this.streamWorkDeadlineNanos = System.nanoTime() + TimeUnit.NANOSECONDS.toNanos(saturatedToNanos(duration));
        this.validateResponse = (status, th) -> {
            if (th != null) {
                throw YdbOperations.convertToRepositoryException(th);
            }
            YdbValidator.validate(str, status.getCode(), status.toString());
        };
    }

    private long calculateTimeout() {
        return TimeUnit.NANOSECONDS.toNanos(this.streamWorkDeadlineNanos - System.nanoTime());
    }

    public Stream<V> makeStream() {
        return (Stream) StreamSupport.stream(this, false).onClose(this::onStreamClose);
    }

    public void onNext(V v) {
        if (this.streamClosed) {
            throw ConsumerDoneException.INSTANCE;
        }
        try {
            if (this.queue.offer(QueueValue.of(v), calculateTimeout(), TimeUnit.NANOSECONDS)) {
                return;
            }
            log.warn("Supplier thread was closed because consumer didn't poll an element of stream on timeout");
            throw OfferDeadlineExceededException.INSTANCE;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new QueryInterruptedException("Supplier thread interrupted", e);
        }
    }

    public void onSupplierThreadComplete(Status status, Throwable th) {
        Throwable unwrapException = unwrapException(th);
        if ((unwrapException instanceof OfferDeadlineExceededException) || this.streamClosed) {
            return;
        }
        if (offerUninterruptibly(this.queue, QueueValue.ofEndData(status, unwrapException), this.streamWorkDeadlineNanos)) {
            return;
        }
        log.warn("Supplier thread was closed because consumer didn't poll the last element of stream on timeout");
    }

    @Nullable
    private QueueValue<V> poll() {
        try {
            return this.queue.poll(calculateTimeout(), TimeUnit.NANOSECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new QueryInterruptedException("Consumer thread interrupted", e);
        }
    }

    @Override // java.util.Spliterator
    public boolean tryAdvance(Consumer<? super V> consumer) {
        QueueValue<V> poll = poll();
        if (poll == null) {
            throw new DeadlineExceededException("Stream deadline exceeded on poll");
        }
        if (poll.isEndData()) {
            this.validateResponse.accept(poll.getStatus(), poll.getError());
            return false;
        }
        consumer.accept(poll.getValue());
        return true;
    }

    @VisibleForTesting
    protected void onStreamClose() {
        this.streamClosed = true;
        this.queue.clear();
    }

    @Override // java.util.Spliterator
    public Spliterator<V> trySplit() {
        return null;
    }

    @Override // java.util.Spliterator
    public long estimateSize() {
        return Long.MAX_VALUE;
    }

    @Override // java.util.Spliterator
    public long getExactSizeIfKnown() {
        return -1L;
    }

    @Override // java.util.Spliterator
    public int characteristics() {
        return this.flags;
    }

    private static Throwable unwrapException(Throwable th) {
        return ((th instanceof CompletionException) || (th instanceof ExecutionException)) ? th.getCause() : th;
    }

    private static long saturatedToNanos(Duration duration) {
        try {
            return duration.toNanos();
        } catch (ArithmeticException e) {
            return duration.isNegative() ? Long.MIN_VALUE : Long.MAX_VALUE;
        }
    }

    private static <E> boolean offerUninterruptibly(BlockingQueue<E> blockingQueue, E e, long j) {
        boolean offer;
        boolean z = false;
        while (true) {
            try {
                offer = blockingQueue.offer(e, TimeUnit.NANOSECONDS.toNanos(j - System.nanoTime()), TimeUnit.NANOSECONDS);
                break;
            } catch (InterruptedException e2) {
                z = true;
            } catch (Throwable th) {
                if (z) {
                    Thread.currentThread().interrupt();
                }
                throw th;
            }
        }
        if (z) {
            Thread.currentThread().interrupt();
        }
        return offer;
    }
}
