package com.amazonaws.services.dynamodbv2.streamsadapter.tasks;

import com.amazonaws.services.dynamodbv2.streamsadapter.util.KinesisMapperUtil;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Generated;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCleanupManager;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.UpdateField;
import software.amazon.kinesis.leases.exceptions.CustomerApplicationException;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.ConsumerTask;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.TaskType;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

/* loaded from: input_file:com/amazonaws/services/dynamodbv2/streamsadapter/tasks/DynamoDBStreamsShutdownTask.class */
public class DynamoDBStreamsShutdownTask implements ConsumerTask {
    private static final String SHUTDOWN_TASK_OPERATION = "DynamoDBStreamsShutdownTask";
    private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown";

    @VisibleForTesting
    static final int RETRY_RANDOM_MAX_RANGE = 30;

    @NonNull
    private final ShardInfo shardInfo;

    @NonNull
    private final ShardDetector dynamoDBStreamsShardDetector;

    @NonNull
    private final ShardRecordProcessor shardRecordProcessor;

    @NonNull
    private final ShardRecordProcessorCheckpointer recordProcessorCheckpointer;

    @NonNull
    private final ShutdownReason reason;

    @NonNull
    private final InitialPositionInStreamExtended initialPositionInStream;
    private final boolean cleanupLeasesOfCompletedShards;
    private final boolean ignoreUnexpectedChildShards;

    @NonNull
    private final LeaseCoordinator leaseCoordinator;
    private final long backoffTimeMillis;

    @NonNull
    private final RecordsPublisher recordsPublisher;

    @NonNull
    private final HierarchicalShardSyncer dynamoDBStreamsShardSyncer;

    @NonNull
    private final MetricsFactory metricsFactory;
    private final TaskType taskType = TaskType.SHUTDOWN;
    private final List<ChildShard> childShards;

    @NonNull
    private final StreamIdentifier streamIdentifier;

    @NonNull
    private final LeaseCleanupManager leaseCleanupManager;
    private final String streamArn;

    @Generated
    private static final Logger log = LoggerFactory.getLogger(DynamoDBStreamsShutdownTask.class);
    private static final LeaseLostInput LEASE_LOST_INPUT = LeaseLostInput.builder().build();
    private static final Random RANDOM = new Random();

    public DynamoDBStreamsShutdownTask(@NonNull ShardInfo shardInfo, @NonNull ShardDetector shardDetector, @NonNull ShardRecordProcessor shardRecordProcessor, @NonNull ShardRecordProcessorCheckpointer shardRecordProcessorCheckpointer, @NonNull ShutdownReason shutdownReason, @NonNull InitialPositionInStreamExtended initialPositionInStreamExtended, boolean z, boolean z2, @NonNull LeaseCoordinator leaseCoordinator, long j, @NonNull RecordsPublisher recordsPublisher, @NonNull HierarchicalShardSyncer hierarchicalShardSyncer, @NonNull MetricsFactory metricsFactory, List<ChildShard> list, @NonNull StreamIdentifier streamIdentifier, @NonNull LeaseCleanupManager leaseCleanupManager) {
        if (shardInfo == null) {
            throw new NullPointerException("shardInfo is marked non-null but is null");
        }
        if (shardDetector == null) {
            throw new NullPointerException("dynamoDBStreamsShardDetector is marked non-null but is null");
        }
        if (shardRecordProcessor == null) {
            throw new NullPointerException("shardRecordProcessor is marked non-null but is null");
        }
        if (shardRecordProcessorCheckpointer == null) {
            throw new NullPointerException("recordProcessorCheckpointer is marked non-null but is null");
        }
        if (shutdownReason == null) {
            throw new NullPointerException("reason is marked non-null but is null");
        }
        if (initialPositionInStreamExtended == null) {
            throw new NullPointerException("initialPositionInStream is marked non-null but is null");
        }
        if (leaseCoordinator == null) {
            throw new NullPointerException("leaseCoordinator is marked non-null but is null");
        }
        if (recordsPublisher == null) {
            throw new NullPointerException("recordsPublisher is marked non-null but is null");
        }
        if (hierarchicalShardSyncer == null) {
            throw new NullPointerException("dynamoDBStreamsShardSyncer is marked non-null but is null");
        }
        if (metricsFactory == null) {
            throw new NullPointerException("metricsFactory is marked non-null but is null");
        }
        if (streamIdentifier == null) {
            throw new NullPointerException("streamIdentifier is marked non-null but is null");
        }
        if (leaseCleanupManager == null) {
            throw new NullPointerException("leaseCleanupManager is marked non-null but is null");
        }
        this.shardInfo = shardInfo;
        this.dynamoDBStreamsShardDetector = shardDetector;
        this.shardRecordProcessor = shardRecordProcessor;
        this.recordProcessorCheckpointer = shardRecordProcessorCheckpointer;
        this.reason = shutdownReason;
        this.initialPositionInStream = initialPositionInStreamExtended;
        this.cleanupLeasesOfCompletedShards = z;
        this.ignoreUnexpectedChildShards = z2;
        this.leaseCoordinator = leaseCoordinator;
        this.backoffTimeMillis = j;
        this.recordsPublisher = recordsPublisher;
        this.dynamoDBStreamsShardSyncer = hierarchicalShardSyncer;
        this.metricsFactory = metricsFactory;
        this.childShards = list;
        this.streamIdentifier = streamIdentifier;
        this.leaseCleanupManager = leaseCleanupManager;
        this.streamArn = KinesisMapperUtil.createDynamoDBStreamsArnFromKinesisStreamName(this.streamIdentifier.streamName());
    }

