package software.amazon.kinesis.lifecycle;

import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.exceptions.internal.BlockedOnParentShardException;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCleanupManager;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.UpdateField;
import software.amazon.kinesis.leases.exceptions.CustomerApplicationException;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

@KinesisClientInternalApi
/* loaded from: input_file:software/amazon/kinesis/lifecycle/ShutdownTask.class */
public class ShutdownTask implements ConsumerTask {
    private static final String SHUTDOWN_TASK_OPERATION = "ShutdownTask";
    private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown";

    @VisibleForTesting
    static final int RETRY_RANDOM_MAX_RANGE = 30;

    @NonNull
    private final ShardInfo shardInfo;

    @NonNull
    private final ShardDetector shardDetector;

    @NonNull
    private final ShardRecordProcessor shardRecordProcessor;

    @NonNull
    private final ShardRecordProcessorCheckpointer recordProcessorCheckpointer;

    @NonNull
    private final ShutdownReason reason;

    @NonNull
    private final InitialPositionInStreamExtended initialPositionInStream;
    private final boolean cleanupLeasesOfCompletedShards;
    private final boolean ignoreUnexpectedChildShards;

    @NonNull
    private final LeaseCoordinator leaseCoordinator;
    private final long backoffTimeMillis;

    @NonNull
    private final RecordsPublisher recordsPublisher;

    @NonNull
    private final HierarchicalShardSyncer hierarchicalShardSyncer;

    @NonNull
    private final MetricsFactory metricsFactory;
    private final TaskType taskType = TaskType.SHUTDOWN;
    private final List<ChildShard> childShards;

    @NonNull
    private final StreamIdentifier streamIdentifier;

