package io.streamnative.oxia.client.notify;

import io.opentelemetry.api.common.Attributes;
import io.streamnative.oxia.client.CompositeConsumer;
import io.streamnative.oxia.client.api.Notification;
import io.streamnative.oxia.client.grpc.OxiaStubManager;
import io.streamnative.oxia.client.metrics.Counter;
import io.streamnative.oxia.client.metrics.InstrumentProvider;
import io.streamnative.oxia.client.metrics.Unit;
import io.streamnative.oxia.client.notify.ShardNotificationReceiver;
import io.streamnative.oxia.client.shard.ShardManager;
import java.util.Collections;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/oxia-client-0.3.1.jar:io/streamnative/oxia/client/notify/NotificationManager.class */
public class NotificationManager implements AutoCloseable, Consumer<ShardManager.ShardAssignmentChanges> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) NotificationManager.class);
    private final ConcurrentMap<Long, ShardNotificationReceiver> shardReceivers;

    @NonNull
    private final ShardNotificationReceiver.Factory receiverFactory;

    @NonNull
    private final ShardManager shardManager;
    private final CompositeConsumer<Notification> compositeCallback;
    private final ScheduledExecutorService executor;
    private volatile boolean started;
    private volatile boolean closed;
    private final Counter counterNotificationsReceived;
    private final Counter counterNotificationsBatchesReceived;

    public NotificationManager(@NonNull ScheduledExecutorService scheduledExecutorService, @NonNull OxiaStubManager oxiaStubManager, @NonNull ShardManager shardManager, @NonNull InstrumentProvider instrumentProvider) {
        this(scheduledExecutorService, new ShardNotificationReceiver.Factory(oxiaStubManager), shardManager, instrumentProvider);
        if (scheduledExecutorService == null) {
            throw new NullPointerException("executor is marked non-null but is null");
        }
        if (oxiaStubManager == null) {
            throw new NullPointerException("stubManager is marked non-null but is null");
        }
        if (shardManager == null) {
            throw new NullPointerException("shardManager is marked non-null but is null");
        }
        if (instrumentProvider == null) {
            throw new NullPointerException("instrumentProvider is marked non-null but is null");
        }
    }

    public NotificationManager(@NonNull ScheduledExecutorService scheduledExecutorService, @NonNull ShardNotificationReceiver.Factory factory, @NonNull ShardManager shardManager, @NonNull InstrumentProvider instrumentProvider) {
        this.shardReceivers = new ConcurrentHashMap();
        this.started = false;
        this.closed = false;
        if (scheduledExecutorService == null) {
            throw new NullPointerException("executor is marked non-null but is null");
        }
        if (factory == null) {
            throw new NullPointerException("receiverFactory is marked non-null but is null");
        }
        if (shardManager == null) {
            throw new NullPointerException("shardManager is marked non-null but is null");
        }
        if (instrumentProvider == null) {
            throw new NullPointerException("instrumentProvider is marked non-null but is null");
        }
        this.receiverFactory = factory;
        this.compositeCallback = factory.getCallback();
        this.shardManager = shardManager;
        this.executor = scheduledExecutorService;
        this.counterNotificationsReceived = instrumentProvider.newCounter("oxia.client.notifications.received", Unit.Events, "The total number of notification events", Attributes.empty());
        this.counterNotificationsBatchesReceived = instrumentProvider.newCounter("oxia.client.notifications.batches.received", Unit.Events, "The total number of notification batches received", Attributes.empty());
    }

    @Override // java.util.function.Consumer
    public void accept(@NonNull ShardManager.ShardAssignmentChanges shardAssignmentChanges) {
        if (shardAssignmentChanges == null) {
            throw new NullPointerException("changes is marked non-null but is null");
        }
        if (!this.started || this.closed) {
            return;
        }
        connectNotificationReceivers(shardAssignmentChanges);
    }

    public void registerCallback(@NonNull Consumer<Notification> consumer) {
        if (consumer == null) {
            throw new NullPointerException("callback is marked non-null but is null");
        }
        if (this.closed) {
            throw new IllegalStateException("Notification manager has been closed");
        }
        this.compositeCallback.add(consumer);
        if (this.started) {
            return;
        }
        synchronized (this) {
            if (!this.started) {
                bootstrap();
                this.started = true;
            }
        }
    }

    private void bootstrap() {
        connectNotificationReceivers(new ShardManager.ShardAssignmentChanges(Set.copyOf(this.shardManager.allShards()), Collections.emptySet(), Collections.emptySet()));
    }

    private void connectNotificationReceivers(@NonNull ShardManager.ShardAssignmentChanges shardAssignmentChanges) {
        if (shardAssignmentChanges == null) {
            throw new NullPointerException("changes is marked non-null but is null");
        }
        shardAssignmentChanges.removed().forEach(shard -> {
            this.shardReceivers.remove(Long.valueOf(shard.id())).close();
        });
        shardAssignmentChanges.added().forEach(shard2 -> {
            this.shardReceivers.computeIfAbsent(Long.valueOf(shard2.id()), l -> {
                return this.receiverFactory.newReceiver(shard2.id(), shard2.leader(), this, OptionalLong.empty());
            });
        });
        shardAssignmentChanges.reassigned().forEach(shard3 -> {
            Optional ofNullable = Optional.ofNullable(this.shardReceivers.remove(Long.valueOf(shard3.id())));
            ofNullable.ifPresent((v0) -> {
                v0.close();
            });
            OptionalLong optionalLong = (OptionalLong) ofNullable.map((v0) -> {
                return v0.getOffset();
            }).orElse(OptionalLong.empty());
            this.shardReceivers.computeIfAbsent(Long.valueOf(shard3.id()), l -> {
                return this.receiverFactory.newReceiver(shard3.id(), shard3.leader(), this, optionalLong);
            });
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.shardReceivers.values().parallelStream().forEach((v0) -> {
            v0.close();
        });
    }

    public ScheduledExecutorService getExecutor() {
        return this.executor;
    }

    public Counter getCounterNotificationsReceived() {
        return this.counterNotificationsReceived;
    }

    public Counter getCounterNotificationsBatchesReceived() {
        return this.counterNotificationsBatchesReceived;
    }
}
