package io.atomix.protocols.gossip.map;

import com.google.common.base.Preconditions;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.BaseEncoding;
import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.atomix.primitive.PrimitiveManagementService;
import io.atomix.primitive.protocol.map.MapDelegate;
import io.atomix.primitive.protocol.map.MapDelegateEvent;
import io.atomix.primitive.protocol.map.MapDelegateEventListener;
import io.atomix.protocols.gossip.AntiEntropyProtocolConfig;
import io.atomix.protocols.gossip.PeerSelector;
import io.atomix.protocols.gossip.TimestampProvider;
import io.atomix.protocols.gossip.map.MapValue;
import io.atomix.utils.concurrent.AbstractAccumulator;
import io.atomix.utils.concurrent.Threads;
import io.atomix.utils.misc.SlidingWindowCounter;
import io.atomix.utils.serializer.Namespace;
import io.atomix.utils.serializer.Namespaces;
import io.atomix.utils.serializer.Serializer;
import io.atomix.utils.time.LogicalTimestamp;
import io.atomix.utils.time.Timestamp;
import io.atomix.utils.time.WallClockTimestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.Timer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/atomix/protocols/gossip/map/AntiEntropyMapDelegate.class */
public class AntiEntropyMapDelegate<K, V> implements MapDelegate<K, V> {
    private static final String ERROR_DESTROYED = " map is already destroyed";
    private static final String ERROR_NULL_KEY = "Key cannot be null";
    private static final String ERROR_NULL_VALUE = "Null values are not allowed";
    private static final int WINDOW_SIZE = 5;
    private static final int HIGH_LOAD_THRESHOLD = 2;
    private static final int LOAD_WINDOW = 2;
    private final ClusterCommunicationService clusterCommunicator;
    private final ClusterMembershipService membershipService;
    private final Serializer entrySerializer;
    private final Serializer serializer;
    private final TimestampProvider<Map.Entry<K, V>> timestampProvider;
    private final String bootstrapMessageSubject;
    private final String initializeMessageSubject;
    private final String updateMessageSubject;
    private final String antiEntropyAdvertisementSubject;
    private final String updateRequestSubject;
    private final ExecutorService executor;
    private final ScheduledExecutorService backgroundExecutor;
    private final PeerSelector<Map.Entry<K, V>> peerUpdateFunction;
    private final ExecutorService communicationExecutor;
    private final String mapName;
    private final String destroyedMessage;
    private final boolean tombstonesDisabled;
    private final Supplier<List<MemberId>> peersSupplier;
    private final Supplier<List<MemberId>> bootstrapPeersSupplier;
    private final MemberId localMemberId;
    private long previousTombstonePurgeTime;
    private static final int DEFAULT_MAX_EVENTS = 1000;
    private static final int DEFAULT_MAX_IDLE_MS = 10;
    private static final int DEFAULT_MAX_BATCH_MS = 50;
    private static final Logger log = LoggerFactory.getLogger(AntiEntropyMapDelegate.class);
    private static final Timer TIMER = new Timer("atomix-anti-entropy-map-sender-events");
    private final Set<MapDelegateEventListener<K, V>> listeners = Sets.newCopyOnWriteArraySet();
    private final Map<MemberId, Long> antiEntropyTimes = Maps.newConcurrentMap();
    private final long initialDelaySec = 5;
    private volatile boolean closed = false;
    private SlidingWindowCounter counter = new SlidingWindowCounter(5);
    private final Map<String, MapValue> items = Maps.newConcurrentMap();
    private final Map<MemberId, AntiEntropyMapDelegate<K, V>.EventAccumulator> senderPending = Maps.newConcurrentMap();

    /* loaded from: input_file:io/atomix/protocols/gossip/map/AntiEntropyMapDelegate$EventAccumulator.class */
    private final class EventAccumulator extends AbstractAccumulator<UpdateEntry> {
        private final MemberId peer;

        private EventAccumulator(MemberId memberId) {
            super(AntiEntropyMapDelegate.TIMER, 1000, 50, 10);
            this.peer = memberId;
        }

