package net.openhft.chronicle.engine.map;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Stream;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.io.IORuntimeException;
import net.openhft.chronicle.core.pool.ClassAliasPool;
import net.openhft.chronicle.core.threads.EventLoop;
import net.openhft.chronicle.core.util.ThrowingConsumer;
import net.openhft.chronicle.engine.api.EngineReplication;
import net.openhft.chronicle.engine.api.map.KeyValueStore;
import net.openhft.chronicle.engine.api.map.MapEvent;
import net.openhft.chronicle.engine.api.pubsub.InvalidSubscriberException;
import net.openhft.chronicle.engine.api.pubsub.SubscriptionConsumer;
import net.openhft.chronicle.engine.api.tree.Asset;
import net.openhft.chronicle.engine.api.tree.AssetNotFoundException;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.fs.Clusters;
import net.openhft.chronicle.engine.fs.EngineCluster;
import net.openhft.chronicle.engine.fs.EngineHostDetails;
import net.openhft.chronicle.engine.server.internal.MapReplicationHandler;
import net.openhft.chronicle.engine.tree.HostIdentifier;
import net.openhft.chronicle.hash.replication.EngineReplicationLangBytesConsumer;
import net.openhft.chronicle.hash.replication.SingleChronicleHashReplication;
import net.openhft.chronicle.map.BytesMapEventListener;
import net.openhft.chronicle.map.ChronicleMap;
import net.openhft.chronicle.map.ChronicleMapBuilder;
import net.openhft.chronicle.map.MapEventListener;
import net.openhft.chronicle.map.Replica;
import net.openhft.chronicle.map.SharedSegment;
import net.openhft.chronicle.map.UpdateResult;
import net.openhft.chronicle.network.api.session.SessionDetails;
import net.openhft.chronicle.network.api.session.SessionProvider;
import net.openhft.chronicle.network.cluster.ConnectionManager;
import net.openhft.chronicle.network.connection.TcpChannelHub;
import net.openhft.chronicle.threads.NamedThreadFactory;
import net.openhft.lang.io.Bytes;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/chronicle-engine-1.14.2.jar:net/openhft/chronicle/engine/map/ChronicleMapKeyValueStore.class */
public class ChronicleMapKeyValueStore<K, V> implements ObjectKeyValueStore<K, V>, Closeable, Supplier<EngineReplication> {
    private static final ScheduledExecutorService DELAYED_CLOSER;
    private static final Logger LOG;
    private final ChronicleMap<K, V> chronicleMap;

    @NotNull
    private final ObjectSubscription<K, V> subscriptions;

    @Nullable
    private final EngineReplication engineReplicator;

    @NotNull
    private final Asset asset;

    @NotNull
    private final String assetFullName;

    @Nullable
    private final EventLoop eventLoop;
    private final AtomicBoolean isClosed = new AtomicBoolean();

    @Nullable
    private final SessionProvider sessionProvider;
    private Class keyType;
    private Class valueType;

    @Nullable
    private SessionDetails replicationSessionDetails;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:WEB-INF/lib/chronicle-engine-1.14.2.jar:net/openhft/chronicle/engine/map/ChronicleMapKeyValueStore$NullOldValuePublishingOperations.class */
    private class NullOldValuePublishingOperations extends BytesMapEventListener {
        private NullOldValuePublishingOperations() {
        }

        @Override // net.openhft.chronicle.map.BytesMapEventListener
        public void onPut(Bytes bytes, long j, long j2, long j3, boolean z, boolean z2, boolean z3, byte b, byte b2, long j4, long j5, @NotNull SharedSegment sharedSegment) {
            if (b == b2 && j4 == j5 && !z3) {
                return;
            }
            Object readKey = ChronicleMapKeyValueStore.this.chronicleMap.readKey(bytes, j2);
            Object readValue = ChronicleMapKeyValueStore.this.chronicleMap.readValue(bytes, j3);
            sharedSegment.writeUnlock();
            try {
                if (z) {
                    ChronicleMapKeyValueStore.this.subscriptions.notifyEvent(InsertedEvent.of(ChronicleMapKeyValueStore.this.assetFullName, readKey, readValue, z2));
                } else {
                    ChronicleMapKeyValueStore.this.subscriptions.notifyEvent(UpdatedEvent.of(ChronicleMapKeyValueStore.this.assetFullName, readKey, null, readValue, z2, z3));
                }
                sharedSegment.writeLock();
            } catch (Throwable th) {
                sharedSegment.writeLock();
                throw th;
            }
        }

