package io.pravega.client.batch.impl;

import com.google.common.annotations.Beta;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.batch.BatchClient;
import io.pravega.client.batch.SegmentIterator;
import io.pravega.client.batch.SegmentRange;
import io.pravega.client.batch.StreamInfo;
import io.pravega.client.batch.StreamSegmentsIterator;
import io.pravega.client.batch.impl.SegmentRangeImpl;
import io.pravega.client.netty.impl.ConnectionFactory;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.segment.impl.SegmentInfo;
import io.pravega.client.segment.impl.SegmentInputStreamFactory;
import io.pravega.client.segment.impl.SegmentInputStreamFactoryImpl;
import io.pravega.client.segment.impl.SegmentMetadataClient;
import io.pravega.client.segment.impl.SegmentMetadataClientFactory;
import io.pravega.client.segment.impl.SegmentMetadataClientFactoryImpl;
import io.pravega.client.stream.Serializer;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamCut;
import io.pravega.client.stream.impl.Controller;
import io.pravega.client.stream.impl.StreamCutImpl;
import io.pravega.client.stream.impl.StreamImpl;
import io.pravega.client.stream.impl.StreamSegmentSuccessors;
import io.pravega.common.concurrent.Futures;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Beta
/* loaded from: input_file:io/pravega/client/batch/impl/BatchClientImpl.class */
public class BatchClientImpl implements BatchClient {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger((Class<?>) BatchClientImpl.class);
    private final Controller controller;
    private final SegmentInputStreamFactory inputStreamFactory;
    private final SegmentMetadataClientFactory segmentMetadataClientFactory;

    @GuardedBy("this")
    private final AtomicReference<String> latestDelegationToken = new AtomicReference<>();

    public BatchClientImpl(Controller controller, ConnectionFactory connectionFactory) {
        this.controller = controller;
        this.inputStreamFactory = new SegmentInputStreamFactoryImpl(controller, connectionFactory);
        this.segmentMetadataClientFactory = new SegmentMetadataClientFactoryImpl(controller, connectionFactory);
    }

    @Override // io.pravega.client.batch.BatchClient
    public CompletableFuture<StreamInfo> getStreamInfo(Stream stream) {
        Preconditions.checkNotNull(stream, "stream");
        return fetchTailStreamCut(stream).thenCombine((CompletionStage) fetchHeadStreamCut(stream), (streamCut, streamCut2) -> {
            return new StreamInfo(stream.getScope(), stream.getStreamName(), streamCut, streamCut2);
        });
    }

    @Override // io.pravega.client.batch.BatchClient
    public StreamSegmentsIterator getSegments(Stream stream, StreamCut streamCut, StreamCut streamCut2) {
        Preconditions.checkNotNull(stream, "stream");
        return listSegments(stream, Optional.ofNullable(streamCut), Optional.ofNullable(streamCut2));
    }

    @Override // io.pravega.client.batch.BatchClient
    public <T> SegmentIterator<T> readSegment(SegmentRange segmentRange, Serializer<T> serializer) {
        return new SegmentIteratorImpl(this.inputStreamFactory, segmentRange.asImpl().getSegment(), serializer, segmentRange.asImpl().getStartOffset(), segmentRange.asImpl().getEndOffset());
    }

    private StreamSegmentsIterator listSegments(Stream stream, Optional<StreamCut> optional, Optional<StreamCut> optional2) {
        Optional<StreamCut> filter = optional.filter(streamCut -> {
            return !streamCut.equals(StreamCut.UNBOUNDED);
        });
        Optional<StreamCut> filter2 = optional2.filter(streamCut2 -> {
            return !streamCut2.equals(StreamCut.UNBOUNDED);
        });
        filter.ifPresent(streamCut3 -> {
            Preconditions.checkArgument(stream.equals(streamCut3.asImpl().getStream()));
        });
        filter2.ifPresent(streamCut4 -> {
            Preconditions.checkArgument(stream.equals(streamCut4.asImpl().getStream()));
        });
        return (StreamSegmentsIterator) Futures.getAndHandleExceptions((filter.isPresent() ? CompletableFuture.completedFuture(filter.get()) : fetchHeadStreamCut(stream)).thenCombine((CompletionStage) (filter2.isPresent() ? CompletableFuture.completedFuture(filter2.get()) : fetchTailStreamCut(stream)), (streamCut5, streamCut6) -> {
            return getStreamSegmentInfo(streamCut5, streamCut6);
        }), RuntimeException::new);
    }

