package com.hazelcast.wan.impl;

import com.hazelcast.config.AbstractWanPublisherConfig;
import com.hazelcast.config.InvalidConfigurationException;
import com.hazelcast.config.WanBatchPublisherConfig;
import com.hazelcast.config.WanCustomPublisherConfig;
import com.hazelcast.config.WanReplicationConfig;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.management.events.WanAddConfigurationIgnoredEvent;
import com.hazelcast.internal.management.events.WanConsistencyCheckIgnoredEvent;
import com.hazelcast.internal.management.events.WanSyncIgnoredEvent;
import com.hazelcast.internal.monitor.LocalWanStats;
import com.hazelcast.internal.monitor.WanSyncState;
import com.hazelcast.internal.nio.ClassLoaderUtil;
import com.hazelcast.internal.partition.ChunkedMigrationAwareService;
import com.hazelcast.internal.partition.PartitionMigrationEvent;
import com.hazelcast.internal.partition.PartitionReplicationEvent;
import com.hazelcast.internal.services.ManagedService;
import com.hazelcast.internal.services.ObjectNamespace;
import com.hazelcast.internal.services.ServiceNamespace;
import com.hazelcast.internal.util.ConcurrencyUtil;
import com.hazelcast.internal.util.ConstructorFunction;
import com.hazelcast.internal.util.MapUtil;
import com.hazelcast.internal.util.StringUtil;
import com.hazelcast.map.impl.MapService;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.version.Version;
import com.hazelcast.wan.WanEventCounters;
import com.hazelcast.wan.WanMigrationAwarePublisher;
import com.hazelcast.wan.WanPublisher;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import javax.annotation.Nonnull;

/* loaded from: input_file:BOOT-INF/lib/hazelcast-5.1.2.jar:com/hazelcast/wan/impl/WanReplicationServiceImpl.class */
public class WanReplicationServiceImpl implements WanReplicationService, ChunkedMigrationAwareService, ManagedService {
    private final Node node;
    private final WanEventCounterRegistry receivedWanEventCounters = new WanEventCounterRegistry();
    private final WanEventCounterRegistry sentWanEventCounters = new WanEventCounterRegistry();
    private final ConcurrentMap<String, DelegatingWanScheme> wanReplications = MapUtil.createConcurrentHashMap(1);
    private final ConstructorFunction<String, DelegatingWanScheme> publisherDelegateConstructor;

    public WanReplicationServiceImpl(Node node) {
        this.node = node;
        this.publisherDelegateConstructor = str -> {
            WanReplicationConfig wanReplicationConfig = node.getConfig().getWanReplicationConfig(str);
            if (wanReplicationConfig == null) {
                return null;
            }
            if (wanReplicationConfig.getBatchPublisherConfigs().isEmpty()) {
                return new DelegatingWanScheme(str, createPublishers(wanReplicationConfig));
            }
            throw new InvalidConfigurationException("Built-in batching WAN replication implementation is only available in Hazelcast enterprise edition.");
        };
    }

    @Override // com.hazelcast.wan.impl.WanReplicationService
    public DelegatingWanScheme getWanReplicationPublishers(String str) {
        if (this.wanReplications.containsKey(str) || this.node.getConfig().getWanReplicationConfig(str) != null) {
            return (DelegatingWanScheme) ConcurrencyUtil.getOrPutSynchronized((ConcurrentMap<String, V>) this.wanReplications, str, (Object) this, (ConstructorFunction<String, V>) this.publisherDelegateConstructor);
        }
        return null;
    }