        @Override // net.openhft.chronicle.map.BytesMapEventListener
        public void onRemove(Bytes bytes, long j, long j2, long j3, boolean z, byte b, byte b2, long j4, long j5, @NotNull SharedSegment sharedSegment) {
            if (b == b2 && j4 == j5) {
                return;
            }
            Object readKey = ChronicleMapKeyValueStore.this.chronicleMap.readKey(bytes, j2);
            Object readValue = ChronicleMapKeyValueStore.this.chronicleMap.readValue(bytes, j3);
            sharedSegment.writeUnlock();
            try {
                ChronicleMapKeyValueStore.this.subscriptions.notifyEvent(RemovedEvent.of(ChronicleMapKeyValueStore.this.assetFullName, readKey, readValue, z));
                sharedSegment.writeLock();
            } catch (Throwable th) {
                sharedSegment.writeLock();
                throw th;
            }
        }
    }

    /* loaded from: input_file:WEB-INF/lib/chronicle-engine-1.14.2.jar:net/openhft/chronicle/engine/map/ChronicleMapKeyValueStore$PublishingOperations.class */
    private class PublishingOperations extends MapEventListener<K, V> {
        private PublishingOperations() {
        }

        @Override // net.openhft.chronicle.map.MapEventListener
        public boolean isActive() {
            return ChronicleMapKeyValueStore.this.subscriptions.hasSubscribers();
        }

        @Override // net.openhft.chronicle.map.MapEventListener
        public boolean usesValue() {
            return ChronicleMapKeyValueStore.this.subscriptions.hasValueSubscribers();
        }

        @Override // net.openhft.chronicle.map.MapEventListener
        public void onRemove(@NotNull K k, V v, boolean z, byte b, byte b2, long j, long j2) {
            if (z && ChronicleMapKeyValueStore.this.replicationSessionDetails != null && ChronicleMapKeyValueStore.this.sessionProvider.get() == null) {
                ChronicleMapKeyValueStore.this.sessionProvider.set(ChronicleMapKeyValueStore.this.replicationSessionDetails);
            }
            onRemove0(k, v, z);
        }

        public void onRemove0(@NotNull K k, V v, boolean z) {
            ChronicleMapKeyValueStore.this.subscriptions.notifyEvent(RemovedEvent.of(ChronicleMapKeyValueStore.this.assetFullName, k, v, z));
        }

        private void onPut0(@NotNull K k, V v, @Nullable V v2, boolean z, boolean z2, boolean z3) {
            if (z2) {
                ChronicleMapKeyValueStore.this.subscriptions.notifyEvent(InsertedEvent.of(ChronicleMapKeyValueStore.this.assetFullName, k, v, z));
            } else if (z3) {
                ChronicleMapKeyValueStore.this.subscriptions.notifyEvent(UpdatedEvent.of(ChronicleMapKeyValueStore.this.assetFullName, k, v2, v, z, z3));
            }
        }

        @Override // net.openhft.chronicle.map.MapEventListener
        public void onPut(@NotNull K k, V v, @Nullable V v2, boolean z, boolean z2, boolean z3, byte b, byte b2, long j, long j2) {
            if (!z2 && !z3 && j2 == j && b == b2) {
                Jvm.debug().on(getClass(), "ignore update as nothing has changed");
                return;
            }
            if (z && ChronicleMapKeyValueStore.this.replicationSessionDetails != null && ChronicleMapKeyValueStore.this.sessionProvider.get() == null) {
                ChronicleMapKeyValueStore.this.sessionProvider.set(ChronicleMapKeyValueStore.this.replicationSessionDetails);
            }
            onPut0(k, v, v2, z, z2, z3);
        }
    }

