package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.connector.FlowControl;
import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.connector.impl.buffer.RoundRobinMultiReadonlyBuffer;
import io.axoniq.axonserver.grpc.ErrorMessage;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/* loaded from: input_file:io/axoniq/axonserver/connector/impl/FlowControlledReplyChannelWriter.class */
public class FlowControlledReplyChannelWriter<T> implements FlowControl {
    private final DisposableReadonlyBuffer<T> buffer;
    private final ReplyChannel<T> replyChannel;
    private final AtomicLong requestedRef;
    private final AtomicBoolean flowGate;
    private final AtomicBoolean completed;

    public FlowControlledReplyChannelWriter(List<? extends DisposableReadonlyBuffer<T>> list, ReplyChannel<T> replyChannel) {
        this(new RoundRobinMultiReadonlyBuffer(list), replyChannel);
    }

    public FlowControlledReplyChannelWriter(DisposableReadonlyBuffer<T> disposableReadonlyBuffer, ReplyChannel<T> replyChannel) {
        this.requestedRef = new AtomicLong();
        this.flowGate = new AtomicBoolean();
        this.completed = new AtomicBoolean();
        this.buffer = disposableReadonlyBuffer;
        this.replyChannel = replyChannel;
        disposableReadonlyBuffer.onAvailable(this::stream);
    }

    @Override // io.axoniq.axonserver.connector.FlowControl
    public void request(long j) {
        if (j <= 0) {
            return;
        }
        this.requestedRef.getAndUpdate(j2 -> {
            try {
                return Math.addExact(j, j2);
            } catch (ArithmeticException e) {
                return Long.MAX_VALUE;
            }
        });
        stream();
    }

    @Override // io.axoniq.axonserver.connector.FlowControl
    public void cancel() {
        this.buffer.dispose();
    }

    private void stream() {
        while (this.flowGate.compareAndSet(false, true)) {
            try {
                this.requestedRef.getAndAccumulate(send(this.requestedRef.get()), (j, j2) -> {
                    return j - j2;
                });
                this.flowGate.set(false);
                if (this.requestedRef.get() <= 0 || this.buffer.isEmpty()) {
                    return;
                }
            } catch (Throwable th) {
                this.flowGate.set(false);
                throw th;
            }
        }
    }

    private long send(long j) {
        completeIfBufferIsClosedAndEmpty();
        for (int i = 0; i < j; i++) {
            Optional<T> poll = this.buffer.poll();
            if (!poll.isPresent()) {
                completeIfBufferIsClosedAndEmpty();
                return i;
            }
            this.replyChannel.send(poll.get());
        }
        return j;
    }

    private void completeIfBufferIsClosedAndEmpty() {
        if (this.buffer.closed() && this.buffer.isEmpty() && this.completed.compareAndSet(false, true)) {
            Optional<ErrorMessage> error = this.buffer.error();
            if (error.isPresent()) {
                this.replyChannel.completeWithError(error.get());
            } else {
                this.replyChannel.complete();
            }
        }
    }
}
