package io.streamnative.oxia.client.shard;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import io.opentelemetry.api.common.Attributes;
import io.streamnative.oxia.client.CompositeConsumer;
import io.streamnative.oxia.client.grpc.CustomStatusCode;
import io.streamnative.oxia.client.grpc.OxiaStub;
import io.streamnative.oxia.client.metrics.Counter;
import io.streamnative.oxia.client.metrics.InstrumentProvider;
import io.streamnative.oxia.client.metrics.Unit;
import io.streamnative.oxia.client.util.Backoff;
import io.streamnative.oxia.proto.NamespaceShardsAssignment;
import io.streamnative.oxia.proto.ShardAssignments;
import io.streamnative.oxia.proto.ShardAssignmentsRequest;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/oxia-client-0.4.5.jar:io/streamnative/oxia/client/shard/ShardManager.class */
public class ShardManager implements AutoCloseable, StreamObserver<ShardAssignments> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ShardManager.class);
    private final ScheduledExecutorService executor;
    private final OxiaStub stub;

    @NonNull
    private final ShardAssignmentsContainer assignments;

    @NonNull
    private final CompositeConsumer<ShardAssignmentChanges> callbacks;
    private final Counter shardAssignmentsEvents;
    private final Backoff backoff;
    private volatile boolean closed;
    private final CompletableFuture<Void> initialAssignmentsFuture;

    /* loaded from: input_file:META-INF/bundled-dependencies/oxia-client-0.4.5.jar:io/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChanges.class */
    public static final class ShardAssignmentChanges extends Record {
        private final Set<Shard> added;
        private final Set<Shard> removed;
        private final Set<Shard> reassigned;

        public ShardAssignmentChanges(Set<Shard> set, Set<Shard> set2, Set<Shard> set3) {
            this.added = set;
            this.removed = set2;
            this.reassigned = set3;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, ShardAssignmentChanges.class), ShardAssignmentChanges.class, "added;removed;reassigned", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChanges;->added:Ljava/util/Set;", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChanges;->removed:Ljava/util/Set;", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChanges;->reassigned:Ljava/util/Set;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, ShardAssignmentChanges.class), ShardAssignmentChanges.class, "added;removed;reassigned", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChanges;->added:Ljava/util/Set;", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChanges;->removed:Ljava/util/Set;", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChanges;->reassigned:Ljava/util/Set;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, ShardAssignmentChanges.class, Object.class), ShardAssignmentChanges.class, "added;removed;reassigned", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChanges;->added:Ljava/util/Set;", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChanges;->removed:Ljava/util/Set;", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChanges;->reassigned:Ljava/util/Set;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public Set<Shard> added() {
            return this.added;
        }

        public Set<Shard> removed() {
            return this.removed;
        }

        public Set<Shard> reassigned() {
            return this.reassigned;
        }
    }

    @VisibleForTesting
    ShardManager(@NonNull ScheduledExecutorService scheduledExecutorService, @NonNull OxiaStub oxiaStub, @NonNull ShardAssignmentsContainer shardAssignmentsContainer, @NonNull CompositeConsumer<ShardAssignmentChanges> compositeConsumer, @NonNull InstrumentProvider instrumentProvider) {
        this.backoff = new Backoff();
        this.initialAssignmentsFuture = new CompletableFuture<>();
        if (scheduledExecutorService == null) {
            throw new NullPointerException("executor is marked non-null but is null");
        }
        if (oxiaStub == null) {
            throw new NullPointerException("stub is marked non-null but is null");
        }
        if (shardAssignmentsContainer == null) {
            throw new NullPointerException("assignments is marked non-null but is null");
        }
        if (compositeConsumer == null) {
            throw new NullPointerException("callbacks is marked non-null but is null");
        }
        if (instrumentProvider == null) {
            throw new NullPointerException("instrumentProvider is marked non-null but is null");
        }
        this.stub = oxiaStub;
        this.executor = scheduledExecutorService;
        this.assignments = shardAssignmentsContainer;
        this.callbacks = compositeConsumer;
        this.shardAssignmentsEvents = instrumentProvider.newCounter("oxia.client.shard.assignments.count", Unit.None, "The total count of received shard assignment events", Attributes.empty());
    }

    public ShardManager(ScheduledExecutorService scheduledExecutorService, @NonNull OxiaStub oxiaStub, @NonNull InstrumentProvider instrumentProvider, @NonNull String str) {
        this(scheduledExecutorService, oxiaStub, new ShardAssignmentsContainer(HashRangeShardStrategy.Xxh332HashRangeShardStrategy, str), new CompositeConsumer(), instrumentProvider);
        if (oxiaStub == null) {
            throw new NullPointerException("stub is marked non-null but is null");
        }
        if (instrumentProvider == null) {
            throw new NullPointerException("instrumentProvider is marked non-null but is null");
        }
        if (str == null) {
            throw new NullPointerException("namespace is marked non-null but is null");
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.closed = true;
    }

    public CompletableFuture<Void> start() {
        this.stub.async().getShardAssignments(ShardAssignmentsRequest.newBuilder().setNamespace(this.assignments.getNamespace()).build(), this);
        return this.initialAssignmentsFuture;
    }

    @Override // io.grpc.stub.StreamObserver
    public void onNext(ShardAssignments shardAssignments) {
        this.shardAssignmentsEvents.increment();
        updateAssignments(shardAssignments);
        this.backoff.reset();
        if (this.initialAssignmentsFuture.isDone()) {
            return;
        }
        this.initialAssignmentsFuture.complete(null);
    }

    @Override // io.grpc.stub.StreamObserver
    public void onError(Throwable th) {
        String description;
        if (this.closed) {
            return;
        }
        if (th instanceof StatusRuntimeException) {
            Status status = ((StatusRuntimeException) th).getStatus();
            if (status.getCode() == Status.Code.UNKNOWN && (description = status.getDescription()) != null && CustomStatusCode.fromDescription(description) == CustomStatusCode.ErrorNamespaceNotFound) {
                log.error("Namespace not found: {}", this.assignments.getNamespace());
                if (!this.initialAssignmentsFuture.isDone() && this.initialAssignmentsFuture.completeExceptionally(new NamespaceNotFoundException(this.assignments.getNamespace()))) {
                    close();
                }
            }
        }
        log.warn("Failed receiving shard assignments.", Throwables.getRootCause(th));
        this.executor.schedule(() -> {
            if (this.closed) {
                return;
            }
            log.info("Retry creating stream for shard assignments namespace={}", this.assignments.getNamespace());
            start();
        }, this.backoff.nextDelayMillis(), TimeUnit.MILLISECONDS);
    }

    @Override // io.grpc.stub.StreamObserver
    public void onCompleted() {
        if (this.closed) {
            return;
        }
        log.warn("Stream closed while receiving shard assignments");
        this.executor.schedule(() -> {
            if (this.closed) {
                return;
            }
            log.info("Retry creating stream for shard assignments after stream closed namespace={}", this.assignments.getNamespace());
            start();
        }, this.backoff.nextDelayMillis(), TimeUnit.MILLISECONDS);
    }

    private void updateAssignments(ShardAssignments shardAssignments) {
        NamespaceShardsAssignment namespaceShardsAssignment = shardAssignments.getNamespacesMap().get(this.assignments.getNamespace());
        if (namespaceShardsAssignment == null) {
            throw new NamespaceNotFoundException(this.assignments.getNamespace(), true);
        }
        ShardAssignmentChanges computeShardLeaderChanges = computeShardLeaderChanges(this.assignments.allShards(), recomputeShardHashBoundaries(this.assignments.allShards(), (Set) namespaceShardsAssignment.getAssignmentsList().stream().map(Shard::fromProto).collect(Collectors.toSet())));
        this.assignments.update(computeShardLeaderChanges);
        this.callbacks.accept(computeShardLeaderChanges);
    }

    @VisibleForTesting
    static Map<Long, Shard> recomputeShardHashBoundaries(Map<Long, Shard> map, Set<Shard> set) {
        ArrayList arrayList = new ArrayList();
        set.forEach(shard -> {
            shard.findOverlapping(map.values()).forEach(shard -> {
                log.info("Deleting shard {} as it overlaps with {}", shard, shard);
                arrayList.add(Long.valueOf(shard.id()));
            });
        });
        return Collections.unmodifiableMap((Map) Stream.concat(map.entrySet().stream().filter(entry -> {
            return !arrayList.contains(entry.getKey());
        }).map((v0) -> {
            return v0.getValue();
        }), set.stream()).collect(Collectors.toMap((v0) -> {
            return v0.id();
        }, Function.identity(), (shard2, shard3) -> {
            return shard3;
        })));
    }

    @VisibleForTesting
    static ShardAssignmentChanges computeShardLeaderChanges(Map<Long, Shard> map, Map<Long, Shard> map2) {
        Set set = (Set) map.values().stream().filter(shard -> {
            return !map2.containsKey(Long.valueOf(shard.id()));
        }).collect(Collectors.toSet());
        return new ShardAssignmentChanges(Collections.unmodifiableSet((Set) map2.values().stream().filter(shard2 -> {
            return !map.containsKey(Long.valueOf(shard2.id()));
        }).collect(Collectors.toSet())), Collections.unmodifiableSet(set), Collections.unmodifiableSet((Set) map.values().stream().filter(shard3 -> {
            return map2.containsKey(Long.valueOf(shard3.id()));
        }).filter(shard4 -> {
            return !((Shard) map2.get(Long.valueOf(shard4.id()))).leader().equals(shard4.leader());
        }).map(shard5 -> {
            return (Shard) map2.get(Long.valueOf(shard5.id()));
        }).collect(Collectors.toSet())));
    }

    public long getShardForKey(String str) {
        return this.assignments.getShardForKey(str);
    }

    public Collection<Shard> allShards() {
        return this.assignments.allShards().values();
    }

    public Set<Long> allShardIds() {
        return this.assignments.allShardIds();
    }

    public String leader(long j) {
        return this.assignments.leader(j);
    }

    public void addCallback(@NonNull Consumer<ShardAssignmentChanges> consumer) {
        if (consumer == null) {
            throw new NullPointerException("callback is marked non-null but is null");
        }
        this.callbacks.add(consumer);
    }

    private boolean isErrorRetryable(@NonNull Throwable th) {
        if (th == null) {
            throw new NullPointerException("ex is marked non-null but is null");
        }
        if (th instanceof NamespaceNotFoundException) {
            return ((NamespaceNotFoundException) th).isRetryable();
        }
        return true;
    }
}
