package org.infinispan.statetransfer;

import io.reactivex.rxjava3.core.Flowable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.Configurations;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.impl.InternalDataContainer;
import org.infinispan.container.impl.InternalEntryFactory;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.notifications.cachelistener.cluster.ClusterCacheNotifier;
import org.infinispan.notifications.cachelistener.cluster.ClusterListenerReplicateCallable;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.persistence.spi.MarshallableEntry;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.topology.CacheTopology;
import org.infinispan.transaction.impl.LocalTransaction;
import org.infinispan.transaction.impl.TransactionOriginatorChecker;
import org.infinispan.transaction.impl.TransactionTable;
import org.infinispan.transaction.xa.CacheTransaction;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

@Scope(Scopes.NAMED_CACHE)
/* loaded from: input_file:META-INF/bundled-dependencies/infinispan-core-12.1.6.Final.jar:org/infinispan/statetransfer/StateProviderImpl.class */
public class StateProviderImpl implements StateProvider {
    private static final Log log = LogFactory.getLog(StateProviderImpl.class);

    @ComponentName(KnownComponentNames.CACHE_NAME)
    @Inject
    protected String cacheName;

    @Inject
    Configuration configuration;

    @Inject
    protected RpcManager rpcManager;

    @Inject
    protected CommandsFactory commandsFactory;

    @Inject
    ClusterCacheNotifier clusterCacheNotifier;

    @Inject
    TransactionTable transactionTable;

    @Inject
    protected InternalDataContainer<Object, Object> dataContainer;

    @Inject
    protected PersistenceManager persistenceManager;

    @Inject
    protected StateTransferLock stateTransferLock;

    @Inject
    protected InternalEntryFactory entryFactory;

    @Inject
    protected KeyPartitioner keyPartitioner;

    @Inject
    protected DistributionManager distributionManager;

    @Inject
    protected TransactionOriginatorChecker transactionOriginatorChecker;

    @ComponentName(KnownComponentNames.TIMEOUT_SCHEDULE_EXECUTOR)
    @Inject
    ScheduledExecutorService timeoutExecutor;
    protected long timeout;
    protected int chunkSize;
    private final Map<Address, List<OutboundTransferTask>> transfersByDestination = new HashMap();

    @Override // org.infinispan.statetransfer.StateProvider
    public boolean isStateTransferInProgress() {
        boolean z;
        synchronized (this.transfersByDestination) {
            z = !this.transfersByDestination.isEmpty();
        }
        return z;
    }

