package io.streamnative.oxia.client.notify;

import io.grpc.stub.StreamObserver;
import io.streamnative.oxia.client.CompositeConsumer;
import io.streamnative.oxia.client.api.Notification;
import io.streamnative.oxia.client.grpc.OxiaStub;
import io.streamnative.oxia.client.grpc.OxiaStubManager;
import io.streamnative.oxia.client.util.Backoff;
import io.streamnative.oxia.proto.NotificationBatch;
import io.streamnative.oxia.proto.NotificationsRequest;
import java.io.Closeable;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/oxia-client-0.4.10.jar:io/streamnative/oxia/client/notify/ShardNotificationReceiver.class */
public class ShardNotificationReceiver implements Closeable, StreamObserver<NotificationBatch> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ShardNotificationReceiver.class);
    private final OxiaStub stub;
    private final NotificationManager notificationManager;
    private final long shardId;

    @NonNull
    private final Consumer<Notification> callback;

    @NonNull
    private volatile OptionalLong offset;
    private volatile boolean closed = false;
    private final Backoff backoff = new Backoff();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:META-INF/bundled-dependencies/oxia-client-0.4.10.jar:io/streamnative/oxia/client/notify/ShardNotificationReceiver$Factory.class */
    public static class Factory {

        @NonNull
        private final OxiaStubManager stubManager;

        @NonNull
        private final CompositeConsumer<Notification> callback = new CompositeConsumer<>();

        /* JADX INFO: Access modifiers changed from: package-private */
        @NonNull
        public ShardNotificationReceiver newReceiver(long j, @NonNull String str, @NonNull NotificationManager notificationManager, @NonNull OptionalLong optionalLong) {
            if (str == null) {
                throw new NullPointerException("leader is marked non-null but is null");
            }
            if (notificationManager == null) {
                throw new NullPointerException("notificationManager is marked non-null but is null");
            }
            if (optionalLong == null) {
                throw new NullPointerException("offset is marked non-null but is null");
            }
            return new ShardNotificationReceiver(this.stubManager.getStub(str), j, this.callback, notificationManager, optionalLong);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Factory(@NonNull OxiaStubManager oxiaStubManager) {
            if (oxiaStubManager == null) {
                throw new NullPointerException("stubManager is marked non-null but is null");
            }
            this.stubManager = oxiaStubManager;
        }

        @NonNull
        public CompositeConsumer<Notification> getCallback() {
            return this.callback;
        }
    }

    ShardNotificationReceiver(@NonNull OxiaStub oxiaStub, long j, @NonNull Consumer<Notification> consumer, NotificationManager notificationManager, @NonNull OptionalLong optionalLong) {
        if (oxiaStub == null) {
            throw new NullPointerException("stub is marked non-null but is null");
        }
        if (consumer == null) {
            throw new NullPointerException("callback is marked non-null but is null");
        }
        if (optionalLong == null) {
            throw new NullPointerException("offset is marked non-null but is null");
        }
        this.stub = oxiaStub;
        this.notificationManager = notificationManager;
        this.shardId = j;
        this.callback = consumer;
        this.offset = optionalLong;
        start();
    }

    void start() {
        NotificationsRequest.Builder shardId = NotificationsRequest.newBuilder().setShardId(this.shardId);
        OptionalLong optionalLong = this.offset;
        Objects.requireNonNull(shardId);
        optionalLong.ifPresent(shardId::setStartOffsetExclusive);
        this.stub.async().getNotifications(shardId.build(), this);
    }

    @Override // io.grpc.stub.StreamObserver
    public void onNext(NotificationBatch notificationBatch) {
        if (!this.offset.isPresent() || this.offset.getAsLong() < notificationBatch.getOffset()) {
            this.offset = OptionalLong.of(notificationBatch.getOffset());
            this.notificationManager.getCounterNotificationsBatchesReceived().increment();
            this.notificationManager.getCounterNotificationsReceived().add(notificationBatch.getNotificationsCount());
            notificationBatch.getNotificationsMap().forEach((str, notification) -> {
                Notification notification;
                if (log.isDebugEnabled()) {
                    log.debug("--- Got notification: {} - {}", str, notification.getType());
                }
                switch (notification.getType()) {
                    case KEY_CREATED:
                        notification = new Notification.KeyCreated(str, notification.getVersionId());
                        break;
                    case KEY_MODIFIED:
                        notification = new Notification.KeyModified(str, notification.getVersionId());
                        break;
                    case KEY_DELETED:
                        notification = new Notification.KeyDeleted(str);
                        break;
                    case KEY_RANGE_DELETED:
                        notification = new Notification.KeyRangeDelete(str, notification.getKeyRangeLast());
                        break;
                    case UNRECOGNIZED:
                        notification = null;
                        break;
                    default:
                        throw new IncompatibleClassChangeError();
                }
                Notification notification2 = notification;
                if (notification2 != null) {
                    this.callback.accept(notification2);
                }
            });
        }
    }

    @Override // io.grpc.stub.StreamObserver
    public void onError(Throwable th) {
        if (this.closed) {
            return;
        }
        long nextDelayMillis = this.backoff.nextDelayMillis();
        log.warn("Error while receiving notifications for shard={}: {} - Retrying in {} seconds", Long.valueOf(this.shardId), th.getMessage(), Double.valueOf(nextDelayMillis / 1000.0d));
        this.notificationManager.getExecutor().schedule(() -> {
            if (this.closed) {
                return;
            }
            log.info("Retrying getting notifications for shard={}", Long.valueOf(this.shardId));
            start();
        }, nextDelayMillis, TimeUnit.MILLISECONDS);
    }

    @Override // io.grpc.stub.StreamObserver
    public void onCompleted() {
        if (this.closed) {
            return;
        }
        start();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.closed = true;
    }

    long getShardId() {
        return this.shardId;
    }

    @NonNull
    public OptionalLong getOffset() {
        return this.offset;
    }
}
