package io.atomix.protocols.gossip;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
import io.atomix.event.AbstractListenerManager;
import io.atomix.protocols.gossip.GossipService;
import io.atomix.protocols.gossip.protocol.GossipMessage;
import io.atomix.protocols.gossip.protocol.GossipProtocol;
import io.atomix.protocols.gossip.protocol.GossipUpdate;
import io.atomix.time.LogicalClock;
import io.atomix.time.LogicalTimestamp;
import io.atomix.utils.Identifier;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/* loaded from: input_file:io/atomix/protocols/gossip/DisseminationService.class */
public class DisseminationService<K, V> extends AbstractListenerManager<GossipEvent<K, V>, GossipEventListener<K, V>> implements GossipService<K, V> {
    private final GossipProtocol protocol;
    private final Supplier<Collection<Identifier>> peerProvider;
    private final Executor eventExecutor;
    private final boolean fastConvergence;
    private final boolean tombstonesDisabled;
    private final ScheduledFuture<?> updateFuture;
    private final ScheduledFuture<?> purgeFuture;
    private final Map<K, GossipUpdate<K, V>> updates = Maps.newLinkedHashMap();
    private final LogicalClock logicalClock = new LogicalClock();
    private final Map<Identifier, Long> peerUpdateTimes = Maps.newConcurrentMap();
    private final Map<Identifier, LogicalTimestamp> peerTimestamps = Maps.newHashMap();

    /* loaded from: input_file:io/atomix/protocols/gossip/DisseminationService$Builder.class */
    public static class Builder<K, V> implements GossipService.Builder<K, V> {
        protected GossipProtocol protocol;
        protected Supplier<Collection<Identifier>> peerProvider;
        protected ScheduledExecutorService communicationExecutor;
        protected Executor eventExecutor = MoreExecutors.directExecutor();
        protected Duration updateInterval = Duration.ofSeconds(1);
        protected boolean fastConvergence = false;
        protected boolean tombstonesDisabled = false;
        protected Duration purgeInterval = Duration.ofMinutes(1);

        public Builder<K, V> withProtocol(GossipProtocol gossipProtocol) {
            this.protocol = (GossipProtocol) Preconditions.checkNotNull(gossipProtocol, "protocol");
            return this;
        }

        public Builder<K, V> withPeerProvider(Supplier<Collection<Identifier>> supplier) {
            this.peerProvider = (Supplier) Preconditions.checkNotNull(supplier, "peerProvider cannot be null");
            return this;
        }

        public Builder<K, V> withEventExecutor(Executor executor) {
            this.eventExecutor = (Executor) Preconditions.checkNotNull(executor, "executor cannot be null");
            return this;
        }

        public Builder<K, V> withCommunicationExecutor(ScheduledExecutorService scheduledExecutorService) {
            this.communicationExecutor = (ScheduledExecutorService) Preconditions.checkNotNull(scheduledExecutorService, "executor cannot be null");
            return this;
        }

        public Builder<K, V> withUpdateInterval(Duration duration) {
            this.updateInterval = (Duration) Preconditions.checkNotNull(duration, "updateInterval cannot be null");
            return this;
        }

        public Builder<K, V> withFastConvergence(boolean z) {
            this.fastConvergence = z;
            return this;
        }

        public Builder<K, V> withTombstonesDisabled(boolean z) {
            this.tombstonesDisabled = z;
            return this;
        }

        public Builder<K, V> withPurgeInterval(Duration duration) {
            this.purgeInterval = (Duration) Preconditions.checkNotNull(duration, "purgeInterval cannot be null");
            return this;
        }

        /* renamed from: build, reason: merged with bridge method [inline-methods] */
        public GossipService<K, V> m2build() {
            return new DisseminationService(this.protocol, this.peerProvider, this.eventExecutor, this.communicationExecutor, this.updateInterval, this.fastConvergence, this.tombstonesDisabled, this.purgeInterval);
        }
    }

    public static <K, V> Builder<K, V> builder() {
        return new Builder<>();
    }

