package io.pravega.client.stream.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.segment.impl.EndOfSegmentException;
import io.pravega.client.segment.impl.NoSuchEventException;
import io.pravega.client.segment.impl.NoSuchSegmentException;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.segment.impl.SegmentInputStream;
import io.pravega.client.segment.impl.SegmentInputStreamFactory;
import io.pravega.client.segment.impl.SegmentMetadataClient;
import io.pravega.client.segment.impl.SegmentMetadataClientFactory;
import io.pravega.client.segment.impl.SegmentTruncatedException;
import io.pravega.client.stream.EventPointer;
import io.pravega.client.stream.EventRead;
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.ReinitializationRequiredException;
import io.pravega.client.stream.Sequence;
import io.pravega.client.stream.Serializer;
import io.pravega.client.stream.TruncatedDataException;
import io.pravega.common.Exceptions;
import io.pravega.common.Timer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/client/stream/impl/EventStreamReaderImpl.class */
public class EventStreamReaderImpl<Type> implements EventStreamReader<Type> {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger((Class<?>) EventStreamReaderImpl.class);
    private final Serializer<Type> deserializer;
    private final SegmentInputStreamFactory inputStreamFactory;
    private final SegmentMetadataClientFactory metadataClientFactory;
    private final Orderer orderer;
    private final ReaderConfig config;

    @GuardedBy("readers")
    private Sequence lastRead;

    @GuardedBy("readers")
    private boolean atCheckpoint;
    private final ReaderGroupStateManager groupState;
    private final Supplier<Long> clock;

    @SuppressFBWarnings(justification = "generated code")
    private final Object $lock = new Object[0];

    @GuardedBy("readers")
    private final List<SegmentInputStream> readers = new ArrayList();

    @GuardedBy("readers")
    private boolean closed = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventStreamReaderImpl(SegmentInputStreamFactory segmentInputStreamFactory, SegmentMetadataClientFactory segmentMetadataClientFactory, Serializer<Type> serializer, ReaderGroupStateManager readerGroupStateManager, Orderer orderer, Supplier<Long> supplier, ReaderConfig readerConfig) {
        this.deserializer = serializer;
        this.inputStreamFactory = segmentInputStreamFactory;
        this.metadataClientFactory = segmentMetadataClientFactory;
        this.groupState = readerGroupStateManager;
        this.orderer = orderer;
        this.clock = supplier;
        this.config = readerConfig;
    }

    @Override // io.pravega.client.stream.EventStreamReader
    public EventRead<Type> readNextEvent(long j) throws ReinitializationRequiredException, TruncatedDataException {
        ByteBuffer byteBuffer;
        synchronized (this.readers) {
            Preconditions.checkState(!this.closed, "Reader is closed");
            long min = Math.min(j, ReaderGroupStateManager.TIME_UNIT.toMillis());
            Timer timer = new Timer();
            Segment segment = null;
            long j2 = -1;
            do {
                String updateGroupStateIfNeeded = updateGroupStateIfNeeded();
                if (updateGroupStateIfNeeded == null) {
                    SegmentInputStream nextSegment = this.orderer.nextSegment(this.readers);
                    if (nextSegment == null) {
                        Exceptions.handleInterrupted(() -> {
                            Thread.sleep(min);
                        });
                        byteBuffer = null;
                    } else {
                        segment = nextSegment.getSegmentId();
                        j2 = nextSegment.getOffset();
                        try {
                            byteBuffer = nextSegment.read(min);
                        } catch (EndOfSegmentException e) {
                            handleEndOfSegment(nextSegment, e.getErrorType().equals(EndOfSegmentException.ErrorType.END_OF_SEGMENT_REACHED));
                            byteBuffer = null;
                        } catch (SegmentTruncatedException e2) {
                            handleSegmentTruncated(nextSegment);
                            byteBuffer = null;
                        }
                    }
                    if (byteBuffer != null) {
                        break;
                    }
                } else {
                    return createEmptyEvent(updateGroupStateIfNeeded);
                }
            } while (timer.getElapsedMillis() < j);
            if (byteBuffer == null) {
                return createEmptyEvent(null);
            }
            this.lastRead = Sequence.create(segment.getSegmentId(), j2);
            return new EventReadImpl(this.lastRead, this.deserializer.deserialize(byteBuffer), getPosition(), new EventPointerImpl(segment, j2, byteBuffer.remaining() + 8), null);
        }
    }

    private EventRead<Type> createEmptyEvent(String str) {
        return new EventReadImpl(this.lastRead, null, getPosition(), null, str);
    }

    private PositionInternal getPosition() {
        return new PositionImpl((Map) this.readers.stream().collect(Collectors.toMap(segmentInputStream -> {
            return segmentInputStream.getSegmentId();
        }, segmentInputStream2 -> {
            return Long.valueOf(segmentInputStream2.getOffset());
        })));
    }

    @GuardedBy("readers")
    private String updateGroupStateIfNeeded() throws ReinitializationRequiredException {
        try {
            String checkpoint = this.groupState.getCheckpoint();
            if (checkpoint != null) {
                log.info("{} at checkpoint {}", this, checkpoint);
                this.groupState.checkpoint(checkpoint, getPosition());
                this.atCheckpoint = true;
                return checkpoint;
            }
            if (this.atCheckpoint) {
                releaseSegmentsIfNeeded();
                this.atCheckpoint = false;
            }
            acquireSegmentsIfNeeded();
            return null;
        } catch (ReinitializationRequiredException e) {
            close();
            throw e;
        }
    }