    public ChronicleMapKeyValueStore(@NotNull RequestContext requestContext, @NotNull Asset asset) {
        EngineReplicationLangBytesConsumer engineReplicationLangBytesConsumer;
        String basePath = requestContext.basePath();
        this.keyType = requestContext.keyType();
        this.valueType = requestContext.valueType();
        double averageValueSize = requestContext.getAverageValueSize();
        long entries = requestContext.getEntries();
        this.asset = asset;
        this.assetFullName = asset.fullName();
        this.subscriptions = (ObjectSubscription) asset.acquireView(ObjectSubscription.class, requestContext);
        this.subscriptions.setKvStore(this);
        this.eventLoop = (EventLoop) asset.findOrCreateView(EventLoop.class);
        if (!$assertionsDisabled && this.eventLoop == null) {
            throw new AssertionError();
        }
        this.sessionProvider = (SessionProvider) asset.findView(SessionProvider.class);
        this.eventLoop.start();
        this.replicationSessionDetails = (SessionDetails) asset.root().findView(SessionDetails.class);
        ChronicleMapBuilder of = ChronicleMapBuilder.of(requestContext.keyType(), requestContext.valueType());
        HostIdentifier hostIdentifier = null;
        EngineReplication engineReplication = null;
        try {
            engineReplication = (EngineReplication) asset.acquireView(EngineReplication.class);
            engineReplicationLangBytesConsumer = (EngineReplicationLangBytesConsumer) asset.findView(EngineReplicationLangBytesConsumer.class);
            hostIdentifier = (HostIdentifier) asset.findOrCreateView(HostIdentifier.class);
        } catch (AssetNotFoundException e) {
            if (LOG.isDebugEnabled()) {
                Jvm.debug().on(getClass(), "replication not enabled ", e);
            }
        }
        if (!$assertionsDisabled && hostIdentifier == null) {
            throw new AssertionError();
        }
        of.putReturnsNull(requestContext.putReturnsNull() != Boolean.FALSE).removeReturnsNull(requestContext.removeReturnsNull() != Boolean.FALSE);
        of.replication(SingleChronicleHashReplication.builder().engineReplication(engineReplicationLangBytesConsumer).createWithId(hostIdentifier.hostId()));
        this.engineReplicator = engineReplication;
        Boolean nullOldValueOnUpdateEvent = requestContext.nullOldValueOnUpdateEvent();
        if (nullOldValueOnUpdateEvent == null || !nullOldValueOnUpdateEvent.booleanValue()) {
            of.eventListener(new PublishingOperations());
        } else {
            of.bytesEventListener(new NullOldValuePublishingOperations());
        }
        if (requestContext.putReturnsNull() != Boolean.FALSE) {
            of.putReturnsNull(true);
        }
        if (requestContext.removeReturnsNull() != Boolean.FALSE) {
            of.removeReturnsNull(true);
        }
        if (averageValueSize > 0.0d) {
            of.averageValueSize(averageValueSize);
        }
        if (entries > 0) {
            of.entries(entries + 1);
        }
        if (basePath == null) {
            this.chronicleMap = of.create();
        } else {
            String str = basePath + "/" + requestContext.name();
            new File(basePath).mkdirs();
            try {
                this.chronicleMap = of.createPersistedTo(new File(str));
            } catch (IOException e2) {
                IORuntimeException iORuntimeException = new IORuntimeException("Could not access " + str);
                iORuntimeException.initCause(e2);
                throw iORuntimeException;
            }
        }
        if (hostIdentifier == null) {
            return;
        }
        Clusters clusters = (Clusters) asset.findView(Clusters.class);
        if (clusters == null) {
            Jvm.warn().on(getClass(), "no clusters found.");
            return;
        }
        EngineCluster engineCluster = clusters.get(requestContext.cluster());
        if (engineCluster == null) {
            Jvm.warn().on(getClass(), "no cluster found, name=" + requestContext.cluster());
            return;
        }
        byte hostId = hostIdentifier.hostId();
        if (LOG.isDebugEnabled()) {
            Jvm.debug().on(getClass(), "hostDetails : localIdentifier=" + ((int) hostId) + ",cluster=" + engineCluster.hostDetails());
        }
        for (EngineHostDetails engineHostDetails : engineCluster.hostDetails()) {
            try {
                byte hostId2 = (byte) engineHostDetails.hostId();
                if (hostId2 != hostId) {
                    ConnectionManager findConnectionManager = engineCluster.findConnectionManager(hostId2);
                    if (findConnectionManager == null) {
                        Jvm.warn().on(getClass(), "connectionManager==null for remoteIdentifier=" + ((int) hostId2));
                        engineCluster.findConnectionManager(hostId2);
                    } else {
                        findConnectionManager.addListener((networkContext, z) -> {
                            if (z && !networkContext.isAcceptor()) {
                                String fullName = requestContext.fullName();
                                networkContext.wireOutPublisher().publish(MapReplicationHandler.newMapReplicationHandler(((Replica) this.chronicleMap).lastModificationTime(hostId2), this.keyType, this.valueType, fullName, networkContext.newCid()));
                            }
                        });
                    }
                }
            } catch (Exception e3) {
                Jvm.warn().on(getClass(), "hostDetails=" + engineHostDetails, e3);
            }
        }
    }

    @Override // net.openhft.chronicle.engine.api.map.SubscriptionKeyValueStore
    @NotNull
    public KVSSubscription<K, V> subscription(boolean z) {
        return this.subscriptions;
    }

    @Override // net.openhft.chronicle.engine.api.map.KeyValueStore
    public boolean put(K k, V v) {
        try {
            return this.chronicleMap.update(k, v) != UpdateResult.INSERT;
        } catch (RuntimeException e) {
            if (LOG.isDebugEnabled()) {
                Jvm.debug().on(getClass(), "Failed to write " + k + ", " + v, e);
            }
            throw e;
        }
    }