    @Override // org.infinispan.statetransfer.StateProvider
    public CompletableFuture<Void> onTopologyUpdate(CacheTopology cacheTopology, boolean z) {
        HashSet hashSet = new HashSet(cacheTopology.getWriteConsistentHash().getMembers());
        synchronized (this.transfersByDestination) {
            Iterator<Map.Entry<Address, List<OutboundTransferTask>>> it = this.transfersByDestination.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Address, List<OutboundTransferTask>> next = it.next();
                if (!hashSet.contains(next.getKey())) {
                    List<OutboundTransferTask> value = next.getValue();
                    it.remove();
                    Iterator<OutboundTransferTask> it2 = value.iterator();
                    while (it2.hasNext()) {
                        it2.next().cancel();
                    }
                }
            }
        }
        return CompletableFutures.completedNull();
    }

    @Override // org.infinispan.statetransfer.StateProvider
    @Start(priority = 50)
    public void start() {
        this.timeout = this.configuration.clustering().stateTransfer().timeout();
        this.chunkSize = this.configuration.clustering().stateTransfer().chunkSize();
    }

    @Override // org.infinispan.statetransfer.StateProvider
    @Stop(priority = 0)
    public void stop() {
        if (log.isTraceEnabled()) {
            log.tracef("Shutting down StateProvider of cache %s on node %s", this.cacheName, this.rpcManager.getAddress());
        }
        try {
            synchronized (this.transfersByDestination) {
                Iterator<List<OutboundTransferTask>> it = this.transfersByDestination.values().iterator();
                while (it.hasNext()) {
                    List<OutboundTransferTask> next = it.next();
                    it.remove();
                    Iterator<OutboundTransferTask> it2 = next.iterator();
                    while (it2.hasNext()) {
                        it2.next().cancel();
                    }
                }
            }
        } catch (Throwable th) {
            log.errorf(th, "Failed to stop StateProvider of cache %s on node %s", this.cacheName, this.rpcManager.getAddress());
        }
    }

    @Override // org.infinispan.statetransfer.StateProvider
    public CompletionStage<List<TransactionInfo>> getTransactionsForSegments(Address address, int i, IntSet intSet) {
        if (log.isTraceEnabled()) {
            log.tracef("Received request for transactions from node %s for cache %s, topology id %d, segments %s", address, this.cacheName, Integer.valueOf(i), intSet);
        }
        return getCacheTopology(i, address, true).thenApply(cacheTopology -> {
            IntSet from = IntSets.from(cacheTopology.getReadConsistentHash().getSegmentsForOwner(this.rpcManager.getAddress()));
            if (!from.containsAll(intSet)) {
                intSet.removeAll(from);
                throw new IllegalArgumentException("Segments " + intSet + " are not owned by " + this.rpcManager.getAddress());
            }
            ArrayList arrayList = new ArrayList();
            if (this.configuration.transaction().transactionMode().isTransactional()) {
                collectTransactionsToTransfer(address, arrayList, this.transactionTable.getRemoteTransactions(), intSet, cacheTopology);
                collectTransactionsToTransfer(address, arrayList, this.transactionTable.getLocalTransactions(), intSet, cacheTopology);
                if (log.isTraceEnabled()) {
                    log.tracef("Found %d transaction(s) to transfer", arrayList.size());
                }
            }
            return arrayList;
        });
    }

    @Override // org.infinispan.statetransfer.StateProvider
    public Collection<ClusterListenerReplicateCallable<Object, Object>> getClusterListenersToInstall() {
        return this.clusterCacheNotifier.retrieveClusterListenerCallablesToInstall();
    }

    private CompletionStage<CacheTopology> getCacheTopology(int i, Address address, boolean z) {
        LocalizedCacheTopology cacheTopology = this.distributionManager.getCacheTopology();
        int topologyId = cacheTopology.getTopologyId();
        if (i < topologyId) {
            if (z) {
                log.debugf("Transactions were requested by node %s with topology %d, older than the local topology (%d)", address, Integer.valueOf(i), Integer.valueOf(topologyId));
            } else {
                log.debugf("Segments were requested by node %s with topology %d, older than the local topology (%d)", address, Integer.valueOf(i), Integer.valueOf(topologyId));
            }
        } else if (i > topologyId) {
            if (log.isTraceEnabled()) {
                Log log2 = log;
                Object[] objArr = new Object[5];
                objArr[0] = z ? "Transactions" : "Segments";
                objArr[1] = address;
                objArr[2] = Integer.valueOf(i);
                objArr[3] = Integer.valueOf(topologyId);
                objArr[4] = Integer.valueOf(i);
                log2.tracef("%s were requested by node %s with topology %d, greater than the local topology (%d). Waiting for topology %d to be installed locally.", objArr);
            }
            return this.stateTransferLock.topologyFuture(i).exceptionally(th -> {
                throw Log.CLUSTER.failedWaitingForTopology(i);
            }).thenApply(r3 -> {
                return this.distributionManager.getCacheTopology();
            });
        }
        return CompletableFuture.completedFuture(cacheTopology);
    }

    private void collectTransactionsToTransfer(Address address, List<TransactionInfo> list, Collection<? extends CacheTransaction> collection, IntSet intSet, CacheTopology cacheTopology) {
        int topologyId = cacheTopology.getTopologyId();
        HashSet hashSet = new HashSet(cacheTopology.getMembers());
        for (CacheTransaction cacheTransaction : collection) {
            GlobalTransaction globalTransaction = cacheTransaction.getGlobalTransaction();
            if (cacheTransaction.getTopologyId() != topologyId && !this.transactionOriginatorChecker.isOriginatorMissing(globalTransaction, hashSet)) {
                HashSet hashSet2 = new HashSet();
                Consumer<Object> consumer = obj -> {
                    if (intSet.contains(this.keyPartitioner.getSegment(obj))) {
                        hashSet2.add(obj);
                    }
                };
                cacheTransaction.forEachLock(consumer);
                cacheTransaction.forEachBackupLock(consumer);
                if (!hashSet2.isEmpty()) {
                    if (log.isTraceEnabled()) {
                        log.tracef("Sending transaction %s to new owner %s", cacheTransaction, address);
                    }
                    List<WriteCommand> modifications = cacheTransaction.getModifications();
                    WriteCommand[] writeCommandArr = modifications.isEmpty() ? null : (WriteCommand[]) modifications.toArray(new WriteCommand[0]);
                    if (cacheTransaction instanceof LocalTransaction) {
                        ((LocalTransaction) cacheTransaction).locksAcquired(Collections.singleton(address));
                        if (log.isTraceEnabled()) {
                            log.tracef("Adding affected node %s to transferred transaction %s (keys %s)", address, globalTransaction, hashSet2);
                        }
                    }
                    list.add(new TransactionInfo(globalTransaction, cacheTransaction.getTopologyId(), writeCommandArr, hashSet2));
                } else if (log.isTraceEnabled()) {
                    log.tracef("Skipping transaction %s because the state requestor %s doesn't own any key", cacheTransaction, address);
                }
            } else if (log.isTraceEnabled()) {
                log.tracef("Skipping transaction %s as it was started in the current topology or by a leaver", cacheTransaction);
            }
        }
    }

    @Override // org.infinispan.statetransfer.StateProvider
    public void startOutboundTransfer(Address address, int i, IntSet intSet, boolean z) {
        if (log.isTraceEnabled()) {
            log.tracef("Starting outbound transfer to node %s for cache %s, topology id %d, segments %s", address, this.cacheName, Integer.valueOf(i), intSet);
        }
        OutboundTransferTask outboundTransferTask = new OutboundTransferTask(address, intSet, this.configuration.clustering().hash().numSegments(), this.chunkSize, i, this.keyPartitioner, collection -> {
        }, this.rpcManager, this.commandsFactory, this.timeout, this.cacheName, z, false);
        addTransfer(outboundTransferTask);
        outboundTransferTask.execute(Flowable.concat(publishDataContainerEntries(intSet), publishStoreEntries(intSet))).whenComplete((r6, th) -> {
            if (th != null) {
                logError(outboundTransferTask, th);
            }
            onTaskCompletion(outboundTransferTask);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Flowable<InternalCacheEntry<Object, Object>> publishDataContainerEntries(IntSet intSet) {
        return Flowable.fromIterable(() -> {
            return this.dataContainer.iterator(intSet);
        }).filter(internalCacheEntry -> {
            return !internalCacheEntry.isL1Entry();
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Flowable<InternalCacheEntry<Object, Object>> publishStoreEntries(IntSet intSet) {
        return Flowable.fromPublisher(this.persistenceManager.publishEntries(intSet, obj -> {
            return !this.dataContainer.containsKey(obj);
        }, true, true, Configurations::isStateTransferStore)).map(this::defaultMapEntryFromStore);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addTransfer(OutboundTransferTask outboundTransferTask) {
        if (log.isTraceEnabled()) {
            log.tracef("Adding outbound transfer to %s for segments %s", outboundTransferTask.getDestination(), outboundTransferTask.getSegments());
        }
        synchronized (this.transfersByDestination) {
            this.transfersByDestination.computeIfAbsent(outboundTransferTask.getDestination(), address -> {
                return new ArrayList();
            }).add(outboundTransferTask);
        }
    }

    @Override // org.infinispan.statetransfer.StateProvider
    public void cancelOutboundTransfer(Address address, int i, IntSet intSet) {
        if (log.isTraceEnabled()) {
            log.tracef("Cancelling outbound transfer to node %s for cache %s, topology id %d, segments %s", address, this.cacheName, Integer.valueOf(i), intSet);
        }
        synchronized (this.transfersByDestination) {
            List<OutboundTransferTask> list = this.transfersByDestination.get(address);
            if (list != null) {
                for (OutboundTransferTask outboundTransferTask : (OutboundTransferTask[]) list.toArray(new OutboundTransferTask[0])) {
                    if (outboundTransferTask.getTopologyId() == i) {
                        outboundTransferTask.cancelSegments(intSet);
                    }
                }
            }
        }
    }

    private void removeTransfer(OutboundTransferTask outboundTransferTask) {
        synchronized (this.transfersByDestination) {
            List<OutboundTransferTask> list = this.transfersByDestination.get(outboundTransferTask.getDestination());
            if (list != null) {
                list.remove(outboundTransferTask);
                if (list.isEmpty()) {
                    this.transfersByDestination.remove(outboundTransferTask.getDestination());
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onTaskCompletion(OutboundTransferTask outboundTransferTask) {
        if (log.isTraceEnabled()) {
            Log log2 = log;
            Object[] objArr = new Object[4];
            objArr[0] = outboundTransferTask.isCancelled() ? "cancelled" : "completed";
            objArr[1] = outboundTransferTask.getDestination();
            objArr[2] = this.cacheName;
            objArr[3] = outboundTransferTask.getSegments();
            log2.tracef("Removing %s outbound transfer of segments to %s for cache %s, segments %s", objArr);
        }
        removeTransfer(outboundTransferTask);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void logError(OutboundTransferTask outboundTransferTask, Throwable th) {
        if (!outboundTransferTask.isCancelled()) {
            log.failedOutBoundTransferExecution(th);
        } else if (log.isTraceEnabled()) {
            log.tracef("Ignoring error in already cancelled transfer to node %s, segments %s", outboundTransferTask.getDestination(), outboundTransferTask.getSegments());
        }
    }

    private InternalCacheEntry<Object, Object> defaultMapEntryFromStore(MarshallableEntry<Object, Object> marshallableEntry) {
        InternalCacheEntry<Object, Object> create = this.entryFactory.create((InternalEntryFactory) marshallableEntry.getKey(), marshallableEntry.getValue(), marshallableEntry.getMetadata());
        create.setInternalMetadata(marshallableEntry.getInternalMetadata());
        return create;
    }
}