    @GuardedBy("readers")
    private void releaseSegmentsIfNeeded() throws ReinitializationRequiredException {
        Segment findSegmentToReleaseIfRequired = this.groupState.findSegmentToReleaseIfRequired();
        if (findSegmentToReleaseIfRequired != null) {
            log.info("{} releasing segment {}", this, findSegmentToReleaseIfRequired);
            SegmentInputStream orElse = this.readers.stream().filter(segmentInputStream -> {
                return segmentInputStream.getSegmentId().equals(findSegmentToReleaseIfRequired);
            }).findAny().orElse(null);
            if (orElse == null || !this.groupState.releaseSegment(findSegmentToReleaseIfRequired, orElse.getOffset(), getLag())) {
                return;
            }
            this.readers.remove(orElse);
            orElse.close();
        }
    }

    @GuardedBy("readers")
    private void acquireSegmentsIfNeeded() throws ReinitializationRequiredException {
        Map<Segment, Long> acquireNewSegmentsIfNeeded = this.groupState.acquireNewSegmentsIfNeeded(getLag());
        if (acquireNewSegmentsIfNeeded.isEmpty()) {
            return;
        }
        log.info("{} acquiring segments {}", this, acquireNewSegmentsIfNeeded);
        for (Map.Entry<Segment, Long> entry : acquireNewSegmentsIfNeeded.entrySet()) {
            SegmentInputStream createInputStreamForSegment = this.inputStreamFactory.createInputStreamForSegment(entry.getKey(), this.groupState.getEndOffsetForSegment(entry.getKey()));
            createInputStreamForSegment.setOffset(entry.getValue().longValue());
            this.readers.add(createInputStreamForSegment);
        }
    }

    private long getLag() {
        if (this.lastRead == null) {
            return 0L;
        }
        return this.clock.get().longValue() - this.lastRead.getHighOrder();
    }

    private void handleEndOfSegment(SegmentInputStream segmentInputStream, boolean z) throws ReinitializationRequiredException {
        try {
            log.info("{} encountered end of segment {} ", this, segmentInputStream.getSegmentId());
            this.readers.remove(segmentInputStream);
            segmentInputStream.close();
            this.groupState.handleEndOfSegment(segmentInputStream.getSegmentId(), z);
        } catch (ReinitializationRequiredException e) {
            close();
            throw e;
        }
    }

    private void handleSegmentTruncated(SegmentInputStream segmentInputStream) throws ReinitializationRequiredException, TruncatedDataException {
        Segment segmentId = segmentInputStream.getSegmentId();
        log.info("{} encountered truncation for segment {} ", this, segmentId);
        SegmentMetadataClient createSegmentMetadataClient = this.metadataClientFactory.createSegmentMetadataClient(segmentId, this.groupState.getOrRefreshDelegationTokenFor(segmentId));
        try {
            try {
                segmentInputStream.setOffset(createSegmentMetadataClient.getSegmentInfo().getStartingOffset());
            } catch (NoSuchSegmentException e) {
                handleEndOfSegment(segmentInputStream, true);
            }
            throw new TruncatedDataException();
        } catch (Throwable th) {
            if (Collections.singletonList(createSegmentMetadataClient).get(0) != null) {
                createSegmentMetadataClient.close();
            }
            throw th;
        }
    }

    @Override // io.pravega.client.stream.EventStreamReader
    public ReaderConfig getConfig() {
        return this.config;
    }

    @Override // io.pravega.client.stream.EventStreamReader, java.lang.AutoCloseable
    public void close() {
        synchronized (this.readers) {
            if (!this.closed) {
                log.info("Closing reader {} ", this);
                this.closed = true;
                this.groupState.readerShutdown(getPosition());
                Iterator<SegmentInputStream> it = this.readers.iterator();
                while (it.hasNext()) {
                    it.next().close();
                }
                this.readers.clear();
                this.groupState.close();
            }
        }
    }

    @Override // io.pravega.client.stream.EventStreamReader
    public Type fetchEvent(EventPointer eventPointer) throws NoSuchEventException {
        Preconditions.checkNotNull(eventPointer);
        SegmentInputStream createInputStreamForSegment = this.inputStreamFactory.createInputStreamForSegment(eventPointer.asImpl().getSegment(), eventPointer.asImpl().getEventLength());
        try {
            createInputStreamForSegment.setOffset(eventPointer.asImpl().getEventStartOffset());
            try {
                try {
                    Type deserialize = this.deserializer.deserialize(createInputStreamForSegment.read());
                    if (Collections.singletonList(createInputStreamForSegment).get(0) != null) {
                        createInputStreamForSegment.close();
                    }
                    return deserialize;
                } catch (EndOfSegmentException e) {
                    throw new NoSuchEventException(e.getMessage());
                }
            } catch (NoSuchSegmentException | SegmentTruncatedException e2) {
                throw new NoSuchEventException("Event no longer exists.");
            }
        } catch (Throwable th) {
            if (Collections.singletonList(createInputStreamForSegment).get(0) != null) {
                createInputStreamForSegment.close();
            }
            throw th;
        }
    }

    @VisibleForTesting
    List<SegmentInputStream> getReaders() {
        List<SegmentInputStream> unmodifiableList;
        synchronized (this.$lock) {
            unmodifiableList = Collections.unmodifiableList(this.readers);
        }
        return unmodifiableList;
    }

    public String toString() {
        return "EventStreamReaderImpl( id=" + this.groupState.getReaderId() + ")";
    }
}