    public DisseminationService(GossipProtocol<?> gossipProtocol, Supplier<Collection<Identifier>> supplier, Executor executor, ScheduledExecutorService scheduledExecutorService, Duration duration, boolean z, boolean z2, Duration duration2) {
        this.protocol = (GossipProtocol) Preconditions.checkNotNull(gossipProtocol, "protocol cannot be null");
        this.peerProvider = (Supplier) Preconditions.checkNotNull(supplier, "peerProvider cannot be null");
        this.eventExecutor = (Executor) Preconditions.checkNotNull(executor, "eventExecutor cannot be null");
        this.fastConvergence = z;
        this.tombstonesDisabled = z2;
        gossipProtocol.registerGossipListener(this::update);
        this.updateFuture = scheduledExecutorService.scheduleAtFixedRate(this::gossip, 0L, duration.toMillis(), TimeUnit.MILLISECONDS);
        this.purgeFuture = !z2 ? scheduledExecutorService.scheduleAtFixedRate(this::purgeTombstones, 0L, duration2.toMillis(), TimeUnit.MILLISECONDS) : null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void post(GossipEvent<K, V> gossipEvent) {
        this.eventExecutor.execute(() -> {
            super.post(gossipEvent);
        });
    }

    public void process(GossipEvent<K, V> gossipEvent) {
        GossipUpdate<K, V> gossipUpdate = new GossipUpdate<>(gossipEvent.subject(), gossipEvent.value(), this.logicalClock.increment().asVersion());
        if (gossipEvent.value() != null) {
            this.updates.put(gossipEvent.subject(), gossipUpdate);
            if (this.fastConvergence) {
                updatePeers();
            }
        } else if (this.tombstonesDisabled) {
            this.updates.remove(gossipEvent.subject());
        } else {
            this.updates.put(gossipEvent.subject(), gossipUpdate);
            if (this.fastConvergence) {
                updatePeers();
            }
        }
        post((GossipEvent) gossipEvent);
    }

    private synchronized void update(GossipMessage<K, V> gossipMessage) {
        this.logicalClock.update(gossipMessage.timestamp());
        for (GossipUpdate<K, V> gossipUpdate : gossipMessage.updates()) {
            GossipUpdate<K, V> gossipUpdate2 = this.updates.get(gossipUpdate.subject());
            if (gossipUpdate2 == null || ((gossipUpdate2.isTombstone() && !gossipUpdate.isTombstone()) || gossipUpdate2.timestamp().isOlderThan(gossipUpdate.timestamp()))) {
                if (!this.tombstonesDisabled) {
                    this.updates.put(gossipUpdate.subject(), gossipUpdate);
                }
                post((GossipEvent) new GossipEvent<>(gossipUpdate.creationTime(), gossipUpdate.subject(), gossipUpdate.value()));
            }
        }
    }

    private synchronized void gossip() {
        ArrayList newArrayList = Lists.newArrayList(this.peerProvider.get());
        if (newArrayList.isEmpty()) {
            return;
        }
        Collections.shuffle(newArrayList);
        updatePeer((Identifier) newArrayList.get(0));
    }

    private void updatePeers() {
        Iterator<Identifier> it = this.peerProvider.get().iterator();
        while (it.hasNext()) {
            updatePeer(it.next());
        }
    }

    private synchronized void updatePeer(Identifier identifier) {
        LogicalTimestamp increment = this.logicalClock.increment();
        long currentTimeMillis = System.currentTimeMillis();
        LogicalTimestamp computeIfAbsent = this.peerTimestamps.computeIfAbsent(identifier, identifier2 -> {
            return new LogicalTimestamp(0L);
        });
        this.protocol.gossip(identifier, new GossipMessage<>(increment, (Collection) this.updates.values().stream().filter(gossipUpdate -> {
            return gossipUpdate.timestamp().isNewerThan(computeIfAbsent);
        }).collect(Collectors.toList())));
        this.peerTimestamps.put(identifier, increment);
        this.peerUpdateTimes.put(identifier, Long.valueOf(currentTimeMillis));
    }

    private synchronized void purgeTombstones() {
        long longValue = ((Long) this.peerProvider.get().stream().map(identifier -> {
            return this.peerUpdateTimes.getOrDefault(identifier, 0L);
        }).reduce((v0, v1) -> {
            return Math.min(v0, v1);
        }).orElse(0L)).longValue();
        Iterator<Map.Entry<K, GossipUpdate<K, V>>> it = this.updates.entrySet().iterator();
        while (it.hasNext()) {
            GossipUpdate<K, V> value = it.next().getValue();
            if (value.isTombstone() && value.creationTime() < longValue) {
                it.remove();
            }
        }
    }

    @Override // io.atomix.protocols.gossip.GossipService
    public void close() {
        this.protocol.unregisterGossipListener();
        this.updateFuture.cancel(false);
        if (this.purgeFuture != null) {
            this.purgeFuture.cancel(false);
        }
    }

    public String toString() {
        return MoreObjects.toStringHelper(this).add("protocol", this.protocol).toString();
    }
}
