package io.atomix.protocols.gossip.value;

import com.google.common.collect.Sets;
import com.google.common.io.BaseEncoding;
import io.atomix.cluster.messaging.ClusterEventService;
import io.atomix.cluster.messaging.Subscription;
import io.atomix.primitive.PrimitiveManagementService;
import io.atomix.primitive.protocol.value.ValueDelegate;
import io.atomix.primitive.protocol.value.ValueDelegateEvent;
import io.atomix.primitive.protocol.value.ValueDelegateEventListener;
import io.atomix.protocols.gossip.CrdtProtocolConfig;
import io.atomix.protocols.gossip.TimestampProvider;
import io.atomix.utils.serializer.Namespace;
import io.atomix.utils.serializer.Namespaces;
import io.atomix.utils.serializer.Serializer;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/* loaded from: input_file:io/atomix/protocols/gossip/value/CrdtValueDelegate.class */
public class CrdtValueDelegate<V> implements ValueDelegate<V> {
    private static final Serializer SERIALIZER = Serializer.using(Namespace.builder().register(Namespaces.BASIC).register(new Class[]{Value.class}).build());
    private final ClusterEventService clusterEventService;
    private final ScheduledExecutorService executorService;
    private final Serializer valueSerializer;
    private final TimestampProvider<V> timestampProvider;
    private final String subject;
    private volatile CompletableFuture<Subscription> subscribeFuture;
    private volatile ScheduledFuture<?> broadcastFuture;
    private final AtomicReference<Value> currentValue = new AtomicReference<>();
    private final Set<ValueDelegateEventListener<V>> eventListeners = Sets.newCopyOnWriteArraySet();

    public CrdtValueDelegate(String str, Serializer serializer, CrdtProtocolConfig crdtProtocolConfig, PrimitiveManagementService primitiveManagementService) {
        this.clusterEventService = primitiveManagementService.getEventService();
        this.executorService = primitiveManagementService.getExecutorService();
        this.valueSerializer = serializer;
        this.timestampProvider = crdtProtocolConfig.getTimestampProvider();
        this.subject = String.format("atomix-crdt-value-%s", str);
        ClusterEventService clusterEventService = this.clusterEventService;
        String str2 = this.subject;
        Serializer serializer2 = SERIALIZER;
        Objects.requireNonNull(serializer2);
        this.subscribeFuture = clusterEventService.subscribe(str2, serializer2::decode, this::updateValue, this.executorService);
        this.broadcastFuture = this.executorService.scheduleAtFixedRate(this::broadcastValue, crdtProtocolConfig.getGossipInterval().toMillis(), crdtProtocolConfig.getGossipInterval().toMillis(), TimeUnit.MILLISECONDS);
    }

    public V get() {
        Value value = this.currentValue.get();
        if (value != null) {
            return decode(value.value());
        }
        return null;
    }

    public V getAndSet(V v) {
        Value value;
        Value value2 = new Value(encode(v), this.timestampProvider.get(v));
        do {
            value = this.currentValue.get();
            if (!value2.isNewerThan(value)) {
                return v;
            }
        } while (!this.currentValue.compareAndSet(value, value2));
        if (value == null || !Objects.equals(value.value(), value2.value())) {
            this.eventListeners.forEach(valueDelegateEventListener -> {
                valueDelegateEventListener.event(new ValueDelegateEvent(ValueDelegateEvent.Type.UPDATE, v));
            });
        }
        if (value != null) {
            return decode(value.value());
        }
        return null;
    }

    public void set(V v) {
        Value value;
        Value value2 = new Value(encode(v), this.timestampProvider.get(v));
        do {
            value = this.currentValue.get();
            if (!value2.isNewerThan(value)) {
                return;
            }
        } while (!this.currentValue.compareAndSet(value, value2));
        if (value == null || !Objects.equals(value.value(), value2.value())) {
            this.eventListeners.forEach(valueDelegateEventListener -> {
                valueDelegateEventListener.event(new ValueDelegateEvent(ValueDelegateEvent.Type.UPDATE, v));
            });
        }
    }

    public void addListener(ValueDelegateEventListener<V> valueDelegateEventListener) {
        this.eventListeners.add(valueDelegateEventListener);
    }

    public void removeListener(ValueDelegateEventListener<V> valueDelegateEventListener) {
        this.eventListeners.remove(valueDelegateEventListener);
    }

    private String encode(Object obj) {
        return BaseEncoding.base16().encode(this.valueSerializer.encode(obj));
    }

    protected V decode(String str) {
        return (V) this.valueSerializer.decode(BaseEncoding.base16().decode(str));
    }

    private void updateValue(Value value) {
        Value value2;
        do {
            value2 = this.currentValue.get();
            if (!value.isNewerThan(value2)) {
                return;
            }
        } while (!this.currentValue.compareAndSet(value2, value));
        if (value2 == null || !Objects.equals(value2.value(), value.value())) {
            this.eventListeners.forEach(valueDelegateEventListener -> {
                valueDelegateEventListener.event(new ValueDelegateEvent(ValueDelegateEvent.Type.UPDATE, decode(value.value())));
            });
        }
    }

    private void broadcastValue() {
        ClusterEventService clusterEventService = this.clusterEventService;
        String str = this.subject;
        Value value = this.currentValue.get();
        Serializer serializer = SERIALIZER;
        Objects.requireNonNull(serializer);
        clusterEventService.broadcast(str, value, (v1) -> {
            return r3.encode(v1);
        });
    }

    public void close() {
        this.broadcastFuture.cancel(false);
        this.subscribeFuture.thenAccept(subscription -> {
            subscription.close();
        });
    }
}