    @NonNull
    private final LeaseCleanupManager leaseCleanupManager;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ShutdownTask.class);
    private static final Function<ShardInfo, String> leaseKeyProvider = shardInfo -> {
        return ShardInfo.getLeaseKey(shardInfo);
    };

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // software.amazon.kinesis.lifecycle.ConsumerTask, java.util.concurrent.Callable
    public TaskResult call() {
        this.recordProcessorCheckpointer.checkpointer().operation(SHUTDOWN_TASK_OPERATION);
        MetricsScope createMetricsWithOperation = MetricsUtil.createMetricsWithOperation(this.metricsFactory, SHUTDOWN_TASK_OPERATION);
        try {
            try {
                log.debug("Invoking shutdown() for shard {} with childShards {}, concurrencyToken {}. Shutdown reason: {}", leaseKeyProvider.apply(this.shardInfo), this.childShards, this.shardInfo.concurrencyToken(), this.reason);
                long currentTimeMillis = System.currentTimeMillis();
                Lease currentlyHeldLease = this.leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(this.shardInfo));
                Runnable runnable = () -> {
                    this.shardRecordProcessor.leaseLost(LeaseLostInput.builder().build());
                };
                if (this.reason == ShutdownReason.SHARD_END) {
                    try {
                        takeShardEndAction(currentlyHeldLease, createMetricsWithOperation, currentTimeMillis);
                    } catch (InvalidStateException e) {
                        log.warn("Lease {}: Invalid state encountered while shutting down shardConsumer with SHARD_END reason. Dropping the lease and shutting down shardConsumer using LEASE_LOST reason. ", leaseKeyProvider.apply(this.shardInfo), e);
                        dropLease(currentlyHeldLease);
                        throwOnApplicationException(runnable, createMetricsWithOperation, currentTimeMillis);
                    }
                } else {
                    throwOnApplicationException(runnable, createMetricsWithOperation, currentTimeMillis);
                }
                log.debug("Shutting down retrieval strategy for shard {}.", leaseKeyProvider.apply(this.shardInfo));
                this.recordsPublisher.shutdown();
                log.debug("Record processor completed shutdown() for shard {}", leaseKeyProvider.apply(this.shardInfo));
                TaskResult taskResult = new TaskResult((Exception) null);
                MetricsUtil.endScope(createMetricsWithOperation);
                return taskResult;
            } catch (Exception e2) {
                if (e2 instanceof CustomerApplicationException) {
                    log.error("Shard {}: Application exception. ", leaseKeyProvider.apply(this.shardInfo), e2);
                } else {
                    log.error("Shard {}: Caught exception: ", leaseKeyProvider.apply(this.shardInfo), e2);
                }
                try {
                    Thread.sleep(this.backoffTimeMillis);
                } catch (InterruptedException e3) {
                    log.debug("Shard {}: Interrupted sleep", leaseKeyProvider.apply(this.shardInfo), e3);
                }
                MetricsUtil.endScope(createMetricsWithOperation);
                return new TaskResult(e2);
            }
        } catch (Throwable th) {
            MetricsUtil.endScope(createMetricsWithOperation);
            throw th;
        }
    }

    private void takeShardEndAction(Lease lease, MetricsScope metricsScope, long j) throws DependencyException, ProvisionedThroughputException, InvalidStateException, CustomerApplicationException {
        if (lease == null) {
            throw new InvalidStateException(leaseKeyProvider.apply(this.shardInfo) + " : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.");
        }
        if (!CollectionUtils.isNullOrEmpty(this.childShards)) {
            createLeasesForChildShardsIfNotExist();
            updateLeaseWithChildShards(lease);
        }
        LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(this.streamIdentifier, lease, this.shardInfo, this.shardDetector);
        if (this.leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
            return;
        }
        boolean z = false;
        try {
            z = attemptShardEndCheckpointing(metricsScope, j);
            if (z || CollectionUtils.isNullOrEmpty(this.childShards)) {
                this.leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
            }
        } catch (Throwable th) {
            if (z || CollectionUtils.isNullOrEmpty(this.childShards)) {
                this.leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
            }
            throw th;
        }
    }

    private boolean attemptShardEndCheckpointing(MetricsScope metricsScope, long j) throws DependencyException, ProvisionedThroughputException, InvalidStateException, CustomerApplicationException {
        if (((Lease) Optional.ofNullable(this.leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(this.shardInfo))).orElseThrow(() -> {
            return new InvalidStateException("Lease for shard " + leaseKeyProvider.apply(this.shardInfo) + " does not exist.");
        })).checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
            return true;
        }
        throwOnApplicationException(() -> {
            applicationCheckpointAndVerification();
        }, metricsScope, j);
        return true;
    }

    private void applicationCheckpointAndVerification() {
        this.recordProcessorCheckpointer.sequenceNumberAtShardEnd(this.recordProcessorCheckpointer.largestPermittedCheckpointValue());
        this.recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
        this.shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer(this.recordProcessorCheckpointer).build());
        ExtendedSequenceNumber lastCheckpointValue = this.recordProcessorCheckpointer.lastCheckpointValue();
        if (lastCheckpointValue == null || !lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END)) {
            throw new IllegalArgumentException("Application didn't checkpoint at end of shard " + leaseKeyProvider.apply(this.shardInfo) + ". Application must checkpoint upon shard end. See ShardRecordProcessor.shardEnded javadocs for more information.");
        }
    }

    private void throwOnApplicationException(Runnable runnable, MetricsScope metricsScope, long j) throws CustomerApplicationException {
        try {
            try {
                runnable.run();
                MetricsUtil.addLatency(metricsScope, RECORD_PROCESSOR_SHUTDOWN_METRIC, j, MetricsLevel.SUMMARY);
            } catch (Exception e) {
                throw new CustomerApplicationException("Customer application throws exception for shard " + leaseKeyProvider.apply(this.shardInfo) + ": ", e);
            }
        } catch (Throwable th) {
            MetricsUtil.addLatency(metricsScope, RECORD_PROCESSOR_SHUTDOWN_METRIC, j, MetricsLevel.SUMMARY);
            throw th;
        }
    }

    private void createLeasesForChildShardsIfNotExist() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
        if (!CollectionUtils.isNullOrEmpty(this.childShards) && this.childShards.size() == 1) {
            ChildShard childShard = this.childShards.get(0);
            List list = (List) childShard.parentShards().stream().map(str -> {
                return ShardInfo.getLeaseKey(this.shardInfo, str);
            }).collect(Collectors.toList());
            if (list.size() != 2) {
                throw new InvalidStateException("Shard " + this.shardInfo.shardId() + "'s only child shard " + childShard + " does not contain other parent information.");
            }
            if (!(Objects.isNull(this.leaseCoordinator.leaseRefresher().getLease((String) list.get(0))) == Objects.isNull(this.leaseCoordinator.leaseRefresher().getLease((String) list.get(1))))) {
                if (!isOneInNProbability(30)) {
                    throw new BlockedOnParentShardException("Shard " + this.shardInfo.shardId() + "'s only child shard " + childShard + " has partial parent information in lease table. Hence deferring lease creation of child shard.");
                }
                throw new InvalidStateException("Shard " + this.shardInfo.shardId() + "'s only child shard " + childShard + " has partial parent information in lease table. Hence deferring lease creation of child shard.");
            }
        }
        for (ChildShard childShard2 : this.childShards) {
            if (this.leaseCoordinator.leaseRefresher().getLease(ShardInfo.getLeaseKey(this.shardInfo, childShard2.shardId())) == null) {
                Lease createLeaseForChildShard = this.hierarchicalShardSyncer.createLeaseForChildShard(childShard2, this.shardDetector.streamIdentifier());
                this.leaseCoordinator.leaseRefresher().createLeaseIfNotExists(createLeaseForChildShard);
                log.info("Shard {}: Created child shard lease: {}", this.shardInfo.shardId(), createLeaseForChildShard.leaseKey());
            }
        }
    }

    @VisibleForTesting
    boolean isOneInNProbability(int i) {
        return 1 == new Random().nextInt((i - 1) + 1) + 1;
    }

    private void updateLeaseWithChildShards(Lease lease) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
        Set set = (Set) this.childShards.stream().map((v0) -> {
            return v0.shardId();
        }).collect(Collectors.toSet());
        Lease copy = lease.copy();
        copy.childShardIds(set);
        this.leaseCoordinator.leaseRefresher().updateLeaseWithMetaInfo(copy, UpdateField.CHILD_SHARDS);
        log.info("Shard {}: Updated current lease {} with child shard information: {}", this.shardInfo.shardId(), lease.leaseKey(), set);
    }

    @Override // software.amazon.kinesis.lifecycle.ConsumerTask
    public TaskType taskType() {
        return this.taskType;
    }

    @VisibleForTesting
    public ShutdownReason getReason() {
        return this.reason;
    }

    private void dropLease(Lease lease) {
        if (lease == null) {
            log.warn("Shard {}: Unable to find the lease for shard. Will shutdown the shardConsumer directly.", leaseKeyProvider.apply(this.shardInfo));
        } else {
            this.leaseCoordinator.dropLease(lease);
            log.info("Dropped lease for shutting down ShardConsumer: " + lease.leaseKey());
        }
    }

    public ShutdownTask(@NonNull ShardInfo shardInfo, @NonNull ShardDetector shardDetector, @NonNull ShardRecordProcessor shardRecordProcessor, @NonNull ShardRecordProcessorCheckpointer shardRecordProcessorCheckpointer, @NonNull ShutdownReason shutdownReason, @NonNull InitialPositionInStreamExtended initialPositionInStreamExtended, boolean z, boolean z2, @NonNull LeaseCoordinator leaseCoordinator, long j, @NonNull RecordsPublisher recordsPublisher, @NonNull HierarchicalShardSyncer hierarchicalShardSyncer, @NonNull MetricsFactory metricsFactory, List<ChildShard> list, @NonNull StreamIdentifier streamIdentifier, @NonNull LeaseCleanupManager leaseCleanupManager) {
        if (shardInfo == null) {
            throw new NullPointerException("shardInfo is marked non-null but is null");
        }
        if (shardDetector == null) {
            throw new NullPointerException("shardDetector is marked non-null but is null");
        }
        if (shardRecordProcessor == null) {
            throw new NullPointerException("shardRecordProcessor is marked non-null but is null");
        }
        if (shardRecordProcessorCheckpointer == null) {
            throw new NullPointerException("recordProcessorCheckpointer is marked non-null but is null");
        }
        if (shutdownReason == null) {
            throw new NullPointerException("reason is marked non-null but is null");
        }
        if (initialPositionInStreamExtended == null) {
            throw new NullPointerException("initialPositionInStream is marked non-null but is null");
        }
        if (leaseCoordinator == null) {
            throw new NullPointerException("leaseCoordinator is marked non-null but is null");
        }
        if (recordsPublisher == null) {
            throw new NullPointerException("recordsPublisher is marked non-null but is null");
        }
        if (hierarchicalShardSyncer == null) {
            throw new NullPointerException("hierarchicalShardSyncer is marked non-null but is null");
        }
        if (metricsFactory == null) {
            throw new NullPointerException("metricsFactory is marked non-null but is null");
        }
        if (streamIdentifier == null) {
            throw new NullPointerException("streamIdentifier is marked non-null but is null");
        }
        if (leaseCleanupManager == null) {
            throw new NullPointerException("leaseCleanupManager is marked non-null but is null");
        }
        this.shardInfo = shardInfo;
        this.shardDetector = shardDetector;
        this.shardRecordProcessor = shardRecordProcessor;
        this.recordProcessorCheckpointer = shardRecordProcessorCheckpointer;
        this.reason = shutdownReason;
        this.initialPositionInStream = initialPositionInStreamExtended;
        this.cleanupLeasesOfCompletedShards = z;
        this.ignoreUnexpectedChildShards = z2;
        this.leaseCoordinator = leaseCoordinator;
        this.backoffTimeMillis = j;
        this.recordsPublisher = recordsPublisher;
        this.hierarchicalShardSyncer = hierarchicalShardSyncer;
        this.metricsFactory = metricsFactory;
        this.childShards = list;
        this.streamIdentifier = streamIdentifier;
        this.leaseCleanupManager = leaseCleanupManager;
    }
}