    private ConcurrentMap<String, WanPublisher> createPublishers(WanReplicationConfig wanReplicationConfig) {
        List<WanCustomPublisherConfig> customPublisherConfigs = wanReplicationConfig.getCustomPublisherConfigs();
        int size = customPublisherConfigs.size();
        if (size == 0) {
            return MapUtil.createConcurrentHashMap(1);
        }
        ConcurrentMap<String, WanPublisher> createConcurrentHashMap = MapUtil.createConcurrentHashMap(size);
        Map createHashMap = MapUtil.createHashMap(size);
        customPublisherConfigs.forEach(wanCustomPublisherConfig -> {
            String wanPublisherId = getWanPublisherId(wanCustomPublisherConfig);
            if (createConcurrentHashMap.containsKey(wanPublisherId)) {
                throw new InvalidConfigurationException("Detected duplicate publisher ID '" + wanPublisherId + "' for a single WAN replication config");
            }
            createConcurrentHashMap.put(wanPublisherId, createPublisher(wanCustomPublisherConfig));
            createHashMap.put(wanPublisherId, wanCustomPublisherConfig);
        });
        for (Map.Entry<String, WanPublisher> entry : createConcurrentHashMap.entrySet()) {
            String key = entry.getKey();
            ((WanPublisher) this.node.getSerializationService().getManagedContext().initialize(entry.getValue())).init(wanReplicationConfig, (AbstractWanPublisherConfig) createHashMap.get(key));
        }
        return createConcurrentHashMap;
    }

    private WanPublisher createPublisher(AbstractWanPublisherConfig abstractWanPublisherConfig) {
        WanPublisher wanPublisher = (WanPublisher) ClassLoaderUtil.getOrCreate(abstractWanPublisherConfig.getImplementation(), this.node.getConfigClassLoader(), abstractWanPublisherConfig.getClassName());
        if (wanPublisher == null) {
            throw new InvalidConfigurationException("Either 'implementation' or 'className' attribute need to be set in the WAN publisher configuration for publisher " + abstractWanPublisherConfig);
        }
        return wanPublisher;
    }

    @Nonnull
    public static String getWanPublisherId(AbstractWanPublisherConfig abstractWanPublisherConfig) {
        String str = null;
        if (!StringUtil.isNullOrEmptyAfterTrim(abstractWanPublisherConfig.getPublisherId())) {
            str = abstractWanPublisherConfig.getPublisherId();
        } else if (abstractWanPublisherConfig instanceof WanBatchPublisherConfig) {
            str = ((WanBatchPublisherConfig) abstractWanPublisherConfig).getClusterName();
        }
        if (str == null) {
            throw new InvalidConfigurationException("Publisher ID or group name is not specified for " + abstractWanPublisherConfig);
        }
        return str;
    }

    @Override // com.hazelcast.wan.impl.WanReplicationService
    public void shutdown() {
        synchronized (this) {
            Iterator<DelegatingWanScheme> it = this.wanReplications.values().iterator();
            while (it.hasNext()) {
                for (WanPublisher wanPublisher : it.next().getPublishers()) {
                    if (wanPublisher != null) {
                        wanPublisher.shutdown();
                    }
                }
            }
            this.wanReplications.clear();
        }
    }

    @Override // com.hazelcast.wan.impl.WanReplicationService
    public void pause(String str, String str2) {
        throw new UnsupportedOperationException("Pausing WAN replication is not supported.");
    }

    @Override // com.hazelcast.wan.impl.WanReplicationService
    public void stop(String str, String str2) {
        throw new UnsupportedOperationException("Stopping WAN replication is not supported");
    }

    @Override // com.hazelcast.wan.impl.WanReplicationService
    public void resume(String str, String str2) {
        throw new UnsupportedOperationException("Resuming WAN replication is not supported");
    }

    @Override // com.hazelcast.wan.impl.WanReplicationService
    public UUID syncMap(String str, String str2, String str3) {
        this.node.getManagementCenterService().log(WanSyncIgnoredEvent.enterpriseOnly(str, str2, str3));
        throw new UnsupportedOperationException("WAN sync for map is not supported.");
    }

    @Override // com.hazelcast.wan.impl.WanReplicationService
    public UUID syncAllMaps(String str, String str2) {
        this.node.getManagementCenterService().log(WanSyncIgnoredEvent.enterpriseOnly(str, str2, null));
        throw new UnsupportedOperationException("WAN sync is not supported.");
    }

