package io.streamnative.oxia.client.batch;

import io.grpc.stub.StreamObserver;
import io.streamnative.oxia.proto.OxiaClientGrpc;
import io.streamnative.oxia.proto.WriteRequest;
import io.streamnative.oxia.proto.WriteResponse;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/oxia-client-0.4.5.jar:io/streamnative/oxia/client/batch/WriteStreamWrapper.class */
public final class WriteStreamWrapper implements StreamObserver<WriteResponse> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) WriteStreamWrapper.class);
    private final StreamObserver<WriteRequest> clientStream;
    private final Deque<CompletableFuture<WriteResponse>> pendingWrites = new ArrayDeque();
    private volatile Throwable failed = null;

    public WriteStreamWrapper(OxiaClientGrpc.OxiaClientStub oxiaClientStub) {
        this.clientStream = oxiaClientStub.writeStream(this);
    }

    public boolean isValid() {
        return this.failed == null;
    }

    @Override // io.grpc.stub.StreamObserver
    public void onNext(WriteResponse writeResponse) {
        synchronized (this) {
            CompletableFuture<WriteResponse> poll = this.pendingWrites.poll();
            if (poll != null) {
                poll.complete(writeResponse);
            }
        }
    }

    @Override // io.grpc.stub.StreamObserver
    public void onError(Throwable th) {
        synchronized (this) {
            if (!this.pendingWrites.isEmpty()) {
                log.warn("Got Error", th);
            }
            this.pendingWrites.forEach(completableFuture -> {
                completableFuture.completeExceptionally(th);
            });
            this.pendingWrites.clear();
            this.failed = th;
        }
    }

    @Override // io.grpc.stub.StreamObserver
    public void onCompleted() {
    }

    public CompletableFuture<WriteResponse> send(WriteRequest writeRequest) {
        synchronized (this) {
            if (this.failed != null) {
                return CompletableFuture.failedFuture(this.failed);
            }
            CompletableFuture<WriteResponse> completableFuture = new CompletableFuture<>();
            try {
                if (log.isDebugEnabled()) {
                    log.debug("Sending request {}", writeRequest);
                }
                this.clientStream.onNext(writeRequest);
                this.pendingWrites.add(completableFuture);
            } catch (Exception e) {
                completableFuture.completeExceptionally(e);
            }
            return completableFuture;
        }
    }
}