    private CompletableFuture<StreamCut> fetchHeadStreamCut(Stream stream) {
        return this.controller.getSegmentsAtTime(new StreamImpl(stream.getScope(), stream.getStreamName()), 0L).thenApply(map -> {
            return new StreamCutImpl(stream, (Map) map.keySet().stream().map(this::segmentToInfo).collect(Collectors.toMap((v0) -> {
                return v0.getSegment();
            }, (v0) -> {
                return v0.getStartingOffset();
            })));
        });
    }

    private CompletableFuture<StreamCut> fetchTailStreamCut(Stream stream) {
        return this.controller.getCurrentSegments(stream.getScope(), stream.getStreamName()).thenApply(streamSegments -> {
            return new StreamCutImpl(stream, (Map) streamSegments.getSegments().stream().map(this::segmentToInfo).collect(Collectors.toMap((v0) -> {
                return v0.getSegment();
            }, (v0) -> {
                return v0.getWriteOffset();
            })));
        });
    }

    private StreamSegmentsIterator getStreamSegmentInfo(StreamCut streamCut, StreamCut streamCut2) {
        log.debug("Start stream cut: {}, End stream cut: {}", streamCut, streamCut2);
        StreamSegmentsInfoImpl.validateStreamCuts(streamCut, streamCut2);
        TreeSet treeSet = new TreeSet();
        StreamSegmentSuccessors streamSegmentSuccessors = (StreamSegmentSuccessors) Futures.getAndHandleExceptions(this.controller.getSegments(streamCut, streamCut2), RuntimeException::new);
        treeSet.addAll(streamSegmentSuccessors.getSegments());
        synchronized (this) {
            this.latestDelegationToken.set(streamSegmentSuccessors.getDelegationToken());
        }
        log.debug("List of Segments between the start and end stream cuts : {}", treeSet);
        return StreamSegmentsInfoImpl.builder().segmentRangeIterator(Iterators.transform(treeSet.iterator(), segment -> {
            return getSegmentRange(segment, streamCut, streamCut2);
        })).startStreamCut(streamCut).endStreamCut(streamCut2).build();
    }

    private SegmentInfo segmentToInfo(Segment segment) {
        String str;
        synchronized (this) {
            str = this.latestDelegationToken.get();
        }
        SegmentMetadataClient createSegmentMetadataClient = this.segmentMetadataClientFactory.createSegmentMetadataClient(segment, str);
        try {
            SegmentInfo segmentInfo = createSegmentMetadataClient.getSegmentInfo();
            if (Collections.singletonList(createSegmentMetadataClient).get(0) != null) {
                createSegmentMetadataClient.close();
            }
            return segmentInfo;
        } catch (Throwable th) {
            if (Collections.singletonList(createSegmentMetadataClient).get(0) != null) {
                createSegmentMetadataClient.close();
            }
            throw th;
        }
    }

    private SegmentRange getSegmentRange(Segment segment, StreamCut streamCut, StreamCut streamCut2) {
        SegmentRangeImpl.SegmentRangeImplBuilder segment2 = SegmentRangeImpl.builder().segment(segment);
        if (streamCut.asImpl().getPositions().containsKey(segment) && streamCut2.asImpl().getPositions().containsKey(segment)) {
            segment2.startOffset(streamCut.asImpl().getPositions().get(segment).longValue()).endOffset(streamCut2.asImpl().getPositions().get(segment).longValue());
        } else {
            SegmentInfo segmentToInfo = segmentToInfo(segment);
            segment2.startOffset(streamCut.asImpl().getPositions().getOrDefault(segment, Long.valueOf(segmentToInfo.getStartingOffset())).longValue()).endOffset(streamCut2.asImpl().getPositions().getOrDefault(segment, Long.valueOf(segmentToInfo.getWriteOffset())).longValue());
        }
        return segment2.build();
    }
}