    @Override // com.hazelcast.wan.impl.WanReplicationService
    public UUID consistencyCheck(String str, String str2, String str3) {
        this.node.getManagementCenterService().log(WanConsistencyCheckIgnoredEvent.enterpriseOnly(str, str2, str3));
        throw new UnsupportedOperationException("Consistency check is not supported.");
    }

    @Override // com.hazelcast.wan.impl.WanReplicationService
    public void removeWanEvents(String str, String str2) {
        throw new UnsupportedOperationException("Clearing WAN replication queues is not supported.");
    }

    @Override // com.hazelcast.wan.impl.WanReplicationService
    public AddWanConfigResult addWanReplicationConfig(WanReplicationConfig wanReplicationConfig) {
        this.node.getManagementCenterService().log(WanAddConfigurationIgnoredEvent.enterpriseOnly(wanReplicationConfig.getName()));
        throw new UnsupportedOperationException("Adding new WAN config is not supported.");
    }

    @Override // com.hazelcast.wan.impl.WanReplicationService
    public void addWanReplicationConfigLocally(WanReplicationConfig wanReplicationConfig) {
        throw new UnsupportedOperationException("Adding new WAN config is not supported.");
    }

    @Override // com.hazelcast.internal.services.StatisticsAwareService
    public Map<String, LocalWanStats> getStats() {
        return null;
    }

    @Override // com.hazelcast.wan.impl.WanReplicationService
    public WanSyncState getWanSyncState() {
        return null;
    }

    @Override // com.hazelcast.wan.impl.WanReplicationService
    public WanEventCounters getReceivedEventCounters(String str) {
        return this.receivedWanEventCounters.getWanEventCounter("", "", str);
    }

    @Override // com.hazelcast.wan.impl.WanReplicationService
    public WanEventCounters getSentEventCounters(String str, String str2, String str3) {
        return this.sentWanEventCounters.getWanEventCounter(str, str2, str3);
    }

    @Override // com.hazelcast.wan.impl.WanReplicationService
    public void removeWanEventCounters(String str, String str2) {
        this.receivedWanEventCounters.removeCounter(str, str2);
        this.sentWanEventCounters.removeCounter(str, str2);
    }

    @Override // com.hazelcast.wan.impl.WanReplicationService
    public List<Version> getSupportedWanProtocolVersions() {
        return Collections.emptyList();
    }

    @Override // com.hazelcast.internal.partition.MigrationAwareService
    public void beforeMigration(PartitionMigrationEvent partitionMigrationEvent) {
        notifyMigrationAwarePublishers(wanMigrationAwarePublisher -> {
            wanMigrationAwarePublisher.onMigrationStart(partitionMigrationEvent);
        });
    }

    @Override // com.hazelcast.internal.partition.MigrationAwareService
    public void commitMigration(PartitionMigrationEvent partitionMigrationEvent) {
        notifyMigrationAwarePublishers(wanMigrationAwarePublisher -> {
            wanMigrationAwarePublisher.onMigrationCommit(partitionMigrationEvent);
        });
    }

    @Override // com.hazelcast.internal.partition.MigrationAwareService
    public void rollbackMigration(PartitionMigrationEvent partitionMigrationEvent) {
        notifyMigrationAwarePublishers(wanMigrationAwarePublisher -> {
            wanMigrationAwarePublisher.onMigrationRollback(partitionMigrationEvent);
        });
    }

    @Override // com.hazelcast.internal.services.ManagedService
    public void init(NodeEngine nodeEngine, Properties properties) {
    }

    @Override // com.hazelcast.internal.services.ManagedService
    public void reset() {
        Iterator<DelegatingWanScheme> it = this.wanReplications.values().iterator();
        while (it.hasNext()) {
            Iterator<WanPublisher> it2 = it.next().getPublishers().iterator();
            while (it2.hasNext()) {
                it2.next().reset();
            }
        }
    }