    @Override // net.openhft.chronicle.engine.api.map.KeyValueStore
    @Nullable
    public V getAndPut(K k, V v) {
        if (this.isClosed.get()) {
            return null;
        }
        return (V) this.chronicleMap.put(k, v);
    }

    @Override // net.openhft.chronicle.engine.api.map.KeyValueStore
    public boolean remove(K k) {
        return this.chronicleMap.remove(k) != null;
    }

    @Override // net.openhft.chronicle.engine.api.map.KeyValueStore
    @Nullable
    public V getAndRemove(K k) {
        if (this.isClosed.get()) {
            return null;
        }
        return (V) this.chronicleMap.remove(k);
    }

    @Override // net.openhft.chronicle.engine.api.map.KeyValueStore
    public V getUsing(K k, @Nullable Object obj) {
        if (obj != null) {
            throw new UnsupportedOperationException("Mutable values not supported");
        }
        return this.chronicleMap.getUsing(k, obj);
    }

    @Override // net.openhft.chronicle.engine.api.map.KeyValueStore
    public long longSize() {
        return this.chronicleMap.size();
    }

    @Override // net.openhft.chronicle.engine.api.map.KeyValueStore
    public void keysFor(int i, @NotNull SubscriptionConsumer<K> subscriptionConsumer) throws InvalidSubscriberException {
        SubscriptionConsumer.notifyEachEvent(this.chronicleMap.keySet(), subscriptionConsumer);
    }

    @Override // net.openhft.chronicle.engine.api.map.KeyValueStore
    public void entriesFor(int i, @NotNull SubscriptionConsumer<MapEvent<K, V>> subscriptionConsumer) throws InvalidSubscriberException {
        Stream map = this.chronicleMap.entrySet().stream().map(entry -> {
            return InsertedEvent.of(this.assetFullName, entry.getKey(), entry.getValue(), false);
        });
        subscriptionConsumer.getClass();
        map.forEach(ThrowingConsumer.asConsumer((v1) -> {
            r1.accept(v1);
        }));
    }

    @Override // net.openhft.chronicle.engine.api.map.KeyValueStore
    @NotNull
    public Iterator<Map.Entry<K, V>> entrySetIterator() {
        return this.chronicleMap.entrySet().iterator();
    }

    @Override // net.openhft.chronicle.engine.api.map.KeyValueStore
    @NotNull
    public Iterator<K> keySetIterator() {
        return this.chronicleMap.keySet().iterator();
    }

    @Override // net.openhft.chronicle.engine.api.map.KeyValueStore
    public void clear() {
        this.chronicleMap.clear();
    }

    @Override // net.openhft.chronicle.engine.api.map.KeyValueStore
    public boolean containsValue(V v) {
        throw new UnsupportedOperationException("todo");
    }

    @Override // net.openhft.chronicle.engine.api.tree.Assetted
    @NotNull
    public Asset asset() {
        return this.asset;
    }

    @Override // net.openhft.chronicle.engine.api.tree.Assetted
    @Nullable
    /* renamed from: underlying */
    public KeyValueStore<K, V> underlying2() {
        return null;
    }

    @Override // net.openhft.chronicle.core.io.Closeable, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.isClosed.set(true);
        if (!$assertionsDisabled && this.eventLoop == null) {
            throw new AssertionError();
        }
        this.eventLoop.stop();
        Closeable.closeQuietly(this.asset.findView(TcpChannelHub.class));
        DELAYED_CLOSER.schedule(() -> {
            Closeable.closeQuietly(this.chronicleMap);
        }, 1L, TimeUnit.SECONDS);
    }

    @Override // java.util.function.Consumer
    public void accept(@NotNull EngineReplication.ReplicationEntry replicationEntry) {
        if (this.isClosed.get() || this.engineReplicator == null) {
            Jvm.warn().on(getClass(), "message skipped as closed replicationEntry=" + replicationEntry);
        } else {
            this.engineReplicator.applyReplication(replicationEntry);
        }
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.function.Supplier
    @Nullable
    public EngineReplication get() {
        return this.engineReplicator;
    }

    @Override // net.openhft.chronicle.engine.map.ObjectKeyValueStore
    public Class<K> keyType() {
        return this.keyType;
    }

    @Override // net.openhft.chronicle.engine.map.ObjectKeyValueStore
    public Class<V> valueType() {
        return this.valueType;
    }

    static {
        $assertionsDisabled = !ChronicleMapKeyValueStore.class.desiredAssertionStatus();
        DELAYED_CLOSER = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ChronicleMapKeyValueStore Closer", true));
        LOG = LoggerFactory.getLogger((Class<?>) ChronicleMapKeyValueStore.class);
        ClassAliasPool.CLASS_ALIASES.addAlias(MapReplicationHandler.class);
    }
}
