package cloud.localstack.awssdkv2;

import cloud.localstack.awssdkv2.consumer.DeliveryStatusRecordProcessorFactory;
import cloud.localstack.awssdkv2.consumer.EventProcessor;
import cloud.localstack.docker.annotation.LocalstackDockerProperties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.CreateStreamResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.metrics.NullMetricsFactory;

@LocalstackDockerProperties(ignoreDockerRunErrors = true)
/* loaded from: input_file:cloud/localstack/awssdkv2/KinesisSchedulerTest.class */
public class KinesisSchedulerTest extends PowerMockLocalStack {
    String streamName = "test" + UUID.randomUUID().toString();
    String workerId = UUID.randomUUID().toString();
    String testMessage = "hello, world";
    Integer consumerCreationTime = 15;

    @Before
    public void mockServicesForScheduler() {
        PowerMockLocalStack.mockCloudWatchAsyncClient();
        PowerMockLocalStack.mockDynamoDBAsync();
        PowerMockLocalStack.mockKinesisAsync();
    }

    @Test
    public void schedulerTest() throws Exception {
        KinesisAsyncClient create = KinesisAsyncClient.create();
        DynamoDbAsyncClient create2 = DynamoDbAsyncClient.create();
        CloudWatchAsyncClient create3 = CloudWatchAsyncClient.create();
        createStream(create);
        TimeUnit.SECONDS.sleep(2L);
        EventProcessor eventProcessor = new EventProcessor();
        Scheduler createScheduler = createScheduler(new ConfigsBuilder(this.streamName, this.streamName, create, create2, create3, this.workerId, new DeliveryStatusRecordProcessorFactory(eventProcessor)));
        new Thread((Runnable) createScheduler).start();
        TimeUnit.SECONDS.sleep(this.consumerCreationTime.intValue());
        putRecord(create);
        TimeUnit.SECONDS.sleep(5L);
        createScheduler.shutdown();
        Assert.assertTrue(eventProcessor.CONSUMER_CREATED.booleanValue());
        Assert.assertTrue(eventProcessor.RECORD_RECEIVED.booleanValue());
        Assert.assertTrue(eventProcessor.messages.size() > 0);
        Assert.assertEquals(eventProcessor.messages.get(0), this.testMessage);
    }

    public Scheduler createScheduler(ConfigsBuilder configsBuilder) {
        return new Scheduler(configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig().metricsFactory(new NullMetricsFactory()), configsBuilder.processorConfig(), configsBuilder.retrievalConfig());
    }

    public void createStream(KinesisAsyncClient kinesisAsyncClient) throws Exception {
        Assert.assertNotNull((CreateStreamResponse) kinesisAsyncClient.createStream((CreateStreamRequest) CreateStreamRequest.builder().streamName(this.streamName).shardCount(1).build()).get());
    }

    public void putRecord(KinesisAsyncClient kinesisAsyncClient) throws Exception {
        System.out.println("PUTTING RECORD");
        Assert.assertNotNull((PutRecordResponse) kinesisAsyncClient.putRecord((PutRecordRequest) PutRecordRequest.builder().partitionKey("partitionkey").streamName(this.streamName).data(SdkBytes.fromUtf8String(this.testMessage)).build()).get());
    }
}