    @Override // com.hazelcast.internal.services.ManagedService
    public void shutdown(boolean z) {
        reset();
    }

    @Override // com.hazelcast.internal.partition.FragmentedMigrationAwareService
    public Collection<ServiceNamespace> getAllServiceNamespaces(PartitionReplicationEvent partitionReplicationEvent) {
        if (this.wanReplications.isEmpty()) {
            return Collections.emptyList();
        }
        HashSet hashSet = new HashSet();
        Iterator<DelegatingWanScheme> it = this.wanReplications.values().iterator();
        while (it.hasNext()) {
            it.next().collectAllServiceNamespaces(partitionReplicationEvent, hashSet);
        }
        return hashSet;
    }

    @Override // com.hazelcast.internal.partition.FragmentedMigrationAwareService
    public boolean isKnownServiceNamespace(ServiceNamespace serviceNamespace) {
        return (serviceNamespace instanceof ObjectNamespace) && MapService.SERVICE_NAME.equals(serviceNamespace.getServiceName());
    }

    @Override // com.hazelcast.internal.partition.FragmentedMigrationAwareService
    public Operation prepareReplicationOperation(PartitionReplicationEvent partitionReplicationEvent, Collection<ServiceNamespace> collection) {
        if (this.wanReplications.isEmpty() || collection.isEmpty()) {
            return null;
        }
        Map createHashMap = MapUtil.createHashMap(this.wanReplications.size());
        for (Map.Entry<String, DelegatingWanScheme> entry : this.wanReplications.entrySet()) {
            String key = entry.getKey();
            Map<String, Object> prepareEventContainerReplicationData = entry.getValue().prepareEventContainerReplicationData(partitionReplicationEvent, collection);
            if (!prepareEventContainerReplicationData.isEmpty()) {
                createHashMap.put(key, prepareEventContainerReplicationData);
            }
        }
        if (createHashMap.isEmpty()) {
            return null;
        }
        return new WanEventContainerReplicationOperation(Collections.emptyList(), createHashMap, partitionReplicationEvent.getPartitionId(), partitionReplicationEvent.getReplicaIndex());
    }

    @Override // com.hazelcast.internal.partition.MigrationAwareService
    public Operation prepareReplicationOperation(PartitionReplicationEvent partitionReplicationEvent) {
        return prepareReplicationOperation(partitionReplicationEvent, getAllServiceNamespaces(partitionReplicationEvent));
    }

    @Override // com.hazelcast.wan.impl.WanReplicationService
    public WanPublisher getPublisherOrFail(String str, String str2) {
        WanPublisher publisherOrNull = getPublisherOrNull(str, str2);
        if (publisherOrNull == null) {
            throw new InvalidConfigurationException("WAN Replication Config doesn't exist with WAN configuration name " + str + " and publisher ID " + str2);
        }
        return publisherOrNull;
    }

    @Override // com.hazelcast.wan.impl.WanReplicationService
    public boolean appendWanReplicationConfig(WanReplicationConfig wanReplicationConfig) {
        return false;
    }

    private WanPublisher getPublisherOrNull(String str, String str2) {
        DelegatingWanScheme wanReplicationPublishers = getWanReplicationPublishers(str);
        if (wanReplicationPublishers != null) {
            return wanReplicationPublishers.getPublisher(str2);
        }
        return null;
    }

    private void notifyMigrationAwarePublishers(Consumer<WanMigrationAwarePublisher> consumer) {
        Iterator<DelegatingWanScheme> it = this.wanReplications.values().iterator();
        while (it.hasNext()) {
            for (WanPublisher wanPublisher : it.next().getPublishers()) {
                if (wanPublisher instanceof WanMigrationAwarePublisher) {
                    consumer.accept((WanMigrationAwarePublisher) wanPublisher);
                }
            }
        }
    }
}