    /* renamed from: call, reason: merged with bridge method [inline-methods] */
    public TaskResult m15call() {
        this.recordProcessorCheckpointer.checkpointer().operation(SHUTDOWN_TASK_OPERATION);
        MetricsScope createMetricsWithOperation = MetricsUtil.createMetricsWithOperation(this.metricsFactory, SHUTDOWN_TASK_OPERATION);
        String leaseKey = ShardInfo.getLeaseKey(this.shardInfo);
        try {
            try {
                log.debug("Invoking shutdown() for shard {} with childShards {}, concurrencyToken {}. Shutdown reason: {}", new Object[]{leaseKey, this.childShards, this.shardInfo.concurrencyToken(), this.reason});
                long currentTimeMillis = System.currentTimeMillis();
                Lease currentlyHeldLease = this.leaseCoordinator.getCurrentlyHeldLease(leaseKey);
                Runnable runnable = () -> {
                    this.shardRecordProcessor.leaseLost(LEASE_LOST_INPUT);
                };
                if (this.reason == ShutdownReason.SHARD_END) {
                    try {
                        takeShardEndAction(currentlyHeldLease, leaseKey, createMetricsWithOperation, currentTimeMillis);
                    } catch (InvalidStateException e) {
                        log.warn("Lease {}: Invalid state encountered while shutting down shardConsumer with SHARD_END reason. Dropping the lease and shutting down shardConsumer using LEASE_LOST reason.", leaseKey, e);
                        dropLease(currentlyHeldLease, leaseKey);
                        throwOnApplicationException(leaseKey, runnable, createMetricsWithOperation, currentTimeMillis);
                    }
                } else {
                    throwOnApplicationException(leaseKey, runnable, createMetricsWithOperation, currentTimeMillis);
                }
                log.debug("Shutting down retrieval strategy for shard {}.", leaseKey);
                this.recordsPublisher.shutdown();
                log.debug("Record processor completed shutdown() for shard {}", leaseKey);
                TaskResult taskResult = new TaskResult((Exception) null);
                MetricsUtil.endScope(createMetricsWithOperation);
                return taskResult;
            } catch (Throwable th) {
                MetricsUtil.endScope(createMetricsWithOperation);
                throw th;
            }
        } catch (Exception e2) {
            if (e2 instanceof CustomerApplicationException) {
                log.error("Shard {}: Application exception.", leaseKey, e2);
            } else {
                log.error("Shard {}: Caught exception:", leaseKey, e2);
            }
            try {
                Thread.sleep(this.backoffTimeMillis);
            } catch (InterruptedException e3) {
                log.debug("Shard {}: Interrupted sleep", leaseKey, e3);
            }
            TaskResult taskResult2 = new TaskResult(e2);
            MetricsUtil.endScope(createMetricsWithOperation);
            return taskResult2;
        }
    }

    private void takeShardEndAction(Lease lease, String str, MetricsScope metricsScope, long j) throws InvalidStateException, ProvisionedThroughputException, DependencyException, CustomerApplicationException {
        if (lease == null) {
            throw new InvalidStateException(str + " : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.");
        }
        if (!CollectionUtils.isNullOrEmpty(this.childShards)) {
            createLeasesForChildShardsIfNotExist(metricsScope);
            updateLeaseWithChildShards(lease);
        }
        attemptShardEndCheckpointing(str, metricsScope, j);
    }

    private boolean attemptShardEndCheckpointing(String str, MetricsScope metricsScope, long j) throws DependencyException, ProvisionedThroughputException, InvalidStateException, CustomerApplicationException {
        if (((Lease) Optional.ofNullable(this.leaseCoordinator.leaseRefresher().getLease(str)).orElseThrow(() -> {
            return new InvalidStateException("Lease for shard " + str + " does not exist.");
        })).checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
            return true;
        }
        throwOnApplicationException(str, () -> {
            applicationCheckpointAndVerification(str);
        }, metricsScope, j);
        return true;
    }

