package de.otto.synapse.endpoint.receiver.kinesis;

import de.otto.synapse.channel.ShardPosition;
import de.otto.synapse.channel.ShardResponse;
import de.otto.synapse.channel.StartFrom;
import de.otto.synapse.logging.LogHelper;
import java.time.Clock;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Predicate;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.slf4j.Marker;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

@ThreadSafe
/* loaded from: input_file:de/otto/synapse/endpoint/receiver/kinesis/KinesisShardReader.class */
public class KinesisShardReader {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisShardReader.class);
    public static final int LOG_MESSAGE_COUNTER_EVERY_NTH_MESSAGE = 1000;
    private final String shardName;
    private final String channelName;
    private final KinesisAsyncClient kinesisClient;
    private final ExecutorService executorService;
    private final Clock clock;
    private final AtomicBoolean stopSignal;
    private final int waitingTimeOnEmptyRecords;
    private final Marker marker;

    public KinesisShardReader(String str, String str2, KinesisAsyncClient kinesisAsyncClient, ExecutorService executorService, Clock clock) {
        this(str, str2, kinesisAsyncClient, executorService, clock, KinesisMessageLogReader.DEFAULT_WAITING_TIME_ON_EMPTY_RECORDS, null);
    }

    public KinesisShardReader(String str, String str2, KinesisAsyncClient kinesisAsyncClient, ExecutorService executorService, Clock clock, int i, Marker marker) {
        this.stopSignal = new AtomicBoolean(false);
        this.shardName = str2;
        this.channelName = str;
        this.kinesisClient = kinesisAsyncClient;
        this.executorService = executorService;
        this.clock = clock;
        this.waitingTimeOnEmptyRecords = i;
        this.marker = marker;
    }

    public String getChannelName() {
        return this.channelName;
    }

    public String getShardName() {
        return this.shardName;
    }

    public CompletableFuture<ShardPosition> consumeUntil(ShardPosition shardPosition, Predicate<ShardResponse> predicate, Consumer<ShardResponse> consumer) {
        Map copyOfContextMap = MDC.getCopyOfContextMap();
        return CompletableFuture.supplyAsync(() -> {
            if (copyOfContextMap != null) {
                MDC.setContextMap(copyOfContextMap);
            }
            MDC.put("channelName", this.channelName);
            MDC.put("shardName", this.shardName);
            LOG.info(this.marker, "Reading from channel={}, shard={}, position={}", new Object[]{this.channelName, this.shardName, shardPosition});
            try {
                try {
                    long currentTimeMillis = System.currentTimeMillis();
                    AtomicLong atomicLong = new AtomicLong(0L);
                    AtomicLong atomicLong2 = new AtomicLong(System.currentTimeMillis());
                    AtomicLong atomicLong3 = new AtomicLong(0L);
                    AtomicLong atomicLong4 = new AtomicLong(0L);
                    KinesisShardIterator kinesisShardIterator = new KinesisShardIterator(this.kinesisClient, this.channelName, sanitizePositionedShardPosition(shardPosition));
                    while (true) {
                        if (kinesisShardIterator.isPoison()) {
                            LOG.warn(this.marker, "Received Poison-Pill - This should only happen during tests!");
                            break;
                        }
                        ShardResponse next = kinesisShardIterator.next();
                        consumer.accept(next);
                        int size = next.getMessages().size();
                        long addAndGet = atomicLong.addAndGet(size);
                        boolean test = predicate.test(next);
                        boolean z = test || isStopping() || waitABit(next.getDurationBehind());
                        if ((addAndGet > 0 && addAndGet > atomicLong3.get() + 1000) || z) {
                            LOG.info(this.marker, "Read {} messages ({} per sec) from '{}:{}', durationBehind={}, totalMessages={}", new Object[]{Integer.valueOf(size), String.format("%.2f", Double.valueOf(LogHelper.calculateMessagesPerSecond(atomicLong2.getAndSet(System.currentTimeMillis()), addAndGet - atomicLong4.get()))), this.channelName, this.shardName, next.getDurationBehind(), Long.valueOf(addAndGet)});
                            if (z) {
                                LOG.info(this.marker, "Stop reading of channel={}, shard={}, stopCondition={}, stopSignal={}, durationBehind={}", new Object[]{this.channelName, this.shardName, Boolean.valueOf(test), Boolean.valueOf(isStopping()), next.getDurationBehind()});
                            }
                            atomicLong3.set(addAndGet - (addAndGet % 1000));
                            atomicLong4.set(addAndGet);
                        }
                        if (z) {
                            break;
                        }
                    }
                    LOG.info(this.marker, "Read a total of {} messages from '{}:{}', totalMessagesPerSecond={}", new Object[]{Long.valueOf(atomicLong.get()), this.channelName, this.shardName, String.format("%.2f", Double.valueOf(LogHelper.calculateMessagesPerSecond(currentTimeMillis, atomicLong.get())))});
                    ShardPosition shardPosition2 = kinesisShardIterator.getShardPosition();
                    MDC.remove("channelName");
                    MDC.remove("shardName");
                    return shardPosition2;
                } catch (RuntimeException e) {
                    LOG.error(this.marker, "Failed to consume from Kinesis shard {}: {}, {}", new Object[]{this.channelName, this.shardName, e.getMessage()});
                    stop();
                    throw e;
                }
            } catch (Throwable th) {
                MDC.remove("channelName");
                MDC.remove("shardName");
                throw th;
            }
        }, this.executorService);
    }

    private ShardPosition sanitizePositionedShardPosition(ShardPosition shardPosition) {
        try {
            StartFrom startFrom = shardPosition.startFrom();
            if (startFrom == StartFrom.AT_POSITION || startFrom == StartFrom.POSITION) {
                this.kinesisClient.getShardIterator((GetShardIteratorRequest) GetShardIteratorRequest.builder().shardId(this.shardName).streamName(this.channelName).shardIteratorType(startFrom == StartFrom.POSITION ? ShardIteratorType.AFTER_SEQUENCE_NUMBER : ShardIteratorType.AT_SEQUENCE_NUMBER).startingSequenceNumber(shardPosition.position()).build()).get();
            }
            return shardPosition;
        } catch (InterruptedException | RuntimeException | ExecutionException e) {
            LOG.warn(this.marker, "given shardposition {} / {} not accessible, falling back to horizon", shardPosition.shardName(), shardPosition.position());
            return ShardPosition.fromHorizon(shardPosition.shardName());
        }
    }

    private boolean waitABit(Duration duration) {
        try {
            if (duration.getSeconds() > 10) {
                Thread.sleep(1000L);
            } else {
                Thread.sleep(this.waitingTimeOnEmptyRecords);
            }
            return false;
        } catch (InterruptedException e) {
            LOG.warn(this.marker, "Thread got interrupted");
            return true;
        }
    }

    public void stop() {
        LOG.info(this.marker, "Shard {} received stop signal.", this.shardName);
        this.stopSignal.set(true);
    }

    public boolean isStopping() {
        return this.stopSignal.get();
    }
}
