package io.atomix.protocols.gossip.counter;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AtomicLongMap;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.atomix.primitive.PrimitiveManagementService;
import io.atomix.primitive.protocol.counter.CounterDelegate;
import io.atomix.protocols.gossip.CrdtProtocolConfig;
import io.atomix.utils.serializer.Namespace;
import io.atomix.utils.serializer.Namespaces;
import io.atomix.utils.serializer.Serializer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:io/atomix/protocols/gossip/counter/CrdtCounterDelegate.class */
public class CrdtCounterDelegate implements CounterDelegate {
    private static final Serializer SERIALIZER = Serializer.using(Namespace.builder().register(Namespaces.BASIC).register(new Class[]{MemberId.class}).build());
    private final MemberId localMemberId;
    private final ClusterCommunicationService clusterCommunicator;
    private final ScheduledExecutorService executorService;
    private final String subject;
    private volatile ScheduledFuture<?> broadcastFuture;
    private final AtomicLongMap<MemberId> increments = AtomicLongMap.create();
    private final AtomicLongMap<MemberId> decrements = AtomicLongMap.create();

    public CrdtCounterDelegate(String str, CrdtProtocolConfig crdtProtocolConfig, PrimitiveManagementService primitiveManagementService) {
        this.localMemberId = primitiveManagementService.getMembershipService().getLocalMember().id();
        this.clusterCommunicator = primitiveManagementService.getCommunicationService();
        this.executorService = primitiveManagementService.getExecutorService();
        this.subject = String.format("atomix-crdt-counter-%s", str);
        ClusterCommunicationService clusterCommunicationService = this.clusterCommunicator;
        String str2 = this.subject;
        Serializer serializer = SERIALIZER;
        Objects.requireNonNull(serializer);
        clusterCommunicationService.subscribe(str2, serializer::decode, this::updateCounters, this.executorService);
        this.broadcastFuture = this.executorService.scheduleAtFixedRate(this::broadcastCounters, crdtProtocolConfig.getGossipInterval().toMillis(), crdtProtocolConfig.getGossipInterval().toMillis(), TimeUnit.MILLISECONDS);
    }

    public long get() {
        return this.increments.sum() - this.decrements.sum();
    }

    public long incrementAndGet() {
        return getIncrement(this.increments.incrementAndGet(this.localMemberId));
    }

    public long decrementAndGet() {
        return getDecrement(this.decrements.incrementAndGet(this.localMemberId));
    }

    public long getAndIncrement() {
        return getIncrement(this.increments.getAndIncrement(this.localMemberId));
    }

    public long getAndDecrement() {
        return getDecrement(this.decrements.getAndIncrement(this.localMemberId));
    }

    public long getAndAdd(long j) {
        return getIncrement(this.increments.getAndAdd(this.localMemberId, j));
    }

    public long addAndGet(long j) {
        return getIncrement(this.increments.addAndGet(this.localMemberId, j));
    }

    private long getIncrement(long j) {
        return (this.increments.asMap().entrySet().stream().filter(entry -> {
            return !((MemberId) entry.getKey()).equals(this.localMemberId);
        }).mapToLong(entry2 -> {
            return ((Long) entry2.getValue()).longValue();
        }).sum() + j) - this.decrements.sum();
    }

    private long getDecrement(long j) {
        return this.increments.sum() - (this.decrements.asMap().entrySet().stream().filter(entry -> {
            return !((MemberId) entry.getKey()).equals(this.localMemberId);
        }).mapToLong(entry2 -> {
            return ((Long) entry2.getValue()).longValue();
        }).sum() + j);
    }

    private void updateCounters(List<Map<MemberId, Long>> list) {
        for (Map.Entry<MemberId, Long> entry : list.get(0).entrySet()) {
            this.increments.accumulateAndGet(entry.getKey(), entry.getValue().longValue(), Math::max);
        }
        for (Map.Entry<MemberId, Long> entry2 : list.get(1).entrySet()) {
            this.decrements.accumulateAndGet(entry2.getKey(), entry2.getValue().longValue(), Math::max);
        }
    }

    private void broadcastCounters() {
        ArrayList newArrayList = Lists.newArrayList(new Map[]{Maps.newHashMap(this.increments.asMap()), Maps.newHashMap(this.decrements.asMap())});
        ClusterCommunicationService clusterCommunicationService = this.clusterCommunicator;
        String str = this.subject;
        Serializer serializer = SERIALIZER;
        Objects.requireNonNull(serializer);
        clusterCommunicationService.broadcast(str, newArrayList, (v1) -> {
            return r3.encode(v1);
        });
    }

    public void close() {
        this.broadcastFuture.cancel(false);
        this.clusterCommunicator.unsubscribe(this.subject);
    }
}
