package com.amazonaws.services.dynamodbv2.streamsadapter;

import com.amazonaws.services.dynamodbv2.streamsadapter.util.DescribeStreamResult;
import com.amazonaws.services.dynamodbv2.streamsadapter.util.KinesisMapperUtil;
import com.amazonaws.services.dynamodbv2.streamsadapter.util.ShardGraphTracker;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Generated;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.retrieval.AWSExceptionManager;

/* loaded from: input_file:com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsShardDetector.class */
public class DynamoDBStreamsShardDetector implements ShardDetector {

    @NonNull
    private final KinesisAsyncClient kinesisAsyncClient;

    @NonNull
    private final StreamIdentifier streamIdentifier;
    private final String streamArn;
    private final long listShardsCacheAllowedAgeInSeconds;
    private final int maxCacheMissesBeforeReload;
    private final int cacheMissWarningModulus;
    private final Duration kinesisRequestTimeout;
    private volatile Instant lastCacheUpdateTime;

    @Generated
    private static final Logger log = LoggerFactory.getLogger(DynamoDBStreamsShardDetector.class);
    private static final AWSExceptionManager AWS_EXCEPTION_MANAGER = new AWSExceptionManager();

    @Generated
    private final Object $lock = new Object[0];
    private volatile Map<String, Shard> cachedShardMap = null;
    private final AtomicInteger cacheMisses = new AtomicInteger(0);

    public DynamoDBStreamsShardDetector(@NonNull KinesisAsyncClient kinesisAsyncClient, @NonNull StreamIdentifier streamIdentifier, long j, int i, int i2, Duration duration) {
        if (kinesisAsyncClient == null) {
            throw new NullPointerException("kinesisAsyncClient is marked non-null but is null");
        }
        if (streamIdentifier == null) {
            throw new NullPointerException("streamIdentifier is marked non-null but is null");
        }
        this.kinesisAsyncClient = kinesisAsyncClient;
        this.streamIdentifier = streamIdentifier;
        this.listShardsCacheAllowedAgeInSeconds = j;
        this.maxCacheMissesBeforeReload = i;
        this.cacheMissWarningModulus = i2;
        this.kinesisRequestTimeout = duration;
        this.streamArn = KinesisMapperUtil.createDynamoDBStreamsArnFromKinesisStreamName(this.streamIdentifier.streamName());
    }

    public Shard shard(String str) {
        if (CollectionUtils.isNullOrEmpty(this.cachedShardMap)) {
            synchronized (this) {
                if (CollectionUtils.isNullOrEmpty(this.cachedShardMap)) {
                    listShards();
                }
            }
        }
        Shard shard = this.cachedShardMap.get(str);
        if (shard == null && (this.cacheMisses.incrementAndGet() > this.maxCacheMissesBeforeReload || shouldRefreshCache())) {
            synchronized (this) {
                shard = this.cachedShardMap.get(str);
                if (shard == null) {
                    log.info("Too many shard map cache misses for stream: {} or cache is out of date -- forcing a refresh", this.streamArn);
                    describeStream(null);
                    shard = this.cachedShardMap.get(str);
                    if (shard == null) {
                        log.warn("Even after cache refresh shard '{}' wasn't found. This could indicate a bigger problem.", str);
                    }
                }
                this.cacheMisses.set(0);
            }
        }
        if (shard == null) {
            String format = String.format("Cannot find the shard given the shardId %s. Cache misses: %s", str, this.cacheMisses);
            if (this.cacheMisses.get() % this.cacheMissWarningModulus == 0) {
                log.warn(format);
            } else {
                log.debug(format);
            }
        }
        return shard;
    }

    public List<Shard> listShards() {
        List<Shard> shards;
        synchronized (this.$lock) {
            shards = describeStream(null).getShards();
        }
        return shards;
    }

