package de.otto.synapse.endpoint.receiver.kinesis;

import de.otto.synapse.channel.ShardPosition;
import de.otto.synapse.channel.ShardResponse;
import java.time.Clock;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Predicate;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;

@ThreadSafe
/* loaded from: input_file:de/otto/synapse/endpoint/receiver/kinesis/KinesisShardReader.class */
public class KinesisShardReader {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisShardReader.class);
    private final String shardName;
    private final String channelName;
    private final KinesisAsyncClient kinesisClient;
    private final ExecutorService executorService;
    private final Clock clock;
    private final AtomicBoolean stopSignal = new AtomicBoolean(false);

    public KinesisShardReader(String str, String str2, KinesisAsyncClient kinesisAsyncClient, ExecutorService executorService, Clock clock) {
        this.shardName = str2;
        this.channelName = str;
        this.kinesisClient = kinesisAsyncClient;
        this.executorService = executorService;
        this.clock = clock;
    }

    public String getChannelName() {
        return this.channelName;
    }

    public String getShardName() {
        return this.shardName;
    }

    public CompletableFuture<ShardPosition> consumeUntil(ShardPosition shardPosition, Predicate<ShardResponse> predicate, Consumer<ShardResponse> consumer) {
        return CompletableFuture.supplyAsync(() -> {
            MDC.put("channelName", this.channelName);
            MDC.put("shardName", this.shardName);
            LOG.info("Reading from channel={}, shard={}, position={}", new Object[]{this.channelName, this.shardName, shardPosition});
            try {
                try {
                    KinesisShardIterator kinesisShardIterator = new KinesisShardIterator(this.kinesisClient, this.channelName, shardPosition);
                    while (true) {
                        if (kinesisShardIterator.isPoison()) {
                            LOG.warn("Received Poison-Pill - This should only happen during tests!");
                            break;
                        }
                        ShardResponse next = kinesisShardIterator.next();
                        consumer.accept(next);
                        if (predicate.test(next) || isStopping() || waitABit()) {
                            break;
                        }
                    }
                    ShardPosition shardPosition2 = kinesisShardIterator.getShardPosition();
                    MDC.remove("channelName");
                    MDC.remove("shardName");
                    return shardPosition2;
                } catch (RuntimeException e) {
                    LOG.error("Failed to consume from Kinesis shard {}: {}", new Object[]{this.channelName, this.shardName, e.getMessage()});
                    stop();
                    throw e;
                }
            } catch (Throwable th) {
                MDC.remove("channelName");
                MDC.remove("shardName");
                throw th;
            }
        }, this.executorService);
    }

    private boolean waitABit() {
        try {
            Thread.sleep(1000L);
            return false;
        } catch (InterruptedException e) {
            LOG.warn("Thread got interrupted");
            return true;
        }
    }

    public void stop() {
        LOG.info("Shard {} received stop signal.", this.shardName);
        this.stopSignal.set(true);
    }

    public boolean isStopping() {
        return this.stopSignal.get();
    }
}
