package com.fasterxml.clustermate.service.remote;

import com.fasterxml.clustermate.api.EntryKey;
import com.fasterxml.clustermate.api.NodeState;
import com.fasterxml.clustermate.service.SharedServiceStuff;
import com.fasterxml.clustermate.service.Stores;
import com.fasterxml.clustermate.service.cluster.ConflictOverwriteChecker;
import com.fasterxml.clustermate.service.state.ActiveNodeState;
import com.fasterxml.clustermate.service.store.StoredEntry;
import com.fasterxml.clustermate.service.store.StoredEntryConverter;
import com.fasterxml.clustermate.service.sync.SyncListAccessor;
import com.fasterxml.clustermate.service.sync.SyncListResponse;
import com.fasterxml.clustermate.service.sync.SyncListResponseEntry;
import com.fasterxml.clustermate.service.sync.SyncPullEntry;
import com.fasterxml.clustermate.service.sync.SyncPullRequest;
import com.fasterxml.clustermate.service.util.StoreUtil;
import com.fasterxml.storemate.shared.ByteContainer;
import com.fasterxml.storemate.shared.IpAndPort;
import com.fasterxml.storemate.shared.StartAndStoppable;
import com.fasterxml.storemate.shared.StorableKey;
import com.fasterxml.storemate.shared.util.IOUtil;
import com.fasterxml.storemate.store.Storable;
import com.fasterxml.storemate.store.StorableCreationMetadata;
import com.fasterxml.storemate.store.StorableCreationResult;
import com.fasterxml.storemate.store.StorableStore;
import com.fasterxml.storemate.store.StoreException;
import com.fasterxml.storemate.store.StoreOperationSource;
import com.fasterxml.storemate.store.util.BoundedInputStream;
import com.fasterxml.storemate.store.util.OperationDiagnostics;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.skife.config.TimeSpan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/fasterxml/clustermate/service/remote/RemoteClusterHandler.class */
public class RemoteClusterHandler<K extends EntryKey, E extends StoredEntry<K>> implements StartAndStoppable {
    private static final long MSECS_TO_WAIT_AFTER_FAILED_STATUS = 30000;
    private static final long MSECS_TO_WAIT_AFTER_NO_REMOTE_PEERS = 30000;
    private static final int MAX_WAIT_SECS_FOR_REMOTE_STATUS = 10;
    private static final long SLEEP_FOR_SYNCPULL_ERRORS_MSECS = 3000;
    private static final long SLEEP_INITIAL = 10000;
    private static final TimeSpan TIMEOUT_FOR_INITIAL_SYNCLIST_MSECS = new TimeSpan(10, TimeUnit.SECONDS);
    private static final TimeSpan TIMEOUT_FOR_SYNCLIST = new TimeSpan(8, TimeUnit.SECONDS);
    private static final long MAX_TIME_FOR_SYNCPULL_MSECS = 60000;
    private static final long MAX_TOTAL_PAYLOAD = 250000000;
    private static final int MAX_SYNC_FAILURES = 8;
    protected final SharedServiceStuff _stuff;
    protected final NodeState _localState;
    protected final RemoteClusterStateFetcher _remoteFetcher;
    protected final Stores<K, E> _stores;
    protected final StoredEntryConverter<K, E, ?> _entryConverter;
    protected Thread _syncThread;
    protected final SyncListAccessor _syncListAccessor;
    private final Logger LOG = LoggerFactory.getLogger(getClass());
    private final int MAX_FETCH_TRIES = 20;
    protected final AtomicBoolean _running = new AtomicBoolean(false);
    protected final AtomicReference<RemoteCluster> _remoteCluster = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/fasterxml/clustermate/service/remote/RemoteClusterHandler$PullProblems.class */
    public static class PullProblems {
        public int redundant;
        public int missing;
        public int other;

        private PullProblems() {
            this.redundant = 0;
            this.missing = 0;
            this.other = 0;
        }

