package com.fasterxml.clustermate.service.cluster;

import com.fasterxml.clustermate.api.ClusterStatusAccessor;
import com.fasterxml.clustermate.api.EntryKey;
import com.fasterxml.clustermate.api.KeyRange;
import com.fasterxml.clustermate.api.NodeState;
import com.fasterxml.clustermate.service.SharedServiceStuff;
import com.fasterxml.clustermate.service.state.ActiveNodeState;
import com.fasterxml.clustermate.service.store.StoredEntry;
import com.fasterxml.clustermate.service.store.StoredEntryConverter;
import com.fasterxml.clustermate.service.sync.SyncListAccessor;
import com.fasterxml.clustermate.service.sync.SyncListResponse;
import com.fasterxml.clustermate.service.sync.SyncListResponseEntry;
import com.fasterxml.clustermate.service.sync.SyncPullEntry;
import com.fasterxml.clustermate.service.sync.SyncPullRequest;
import com.fasterxml.clustermate.service.util.StoreUtil;
import com.fasterxml.storemate.shared.ByteContainer;
import com.fasterxml.storemate.shared.IpAndPort;
import com.fasterxml.storemate.shared.StartAndStoppable;
import com.fasterxml.storemate.shared.StorableKey;
import com.fasterxml.storemate.shared.TimeMaster;
import com.fasterxml.storemate.shared.util.IOUtil;
import com.fasterxml.storemate.store.Storable;
import com.fasterxml.storemate.store.StorableCreationMetadata;
import com.fasterxml.storemate.store.StorableCreationResult;
import com.fasterxml.storemate.store.StorableStore;
import com.fasterxml.storemate.store.StoreException;
import com.fasterxml.storemate.store.StoreOperationSource;
import com.fasterxml.storemate.store.state.NodeStateStore;
import com.fasterxml.storemate.store.util.BoundedInputStream;
import com.fasterxml.storemate.store.util.OperationDiagnostics;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.skife.config.TimeSpan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/fasterxml/clustermate/service/cluster/ClusterPeerImpl.class */
public class ClusterPeerImpl<K extends EntryKey, E extends StoredEntry<K>> extends ClusterPeer implements StartAndStoppable {
    private static final long SLEEP_FOR_SYNCLIST_ERRORS_MSECS = 10000;
    private static final long SLEEP_FOR_SYNCPULL_ERRORS_MSECS = 3000;
    private static final long SLEEP_FOR_EMPTY_SYNCLIST_MSECS = 1000;
    private static final long MAX_TOTAL_PAYLOAD = 250000000;
    private static final int MAX_SYNC_FAILURES = 8;
    protected final SharedServiceStuff _stuff;
    protected final TimeMaster _timeMaster;
    protected Thread _syncThread;
    protected final StorableStore _entryStore;
    protected final StoredEntryConverter<K, E, ?> _entryConverter;
    protected final ClusterViewByServerUpdatable _cluster;
    protected final ClusterStatusAccessor _statusAccessor;
    protected final SyncListAccessor _syncListAccessor;
    protected final NodeStateStore<IpAndPort, ActiveNodeState> _stateStore;
    protected long _lastClusterHash;
    protected ActiveNodeState _syncState;
    private static final long MINIMAL_SLEEP_MSECS = 10;
    private static final TimeSpan TIMEOUT_FOR_SYNCLIST = new TimeSpan(MINIMAL_SLEEP_MSECS, TimeUnit.SECONDS);
    private static final TimeSpan TIMEOUT_FOR_BYEBYE = new TimeSpan(250, TimeUnit.MILLISECONDS);
    private static final Logger LOG = LoggerFactory.getLogger(ClusterPeer.class);
    private final int MAX_FETCH_TRIES = 20;
    protected AtomicBoolean _running = new AtomicBoolean(false);
    protected AtomicInteger _failCount = new AtomicInteger(0);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/fasterxml/clustermate/service/cluster/ClusterPeerImpl$PullProblems.class */
    public static class PullProblems {
        public int redundant;
        public int missing;
        public int other;

        private PullProblems() {
            this.redundant = 0;
            this.missing = 0;
            this.other = 0;
        }

        public boolean hasIssues() {
            return this.redundant > 0 || this.missing > 0 || this.other > 0;
        }

