package de.otto.synapse.endpoint.receiver.kinesis;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.channel.ChannelResponse;
import de.otto.synapse.channel.ShardResponse;
import java.time.Clock;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;

/* loaded from: input_file:de/otto/synapse/endpoint/receiver/kinesis/KinesisMessageLogReader.class */
public class KinesisMessageLogReader {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisMessageLogReader.class);
    private final String channelName;
    private final KinesisAsyncClient kinesisClient;
    private final Clock clock;
    private List<KinesisShardReader> kinesisShardReaders;
    private ExecutorService executorService;
    public static final int SKIP_NEXT_PARTS = 8;

    public KinesisMessageLogReader(String str, KinesisAsyncClient kinesisAsyncClient, Clock clock) {
        this.channelName = str;
        this.kinesisClient = kinesisAsyncClient;
        this.clock = clock;
    }

    public String getChannelName() {
        return this.channelName;
    }

    public List<String> getOpenShards() {
        if (Objects.isNull(this.executorService)) {
            initExecutorService();
        }
        return (List) this.kinesisShardReaders.stream().map((v0) -> {
            return v0.getShardName();
        }).collect(Collectors.toList());
    }

    public KinesisMessageLogIterator getMessageLogIterator(ChannelPosition channelPosition) {
        if (Objects.isNull(this.executorService)) {
            initExecutorService();
        }
        try {
            return new KinesisMessageLogIterator((List) ((List) this.kinesisShardReaders.stream().map(kinesisShardReader -> {
                return CompletableFuture.supplyAsync(() -> {
                    return new KinesisShardIterator(this.kinesisClient, this.channelName, channelPosition.shard(kinesisShardReader.getShardName()));
                }, this.executorService);
            }).collect(Collectors.toList())).stream().map((v0) -> {
                return v0.join();
            }).collect(Collectors.toList()));
        } catch (RuntimeException e) {
            shutdownExecutor();
            throw e;
        }
    }

    public CompletableFuture<ChannelResponse> read(KinesisMessageLogIterator kinesisMessageLogIterator) {
        if (Objects.isNull(this.executorService)) {
            initExecutorService();
        }
        try {
            List list = (List) this.kinesisShardReaders.stream().map(kinesisShardReader -> {
                return CompletableFuture.supplyAsync(() -> {
                    return fetchNext(kinesisMessageLogIterator.getShardIterator(kinesisShardReader.getShardName()), 8);
                }, this.executorService);
            }).collect(Collectors.toList());
            return CompletableFuture.supplyAsync(() -> {
                return ChannelResponse.channelResponse(this.channelName, (ImmutableList) list.stream().map((v0) -> {
                    return v0.join();
                }).collect(ImmutableList.toImmutableList()));
            }, this.executorService);
        } catch (RuntimeException e) {
            shutdownExecutor();
            throw e;
        }
    }

    private ShardResponse fetchNext(KinesisShardIterator kinesisShardIterator, int i) {
        String id = kinesisShardIterator.getId();
        ShardResponse next = kinesisShardIterator.next();
        return (!next.getMessages().isEmpty() || kinesisShardIterator.isPoison() || Objects.equals(kinesisShardIterator.getId(), id) || i <= 0) ? next : fetchNext(kinesisShardIterator, i - 1);
    }

    public CompletableFuture<ChannelPosition> consumeUntil(ChannelPosition channelPosition, Predicate<ShardResponse> predicate, Consumer<ShardResponse> consumer) {
        if (Objects.isNull(this.executorService)) {
            initExecutorService();
        }
        try {
            List list = (List) this.kinesisShardReaders.stream().map(kinesisShardReader -> {
                return kinesisShardReader.consumeUntil(channelPosition.shard(kinesisShardReader.getShardName()), predicate, consumer);
            }).collect(Collectors.toList());
            return CompletableFuture.supplyAsync(() -> {
                return ChannelPosition.channelPosition((Iterable) list.stream().map((v0) -> {
                    return v0.join();
                }).collect(Collectors.toList()));
            }).exceptionally(th -> {
                shutdownExecutor();
                throw new RuntimeException(th.getMessage(), th);
            });
        } catch (RuntimeException e) {
            shutdownExecutor();
            throw e;
        }
    }

    private void initExecutorService() {
        Set<String> retrieveAllOpenShards = retrieveAllOpenShards();
        if (retrieveAllOpenShards.isEmpty()) {
            this.executorService = Executors.newSingleThreadExecutor();
        } else {
            this.executorService = Executors.newFixedThreadPool(retrieveAllOpenShards.size() + 1, new ThreadFactoryBuilder().setNameFormat("kinesis-message-log-%d").build());
        }
        this.kinesisShardReaders = (List) retrieveAllOpenShards.stream().map(str -> {
            return new KinesisShardReader(this.channelName, str, this.kinesisClient, this.executorService, this.clock);
        }).collect(Collectors.toList());
    }

    private Set<String> retrieveAllOpenShards() {
        return (Set) new KinesisStreamInfoProvider(this.kinesisClient).getStreamInfo(this.channelName).getShardInfo().stream().filter((v0) -> {
            return v0.isOpen();
        }).map((v0) -> {
            return v0.getShardName();
        }).collect(ImmutableSet.toImmutableSet());
    }

    private void shutdownExecutor() {
        if (this.executorService != null) {
            this.executorService.shutdownNow();
            try {
                if (!this.executorService.awaitTermination(30L, TimeUnit.SECONDS)) {
                    LOG.error("Kinesis Thread for stream {} is still running", getChannelName());
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            this.executorService = null;
        }
    }

    public void stop() {
        LOG.info("Channel {} received stop signal.", getChannelName());
        this.kinesisShardReaders.forEach((v0) -> {
            v0.stop();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public List<KinesisShardReader> getCurrentKinesisShards() {
        if (Objects.isNull(this.executorService)) {
            initExecutorService();
        }
        return this.kinesisShardReaders;
    }
}