        public boolean hasIssues() {
            return this.redundant > 0 || this.missing > 0 || this.other > 0;
        }

        public String toString() {
            return new StringBuilder(60).append(this.redundant).append(" redundant, ").append(this.missing).append(" missing entries and ").append(this.other).append(" other problems").toString();
        }
    }

    public RemoteClusterHandler(SharedServiceStuff sharedServiceStuff, Stores<K, E> stores, Set<IpAndPort> set, NodeState nodeState) {
        this._stuff = sharedServiceStuff;
        this._localState = nodeState;
        this._stores = stores;
        this._entryConverter = sharedServiceStuff.getEntryConverter();
        this._remoteFetcher = new RemoteClusterStateFetcher(sharedServiceStuff, this._running, set, nodeState);
        this._syncListAccessor = new SyncListAccessor(sharedServiceStuff);
    }

    public RemoteCluster getRemoteCluster() {
        return this._remoteCluster.get();
    }

    public void start() throws Exception {
        synchronized (this) {
            if (this._syncThread == null) {
                this._running.set(true);
                Thread thread = new Thread(new Runnable() { // from class: com.fasterxml.clustermate.service.remote.RemoteClusterHandler.1
                    @Override // java.lang.Runnable
                    public void run() {
                        if (RemoteClusterHandler.this._remoteFetcher.init()) {
                            RemoteClusterHandler.this.syncLoop();
                        } else {
                            RemoteClusterHandler.this.LOG.error("No valid end points found for {}: CAN NOT PROCEED WITH REMOTE SYNC", RemoteClusterHandler.this.getName());
                        }
                    }
                });
                this._syncThread = thread;
                this._syncThread.setDaemon(true);
                this._syncThread.setName(getName());
                thread.start();
            }
        }
    }

    public void prepareForStop() throws Exception {
        _stop(false);
    }

    public void stop() throws Exception {
        _stop(true);
    }

    protected void _stop(boolean z) {
        Thread thread;
        synchronized (this) {
            this._running.set(false);
            thread = this._syncThread;
            if (thread != null) {
                this._syncThread = null;
                this.LOG.info("Stop requested (force? {}) for {} thread", Boolean.valueOf(z), getName());
            }
        }
        if (thread != null) {
            thread.interrupt();
        }
    }

    protected void syncLoop() {
        this.LOG.info("Starting {} thread, will sleep for {} msec before operation", getName(), Long.valueOf(SLEEP_INITIAL));
        try {
            Thread.sleep(SLEEP_INITIAL);
        } catch (InterruptedException e) {
        }
        while (this._running.get()) {
            try {
                RemoteCluster _remoteCluster = _remoteCluster();
                if (_remoteCluster != null) {
                    this._stuff.sleep(_sleepForMsecs(_syncListPull(_remoteCluster)));
                }
            } catch (InterruptedException e2) {
                if (this._running.get()) {
                    this.LOG.warn("syncLoop() interrupted without clearing '_running' flag; ignoring");
                }
            } catch (Exception e3) {
                this.LOG.warn("Uncaught processing exception during syncLoop(): ({}) {}", e3.getClass().getName(), e3.getMessage());
                if (this._running.get()) {
                    try {
                        this._stuff.getTimeMaster().sleep(SLEEP_FOR_SYNCPULL_ERRORS_MSECS);
                    } catch (InterruptedException e4) {
                    }
                }
            }
        }
    }

