package io.pravega.client.state.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.state.InitialUpdate;
import io.pravega.client.state.Revision;
import io.pravega.client.state.Revisioned;
import io.pravega.client.state.RevisionedStreamClient;
import io.pravega.client.state.StateSynchronizer;
import io.pravega.client.state.Update;
import io.pravega.client.stream.TruncatedDataException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.annotation.concurrent.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/client/state/impl/StateSynchronizerImpl.class */
public class StateSynchronizerImpl<StateT extends Revisioned> implements StateSynchronizer<StateT> {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger((Class<?>) StateSynchronizerImpl.class);

    @SuppressFBWarnings(justification = "generated code")
    private final Object $lock = new Object[0];
    private final RevisionedStreamClient<UpdateOrInit<StateT>> client;

    @GuardedBy("$lock")
    private StateT currentState;
    private Segment segment;

    public StateSynchronizerImpl(Segment segment, RevisionedStreamClient<UpdateOrInit<StateT>> revisionedStreamClient) {
        this.segment = segment;
        this.client = revisionedStreamClient;
    }

    @Override // io.pravega.client.state.StateSynchronizer
    public StateT getState() {
        StateT statet;
        synchronized (this.$lock) {
            statet = this.currentState;
        }
        return statet;
    }

    private Revision getRevisionToReadFrom(boolean z) {
        Revision mark;
        StateT state = getState();
        if (!z || state == null) {
            mark = this.client.getMark();
            if (mark == null) {
                mark = this.client.fetchOldestRevision();
            }
        } else {
            mark = state.getRevision();
        }
        return mark;
    }

    @Override // io.pravega.client.state.StateSynchronizer
    public void fetchUpdates() {
        Revision revisionToReadFrom = getRevisionToReadFrom(true);
        log.trace("Fetching updates after {} ", revisionToReadFrom);
        try {
            Iterator<Map.Entry<Revision, UpdateOrInit<StateT>>> readFrom = this.client.readFrom(revisionToReadFrom);
            while (readFrom.hasNext()) {
                Map.Entry<Revision, UpdateOrInit<StateT>> next = readFrom.next();
                log.trace("Found entry {} ", next.getValue());
                if (next.getValue().isInit()) {
                    InitialUpdate<StateT> init = next.getValue().getInit();
                    if (isNewer(next.getKey())) {
                        updateCurrentState(init.create(this.segment.getScopedStreamName(), next.getKey()));
                    }
                } else {
                    applyUpdates(next.getKey().asImpl(), next.getValue().getUpdates());
                }
            }
        } catch (TruncatedDataException e) {
            log.warn("{} encountered truncation on segment {}", this, this.segment);
            handleTruncation();
        }
    }

