package de.otto.synapse.endpoint.receiver.kinesis;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.channel.ChannelResponse;
import de.otto.synapse.channel.ShardResponse;
import java.time.Clock;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;

/* loaded from: input_file:de/otto/synapse/endpoint/receiver/kinesis/KinesisMessageLogReader.class */
public class KinesisMessageLogReader {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisMessageLogReader.class);
    private final String channelName;
    private final KinesisAsyncClient kinesisClient;
    private final ExecutorService executorService;
    private final Clock clock;
    private final AtomicReference<List<KinesisShardReader>> kinesisShardReaders;
    public static final int SKIP_NEXT_PARTS = 8;
    public static final int DEFAULT_WAITING_TIME_ON_EMPTY_RECORDS = 10000;
    public static final int DEFAULT_WAITING_TIME_ON_SKIP_EMPTY_PARTS = 200;
    private final int waitingTimeOnEmptyRecords;
    private final int skipNextEmptyParts;
    private final int waitingTimeOnSkipEmptyParts;
    private final Marker marker;

    public KinesisMessageLogReader(String str, KinesisAsyncClient kinesisAsyncClient, ExecutorService executorService, Clock clock) {
        this(str, kinesisAsyncClient, executorService, clock, DEFAULT_WAITING_TIME_ON_EMPTY_RECORDS, null);
    }

    public KinesisMessageLogReader(String str, KinesisAsyncClient kinesisAsyncClient, ExecutorService executorService, Clock clock, int i, Marker marker) {
        this(str, kinesisAsyncClient, executorService, clock, i, 8, DEFAULT_WAITING_TIME_ON_SKIP_EMPTY_PARTS, marker);
    }

    public KinesisMessageLogReader(String str, KinesisAsyncClient kinesisAsyncClient, ExecutorService executorService, Clock clock, int i, int i2, int i3, Marker marker) {
        this.kinesisShardReaders = new AtomicReference<>();
        this.channelName = str;
        this.kinesisClient = kinesisAsyncClient;
        this.executorService = executorService;
        this.clock = clock;
        this.waitingTimeOnEmptyRecords = i;
        this.skipNextEmptyParts = i2;
        this.waitingTimeOnSkipEmptyParts = i3;
        this.marker = marker;
    }

    public String getChannelName() {
        return this.channelName;
    }

    public List<String> getOpenShards() {
        if (this.kinesisShardReaders.get() == null) {
            initShards();
        }
        return (List) this.kinesisShardReaders.get().stream().map((v0) -> {
            return v0.getShardName();
        }).collect(Collectors.toList());
    }

    public KinesisMessageLogIterator getMessageLogIterator(ChannelPosition channelPosition) {
        if (this.kinesisShardReaders.get() == null) {
            initShards();
        }
        try {
            return new KinesisMessageLogIterator((List) ((List) this.kinesisShardReaders.get().stream().map(kinesisShardReader -> {
                return CompletableFuture.supplyAsync(() -> {
                    return new KinesisShardIterator(this.kinesisClient, this.channelName, channelPosition.shard(kinesisShardReader.getShardName()));
                }, this.executorService);
            }).collect(Collectors.toList())).stream().map((v0) -> {
                return v0.join();
            }).collect(Collectors.toList()));
        } catch (RuntimeException e) {
            stop();
            this.kinesisShardReaders.set(null);
            throw e;
        }
    }

    public CompletableFuture<ChannelResponse> read(KinesisMessageLogIterator kinesisMessageLogIterator) {
        if (this.kinesisShardReaders.get() == null) {
            initShards();
        }
        try {
            List list = (List) this.kinesisShardReaders.get().stream().map(kinesisShardReader -> {
                return CompletableFuture.supplyAsync(() -> {
                    return fetchNext(kinesisMessageLogIterator.getShardIterator(kinesisShardReader.getShardName()), this.skipNextEmptyParts);
                }, this.executorService);
            }).collect(Collectors.toList());
            return CompletableFuture.supplyAsync(() -> {
                return ChannelResponse.channelResponse(this.channelName, (ImmutableList) list.stream().map((v0) -> {
                    return v0.join();
                }).collect(ImmutableList.toImmutableList()));
            }, this.executorService);
        } catch (RuntimeException e) {
            stop();
            this.kinesisShardReaders.set(null);
            throw e;
        }
    }

    private ShardResponse fetchNext(KinesisShardIterator kinesisShardIterator, int i) {
        String id = kinesisShardIterator.getId();
        ShardResponse next = kinesisShardIterator.next();
        if (!next.getMessages().isEmpty() || kinesisShardIterator.isPoison() || Objects.equals(kinesisShardIterator.getId(), id) || i <= 0) {
            return next;
        }
        try {
            Thread.sleep(this.waitingTimeOnSkipEmptyParts);
        } catch (InterruptedException e) {
            LOG.warn(this.marker, "Thread got interrupted");
        }
        return fetchNext(kinesisShardIterator, i - 1);
    }

    public CompletableFuture<ChannelPosition> consumeUntil(ChannelPosition channelPosition, Predicate<ShardResponse> predicate, Consumer<ShardResponse> consumer) {
        if (this.kinesisShardReaders.get() == null) {
            initShards();
        }
        try {
            List list = (List) this.kinesisShardReaders.get().stream().map(kinesisShardReader -> {
                return kinesisShardReader.consumeUntil(channelPosition.shard(kinesisShardReader.getShardName()), predicate, consumer);
            }).collect(Collectors.toList());
            return CompletableFuture.supplyAsync(() -> {
                return ChannelPosition.channelPosition((Iterable) list.stream().map((v0) -> {
                    return v0.join();
                }).collect(Collectors.toList()));
            }).exceptionally(th -> {
                stop();
                this.kinesisShardReaders.set(null);
                throw new RuntimeException(th.getMessage(), th);
            });
        } catch (RuntimeException e) {
            stop();
            this.kinesisShardReaders.set(null);
            throw e;
        }
    }

    private void initShards() {
        this.kinesisShardReaders.set((List) retrieveAllOpenShards().stream().map(str -> {
            return new KinesisShardReader(this.channelName, str, this.kinesisClient, this.executorService, this.clock, this.waitingTimeOnEmptyRecords, this.marker);
        }).collect(Collectors.toList()));
    }

    private Set<String> retrieveAllOpenShards() {
        return (Set) new KinesisStreamInfoProvider(this.kinesisClient).getStreamInfo(this.channelName).getShardInfo().stream().filter((v0) -> {
            return v0.isOpen();
        }).map((v0) -> {
            return v0.getShardName();
        }).collect(ImmutableSet.toImmutableSet());
    }

    public void stop() {
        LOG.info("Channel {} received stop signal.", getChannelName());
        if (this.kinesisShardReaders.get() != null) {
            this.kinesisShardReaders.get().forEach((v0) -> {
                v0.stop();
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public List<KinesisShardReader> getCurrentKinesisShards() {
        if (this.kinesisShardReaders.get() == null) {
            initShards();
        }
        return this.kinesisShardReaders.get();
    }
}