    protected long _syncListPull(RemoteCluster remoteCluster) throws InterruptedException {
        long currentTimeMillis;
        SyncListResponse<?> fetchRemoteSyncList;
        for (RemoteClusterNode remoteClusterNode : remoteCluster.getRemotePeers()) {
            ActiveNodeState persisted = remoteClusterNode.persisted();
            if (persisted == null) {
                try {
                    persisted = (ActiveNodeState) this._stores.getRemoteNodeStore().findEntry(remoteClusterNode.getAddress());
                    if (persisted == null) {
                        persisted = new ActiveNodeState(this._localState, remoteClusterNode.asNodeState(this._localState), this._stuff.currentTimeMillis());
                    }
                    remoteClusterNode.setPersisted(persisted);
                } catch (Exception e) {
                    this.LOG.warn("Failed to load Remote Node State for {}; must skip node", remoteClusterNode);
                }
            }
            try {
                currentTimeMillis = System.currentTimeMillis();
                fetchRemoteSyncList = this._syncListAccessor.fetchRemoteSyncList(this._localState, remoteClusterNode.getAddress(), persisted.getSyncedUpTo(), TIMEOUT_FOR_INITIAL_SYNCLIST_MSECS);
            } catch (InterruptedException e2) {
                throw e2;
            } catch (Exception e3) {
                this.LOG.warn("Failure to fetch initial remote sync list from " + remoteClusterNode.getAddress() + ", skipping peer:  (" + e3.getClass().getName() + ") " + e3.getMessage(), e3);
            }
            if (fetchRemoteSyncList != null) {
                return _syncPull(currentTimeMillis, remoteClusterNode, fetchRemoteSyncList);
            }
            continue;
        }
        this.LOG.warn("Failed to contact any of remote peers ({}) to do remote sync", remoteCluster.getRemotePeers());
        this._stuff.getTimeMaster().sleep(30000L);
        return 0L;
    }

    protected long _syncPull(long j, RemoteClusterNode remoteClusterNode, SyncListResponse<?> syncListResponse) throws InterruptedException, IOException {
        long currentTimeMillis = this._stuff.currentTimeMillis() + MAX_TIME_FOR_SYNCPULL_MSECS;
        long j2 = 0;
        ActiveNodeState activeNodeState = null;
        int i = 0;
        ActiveNodeState persisted = remoteClusterNode.persisted();
        while (true) {
            if (!this._running.get()) {
                break;
            }
            i++;
            if (i >= 1000) {
                break;
            }
            int size = syncListResponse.size();
            j2 += size;
            if (size != 0) {
                List<SyncListResponseEntry> list = syncListResponse.entries;
                _handleTombstones(list);
                _filterSeen(list);
                if (!this._running.get()) {
                    break;
                }
                if (!list.isEmpty()) {
                    int size2 = list.size();
                    AtomicInteger atomicInteger = new AtomicInteger(0);
                    _fetchMissing(remoteClusterNode.getAddress(), list, atomicInteger);
                    this.LOG.info("Fetched {}/{} missing entries ({} listed) from {} in {} seconds ({} rounds)", new Object[]{Integer.valueOf(size2 - list.size()), Integer.valueOf(size2), Integer.valueOf(size), remoteClusterNode.getAddress(), String.format("%.2f", Double.valueOf((this._stuff.currentTimeMillis() - j) / 1000.0d)), Integer.valueOf(atomicInteger.get())});
                }
                long lastSeen = syncListResponse.lastSeen();
                if (lastSeen > 0) {
                    persisted = persisted.withSyncedUpTo(lastSeen);
                } else {
                    this.LOG.warn("Missing lastSeenTimestamp from sync-list to {}", remoteClusterNode.getAddress());
                }
                remoteClusterNode.setPersisted(persisted);
                if (this._stuff.currentTimeMillis() < currentTimeMillis) {
                    if (activeNodeState == null) {
                        activeNodeState = persisted;
                        this._stores.getRemoteNodeStore().upsertEntry(remoteClusterNode.getAddress(), persisted);
                    }
                    if (size < 50 && syncListResponse.clientWait > 0) {
                        break;
                    }
                    syncListResponse = this._syncListAccessor.fetchRemoteSyncList(this._localState, remoteClusterNode.getAddress(), persisted.getSyncedUpTo(), TIMEOUT_FOR_INITIAL_SYNCLIST_MSECS);
                    syncListResponse.lastSeen();
                } else {
                    j2 = -j2;
                    break;
                }
            } else {
                break;
            }
        }
        if (activeNodeState != persisted) {
            this._stores.getRemoteNodeStore().upsertEntry(remoteClusterNode.getAddress(), persisted);
        }
        return j2;
    }