        public String toString() {
            return new StringBuilder(60).append(this.redundant).append(" redundant, ").append(this.missing).append(" missing entries and ").append(this.other).append(" other problems").toString();
        }
    }

    public ClusterPeerImpl(SharedServiceStuff sharedServiceStuff, ClusterViewByServerUpdatable clusterViewByServerUpdatable, NodeStateStore<IpAndPort, ActiveNodeState> nodeStateStore, StorableStore storableStore, ActiveNodeState activeNodeState, ClusterStatusAccessor clusterStatusAccessor) {
        this._cluster = clusterViewByServerUpdatable;
        this._stuff = sharedServiceStuff;
        this._syncListAccessor = new SyncListAccessor(sharedServiceStuff);
        this._syncState = activeNodeState;
        this._stateStore = nodeStateStore;
        this._entryStore = storableStore;
        this._timeMaster = sharedServiceStuff.getTimeMaster();
        this._entryConverter = sharedServiceStuff.getEntryConverter();
        this._statusAccessor = clusterStatusAccessor;
    }

    public void start() {
        startSyncing();
    }

    public void prepareForStop() {
        _stop(false);
    }

    public void stop() {
        _stop(true);
    }

    protected void _stop(boolean z) {
        Thread thread;
        synchronized (this) {
            this._running.set(false);
            thread = this._syncThread;
            if (thread != null) {
                this._syncThread = null;
                LOG.info("Stop requested for sync thread for peer at {}", this._syncState.getAddress());
            }
        }
        if (thread != null) {
            thread.interrupt();
        }
        this._syncListAccessor.stop();
    }

    public boolean startSyncing() {
        synchronized (this) {
            if (this._syncThread != null) {
                return false;
            }
            this._running.set(true);
            Thread thread = new Thread(new Runnable() { // from class: com.fasterxml.clustermate.service.cluster.ClusterPeerImpl.1
                @Override // java.lang.Runnable
                public void run() {
                    ClusterPeerImpl.this.syncLoop();
                }
            });
            this._syncThread = thread;
            this._syncThread.setDaemon(true);
            this._syncThread.setName("NodeSync-" + this._syncState.getAddress());
            thread.start();
            return true;
        }
    }

    @Override // com.fasterxml.clustermate.service.cluster.ClusterPeer
    public int getFailCount() {
        return this._failCount.get();
    }

    @Override // com.fasterxml.clustermate.service.cluster.ClusterPeer
    public void resetFailCount() {
        this._failCount.set(0);
    }

    @Override // com.fasterxml.clustermate.service.cluster.ClusterPeer
    public long getSyncedUpTo() {
        return this._syncState.getSyncedUpTo();
    }

    @Override // com.fasterxml.clustermate.service.cluster.ClusterPeer
    public IpAndPort getAddress() {
        return this._syncState.getAddress();
    }

    @Override // com.fasterxml.clustermate.service.cluster.ClusterPeer
    public KeyRange getActiveRange() {
        return this._syncState.getRangeActive();
    }

    @Override // com.fasterxml.clustermate.service.cluster.ClusterPeer
    public KeyRange getTotalRange() {
        return this._syncState.totalRange();
    }

    @Override // com.fasterxml.clustermate.service.cluster.ClusterPeer
    public KeyRange getSyncRange() {
        return this._syncState.getRangeSync();
    }

    public ActiveNodeState getSyncState() {
        return this._syncState;
    }

    public boolean isDisabled() {
        return this._syncState.isDisabled();
    }