    private void applicationCheckpointAndVerification(String str) {
        this.recordProcessorCheckpointer.sequenceNumberAtShardEnd(this.recordProcessorCheckpointer.largestPermittedCheckpointValue());
        this.recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
        this.shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer(this.recordProcessorCheckpointer).build());
        if (!ExtendedSequenceNumber.SHARD_END.equals(this.recordProcessorCheckpointer.lastCheckpointValue())) {
            throw new IllegalArgumentException("Application didn't checkpoint at end of shard " + str + ". Application must checkpoint upon shard end. See ShardRecordProcessor.shardEnded javadocs for more information.");
        }
    }

    private void throwOnApplicationException(String str, Runnable runnable, MetricsScope metricsScope, long j) throws CustomerApplicationException {
        try {
            try {
                runnable.run();
                MetricsUtil.addLatency(metricsScope, RECORD_PROCESSOR_SHUTDOWN_METRIC, j, MetricsLevel.SUMMARY);
            } catch (Exception e) {
                throw new CustomerApplicationException("Customer application throws exception for shard " + str + ": ", e);
            }
        } catch (Throwable th) {
            MetricsUtil.addLatency(metricsScope, RECORD_PROCESSOR_SHUTDOWN_METRIC, j, MetricsLevel.SUMMARY);
            throw th;
        }
    }

    private void createLeasesForChildShardsIfNotExist(MetricsScope metricsScope) throws ProvisionedThroughputException, InvalidStateException, DependencyException {
        String createDynamoDBStreamsArnFromKinesisStreamName = KinesisMapperUtil.createDynamoDBStreamsArnFromKinesisStreamName(this.streamIdentifier.streamName());
        LeaseRefresher leaseRefresher = this.leaseCoordinator.leaseRefresher();
        for (ChildShard childShard : this.childShards) {
            String leaseKey = ShardInfo.getLeaseKey(this.shardInfo, childShard.shardId());
            if (leaseRefresher.getLease(leaseKey) == null) {
                log.debug("{} - Shard {} - Attempting to create lease for child shard {}", new Object[]{this.dynamoDBStreamsShardDetector.streamIdentifier(), this.shardInfo.shardId(), leaseKey});
                Lease createLeaseForChildShard = this.dynamoDBStreamsShardSyncer.createLeaseForChildShard(childShard, this.streamIdentifier);
                long currentTimeMillis = System.currentTimeMillis();
                boolean z = false;
                try {
                    leaseRefresher.createLeaseIfNotExists(createLeaseForChildShard);
                    z = true;
                    MetricsUtil.addSuccessAndLatency(metricsScope, "CreateLease", true, currentTimeMillis, MetricsLevel.DETAILED);
                    if (createLeaseForChildShard.checkpoint() != null) {
                        MetricsUtil.addSuccess(metricsScope, "CreateLease_" + (createLeaseForChildShard.checkpoint().isSentinelCheckpoint() ? createLeaseForChildShard.checkpoint().sequenceNumber() : "SEQUENCE_NUMBER"), true, MetricsLevel.DETAILED);
                    }
                    log.info("{} - Shard {}: Created child shard lease: {}", new Object[]{createDynamoDBStreamsArnFromKinesisStreamName, this.shardInfo.shardId(), createLeaseForChildShard});
                } catch (Throwable th) {
                    MetricsUtil.addSuccessAndLatency(metricsScope, "CreateLease", z, currentTimeMillis, MetricsLevel.DETAILED);
                    if (createLeaseForChildShard.checkpoint() != null) {
                        MetricsUtil.addSuccess(metricsScope, "CreateLease_" + (createLeaseForChildShard.checkpoint().isSentinelCheckpoint() ? createLeaseForChildShard.checkpoint().sequenceNumber() : "SEQUENCE_NUMBER"), true, MetricsLevel.DETAILED);
                    }
                    throw th;
                }
            }
        }
    }

    private void updateLeaseWithChildShards(Lease lease) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
        Set set = (Set) this.childShards.stream().map((v0) -> {
            return v0.shardId();
        }).collect(Collectors.toSet());
        Lease copy = lease.copy();
        copy.childShardIds(set);
        this.leaseCoordinator.leaseRefresher().updateLeaseWithMetaInfo(copy, UpdateField.CHILD_SHARDS);
        log.info("Shard {} of Stream {}: Updated current lease {} with child shard information: {}", new Object[]{this.shardInfo.shardId(), this.streamArn, lease.leaseKey(), set});
    }

    public TaskType taskType() {
        return this.taskType;
    }

    @VisibleForTesting
    public ShutdownReason getReason() {
        return this.reason;
    }

    private void dropLease(Lease lease, String str) {
        if (lease == null) {
            log.warn("Shard {}: Unable to find the lease for shard. Will shutdown the shardConsumer directly.", str);
        } else {
            this.leaseCoordinator.dropLease(lease);
            log.info("Dropped lease for shutting down ShardConsumer: {} of stream: {}", lease.leaseKey(), this.streamArn);
        }
    }
}