    /* JADX WARN: Code restructure failed: missing block: B:46:0x0224, code lost:
    
        throw new java.io.IOException("Unexpected end-of-input: got " + r0 + " bytes; needed " + r28);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private long _fetchMissing(com.fasterxml.storemate.shared.IpAndPort r11, java.util.List<com.fasterxml.clustermate.service.sync.SyncListResponseEntry> r12, java.util.concurrent.atomic.AtomicInteger r13) throws java.lang.InterruptedException {
        /*
            Method dump skipped, instructions count: 923
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.fasterxml.clustermate.service.remote.RemoteClusterHandler._fetchMissing(com.fasterxml.storemate.shared.IpAndPort, java.util.List, java.util.concurrent.atomic.AtomicInteger):long");
    }

    private SyncPullRequest _buildRemoteSyncPullRequest(List<SyncListResponseEntry> list, int i, AtomicInteger atomicInteger) {
        SyncPullRequest syncPullRequest = new SyncPullRequest();
        Iterator<SyncListResponseEntry> it = list.iterator();
        SyncListResponseEntry next = it.next();
        syncPullRequest.addEntry(next.key);
        long j = next.size;
        while (true) {
            if (!it.hasNext() || syncPullRequest.size() >= i) {
                break;
            }
            SyncListResponseEntry next2 = it.next();
            j += next2.size;
            if (j > MAX_TOTAL_PAYLOAD) {
                j -= next2.size;
                break;
            }
            syncPullRequest.addEntry(next2.key);
        }
        atomicInteger.set((int) j);
        return syncPullRequest;
    }

    private void _pullEntry(IpAndPort ipAndPort, SyncListResponseEntry syncListResponseEntry, SyncPullEntry syncPullEntry, InputStream inputStream, PullProblems pullProblems) throws IOException {
        StorableCreationResult upsertConditionally;
        ByteContainer simple;
        StorableStore entryStore = this._stores.getEntryStore();
        StorableKey storableKey = syncPullEntry.key;
        long j = syncPullEntry.storageSize;
        if (syncPullEntry.isDeleted) {
            entryStore.softDelete(StoreOperationSource.SYNC, (OperationDiagnostics) null, storableKey, true, true);
            return;
        }
        StorableCreationMetadata storableCreationMetadata = new StorableCreationMetadata(syncPullEntry.compression, syncPullEntry.checksum, syncPullEntry.checksumForCompressed);
        storableCreationMetadata.uncompressedSize = syncPullEntry.size;
        storableCreationMetadata.storageSize = syncPullEntry.storageSize;
        storableCreationMetadata.replicated = true;
        ByteContainer createMetadata = this._entryConverter.createMetadata(this._stuff.currentTimeMillis(), syncPullEntry.lastAccessMethod, syncPullEntry.minTTLSecs, syncPullEntry.maxTTLSecs);
        if (j <= this._stuff.getServiceConfig().storeConfig.maxInlinedStorageSize) {
            if (j == 0) {
                simple = ByteContainer.emptyContainer();
            } else {
                byte[] bArr = new byte[(int) j];
                int readFully = IOUtil.readFully(inputStream, bArr);
                if (readFully < j) {
                    throw new IOException("Unexpected end-of-input: got " + readFully + " bytes; needed " + j);
                }
                simple = ByteContainer.simple(bArr);
            }
            upsertConditionally = entryStore.upsertConditionally(StoreOperationSource.SYNC, (OperationDiagnostics) null, storableKey, simple, storableCreationMetadata, createMetadata, true, new ConflictOverwriteChecker(syncListResponseEntry.insertionTime));
        } else {
            BoundedInputStream boundedInputStream = new BoundedInputStream(inputStream, storableCreationMetadata.storageSize, false);
            upsertConditionally = entryStore.upsertConditionally(StoreOperationSource.SYNC, (OperationDiagnostics) null, storableKey, boundedInputStream, storableCreationMetadata, createMetadata, true, new ConflictOverwriteChecker(syncListResponseEntry.insertionTime));
            if (upsertConditionally.succeeded() && !boundedInputStream.isCompletelyRead()) {
                Storable newEntry = upsertConditionally.getNewEntry();
                long storageLength = newEntry == null ? -1L : newEntry.getStorageLength();
                pullProblems.other++;
                this.LOG.warn("Problems with sync-pull for '{}': read {} bytes, should have read {} more; entry storageSize: {}", new Object[]{syncPullEntry.key, Long.valueOf(boundedInputStream.bytesRead()), Long.valueOf(boundedInputStream.bytesLeft()), Long.valueOf(storageLength)});
            }
        }
        if (upsertConditionally.succeeded()) {
            return;
        }
        int i = pullProblems.redundant;
        pullProblems.redundant = i + 1;
        if (i == 0) {
            if (upsertConditionally.getPreviousEntry() != null) {
                this.LOG.info("Redundant sync-pull for '{}' (from {}): entry already existed locally (will only report first)", syncPullEntry.key, ipAndPort);
            } else {
                this.LOG.warn("Failed sync-pull for '{}' (from {}): no old entry. Strange! (will only report first)", syncPullEntry.key, ipAndPort);
            }
        }
    }

    protected int _handleTombstones(List<SyncListResponseEntry> list) throws IOException, StoreException {
        StorableStore entryStore = this._stores.getEntryStore();
        int i = 0;
        Iterator<SyncListResponseEntry> it = list.iterator();
        while (it.hasNext()) {
            SyncListResponseEntry next = it.next();
            if (next.deleted()) {
                i++;
                it.remove();
                entryStore.softDelete(StoreOperationSource.SYNC, (OperationDiagnostics) null, next.key, true, true);
            }
        }
        return i;
    }

    protected void _filterSeen(List<SyncListResponseEntry> list) throws IOException, StoreException {
        StorableStore entryStore = this._stores.getEntryStore();
        Iterator<SyncListResponseEntry> it = list.iterator();
        while (it.hasNext()) {
            SyncListResponseEntry next = it.next();
            Storable findEntry = entryStore.findEntry(StoreOperationSource.SYNC, (OperationDiagnostics) null, next.key);
            if (findEntry != null && !StoreUtil.needToPullRemoteToResolve(findEntry.getLastModified(), findEntry.getContentHash(), next.insertionTime, next.hash)) {
                it.remove();
            }
        }
    }

    protected RemoteCluster _remoteCluster() throws IOException {
        RemoteCluster remoteCluster = this._remoteCluster.get();
        if (remoteCluster == null || !remoteCluster.isStillValid(this._stuff.currentTimeMillis())) {
            remoteCluster = this._remoteFetcher.fetch(MAX_WAIT_SECS_FOR_REMOTE_STATUS);
            if (this._remoteCluster == null) {
                this.LOG.warn("Failed to access remote cluster status information; will wait for {} msecs", 30000L);
                try {
                    this._stuff.getTimeMaster().sleep(30000L);
                } catch (InterruptedException e) {
                }
            }
            this._remoteCluster.set(remoteCluster);
        }
        return remoteCluster;
    }

    protected long _sleepForMsecs(long j) {
        if (j < 0) {
            return 50L;
        }
        if (j == 0) {
            return SLEEP_INITIAL;
        }
        if (j < 100) {
            return 5000L;
        }
        return j < 500 ? RemoteClusterStateFetcher.BOOTSTRAP_TIMEOUT_MSECS : j < 1000 ? 500L : 500L;
    }

    protected String getName() {
        return "RemoteClusterSync";
    }
}
