package org.apache.pulsar.io.dynamodb;

import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.google.common.base.Preconditions;
import java.net.InetAddress;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.aws.AbstractAwsConnector;
import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name = "dynamodb", type = IOType.SOURCE, help = "A source connector that copies messages from DynamoDB Streams to Pulsar", configClass = DynamoDBSourceConfig.class)
/* loaded from: input_file:org/apache/pulsar/io/dynamodb/DynamoDBSource.class */
public class DynamoDBSource extends AbstractAwsConnector implements Source<byte[]> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DynamoDBSource.class);
    private LinkedBlockingQueue<StreamsRecord> queue;
    private DynamoDBSourceConfig dynamodbSourceConfig;
    private KinesisClientLibConfiguration kinesisClientLibConfig;
    private IRecordProcessorFactory recordProcessorFactory;
    private String workerId;
    private Worker worker;
    private Thread workerThread;
    private Throwable threadEx;

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.worker.shutdown();
    }

    @Override // org.apache.pulsar.io.core.Source
    public void open(Map<String, Object> map, SourceContext sourceContext) throws Exception {
        this.dynamodbSourceConfig = DynamoDBSourceConfig.load(map, sourceContext);
        Preconditions.checkArgument(StringUtils.isNotBlank(this.dynamodbSourceConfig.getAwsDynamodbStreamArn()), "empty dynamo-stream arn");
        Preconditions.checkArgument(StringUtils.isNotBlank(this.dynamodbSourceConfig.getAwsRegion()), "The aws-region must be set");
        Preconditions.checkArgument(StringUtils.isNotBlank(this.dynamodbSourceConfig.getAwsCredentialPluginParam()), "empty aws-credential param");
        if (this.dynamodbSourceConfig.getInitialPositionInStream() == InitialPositionInStream.AT_TIMESTAMP) {
            Preconditions.checkArgument(this.dynamodbSourceConfig.getStartAtTime() != null, "Timestamp must be specified");
        }
        this.queue = new LinkedBlockingQueue<>(this.dynamodbSourceConfig.getReceiveQueueSize());
        this.workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + String.valueOf(UUID.randomUUID());
        AwsCredentialProviderPlugin createCredentialProvider = createCredentialProvider(this.dynamodbSourceConfig.getAwsCredentialPluginName(), this.dynamodbSourceConfig.getAwsCredentialPluginParam());
        AmazonDynamoDBStreamsAdapterClient amazonDynamoDBStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(this.dynamodbSourceConfig.buildDynamoDBStreamsClient(createCredentialProvider));
        this.recordProcessorFactory = new StreamsRecordProcessorFactory(this.queue, this.dynamodbSourceConfig);
        this.kinesisClientLibConfig = new KinesisClientLibConfiguration(this.dynamodbSourceConfig.getApplicationName(), this.dynamodbSourceConfig.getAwsDynamodbStreamArn(), createCredentialProvider.getCredentialProvider(), this.workerId).withRegionName(this.dynamodbSourceConfig.getAwsRegion()).withInitialPositionInStream(this.dynamodbSourceConfig.getInitialPositionInStream());
        if (this.kinesisClientLibConfig.getInitialPositionInStream() == InitialPositionInStream.AT_TIMESTAMP) {
            this.kinesisClientLibConfig.withTimestampAtInitialPositionInStream(this.dynamodbSourceConfig.getStartAtTime());
        }
        this.worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(this.recordProcessorFactory, this.kinesisClientLibConfig, amazonDynamoDBStreamsAdapterClient, this.dynamodbSourceConfig.buildDynamoDBClient(createCredentialProvider), this.dynamodbSourceConfig.buildCloudwatchClient(createCredentialProvider));
        this.workerThread = new Thread(this.worker);
        this.workerThread.setDaemon(true);
        this.threadEx = null;
        this.workerThread.setUncaughtExceptionHandler((thread, th) -> {
            this.threadEx = th;
            log.error("Worker died with error", th);
        });
        this.workerThread.start();
    }

    @Override // org.apache.pulsar.io.core.Source
    /* renamed from: read, reason: merged with bridge method [inline-methods] */
    public Record<byte[]> read2() throws Exception {
        try {
            return this.queue.take();
        } catch (InterruptedException e) {
            log.warn("Got interrupted when trying to fetch out of the queue");
            if (this.threadEx != null) {
                log.error("error from scheduler", this.threadEx);
            }
            throw e;
        }
    }
}