    private void handleTruncation() {
        log.info(this + " Encountered truncation");
        Revision revisionToReadFrom = getRevisionToReadFrom(false);
        log.trace("Fetching updates after {} ", revisionToReadFrom);
        boolean z = false;
        Iterator<Map.Entry<Revision, UpdateOrInit<StateT>>> readFrom = this.client.readFrom(revisionToReadFrom);
        while (!z && readFrom.hasNext()) {
            Map.Entry<Revision, UpdateOrInit<StateT>> next = readFrom.next();
            if (next.getValue().isInit()) {
                log.trace("Found entry {} ", next.getValue());
                InitialUpdate<StateT> init = next.getValue().getInit();
                if (isNewer(next.getKey())) {
                    updateCurrentState(init.create(this.segment.getScopedStreamName(), next.getKey()));
                    z = true;
                }
            }
        }
        if (!z) {
            throw new IllegalStateException("Data was truncated but there is not init after the truncation point.");
        }
        fetchUpdates();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void applyUpdates(Revision revision, List<? extends Update<StateT>> list) {
        int i = 0;
        Iterator<? extends Update<StateT>> it = list.iterator();
        while (it.hasNext()) {
            Update update = (Update<StateT>) it.next();
            StateT state = getState();
            synchronized (state) {
                int i2 = i;
                i++;
                Revision revisionImpl = new RevisionImpl(this.segment, revision.asImpl().getOffsetInSegment(), i2);
                if (revisionImpl.compareTo(state.getRevision()) > 0) {
                    updateCurrentState(update.applyTo(state, revisionImpl));
                }
            }
        }
    }

    @Override // io.pravega.client.state.StateSynchronizer
    public void updateState(StateSynchronizer.UpdateGenerator<StateT> updateGenerator) {
        conditionallyWrite(revisioned -> {
            ArrayList arrayList = new ArrayList();
            updateGenerator.accept(revisioned, arrayList);
            if (arrayList.isEmpty()) {
                return null;
            }
            return new UpdateOrInit(arrayList);
        });
    }

    @Override // io.pravega.client.state.StateSynchronizer
    public <ReturnT> ReturnT updateState(StateSynchronizer.UpdateGeneratorFunction<StateT, ReturnT> updateGeneratorFunction) {
        AtomicReference atomicReference = new AtomicReference();
        conditionallyWrite(revisioned -> {
            ArrayList arrayList = new ArrayList();
            atomicReference.set(updateGeneratorFunction.apply(revisioned, arrayList));
            if (arrayList.isEmpty()) {
                return null;
            }
            return new UpdateOrInit(arrayList);
        });
        return (ReturnT) atomicReference.get();
    }

    @Override // io.pravega.client.state.StateSynchronizer
    public void updateStateUnconditionally(Update<StateT> update) {
        log.trace("Unconditionally Writing {} ", update);
        this.client.writeUnconditionally(new UpdateOrInit<>(Collections.singletonList(update)));
    }

    @Override // io.pravega.client.state.StateSynchronizer
    public void updateStateUnconditionally(List<? extends Update<StateT>> list) {
        log.trace("Unconditionally Writing {} ", list);
        this.client.writeUnconditionally(new UpdateOrInit<>(list));
    }

    @Override // io.pravega.client.state.StateSynchronizer
    public void initialize(InitialUpdate<StateT> initialUpdate) {
        Revision writeConditionally = this.client.writeConditionally(this.client.fetchOldestRevision(), new UpdateOrInit<>(initialUpdate));
        if (writeConditionally == null) {
            fetchUpdates();
        } else {
            updateCurrentState(initialUpdate.create(this.segment.getScopedStreamName(), writeConditionally));
        }
    }

    @Override // io.pravega.client.state.StateSynchronizer
    public long bytesWrittenSinceCompaction() {
        Revision mark = this.client.getMark();
        StateT state = getState();
        return Math.max(0L, (state == null ? 0L : state.getRevision().asImpl().getOffsetInSegment()) - (mark == null ? 0L : mark.asImpl().getOffsetInSegment()));
    }

    @Override // io.pravega.client.state.StateSynchronizer
    public void compact(Function<StateT, InitialUpdate<StateT>> function) {
        AtomicReference atomicReference = new AtomicReference(null);
        conditionallyWrite(revisioned -> {
            InitialUpdate initialUpdate = (InitialUpdate) function.apply(revisioned);
            if (initialUpdate == null) {
                atomicReference.set(null);
                return null;
            }
            atomicReference.set(revisioned.getRevision());
            return new UpdateOrInit(initialUpdate);
        });
        Revision revision = (Revision) atomicReference.get();
        if (revision != null) {
            Revision mark = this.client.getMark();
            if (mark == null || mark.compareTo(revision) < 0) {
                this.client.compareAndSetMark(mark, revision);
            }
            if (mark != null) {
                this.client.truncateToRevision(mark);
            }
        }
    }

    private void conditionallyWrite(Function<StateT, UpdateOrInit<StateT>> function) {
        while (true) {
            StateT state = getState();
            if (state == null) {
                fetchUpdates();
                state = getState();
                if (state == null) {
                    throw new IllegalStateException("Write was called before the state was initialized.");
                }
            }
            log.trace("Conditionally Writing {} ", state);
            Revision revision = state.getRevision();
            UpdateOrInit<StateT> apply = function.apply(state);
            if (apply == null) {
                return;
            }
            Revision writeConditionally = this.client.writeConditionally(revision, apply);
            log.trace("Conditionally write returned {} ", writeConditionally);
            if (writeConditionally != null) {
                if (apply.isInit()) {
                    return;
                }
                applyUpdates(writeConditionally, apply.getUpdates());
                return;
            }
            fetchUpdates();
        }
    }

    private boolean isNewer(Revision revision) {
        boolean z;
        synchronized (this.$lock) {
            z = this.currentState == null || this.currentState.getRevision().compareTo(revision) < 0;
        }
        return z;
    }

    private void updateCurrentState(StateT statet) {
        synchronized (this.$lock) {
            if (statet != null) {
                if (isNewer(statet.getRevision())) {
                    log.trace("Updating new state to {} ", statet.getRevision());
                    this.currentState = statet;
                }
            }
        }
    }

    @Override // io.pravega.client.state.StateSynchronizer, java.lang.AutoCloseable
    public void close() {
        log.info("Closing stateSynchronizer {}", this);
        this.client.close();
    }

    @SuppressFBWarnings(justification = "generated code")
    public String toString() {
        return "StateSynchronizerImpl(currentState=" + this.currentState + ", segment=" + this.segment + ")";
    }
}