        @Override // io.atomix.utils.concurrent.Accumulator
        public void processItems(List<UpdateEntry> list) {
            HashMap newHashMap = Maps.newHashMap();
            list.forEach(updateEntry -> {
            });
            AntiEntropyMapDelegate.this.communicationExecutor.execute(() -> {
                try {
                    ClusterCommunicationService clusterCommunicationService = AntiEntropyMapDelegate.this.clusterCommunicator;
                    String str = AntiEntropyMapDelegate.this.updateMessageSubject;
                    ImmutableList copyOf = ImmutableList.copyOf((Collection) newHashMap.values());
                    Serializer serializer = AntiEntropyMapDelegate.this.serializer;
                    serializer.getClass();
                    clusterCommunicationService.unicast(str, copyOf, (v1) -> {
                        return r3.encode(v1);
                    }, this.peer).whenComplete((r6, th) -> {
                        if (th != null) {
                            AntiEntropyMapDelegate.log.debug("Failed to send to {}", this.peer, th);
                        }
                    });
                } catch (Exception e) {
                    AntiEntropyMapDelegate.log.warn("Failed to send to {}", this.peer, e);
                }
            });
        }
    }

    public AntiEntropyMapDelegate(String str, Serializer serializer, AntiEntropyProtocolConfig antiEntropyProtocolConfig, PrimitiveManagementService primitiveManagementService) {
        this.localMemberId = primitiveManagementService.getMembershipService().getLocalMember().id();
        this.mapName = str;
        this.entrySerializer = serializer;
        this.serializer = Serializer.using(Namespace.builder().nextId(600).register(Namespaces.BASIC).register(LogicalTimestamp.class).register(WallClockTimestamp.class).register(AntiEntropyAdvertisement.class).register(AntiEntropyResponse.class).register(UpdateEntry.class).register(MapValue.class).register(MapValue.Digest.class).register(UpdateRequest.class).register(MemberId.class).build(str + "-anti-entropy-map"));
        this.destroyedMessage = this.mapName + ERROR_DESTROYED;
        this.clusterCommunicator = primitiveManagementService.getCommunicationService();
        this.membershipService = primitiveManagementService.getMembershipService();
        this.timestampProvider = antiEntropyProtocolConfig.getTimestampProvider();
        List list = antiEntropyProtocolConfig.getPeers() != null ? (List) antiEntropyProtocolConfig.getPeers().stream().map(MemberId::from).collect(Collectors.toList()) : null;
        this.peersSupplier = () -> {
            return list != null ? list : (List) primitiveManagementService.getMembershipService().getMembers().stream().map((v0) -> {
                return v0.id();
            }).sorted().collect(Collectors.toList());
        };
        this.bootstrapPeersSupplier = this.peersSupplier;
        PeerSelector peerSelector = antiEntropyProtocolConfig.getPeerSelector();
        this.peerUpdateFunction = (entry, clusterMembershipService) -> {
            Collection<MemberId> select = peerSelector.select(entry, clusterMembershipService);
            Stream<MemberId> stream = this.peersSupplier.get().stream();
            select.getClass();
            return (Collection) stream.filter((v1) -> {
                return r1.contains(v1);
            }).collect(Collectors.toList());
        };
        this.executor = Executors.newFixedThreadPool(8, Threads.namedThreads("atomix-anti-entropy-map-" + this.mapName + "-fg-%d", log));
        this.communicationExecutor = Executors.newFixedThreadPool(8, Threads.namedThreads("atomix-anti-entropy-map-" + this.mapName + "-publish-%d", log));
        this.backgroundExecutor = Executors.newSingleThreadScheduledExecutor(Threads.namedThreads("atomix-anti-entropy-map-" + this.mapName + "-bg-%d", log));
        this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement, 5L, antiEntropyProtocolConfig.getAntiEntropyInterval().toMillis(), TimeUnit.MILLISECONDS);
        this.bootstrapMessageSubject = "atomix-gossip-map-" + this.mapName + "-bootstrap";
        ClusterCommunicationService clusterCommunicationService = this.clusterCommunicator;
        String str2 = this.bootstrapMessageSubject;
        Serializer serializer2 = this.serializer;
        serializer2.getClass();
        Function function = serializer2::decode;
        Function function2 = this::handleBootstrap;
        Serializer serializer3 = this.serializer;
        serializer3.getClass();
        clusterCommunicationService.subscribe(str2, function, function2, (v1) -> {
            return r4.encode(v1);
        });
        this.initializeMessageSubject = "atomix-gossip-map-" + this.mapName + "-initialize";
        ClusterCommunicationService clusterCommunicationService2 = this.clusterCommunicator;
        String str3 = this.initializeMessageSubject;
        Serializer serializer4 = this.serializer;
        serializer4.getClass();
        Function function3 = serializer4::decode;
        Function function4 = collection -> {
            processUpdates(collection);
            return null;
        };
        Serializer serializer5 = this.serializer;
        serializer5.getClass();
        clusterCommunicationService2.subscribe(str3, function3, function4, (v1) -> {
            return r4.encode(v1);
        }, this.executor);
        this.updateMessageSubject = "atomix-gossip-map-" + this.mapName + "-update";
        ClusterCommunicationService clusterCommunicationService3 = this.clusterCommunicator;
        String str4 = this.updateMessageSubject;
        Serializer serializer6 = this.serializer;
        serializer6.getClass();
        clusterCommunicationService3.subscribe(str4, serializer6::decode, this::processUpdates, this.executor);
        this.antiEntropyAdvertisementSubject = "atomix-gossip-map-" + this.mapName + "-anti-entropy";
        ClusterCommunicationService clusterCommunicationService4 = this.clusterCommunicator;
        String str5 = this.antiEntropyAdvertisementSubject;
        Serializer serializer7 = this.serializer;
        serializer7.getClass();
        Function function5 = serializer7::decode;
        Function function6 = this::handleAntiEntropyAdvertisement;
        Serializer serializer8 = this.serializer;
        serializer8.getClass();
        clusterCommunicationService4.subscribe(str5, function5, function6, (v1) -> {
            return r4.encode(v1);
        }, this.backgroundExecutor);
        this.updateRequestSubject = "atomix-gossip-map-" + this.mapName + "-update-request";
        ClusterCommunicationService clusterCommunicationService5 = this.clusterCommunicator;
        String str6 = this.updateRequestSubject;
        Serializer serializer9 = this.serializer;
        serializer9.getClass();
        clusterCommunicationService5.subscribe(str6, serializer9::decode, this::handleUpdateRequests, this.backgroundExecutor);
        if (!antiEntropyProtocolConfig.isTombstonesDisabled()) {
            this.previousTombstonePurgeTime = 0L;
            this.backgroundExecutor.scheduleWithFixedDelay(this::purgeTombstones, 5L, antiEntropyProtocolConfig.getAntiEntropyInterval().toMillis(), TimeUnit.MILLISECONDS);
        }
        this.tombstonesDisabled = antiEntropyProtocolConfig.isTombstonesDisabled();
        bootstrap();
    }

    private String encodeKey(Object obj) {
        return BaseEncoding.base16().encode(this.entrySerializer.encode(obj));
    }

    private byte[] encodeValue(Object obj) {
        if (obj != null) {
            return this.entrySerializer.encode(obj);
        }
        return null;
    }

    private K decodeKey(String str) {
        return (K) this.entrySerializer.decode(BaseEncoding.base16().decode(str));
    }

    private V decodeValue(byte[] bArr) {
        if (bArr != null) {
            return (V) this.entrySerializer.decode(bArr);
        }
        return null;
    }

    @Override // java.util.Map
    public int size() {
        Preconditions.checkState(!this.closed, this.destroyedMessage);
        return Maps.filterValues(this.items, (v0) -> {
            return v0.isAlive();
        }).size();
    }

    @Override // java.util.Map
    public boolean isEmpty() {
        Preconditions.checkState(!this.closed, this.destroyedMessage);
        return size() == 0;
    }

    @Override // java.util.Map
    public boolean containsKey(Object obj) {
        Preconditions.checkState(!this.closed, this.destroyedMessage);
        Preconditions.checkNotNull(obj, ERROR_NULL_KEY);
        return get(obj) != null;
    }

    @Override // java.util.Map
    public boolean containsValue(Object obj) {
        Preconditions.checkState(!this.closed, this.destroyedMessage);
        Preconditions.checkNotNull(obj, ERROR_NULL_VALUE);
        return this.items.values().stream().filter((v0) -> {
            return v0.isAlive();
        }).anyMatch(mapValue -> {
            return Arrays.equals(encodeValue(obj), mapValue.get());
        });
    }

    @Override // java.util.Map
    public V get(Object obj) {
        Preconditions.checkState(!this.closed, this.destroyedMessage);
        Preconditions.checkNotNull(obj, ERROR_NULL_KEY);
        MapValue mapValue = this.items.get(encodeKey(obj));
        if (mapValue == null || mapValue.isTombstone()) {
            return null;
        }
        return (V) mapValue.get(this::decodeValue);
    }

    @Override // java.util.Map
    public V put(K k, V v) {
        Preconditions.checkState(!this.closed, this.destroyedMessage);
        Preconditions.checkNotNull(k, ERROR_NULL_KEY);
        Preconditions.checkNotNull(v, ERROR_NULL_VALUE);
        String encodeKey = encodeKey(k);
        MapValue mapValue = new MapValue(encodeValue(v), this.timestampProvider.get(Maps.immutableEntry(k, v)));
        this.counter.incrementCount();
        AtomicReference atomicReference = new AtomicReference();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.items.compute(encodeKey, (str, mapValue2) -> {
            if (mapValue2 != null && !mapValue.isNewerThan(mapValue2)) {
                return mapValue2;
            }
            atomicBoolean.set(true);
            atomicReference.set(mapValue2 != null ? mapValue2.get() : null);
            return mapValue;
        });
        if (!atomicBoolean.get()) {
            return v;
        }
        notifyPeers(new UpdateEntry(encodeKey, mapValue), this.peerUpdateFunction.select(Maps.immutableEntry(k, v), this.membershipService));
        if (atomicReference.get() == null) {
            notifyListeners(new MapDelegateEvent<>(MapDelegateEvent.Type.INSERT, k, v));
        } else {
            notifyListeners(new MapDelegateEvent<>(MapDelegateEvent.Type.UPDATE, k, v));
        }
        return decodeValue((byte[]) atomicReference.get());
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // java.util.Map
    public V remove(Object obj) {
        Preconditions.checkState(!this.closed, this.destroyedMessage);
        Preconditions.checkNotNull(obj, ERROR_NULL_KEY);
        return removeAndNotify(obj, null);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // java.util.Map
    public boolean remove(Object obj, Object obj2) {
        Preconditions.checkState(!this.closed, this.destroyedMessage);
        Preconditions.checkNotNull(obj, ERROR_NULL_KEY);
        Preconditions.checkNotNull(obj2, ERROR_NULL_VALUE);
        return removeAndNotify(obj, obj2) != null;
    }

    private V removeAndNotify(K k, V v) {
        String encodeKey = encodeKey(k);
        byte[] encodeValue = encodeValue(v);
        Timestamp timestamp = this.timestampProvider.get(Maps.immutableEntry(k, v));
        Optional<MapValue> empty = (this.tombstonesDisabled || timestamp == null) ? Optional.empty() : Optional.of(MapValue.tombstone(timestamp));
        MapValue removeInternal = removeInternal(encodeKey, Optional.ofNullable(encodeValue), empty);
        Object obj = null;
        if (removeInternal != null) {
            obj = removeInternal.get(this::decodeValue);
            notifyPeers(new UpdateEntry(encodeKey, empty.orElse(null)), this.peerUpdateFunction.select(Maps.immutableEntry(k, obj), this.membershipService));
            if (removeInternal.isAlive()) {
                notifyListeners(new MapDelegateEvent<>(MapDelegateEvent.Type.REMOVE, k, obj));
            }
        }
        return (V) obj;
    }

    private MapValue removeInternal(String str, Optional<byte[]> optional, Optional<MapValue> optional2) {
        Preconditions.checkState(!this.closed, this.destroyedMessage);
        Preconditions.checkNotNull(str, ERROR_NULL_KEY);
        Preconditions.checkNotNull(optional, ERROR_NULL_VALUE);
        optional2.ifPresent(mapValue -> {
            Preconditions.checkState(mapValue.isTombstone());
        });
        this.counter.incrementCount();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicReference atomicReference = new AtomicReference();
        this.items.compute(str, (str2, mapValue2) -> {
            boolean z = true;
            if (optional.isPresent() && mapValue2 != null && mapValue2.isAlive()) {
                z = Arrays.equals((byte[]) optional.get(), mapValue2.get());
            }
            if (mapValue2 == null) {
                log.trace("ECMap Remove: Existing value for key {} is already null", str2);
            }
            if (z) {
                if (mapValue2 == null) {
                    atomicBoolean.set(optional2.isPresent());
                } else {
                    atomicBoolean.set(!optional2.isPresent() || ((MapValue) optional2.get()).isNewerThan(mapValue2));
                }
            }
            if (!atomicBoolean.get()) {
                return mapValue2;
            }
            atomicReference.set(mapValue2);
            return (MapValue) optional2.orElse(null);
        });
        return (MapValue) atomicReference.get();
    }

    @Override // java.util.Map
    public V compute(K k, BiFunction<? super K, ? super V, ? extends V> biFunction) {
        Preconditions.checkState(!this.closed, this.destroyedMessage);
        Preconditions.checkNotNull(k, ERROR_NULL_KEY);
        Preconditions.checkNotNull(biFunction, "Recompute function cannot be null");
        String encodeKey = encodeKey(k);
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        MapValue compute = this.items.compute(encodeKey, (str, mapValue) -> {
            atomicReference2.set(mapValue);
            Object apply = biFunction.apply(k, mapValue == null ? null : mapValue.get(this::decodeValue));
            byte[] encodeValue = encodeValue(apply);
            if (mapValue != null && Arrays.equals(encodeValue, mapValue.get())) {
                return mapValue;
            }
            MapValue mapValue = new MapValue(encodeValue, this.timestampProvider.get(Maps.immutableEntry(k, apply)));
            if (mapValue == null) {
                atomicReference.set(MapDelegateEvent.Type.INSERT);
                return mapValue;
            }
            if (!mapValue.isNewerThan(mapValue)) {
                return mapValue;
            }
            atomicReference.set(MapDelegateEvent.Type.UPDATE);
            return mapValue;
        });
        if (atomicReference.get() == null) {
            return (V) compute.get(this::decodeValue);
        }
        notifyPeers(new UpdateEntry(encodeKey, compute), this.peerUpdateFunction.select(Maps.immutableEntry(k, compute.get(this::decodeValue)), this.membershipService));
        MapDelegateEvent.Type type = compute.isTombstone() ? MapDelegateEvent.Type.REMOVE : (MapDelegateEvent.Type) atomicReference.get();
        V v = (V) (compute.isTombstone() ? atomicReference2.get() == null ? null : ((MapValue) atomicReference2.get()).get(this::decodeValue) : compute.get(this::decodeValue));
        if (v != null) {
            notifyListeners(new MapDelegateEvent<>(type, k, v));
        }
        return v;
    }

    @Override // java.util.Map
    public void putAll(Map<? extends K, ? extends V> map) {
        Preconditions.checkState(!this.closed, this.destroyedMessage);
        map.forEach(this::put);
    }

    @Override // java.util.Map
    public void clear() {
        Preconditions.checkState(!this.closed, this.destroyedMessage);
        Maps.filterValues(this.items, (v0) -> {
            return v0.isAlive();
        }).forEach((str, mapValue) -> {
            remove(decodeKey(str));
        });
    }

    @Override // java.util.Map
    public Set<K> keySet() {
        Preconditions.checkState(!this.closed, this.destroyedMessage);
        return (Set) Maps.filterValues(this.items, (v0) -> {
            return v0.isAlive();
        }).keySet().stream().map(this::decodeKey).collect(Collectors.toSet());
    }

    @Override // java.util.Map
    public Collection<V> values() {
        Preconditions.checkState(!this.closed, this.destroyedMessage);
        return Collections2.transform(Maps.filterValues(this.items, (v0) -> {
            return v0.isAlive();
        }).values(), mapValue -> {
            return mapValue.get(this::decodeValue);
        });
    }

    @Override // java.util.Map
    public Set<Map.Entry<K, V>> entrySet() {
        Preconditions.checkState(!this.closed, this.destroyedMessage);
        return (Set) Maps.filterValues(this.items, (v0) -> {
            return v0.isAlive();
        }).entrySet().stream().map(entry -> {
            return Pair.of(decodeKey((String) entry.getKey()), decodeValue(((MapValue) entry.getValue()).get()));
        }).collect(Collectors.toSet());
    }

    @Override // io.atomix.primitive.protocol.map.MapDelegate
    public void addListener(MapDelegateEventListener<K, V> mapDelegateEventListener) {
        Preconditions.checkState(!this.closed, this.destroyedMessage);
        this.listeners.add(Preconditions.checkNotNull(mapDelegateEventListener));
        this.items.forEach((str, mapValue) -> {
            if (mapValue.isAlive()) {
                mapDelegateEventListener.event(new MapDelegateEvent(MapDelegateEvent.Type.INSERT, decodeKey(str), mapValue.get(this::decodeValue)));
            }
        });
    }

    @Override // io.atomix.primitive.protocol.map.MapDelegate
    public void removeListener(MapDelegateEventListener<K, V> mapDelegateEventListener) {
        Preconditions.checkState(!this.closed, this.destroyedMessage);
        this.listeners.remove(Preconditions.checkNotNull(mapDelegateEventListener));
    }

    @Override // io.atomix.primitive.protocol.map.MapDelegate
    public void close() {
        this.closed = true;
        this.executor.shutdown();
        this.backgroundExecutor.shutdown();
        this.communicationExecutor.shutdown();
        this.listeners.clear();
        this.clusterCommunicator.unsubscribe(this.bootstrapMessageSubject);
        this.clusterCommunicator.unsubscribe(this.initializeMessageSubject);
        this.clusterCommunicator.unsubscribe(this.updateMessageSubject);
        this.clusterCommunicator.unsubscribe(this.updateRequestSubject);
        this.clusterCommunicator.unsubscribe(this.antiEntropyAdvertisementSubject);
    }

    private void notifyListeners(MapDelegateEvent<K, V> mapDelegateEvent) {
        this.listeners.forEach(mapDelegateEventListener -> {
            mapDelegateEventListener.event(mapDelegateEvent);
        });
    }

    private void notifyPeers(UpdateEntry updateEntry, Collection<MemberId> collection) {
        queueUpdate(updateEntry, collection);
    }

    private void queueUpdate(UpdateEntry updateEntry, Collection<MemberId> collection) {
        if (collection == null) {
            return;
        }
        collection.forEach(memberId -> {
            this.senderPending.computeIfAbsent(memberId, memberId -> {
                return new EventAccumulator(memberId);
            }).add(updateEntry);
        });
    }

    private boolean underHighLoad() {
        return this.counter.get(2) > 2;
    }

    private void sendAdvertisement() {
        try {
            if (underHighLoad() || this.closed) {
                return;
            }
            pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
        } catch (Exception e) {
            log.error("Exception thrown while sending advertisement", (Throwable) e);
        }
    }

    private Optional<MemberId> pickRandomActivePeer() {
        List<MemberId> list = this.peersSupplier.get();
        Collections.shuffle(list);
        return list.stream().findFirst();
    }

    private void sendAdvertisementToPeer(MemberId memberId) {
        long currentTimeMillis = System.currentTimeMillis();
        AntiEntropyAdvertisement createAdvertisement = createAdvertisement();
        ClusterCommunicationService clusterCommunicationService = this.clusterCommunicator;
        String str = this.antiEntropyAdvertisementSubject;
        Serializer serializer = this.serializer;
        serializer.getClass();
        Function function = (v1) -> {
            return r3.encode(v1);
        };
        Serializer serializer2 = this.serializer;
        serializer2.getClass();
        clusterCommunicationService.send(str, createAdvertisement, function, serializer2::decode, memberId).whenComplete((obj, th) -> {
            if (th != null) {
                log.debug("Failed to send anti-entropy advertisement to {}: {}", memberId, th.getMessage());
            } else if (obj == AntiEntropyResponse.PROCESSED) {
                this.antiEntropyTimes.put(memberId, Long.valueOf(currentTimeMillis));
            }
        });
    }

    private void sendUpdateRequestToPeer(MemberId memberId, Set<String> set) {
        UpdateRequest updateRequest = new UpdateRequest(this.localMemberId, set);
        ClusterCommunicationService clusterCommunicationService = this.clusterCommunicator;
        String str = this.updateRequestSubject;
        Serializer serializer = this.serializer;
        serializer.getClass();
        clusterCommunicationService.unicast(str, updateRequest, (v1) -> {
            return r3.encode(v1);
        }, memberId).whenComplete((r6, th) -> {
            if (th != null) {
                log.debug("Failed to send update request to {}: {}", memberId, th.getMessage());
            }
        });
    }

    private AntiEntropyAdvertisement createAdvertisement() {
        return new AntiEntropyAdvertisement(this.localMemberId, ImmutableMap.copyOf(Maps.transformValues(this.items, (v0) -> {
            return v0.digest();
        })));
    }

    private AntiEntropyResponse handleAntiEntropyAdvertisement(AntiEntropyAdvertisement antiEntropyAdvertisement) {
        if (this.closed || underHighLoad()) {
            return AntiEntropyResponse.IGNORED;
        }
        try {
            if (log.isTraceEnabled()) {
                log.trace("Received anti-entropy advertisement from {} for {} with {} entries in it", antiEntropyAdvertisement.sender(), this.mapName, Integer.valueOf(antiEntropyAdvertisement.digest().size()));
            }
            antiEntropyCheckLocalItems(antiEntropyAdvertisement).forEach(this::notifyListeners);
            return AntiEntropyResponse.PROCESSED;
        } catch (Exception e) {
            log.warn("Error handling anti-entropy advertisement", (Throwable) e);
            return AntiEntropyResponse.FAILED;
        }
    }

    private List<MapDelegateEvent<K, V>> antiEntropyCheckLocalItems(AntiEntropyAdvertisement antiEntropyAdvertisement) {
        LinkedList newLinkedList = Lists.newLinkedList();
        MemberId sender = antiEntropyAdvertisement.sender();
        ImmutableList of = ImmutableList.of(sender);
        Set<String> hashSet = new HashSet<>();
        HashSet hashSet2 = new HashSet(antiEntropyAdvertisement.digest().keySet());
        this.items.forEach((str, mapValue) -> {
            hashSet2.remove(str);
            MapValue.Digest digest = antiEntropyAdvertisement.digest().get(str);
            if (digest == null || mapValue.isNewerThan(digest.timestamp())) {
                queueUpdate(new UpdateEntry(str, mapValue), of);
                return;
            }
            if (!digest.isNewerThan(mapValue.digest()) || !digest.isTombstone()) {
                if (digest.isNewerThan(mapValue.digest())) {
                    hashSet.add(str);
                }
            } else {
                MapValue removeInternal = removeInternal(str, Optional.empty(), Optional.of(MapValue.tombstone(digest.timestamp())));
                if (removeInternal == null || !removeInternal.isAlive()) {
                    return;
                }
                newLinkedList.add(new MapDelegateEvent(MapDelegateEvent.Type.REMOVE, decodeKey(str), removeInternal.get(this::decodeValue)));
            }
        });
        hashSet.addAll(hashSet2);
        sendUpdateRequestToPeer(sender, hashSet);
        return newLinkedList;
    }

    private void handleUpdateRequests(UpdateRequest<String> updateRequest) {
        Set<String> keys = updateRequest.keys();
        ImmutableList of = ImmutableList.of(updateRequest.sender());
        keys.forEach(str -> {
            queueUpdate(new UpdateEntry(str, this.items.get(str)), of);
        });
    }

    private void purgeTombstones() {
        long longValue = ((Long) this.peersSupplier.get().stream().map(memberId -> {
            return this.antiEntropyTimes.getOrDefault(memberId, 0L);
        }).reduce((v0, v1) -> {
            return Math.min(v0, v1);
        }).orElse(0L)).longValue();
        if (longValue == this.previousTombstonePurgeTime) {
            return;
        }
        List list = (List) this.items.entrySet().stream().filter(entry -> {
            return ((MapValue) entry.getValue()).isTombstone();
        }).filter(entry2 -> {
            return ((MapValue) entry2.getValue()).creationTime() <= longValue;
        }).collect(Collectors.toList());
        this.previousTombstonePurgeTime = longValue;
        list.forEach(entry3 -> {
            this.items.remove(entry3.getKey(), entry3.getValue());
        });
    }

    private void processUpdates(Collection<UpdateEntry> collection) {
        if (this.closed) {
            return;
        }
        collection.forEach(updateEntry -> {
            String key = updateEntry.key();
            MapValue copy = updateEntry.value() == null ? null : updateEntry.value().copy();
            if (copy == null || copy.isTombstone()) {
                MapValue removeInternal = removeInternal(key, Optional.empty(), Optional.ofNullable(copy));
                if (removeInternal == null || !removeInternal.isAlive()) {
                    return;
                }
                notifyListeners(new MapDelegateEvent<>(MapDelegateEvent.Type.REMOVE, decodeKey(key), removeInternal.get(this::decodeValue)));
                return;
            }
            this.counter.incrementCount();
            AtomicReference atomicReference = new AtomicReference();
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            this.items.compute(key, (str, mapValue) -> {
                if (mapValue != null && !copy.isNewerThan(mapValue)) {
                    return mapValue;
                }
                atomicBoolean.set(true);
                atomicReference.set(mapValue != null ? mapValue.get() : null);
                return copy;
            });
            if (atomicBoolean.get()) {
                if (atomicReference.get() == null) {
                    notifyListeners(new MapDelegateEvent<>(MapDelegateEvent.Type.INSERT, decodeKey(key), decodeValue(copy.get())));
                } else {
                    notifyListeners(new MapDelegateEvent<>(MapDelegateEvent.Type.UPDATE, decodeKey(key), decodeValue(copy.get())));
                }
            }
        });
    }

    private void bootstrap() {
        List<MemberId> list = this.bootstrapPeersSupplier.get();
        if (list.isEmpty()) {
            return;
        }
        try {
            requestBootstrapFromPeers(list).get(5000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.debug("Failed to bootstrap ec map {}: {}", this.mapName, ExceptionUtils.getStackTrace(e));
        }
    }

    private CompletableFuture<Void> requestBootstrapFromPeers(List<MemberId> list) {
        if (list.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        int size = list.size();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicReference atomicReference = new AtomicReference();
        Iterator<MemberId> it = list.iterator();
        while (it.hasNext()) {
            requestBootstrapFromPeer(it.next()).whenComplete((r9, th) -> {
                Throwable th;
                if (th != null) {
                    if (atomicBoolean.get() || atomicInteger.incrementAndGet() != size) {
                        atomicReference.set(th);
                        return;
                    } else {
                        completableFuture.completeExceptionally(th);
                        return;
                    }
                }
                if (atomicBoolean.compareAndSet(false, true)) {
                    completableFuture.complete(null);
                } else {
                    if (atomicInteger.incrementAndGet() != size || (th = (Throwable) atomicReference.get()) == null) {
                        return;
                    }
                    completableFuture.completeExceptionally(th);
                }
            });
        }
        return completableFuture;
    }

    private CompletableFuture<Void> requestBootstrapFromPeer(MemberId memberId) {
        log.trace("Sending bootstrap request to {} for {}", memberId, this.mapName);
        ClusterCommunicationService clusterCommunicationService = this.clusterCommunicator;
        String str = this.bootstrapMessageSubject;
        MemberId memberId2 = this.localMemberId;
        Serializer serializer = this.serializer;
        serializer.getClass();
        Function function = (v1) -> {
            return r3.encode(v1);
        };
        Serializer serializer2 = this.serializer;
        serializer2.getClass();
        return clusterCommunicationService.send(str, memberId2, function, serializer2::decode, memberId).whenComplete((r6, th) -> {
            if (th != null) {
                log.debug("Bootstrap request to {} failed: {}", memberId, th.getMessage());
            }
        });
    }

    private CompletableFuture<Void> handleBootstrap(MemberId memberId) {
        log.trace("Received bootstrap request from {} for {}", memberId, this.bootstrapMessageSubject);
        Function function = list -> {
            log.trace("Initializing {} with {} entries", memberId, Integer.valueOf(list.size()));
            ClusterCommunicationService clusterCommunicationService = this.clusterCommunicator;
            String str = this.initializeMessageSubject;
            ImmutableList copyOf = ImmutableList.copyOf((Collection) list);
            Serializer serializer = this.serializer;
            serializer.getClass();
            Function function2 = (v1) -> {
                return r3.encode(v1);
            };
            Serializer serializer2 = this.serializer;
            serializer2.getClass();
            return clusterCommunicationService.send(str, copyOf, function2, serializer2::decode, memberId).whenComplete((r6, th) -> {
                if (th != null) {
                    log.debug("Failed to initialize {}", memberId, th);
                }
            });
        };
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        for (Map.Entry<String, MapValue> entry : this.items.entrySet()) {
            String key = entry.getKey();
            MapValue value = entry.getValue();
            if (value.isAlive()) {
                newArrayList2.add(new UpdateEntry(key, value));
                if (newArrayList2.size() == 1000) {
                    newArrayList.add(function.apply(newArrayList2));
                    newArrayList2 = new ArrayList();
                }
            }
        }
        if (!newArrayList2.isEmpty()) {
            newArrayList.add(function.apply(newArrayList2));
        }
        return CompletableFuture.allOf((CompletableFuture[]) newArrayList.toArray(new CompletableFuture[newArrayList.size()]));
    }
}