    public DescribeStreamResult describeStream(String str) {
        DescribeStreamResult describeStreamResult;
        DescribeStreamResponse describeStreamResponse;
        synchronized (this.$lock) {
            ShardGraphTracker shardGraphTracker = new ShardGraphTracker();
            String str2 = str;
            describeStreamResult = new DescribeStreamResult();
            do {
                describeStreamResponse = describeStreamResponse(str2);
                shardGraphTracker.collectShards(describeStreamResponse.streamDescription().shards());
                List shards = describeStreamResponse.streamDescription().shards();
                describeStreamResult.addStatus(describeStreamResponse.streamDescription().streamStatusAsString());
                if (!shards.isEmpty()) {
                    str2 = ((Shard) shards.get(shards.size() - 1)).shardId();
                }
            } while (describeStreamResponse.streamDescription().hasMoreShards().booleanValue());
            if (Objects.equals(describeStreamResult.getStreamStatus(), "ENABLING")) {
                log.warn("Stream: {} is in ENABLING state, new shards will not be discovered until stream gets enabled.", this.streamArn);
            }
            shardGraphTracker.closeOpenParents();
            if (Objects.equals(describeStreamResult.getStreamStatus(), "DISABLED")) {
                shardGraphTracker.markLeafShardsActive();
            }
            List<Shard> shards2 = shardGraphTracker.getShards();
            describeStreamResult.addShards(shards2);
            cachedShardMap(shards2);
        }
        return describeStreamResult;
    }

    private DescribeStreamResponse describeStreamResponse(String str) {
        try {
            DescribeStreamResponse describeStreamResponse = (DescribeStreamResponse) this.kinesisAsyncClient.describeStream((DescribeStreamRequest) DescribeStreamRequest.builder().streamName(this.streamArn).exclusiveStartShardId(str).build()).get();
            if (describeStreamResponse == null) {
                throw new IllegalStateException("Received null from DescribeStream call.");
            }
            return describeStreamResponse;
        } catch (InterruptedException e) {
            log.debug("Interrupted exception caught, shutdown initiated, returning null");
            return null;
        } catch (ExecutionException e2) {
            throw AWS_EXCEPTION_MANAGER.apply(e2.getCause());
        }
    }

    private boolean shouldRefreshCache() {
        Duration between = Duration.between(this.lastCacheUpdateTime, Instant.now());
        String format = String.format("Shard map cache for stream: %s is %d seconds old", this.streamArn, Long.valueOf(between.getSeconds()));
        if (between.compareTo(Duration.of(this.listShardsCacheAllowedAgeInSeconds, ChronoUnit.SECONDS)) > 0) {
            log.info("{}. Age exceeds limit of {} seconds -- Refreshing.", format, Long.valueOf(this.listShardsCacheAllowedAgeInSeconds));
            return true;
        }
        log.debug("{}. Age doesn't exceed limit of {} seconds.", format, Long.valueOf(this.listShardsCacheAllowedAgeInSeconds));
        return false;
    }

    private void cachedShardMap(List<Shard> list) {
        this.cachedShardMap = (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.shardId();
        }, Function.identity()));
        this.lastCacheUpdateTime = Instant.now();
    }

    @NonNull
    @Generated
    public StreamIdentifier streamIdentifier() {
        return this.streamIdentifier;
    }

    @Generated
    AtomicInteger cacheMisses() {
        return this.cacheMisses;
    }

    static {
        AWS_EXCEPTION_MANAGER.add(KinesisException.class, kinesisException -> {
            return kinesisException;
        });
        AWS_EXCEPTION_MANAGER.add(LimitExceededException.class, limitExceededException -> {
            return limitExceededException;
        });
        AWS_EXCEPTION_MANAGER.add(ResourceInUseException.class, resourceInUseException -> {
            return resourceInUseException;
        });
        AWS_EXCEPTION_MANAGER.add(ResourceNotFoundException.class, resourceNotFoundException -> {
            return resourceNotFoundException;
        });
    }
}