    protected void syncLoop() {
        LOG.info("Starting sync thread for peer at {}", this._syncState.getAddress());
        if (this._stuff.isRunningTests()) {
            try {
                this._timeMaster.sleep(1L);
            } catch (InterruptedException e) {
            }
        }
        while (this._running.get()) {
            try {
                if (hasOverlap(this._cluster.getLocalState(), this._syncState)) {
                    doRealSync();
                } else {
                    doMinimalSync();
                }
            } catch (InterruptedException e2) {
                if (this._running.get()) {
                    LOG.warn("syncLoop() interrupted without clearing '_running' flag; ignoring");
                }
            } catch (Exception e3) {
                LOG.warn("Uncaught processing exception during syncLoop(): ({}) {}", e3.getClass().getName(), e3.getMessage());
                if (this._running.get()) {
                    this._failCount.addAndGet(1);
                    try {
                        this._timeMaster.sleep(SLEEP_FOR_SYNCPULL_ERRORS_MSECS);
                    } catch (InterruptedException e4) {
                    }
                }
            }
        }
        if (this._stuff.isRunningTests()) {
            LOG.info("Stopped sync thread for peer at {} -- testing, all done!", this._syncState.getAddress());
            return;
        }
        if (this._syncState.isDisabled()) {
            LOG.info("Stopped sync thread for peer at {}: is disabled, no need to send bye-bye", this._syncState.getAddress());
            return;
        }
        LOG.info("Stopped sync thread for peer at {}: let's send bye-bye", this._syncState.getAddress());
        long currentTimeMillis = System.currentTimeMillis();
        this._syncListAccessor.sendStatusUpdate(this._cluster, TIMEOUT_FOR_BYEBYE, this._syncState.getAddress(), "inactive");
        LOG.info("Bye-bye message to {} sent in {} msec", this._syncState.getAddress(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    protected void doRealSync() throws Exception {
        long currentTimeMillis = this._timeMaster.currentTimeMillis();
        SyncListResponse<?> _fetchSyncList = _fetchSyncList();
        if (this._running.get()) {
            if (_fetchSyncList == null) {
                this._timeMaster.sleep(SLEEP_FOR_SYNCLIST_ERRORS_MSECS);
                return;
            }
            if (_fetchSyncList.clusterStatus != null) {
                this._cluster.updateWith(_fetchSyncList.clusterStatus);
                this._lastClusterHash = _fetchSyncList.clusterHash;
            } else if (_fetchSyncList.clusterHash != this._lastClusterHash) {
                LOG.warn("Did not get cluster status from {} even though hashes differ 0x{} (old) vs 0x{} (response)", new Object[]{this._syncState.getAddress(), Long.toHexString(this._lastClusterHash), Long.toHexString(_fetchSyncList.clusterHash)});
            }
            long lastSeen = _fetchSyncList.lastSeen();
            if (lastSeen <= 0) {
                LOG.error("Invalid lastSeen timestamp value {} for SyncList, from {}", Long.valueOf(lastSeen), this._syncState.getAddress());
                Thread.sleep(100L);
            }
            List<SyncListResponseEntry> list = _fetchSyncList.entries;
            int size = list.size();
            if (size == 0) {
                _updatePersistentState(currentTimeMillis, lastSeen);
                long j = _fetchSyncList.clientWait;
                if (j < MINIMAL_SLEEP_MSECS) {
                    j = 1000;
                }
                this._timeMaster.sleep(j);
                return;
            }
            _handleTombstones(list);
            _filterSeen(list);
            if (this._running.get()) {
                if (list.isEmpty()) {
                    _updatePersistentState(currentTimeMillis, lastSeen);
                } else {
                    int size2 = list.size();
                    AtomicInteger atomicInteger = new AtomicInteger(0);
                    long _fetchMissing = _fetchMissing(list, atomicInteger);
                    LOG.info("Fetched {}/{} missing entries from {} in {} seconds ({} rounds)", new Object[]{Integer.valueOf(size2 - list.size()), Integer.valueOf(size2), getAddress(), String.format("%.2f", Double.valueOf((this._timeMaster.currentTimeMillis() - currentTimeMillis) / 1000.0d)), Integer.valueOf(atomicInteger.get())});
                    _updatePersistentState(currentTimeMillis, _fetchMissing);
                }
                long _calculateSleepBetweenSync = _calculateSleepBetweenSync(size, this._timeMaster.currentTimeMillis() - this._syncState.getSyncedUpTo());
                if (_calculateSleepBetweenSync > 0) {
                    if (_calculateSleepBetweenSync >= 50) {
                        LOG.info("With {} listed entries, {} seconds behind, will do {} msec sleep", new Object[]{Integer.valueOf(size), String.format("%.2f", Double.valueOf(_calculateSleepBetweenSync / 1000.0d)), Long.valueOf(_calculateSleepBetweenSync)});
                    }
                    this._timeMaster.sleep(_calculateSleepBetweenSync);
                }
            }
        }
    }

    protected void doMinimalSync() throws Exception {
        LOG.info("doMinimalSync(): let's just... Sleep for a bit (TBD)");
        Thread.sleep(30000L);
    }

    public void markDisabled(long j, boolean z) {
        if (j <= 0) {
            j = this._syncState.getDisabledUpdated();
        }
        ActiveNodeState withDisabled = this._syncState.withDisabled(j, z);
        if (withDisabled != this._syncState) {
            this._syncState = withDisabled;
            try {
                this._stateStore.upsertEntry(withDisabled.getAddress(), withDisabled);
            } catch (Exception e) {
                LOG.error("Failed to update node state (disabled to {}) for {}. Problem ({}): {}", new Object[]{Boolean.valueOf(z), this._syncState, e.getClass().getName(), e.getMessage()});
            }
        }
    }

    private void _updatePersistentState(long j, long j2) {
        ActiveNodeState activeNodeState = this._syncState;
        this._syncState = this._syncState.withLastSyncAttempt(j);
        if (j2 > this._syncState.getSyncedUpTo()) {
            this._syncState = this._syncState.withSyncedUpTo(j2);
        }
        if (this._syncState != activeNodeState) {
            try {
                this._stateStore.upsertEntry(this._syncState.getAddress(), this._syncState);
            } catch (Exception e) {
                LOG.error("Failed to update node state for {}. Problem ({}): {}", new Object[]{this._syncState, e.getClass().getName(), e.getMessage()});
            }
        }
    }

    private SyncListResponse<?> _fetchSyncList() throws InterruptedException {
        try {
            return this._syncListAccessor.fetchSyncList(this._cluster, TIMEOUT_FOR_SYNCLIST, this._syncState, this._lastClusterHash);
        } catch (InterruptedException e) {
            if (!this._running.get()) {
                return null;
            }
            LOG.warn("Failed to fetch syncList from {} ({}): {}", new Object[]{this._syncState.getAddress(), e.getClass().getName(), e.getMessage()});
            return null;
        }
    }

    protected int _handleTombstones(List<SyncListResponseEntry> list) throws IOException, StoreException {
        int i = 0;
        Iterator<SyncListResponseEntry> it = list.iterator();
        while (it.hasNext()) {
            SyncListResponseEntry next = it.next();
            if (next.deleted()) {
                i++;
                it.remove();
                this._entryStore.softDelete(StoreOperationSource.SYNC, (OperationDiagnostics) null, next.key, true, true);
            }
        }
        return i;
    }

    protected void _filterSeen(List<SyncListResponseEntry> list) throws IOException, StoreException {
        Iterator<SyncListResponseEntry> it = list.iterator();
        while (it.hasNext()) {
            SyncListResponseEntry next = it.next();
            Storable findEntry = this._entryStore.findEntry(StoreOperationSource.SYNC, (OperationDiagnostics) null, next.key);
            if (findEntry != null && !StoreUtil.needToPullRemoteToResolve(findEntry.getLastModified(), findEntry.getContentHash(), next.insertionTime, next.hash)) {
                it.remove();
            }
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:49:0x022d, code lost:
    
        throw new java.io.IOException("Unexpected end-of-input: got " + r0 + " bytes; needed " + r27);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private long _fetchMissing(java.util.List<com.fasterxml.clustermate.service.sync.SyncListResponseEntry> r11, java.util.concurrent.atomic.AtomicInteger r12) throws java.lang.InterruptedException {
        /*
            Method dump skipped, instructions count: 949
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.fasterxml.clustermate.service.cluster.ClusterPeerImpl._fetchMissing(java.util.List, java.util.concurrent.atomic.AtomicInteger):long");
    }

    private SyncPullRequest _buildSyncPullRequest(List<SyncListResponseEntry> list, int i, AtomicInteger atomicInteger) {
        SyncPullRequest syncPullRequest = new SyncPullRequest();
        Iterator<SyncListResponseEntry> it = list.iterator();
        SyncListResponseEntry next = it.next();
        syncPullRequest.addEntry(next.key);
        long j = next.size;
        while (true) {
            if (!it.hasNext() || syncPullRequest.size() >= i) {
                break;
            }
            SyncListResponseEntry next2 = it.next();
            j += next2.size;
            if (j > MAX_TOTAL_PAYLOAD) {
                j -= next2.size;
                break;
            }
            syncPullRequest.addEntry(next2.key);
        }
        atomicInteger.set((int) j);
        return syncPullRequest;
    }

    private long _calculateSleepBetweenSync(int i, long j) {
        int i2 = (this._stuff.getServiceConfig().cfgMaxEntriesPerSyncList * 3) / 4;
        if (j >= 180000 || i >= i2) {
            return 0L;
        }
        if (i < 5) {
            return 300L;
        }
        if (i <= 10) {
            return 200L;
        }
        return i < 40 ? 100L : 50L;
    }

    private void _pullEntry(SyncListResponseEntry syncListResponseEntry, SyncPullEntry syncPullEntry, InputStream inputStream, PullProblems pullProblems) throws IOException {
        StorableCreationResult upsertConditionally;
        ByteContainer simple;
        StorableKey storableKey = syncPullEntry.key;
        long j = syncPullEntry.storageSize;
        if (syncPullEntry.isDeleted) {
            this._entryStore.softDelete(StoreOperationSource.SYNC, (OperationDiagnostics) null, storableKey, true, true);
            return;
        }
        StorableCreationMetadata storableCreationMetadata = new StorableCreationMetadata(syncPullEntry.compression, syncPullEntry.checksum, syncPullEntry.checksumForCompressed);
        storableCreationMetadata.uncompressedSize = syncPullEntry.size;
        storableCreationMetadata.storageSize = syncPullEntry.storageSize;
        storableCreationMetadata.replicated = true;
        ByteContainer createMetadata = this._entryConverter.createMetadata(this._timeMaster.currentTimeMillis(), syncPullEntry.lastAccessMethod, syncPullEntry.minTTLSecs, syncPullEntry.maxTTLSecs);
        if (j <= this._stuff.getServiceConfig().storeConfig.maxInlinedStorageSize) {
            if (j == 0) {
                simple = ByteContainer.emptyContainer();
            } else {
                byte[] bArr = new byte[(int) j];
                int readFully = IOUtil.readFully(inputStream, bArr);
                if (readFully < j) {
                    throw new IOException("Unexpected end-of-input: got " + readFully + " bytes; needed " + j);
                }
                simple = ByteContainer.simple(bArr);
            }
            upsertConditionally = this._entryStore.upsertConditionally(StoreOperationSource.SYNC, (OperationDiagnostics) null, storableKey, simple, storableCreationMetadata, createMetadata, true, new ConflictOverwriteChecker(syncListResponseEntry.insertionTime));
        } else {
            BoundedInputStream boundedInputStream = new BoundedInputStream(inputStream, storableCreationMetadata.storageSize, false);
            upsertConditionally = this._entryStore.upsertConditionally(StoreOperationSource.SYNC, (OperationDiagnostics) null, storableKey, boundedInputStream, storableCreationMetadata, createMetadata, true, new ConflictOverwriteChecker(syncListResponseEntry.insertionTime));
            if (upsertConditionally.succeeded() && !boundedInputStream.isCompletelyRead()) {
                Storable newEntry = upsertConditionally.getNewEntry();
                long storageLength = newEntry == null ? -1L : newEntry.getStorageLength();
                pullProblems.other++;
                LOG.warn("Problems with sync-pull for '{}': read {} bytes, should have read {} more; entry storageSize: {}", new Object[]{syncPullEntry.key, Long.valueOf(boundedInputStream.bytesRead()), Long.valueOf(boundedInputStream.bytesLeft()), Long.valueOf(storageLength)});
            }
        }
        if (upsertConditionally.succeeded()) {
            return;
        }
        int i = pullProblems.redundant;
        pullProblems.redundant = i + 1;
        if (i == 0) {
            if (upsertConditionally.getPreviousEntry() != null) {
                LOG.info("Redundant sync-pull for '{}' (from {}): entry already existed locally (will only report first)", syncPullEntry.key, this._syncState.getAddress());
            } else {
                LOG.warn("Failed sync-pull for '{}' (from {}): no old entry. Strange! (will only report first)", syncPullEntry.key, this._syncState.getAddress());
            }
        }
    }

    protected final boolean hasOverlap(NodeState nodeState, NodeState nodeState2) {
        return nodeState.totalRange().overlapsWith(nodeState2.totalRange());
    }
}
