package org.elasticsoftware.akcestest;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.victools.jsonschema.generator.Option;
import com.github.victools.jsonschema.generator.OptionPreset;
import com.github.victools.jsonschema.generator.SchemaGenerator;
import com.github.victools.jsonschema.generator.SchemaGeneratorConfigBuilder;
import com.github.victools.jsonschema.generator.SchemaVersion;
import com.github.victools.jsonschema.module.jackson.JacksonModule;
import com.github.victools.jsonschema.module.jakarta.validation.JakartaValidationModule;
import com.github.victools.jsonschema.module.jakarta.validation.JakartaValidationOption;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import jakarta.inject.Inject;
import java.io.IOException;
import java.lang.reflect.Type;
import java.math.BigDecimal;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.elasticsoftware.akces.AggregateServiceApplication;
import org.elasticsoftware.akces.AkcesAggregateController;
import org.elasticsoftware.akces.client.AkcesClientController;
import org.elasticsoftware.akces.commands.Command;
import org.elasticsoftware.akces.control.AggregateServiceCommandType;
import org.elasticsoftware.akces.control.AggregateServiceDomainEventType;
import org.elasticsoftware.akces.control.AggregateServiceRecord;
import org.elasticsoftware.akces.control.AkcesControlRecord;
import org.elasticsoftware.akces.errors.AggregateAlreadyExistsErrorEvent;
import org.elasticsoftware.akces.events.DomainEvent;
import org.elasticsoftware.akces.gdpr.jackson.AkcesGDPRModule;
import org.elasticsoftware.akces.protocol.AggregateStateRecord;
import org.elasticsoftware.akces.protocol.CommandRecord;
import org.elasticsoftware.akces.protocol.DomainEventRecord;
import org.elasticsoftware.akces.protocol.PayloadEncoding;
import org.elasticsoftware.akces.protocol.ProtocolRecord;
import org.elasticsoftware.akces.serialization.AkcesControlRecordSerde;
import org.elasticsoftware.akces.serialization.BigDecimalSerializer;
import org.elasticsoftware.akcestest.aggregate.account.AccountCreatedEvent;
import org.elasticsoftware.akcestest.aggregate.account.AccountState;
import org.elasticsoftware.akcestest.aggregate.account.CreateAccountCommand;
import org.elasticsoftware.akcestest.aggregate.orders.BuyOrderCreatedEvent;
import org.elasticsoftware.akcestest.aggregate.orders.FxMarket;
import org.elasticsoftware.akcestest.aggregate.orders.PlaceBuyOrderCommand;
import org.elasticsoftware.akcestest.aggregate.wallet.CreditWalletCommand;
import org.elasticsoftware.akcestest.aggregate.wallet.WalletCreatedEvent;
import org.elasticsoftware.akcestest.old.BalanceCreatedEvent;
import org.elasticsoftware.akcestest.old.BuyOrderPlacedEvent;
import org.elasticsoftware.akcestest.old.CreateWalletCommand;
import org.elasticsoftware.akcestest.old.WalletCreditedEvent;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.ApplicationContextException;
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.support.TestPropertySourceUtils;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.utility.DockerImageName;

@SpringBootTest(classes = {AggregateServiceApplication.class}, args = {"org.elasticsoftware.akcestest.RuntimeConfiguration"}, useMainMethod = SpringBootTest.UseMainMethod.ALWAYS)
@DirtiesContext
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@Testcontainers
@ContextConfiguration(initializers = {DataSourceInitializer.class})
/* loaded from: input_file:org/elasticsoftware/akcestest/RuntimeTests.class */
public class RuntimeTests {
    private static final String CONFLUENT_PLATFORM_VERSION = "7.8.1";
    private static final Network network = Network.newNetwork();

    @Container
    private static final KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.8.1")).withKraft().withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false").withNetwork(network).withNetworkAliases(new String[]{"kafka"});

    @Container
    private static final GenericContainer<?> schemaRegistry = new GenericContainer(DockerImageName.parse("confluentinc/cp-schema-registry:7.8.1")).withNetwork(network).withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "kafka:9092").withEnv("SCHEMA_REGISTRY_HOST_NAME", "localhost").withEnv("SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL", "none").withExposedPorts(new Integer[]{8081}).withNetworkAliases(new String[]{"schema-registry"}).dependsOn(new Startable[]{kafka});

    @Inject
    @Qualifier("aggregateServiceKafkaAdmin")
    KafkaAdmin adminClient;

    @Inject
    @Qualifier("aggregateServiceSchemaRegistryClient")
    SchemaRegistryClient schemaRegistryClient;

    @Inject
    @Qualifier("WalletAkcesController")
    AkcesAggregateController walletAggregateController;

    @Inject
    @Qualifier("AccountAkcesController")
    AkcesAggregateController accountAggregateController;

    @Inject
    @Qualifier("OrderProcessManagerAkcesController")
    AkcesAggregateController orderProcessManagerAggregateController;

    @Inject
    @Qualifier("aggregateServiceConsumerFactory")
    ConsumerFactory<String, ProtocolRecord> consumerFactory;

    @Inject
    @Qualifier("aggregateServiceProducerFactory")
    ProducerFactory<String, ProtocolRecord> producerFactory;

    @Inject
    @Qualifier("aggregateServiceControlConsumerFactory")
    ConsumerFactory<String, AkcesControlRecord> controlConsumerFactory;

    @Inject
    @Qualifier("akcesClient")
    AkcesClientController akcesClient;

    @Inject
    ObjectMapper objectMapper;

    /* loaded from: input_file:org/elasticsoftware/akcestest/RuntimeTests$DataSourceInitializer.class */
    public static class DataSourceInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
        public void initialize(ConfigurableApplicationContext configurableApplicationContext) {
            TestUtils.prepareKafka(RuntimeTests.kafka.getBootstrapServers());
            CachedSchemaRegistryClient cachedSchemaRegistryClient = new CachedSchemaRegistryClient("http://" + RuntimeTests.schemaRegistry.getHost() + ":" + RuntimeTests.schemaRegistry.getMappedPort(8081), 100);
            TestUtils.prepareExternalSchemas(cachedSchemaRegistryClient, List.of(AccountCreatedEvent.class));
            RuntimeTests.prepareOldCommandSchemas(cachedSchemaRegistryClient);
            RuntimeTests.prepareOldDomainEventSchemas(cachedSchemaRegistryClient);
            try {
                TestUtils.prepareAggregateServiceRecords(RuntimeTests.kafka.getBootstrapServers());
                TestPropertySourceUtils.addInlinedPropertiesToEnvironment(configurableApplicationContext, new String[]{"akces.aggregate.schemas.forceRegister=true", "akces.rocksdb.baseDir=/tmp/akces", "spring.kafka.enabled=true", "spring.kafka.bootstrap-servers=" + RuntimeTests.kafka.getBootstrapServers(), "akces.schemaregistry.url=http://" + RuntimeTests.schemaRegistry.getHost() + ":" + RuntimeTests.schemaRegistry.getMappedPort(8081)});
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static void prepareExternalServices(String str) {
        KafkaProducer kafkaProducer = new KafkaProducer(Map.of("bootstrap.servers", str, "acks", "all", "enable.idempotence", "true", "linger.ms", "0", "max.in.flight.requests.per.connection", "1", "retries", "2147483647", "retry.backoff.ms", "0", "transactional.id", "Test-AkcesControllerProducer", "client.id", "Test-AkcesControllerProducer"), new StringSerializer(), new AkcesControlRecordSerde(new ObjectMapper()).serializer());
        try {
            kafkaProducer.initTransactions();
            AggregateServiceRecord aggregateServiceRecord = new AggregateServiceRecord("Account", "Account-Commands", "Account-DomainEvents", List.of(new AggregateServiceCommandType("CreateAccount", 1, true, "commands.CreateAccount")), List.of(new AggregateServiceDomainEventType("AccountCreated", 1, true, false, "domainevents.AccountCreated")), List.of());
            kafkaProducer.beginTransaction();
            for (int i = 0; i < 3; i++) {
                kafkaProducer.send(new ProducerRecord("Akces-Control", Integer.valueOf(i), "Account", aggregateServiceRecord));
            }
            kafkaProducer.commitTransaction();
            kafkaProducer.close();
        } catch (Throwable th) {
            try {
                kafkaProducer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @AfterAll
    @BeforeAll
    public static void cleanUp() throws IOException {
        if (Files.exists(Paths.get("/tmp/akces", new String[0]), new LinkOption[0])) {
            Files.walk(Paths.get("/tmp/akces", new String[0]), new FileVisitOption[0]).sorted(Comparator.reverseOrder()).map((v0) -> {
                return v0.toFile();
            }).filter((v0) -> {
                return v0.isDirectory();
            }).forEach(file -> {
                System.out.println(file.getAbsolutePath());
            });
            Files.walk(Paths.get("/tmp/akces", new String[0]), new FileVisitOption[0]).sorted(Comparator.reverseOrder()).map((v0) -> {
                return v0.toFile();
            }).forEach((v0) -> {
                v0.delete();
            });
        }
    }

    public static Stream<TopicPartition> generateTopicPartitions(String str, int i) {
        return IntStream.range(0, i).mapToObj(i2 -> {
            return new TopicPartition(str, i2);
        });
    }

    public static <C extends Command> void prepareOldCommandSchemas(SchemaRegistryClient schemaRegistryClient) {
        Jackson2ObjectMapperBuilder jackson2ObjectMapperBuilder = new Jackson2ObjectMapperBuilder();
        jackson2ObjectMapperBuilder.modulesToInstall(new Module[]{new AkcesGDPRModule()});
        jackson2ObjectMapperBuilder.serializerByType(BigDecimal.class, new BigDecimalSerializer());
        SchemaGeneratorConfigBuilder schemaGeneratorConfigBuilder = new SchemaGeneratorConfigBuilder(jackson2ObjectMapperBuilder.build(), SchemaVersion.DRAFT_7, OptionPreset.PLAIN_JSON);
        schemaGeneratorConfigBuilder.with(new JakartaValidationModule(new JakartaValidationOption[]{JakartaValidationOption.INCLUDE_PATTERN_EXPRESSIONS, JakartaValidationOption.NOT_NULLABLE_FIELD_IS_REQUIRED}));
        schemaGeneratorConfigBuilder.with(new JacksonModule());
        schemaGeneratorConfigBuilder.with(Option.FORBIDDEN_ADDITIONAL_PROPERTIES_BY_DEFAULT, new Option[0]);
        schemaGeneratorConfigBuilder.with(Option.NULLABLE_FIELDS_BY_DEFAULT, new Option[0]);
        schemaGeneratorConfigBuilder.with(Option.NULLABLE_METHOD_RETURN_VALUES_BY_DEFAULT, new Option[0]);
        schemaGeneratorConfigBuilder.forTypesInGeneral().withTypeAttributeOverride((objectNode, typeScope, schemaGenerationContext) -> {
            if (typeScope.getType().getTypeName().equals("java.math.BigDecimal")) {
                if (objectNode.get("type").isArray()) {
                    objectNode.get("type").set(0, "string");
                } else {
                    objectNode.put("type", "string");
                }
            }
        });
        try {
            schemaRegistryClient.register("commands.CreateWallet", new JsonSchema(new SchemaGenerator(schemaGeneratorConfigBuilder.build()).generateSchema(CreateWalletCommand.class, new Type[0]), List.of(), Map.of(), 1), 1, -1);
        } catch (IOException | RestClientException e) {
            throw new ApplicationContextException("Problem populating SchemaRegistry", e);
        }
    }

    public static <D extends DomainEvent> void prepareOldDomainEventSchemas(SchemaRegistryClient schemaRegistryClient) {
        Jackson2ObjectMapperBuilder jackson2ObjectMapperBuilder = new Jackson2ObjectMapperBuilder();
        jackson2ObjectMapperBuilder.modulesToInstall(new Module[]{new AkcesGDPRModule()});
        jackson2ObjectMapperBuilder.serializerByType(BigDecimal.class, new BigDecimalSerializer());
        SchemaGeneratorConfigBuilder schemaGeneratorConfigBuilder = new SchemaGeneratorConfigBuilder(jackson2ObjectMapperBuilder.build(), SchemaVersion.DRAFT_7, OptionPreset.PLAIN_JSON);
        schemaGeneratorConfigBuilder.with(new JakartaValidationModule(new JakartaValidationOption[]{JakartaValidationOption.INCLUDE_PATTERN_EXPRESSIONS, JakartaValidationOption.NOT_NULLABLE_FIELD_IS_REQUIRED}));
        schemaGeneratorConfigBuilder.with(new JacksonModule());
        schemaGeneratorConfigBuilder.with(Option.FORBIDDEN_ADDITIONAL_PROPERTIES_BY_DEFAULT, new Option[0]);
        schemaGeneratorConfigBuilder.with(Option.NULLABLE_FIELDS_BY_DEFAULT, new Option[0]);
        schemaGeneratorConfigBuilder.with(Option.NULLABLE_METHOD_RETURN_VALUES_BY_DEFAULT, new Option[0]);
        schemaGeneratorConfigBuilder.forTypesInGeneral().withTypeAttributeOverride((objectNode, typeScope, schemaGenerationContext) -> {
            if (typeScope.getType().getTypeName().equals("java.math.BigDecimal")) {
                if (objectNode.get("type").isArray()) {
                    objectNode.get("type").set(0, "string");
                } else {
                    objectNode.put("type", "string");
                }
            }
        });
        SchemaGenerator schemaGenerator = new SchemaGenerator(schemaGeneratorConfigBuilder.build());
        try {
            schemaRegistryClient.register("domainevents.BalanceCreated", new JsonSchema(schemaGenerator.generateSchema(BalanceCreatedEvent.class, new Type[0]), List.of(), Map.of(), 1), 1, -1);
            schemaRegistryClient.register("domainevents.BuyOrderPlaced", new JsonSchema(schemaGenerator.generateSchema(BuyOrderPlacedEvent.class, new Type[0]), List.of(), Map.of(), 1), 1, -1);
            schemaRegistryClient.register("domainevents.WalletCredited", new JsonSchema(schemaGenerator.generateSchema(WalletCreditedEvent.class, new Type[0]), List.of(), Map.of(), 1), 1, -1);
        } catch (IOException | RestClientException e) {
            throw new ApplicationContextException("Problem populating SchemaRegistry", e);
        }
    }

    @Test
    @Order(1)
    public void testKafkaAdminClient() {
        Assertions.assertNotNull(this.adminClient);
        Map describeTopics = this.adminClient.describeTopics(new String[]{"Akces-Control", "Wallet-Commands", "Wallet-DomainEvents", "Account-DomainEvents", "Wallet-AggregateState"});
        Assertions.assertNotNull(describeTopics);
        Assertions.assertFalse(describeTopics.isEmpty());
    }

    @Test
    @Order(2)
    public void createSchemas() throws RestClientException, IOException {
        while (true) {
            if (this.walletAggregateController.isRunning() && this.accountAggregateController.isRunning() && this.orderProcessManagerAggregateController.isRunning() && this.akcesClient.isRunning()) {
                System.out.println(this.schemaRegistryClient.getAllSubjects());
                return;
            }
            Thread.onSpinWait();
        }
    }

    @Test
    @Order(3)
    public void testAkcesControl() throws JsonProcessingException {
        ConsumerRecords consumerRecords;
        ConsumerRecords consumerRecords2;
        Assertions.assertNotNull(this.walletAggregateController);
        Assertions.assertNotNull(this.accountAggregateController);
        Assertions.assertNotNull(this.orderProcessManagerAggregateController);
        Assertions.assertNotNull(this.akcesClient);
        while (true) {
            if (this.walletAggregateController.isRunning() && this.accountAggregateController.isRunning() && this.orderProcessManagerAggregateController.isRunning() && this.akcesClient.isRunning()) {
                break;
            } else {
                Thread.onSpinWait();
            }
        }
        Producer createProducer = this.producerFactory.createProducer("test");
        Consumer createConsumer = this.consumerFactory.createConsumer("Test", "test");
        Consumer createConsumer2 = this.controlConsumerFactory.createConsumer("Test-AkcesControl", "test-akces-control");
        TopicPartition topicPartition = new TopicPartition("Akces-Control", 0);
        createConsumer2.assign(List.of(topicPartition));
        createConsumer2.seekToBeginning(createConsumer2.assignment());
        Map endOffsets = createConsumer2.endOffsets(createConsumer2.assignment());
        while (((Long) endOffsets.getOrDefault(topicPartition, 0L)).longValue() > createConsumer2.position(topicPartition)) {
            ConsumerRecords poll = createConsumer2.poll(Duration.ofMillis(1000L));
            if (!poll.isEmpty()) {
                Iterator it = poll.records(topicPartition).iterator();
                while (it.hasNext()) {
                    System.out.println(this.objectMapper.writeValueAsString(((ConsumerRecord) it.next()).value()));
                }
            }
        }
        createConsumer2.close();
        while (!this.walletAggregateController.isRunning()) {
            Thread.onSpinWait();
        }
        org.elasticsoftware.akcestest.aggregate.wallet.CreateWalletCommand createWalletCommand = new org.elasticsoftware.akcestest.aggregate.wallet.CreateWalletCommand("086fe270-f848-4b37-9858-f5311280a32e", "USD");
        CommandRecord commandRecord = new CommandRecord((String) null, "CreateWallet", 1, this.objectMapper.writeValueAsBytes(createWalletCommand), PayloadEncoding.JSON, createWalletCommand.getAggregateId(), (String) null, (String) null);
        String resolveTopic = this.walletAggregateController.resolveTopic(createWalletCommand.getClass());
        int intValue = this.walletAggregateController.resolvePartition(createWalletCommand.getAggregateId()).intValue();
        createProducer.beginTransaction();
        createProducer.send(new ProducerRecord(resolveTopic, Integer.valueOf(intValue), commandRecord.aggregateId(), commandRecord));
        createProducer.commitTransaction();
        TopicPartition topicPartition2 = new TopicPartition("Wallet-AggregateState", intValue);
        TopicPartition topicPartition3 = new TopicPartition("Wallet-DomainEvents", intValue);
        createConsumer.assign(List.of(topicPartition2, topicPartition3));
        createConsumer.seekToBeginning(createConsumer.assignment());
        ConsumerRecords poll2 = createConsumer.poll(Duration.ofMillis(250L));
        while (true) {
            consumerRecords = poll2;
            if (!consumerRecords.isEmpty()) {
                break;
            } else {
                poll2 = createConsumer.poll(Duration.ofMillis(250L));
            }
        }
        Assertions.assertFalse(consumerRecords.isEmpty());
        CreditWalletCommand creditWalletCommand = new CreditWalletCommand("086fe270-f848-4b37-9858-f5311280a32e", "USD", new BigDecimal("100.00"));
        CommandRecord commandRecord2 = new CommandRecord((String) null, "CreditWallet", 1, this.objectMapper.writeValueAsBytes(creditWalletCommand), PayloadEncoding.JSON, creditWalletCommand.getAggregateId(), (String) null, (String) null);
        createProducer.beginTransaction();
        createProducer.send(new ProducerRecord(resolveTopic, Integer.valueOf(intValue), commandRecord2.aggregateId(), commandRecord2));
        createProducer.commitTransaction();
        ConsumerRecords poll3 = createConsumer.poll(Duration.ofMillis(250L));
        while (true) {
            consumerRecords2 = poll3;
            if (!consumerRecords2.isEmpty()) {
                break;
            } else {
                poll3 = createConsumer.poll(Duration.ofMillis(250L));
            }
        }
        Assertions.assertFalse(consumerRecords2.isEmpty());
        CreditWalletCommand creditWalletCommand2 = new CreditWalletCommand("086fe270-f848-4b37-9858-f5311280a32e", "USD", new BigDecimal("-100.00"));
        CommandRecord commandRecord3 = new CommandRecord((String) null, "CreditWallet", 1, this.objectMapper.writeValueAsBytes(creditWalletCommand2), PayloadEncoding.JSON, creditWalletCommand2.getAggregateId(), (String) null, (String) null);
        createProducer.beginTransaction();
        createProducer.send(new ProducerRecord(resolveTopic, Integer.valueOf(intValue), commandRecord3.aggregateId(), commandRecord3));
        createProducer.commitTransaction();
        ConsumerRecords poll4 = createConsumer.poll(Duration.ofMillis(250L));
        while (true) {
            ConsumerRecords consumerRecords3 = poll4;
            if (!consumerRecords3.isEmpty()) {
                Assertions.assertTrue(consumerRecords3.records(topicPartition2).isEmpty());
                Assertions.assertEquals(1, consumerRecords3.records(topicPartition3).size());
                Assertions.assertEquals("InvalidAmountError", ((DomainEventRecord) ((ConsumerRecord) consumerRecords3.records(topicPartition3).getFirst()).value()).name());
                createConsumer.close();
                createProducer.close();
                return;
            }
            poll4 = createConsumer.poll(Duration.ofMillis(250L));
        }
    }

    @Test
    @Order(4)
    public void testBatchedCommands() throws JsonProcessingException {
        while (true) {
            if (this.walletAggregateController.isRunning() && this.accountAggregateController.isRunning() && this.orderProcessManagerAggregateController.isRunning() && this.akcesClient.isRunning()) {
                break;
            } else {
                Thread.onSpinWait();
            }
        }
        List of = List.of("47db2418-dd10-11ed-afa1-0242ac120002", "47db2418-dd10-11ed-afa1-0242ac120003", "47db2418-dd10-11ed-afa1-0242ac120004", "47db2418-dd10-11ed-afa1-0242ac120005", "47db2418-dd10-11ed-afa1-0242ac120006", "47db2418-dd10-11ed-afa1-0242ac120007", "47db2418-dd10-11ed-afa1-0242ac120008", "47db2418-dd10-11ed-afa1-0242ac120009", "47db2418-dd10-11ed-afa1-0242ac120010", "47db2418-dd10-11ed-afa1-0242ac120011");
        Producer createProducer = this.producerFactory.createProducer("test");
        try {
            final Consumer createConsumer = this.consumerFactory.createConsumer("Test", "test");
            try {
                final Map endOffsets = createConsumer.endOffsets(Stream.concat(generateTopicPartitions("Wallet-AggregateState", 3), generateTopicPartitions("Wallet-DomainEvents", 3)).toList());
                createProducer.beginTransaction();
                Iterator it = of.iterator();
                while (it.hasNext()) {
                    org.elasticsoftware.akcestest.aggregate.wallet.CreateWalletCommand createWalletCommand = new org.elasticsoftware.akcestest.aggregate.wallet.CreateWalletCommand((String) it.next(), "USD");
                    CommandRecord commandRecord = new CommandRecord((String) null, "CreateWallet", 1, this.objectMapper.writeValueAsBytes(createWalletCommand), PayloadEncoding.JSON, createWalletCommand.getAggregateId(), (String) null, (String) null);
                    createProducer.send(new ProducerRecord(this.walletAggregateController.resolveTopic(createWalletCommand.getClass()), Integer.valueOf(this.walletAggregateController.resolvePartition(createWalletCommand.getAggregateId()).intValue()), commandRecord.aggregateId(), commandRecord));
                }
                createProducer.commitTransaction();
                createConsumer.subscribe(List.of("Wallet-AggregateState", "Wallet-DomainEvents"), new ConsumerRebalanceListener(this) { // from class: org.elasticsoftware.akcestest.RuntimeTests.1
                    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                    }

                    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                        Consumer consumer = createConsumer;
                        Map map = endOffsets;
                        collection.forEach(topicPartition -> {
                            consumer.seek(topicPartition, ((Long) map.get(topicPartition)).longValue());
                        });
                    }
                });
                ConsumerRecords poll = createConsumer.poll(Duration.ofMillis(250L));
                ArrayList arrayList = new ArrayList();
                while (arrayList.size() < 40) {
                    poll.forEach(consumerRecord -> {
                        arrayList.add((ProtocolRecord) consumerRecord.value());
                    });
                    poll = createConsumer.poll(Duration.ofMillis(250L));
                }
                Assertions.assertEquals(40, arrayList.size());
                if (createConsumer != null) {
                    createConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @Order(5)
    public void testCreateViaExternalDomainEvent() throws JsonProcessingException {
        while (true) {
            if (this.walletAggregateController.isRunning() && this.accountAggregateController.isRunning() && this.orderProcessManagerAggregateController.isRunning() && this.akcesClient.isRunning()) {
                break;
            } else {
                Thread.onSpinWait();
            }
        }
        Producer createProducer = this.producerFactory.createProducer("test");
        try {
            final Consumer createConsumer = this.consumerFactory.createConsumer("Test", "test");
            try {
                final Map endOffsets = createConsumer.endOffsets(Stream.concat(generateTopicPartitions("Wallet-AggregateState", 3), generateTopicPartitions("Wallet-DomainEvents", 3)).toList());
                createProducer.beginTransaction();
                CreateAccountCommand createAccountCommand = new CreateAccountCommand("47db2418-dd10-11ed-afa1-0242ac120012", "NL", "Fahim", "Zuijderwijk", "FahimZuijderwijk@jourrapide.com");
                CommandRecord commandRecord = new CommandRecord((String) null, "CreateAccount", 1, this.objectMapper.writeValueAsBytes(createAccountCommand), PayloadEncoding.JSON, createAccountCommand.getAggregateId(), (String) null, (String) null);
                createProducer.send(new ProducerRecord(this.walletAggregateController.resolveTopic(createAccountCommand.getClass()), Integer.valueOf(this.walletAggregateController.resolvePartition(createAccountCommand.getAggregateId()).intValue()), commandRecord.aggregateId(), commandRecord));
                createProducer.commitTransaction();
                createConsumer.subscribe(List.of("Wallet-AggregateState", "Wallet-DomainEvents"), new ConsumerRebalanceListener(this) { // from class: org.elasticsoftware.akcestest.RuntimeTests.2
                    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                    }

                    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                        Consumer consumer = createConsumer;
                        Map map = endOffsets;
                        collection.forEach(topicPartition -> {
                            consumer.seek(topicPartition, ((Long) map.get(topicPartition)).longValue());
                        });
                    }
                });
                ConsumerRecords poll = createConsumer.poll(Duration.ofMillis(250L));
                ArrayList arrayList = new ArrayList();
                while (arrayList.size() < 4) {
                    poll.forEach(consumerRecord -> {
                        arrayList.add((ProtocolRecord) consumerRecord.value());
                    });
                    poll = createConsumer.poll(Duration.ofMillis(250L));
                }
                Assertions.assertEquals(4, arrayList.size());
                if (createConsumer != null) {
                    createConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @Order(6)
    public void testGDPREncryption() throws IOException {
        while (true) {
            if (this.walletAggregateController.isRunning() && this.accountAggregateController.isRunning() && this.orderProcessManagerAggregateController.isRunning() && this.akcesClient.isRunning()) {
                break;
            } else {
                Thread.onSpinWait();
            }
        }
        Producer createProducer = this.producerFactory.createProducer("test");
        try {
            final Consumer createConsumer = this.consumerFactory.createConsumer("Test", "test");
            try {
                final Map endOffsets = createConsumer.endOffsets(Stream.concat(generateTopicPartitions("Account-AggregateState", 3), generateTopicPartitions("Account-DomainEvents", 3)).toList());
                createProducer.beginTransaction();
                CreateAccountCommand createAccountCommand = new CreateAccountCommand("ca7c8e7f-d1a3-46ba-b400-f543d0c04bc6", "NL", "Fahim", "Zuijderwijk", "FahimZuijderwijk@jourrapide.com");
                CommandRecord commandRecord = new CommandRecord((String) null, "CreateAccount", 1, this.objectMapper.writeValueAsBytes(createAccountCommand), PayloadEncoding.JSON, createAccountCommand.getAggregateId(), (String) null, (String) null);
                createProducer.send(new ProducerRecord(this.walletAggregateController.resolveTopic(createAccountCommand.getClass()), Integer.valueOf(this.walletAggregateController.resolvePartition(createAccountCommand.getAggregateId()).intValue()), commandRecord.aggregateId(), commandRecord));
                createProducer.commitTransaction();
                createConsumer.subscribe(List.of("Account-AggregateState", "Account-DomainEvents"), new ConsumerRebalanceListener(this) { // from class: org.elasticsoftware.akcestest.RuntimeTests.3
                    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                    }

                    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                        Consumer consumer = createConsumer;
                        Map map = endOffsets;
                        collection.forEach(topicPartition -> {
                            consumer.seek(topicPartition, ((Long) map.get(topicPartition)).longValue());
                        });
                    }
                });
                ConsumerRecords poll = createConsumer.poll(Duration.ofMillis(250L));
                ArrayList arrayList = new ArrayList();
                while (arrayList.size() < 2) {
                    poll.forEach(consumerRecord -> {
                        arrayList.add((ProtocolRecord) consumerRecord.value());
                    });
                    poll = createConsumer.poll(Duration.ofMillis(250L));
                }
                Assertions.assertEquals(2, arrayList.size());
                Assertions.assertInstanceOf(DomainEventRecord.class, arrayList.get(1));
                AccountCreatedEvent accountCreatedEvent = (AccountCreatedEvent) this.objectMapper.readValue(((DomainEventRecord) arrayList.get(1)).payload(), AccountCreatedEvent.class);
                Assertions.assertNotEquals("Fahim", accountCreatedEvent.firstName());
                Assertions.assertNotEquals("Zuijderwijk", accountCreatedEvent.lastName());
                Assertions.assertNotEquals("FahimZuijderwijk@jourrapide.com", accountCreatedEvent.email());
                AccountState accountState = (AccountState) this.objectMapper.readValue(((AggregateStateRecord) arrayList.getFirst()).payload(), AccountState.class);
                Assertions.assertNotEquals("Fahim", accountState.firstName());
                Assertions.assertNotEquals("Zuijderwijk", accountState.lastName());
                Assertions.assertNotEquals("FahimZuijderwijk@jourrapide.com", accountState.email());
                if (createConsumer != null) {
                    createConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @Order(7)
    public void testDomainEventIndexing() throws IOException {
        while (true) {
            if (this.walletAggregateController.isRunning() && this.accountAggregateController.isRunning() && this.orderProcessManagerAggregateController.isRunning() && this.akcesClient.isRunning()) {
                break;
            } else {
                Thread.onSpinWait();
            }
        }
        Producer createProducer = this.producerFactory.createProducer("test");
        try {
            Consumer createConsumer = this.consumerFactory.createConsumer("Test", "test");
            try {
                createProducer.beginTransaction();
                CreateAccountCommand createAccountCommand = new CreateAccountCommand("1837552e-45c4-41ff-a833-075c5a5fa49e", "NL", "Fahim", "Zuijderwijk", "FahimZuijderwijk@jourrapide.com");
                CommandRecord commandRecord = new CommandRecord((String) null, "CreateAccount", 1, this.objectMapper.writeValueAsBytes(createAccountCommand), PayloadEncoding.JSON, createAccountCommand.getAggregateId(), (String) null, (String) null);
                createProducer.send(new ProducerRecord(this.walletAggregateController.resolveTopic(createAccountCommand.getClass()), Integer.valueOf(this.walletAggregateController.resolvePartition(createAccountCommand.getAggregateId()).intValue()), commandRecord.aggregateId(), commandRecord));
                createProducer.commitTransaction();
                TopicDescription topicDescription = getTopicDescription("Users-1837552e-45c4-41ff-a833-075c5a5fa49e-DomainEventIndex");
                while (topicDescription == null) {
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    topicDescription = getTopicDescription("Users-1837552e-45c4-41ff-a833-075c5a5fa49e-DomainEventIndex");
                }
                createConsumer.assign(generateTopicPartitions("Users-1837552e-45c4-41ff-a833-075c5a5fa49e-DomainEventIndex", 1).toList());
                createConsumer.seekToBeginning(createConsumer.assignment());
                ConsumerRecords poll = createConsumer.poll(Duration.ofMillis(250L));
                ArrayList arrayList = new ArrayList();
                while (arrayList.size() < 4) {
                    poll.forEach(consumerRecord -> {
                        arrayList.add((ProtocolRecord) consumerRecord.value());
                    });
                    poll = createConsumer.poll(Duration.ofMillis(250L));
                }
                Assertions.assertEquals(4, arrayList.size());
                List list = arrayList.stream().map((v0) -> {
                    return v0.name();
                }).toList();
                Assertions.assertEquals("AccountCreated", list.getFirst());
                Assertions.assertTrue(list.indexOf("WalletCreated") < list.indexOf("BalanceCreated"));
                Assertions.assertTrue(list.contains("UserOrderProcessesCreated"));
                if (createConsumer != null) {
                    createConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @Order(8)
    public void testDomainEventIndexingWithErrorEvents() throws IOException {
        while (true) {
            if (this.walletAggregateController.isRunning() && this.accountAggregateController.isRunning() && this.orderProcessManagerAggregateController.isRunning() && this.akcesClient.isRunning()) {
                break;
            } else {
                Thread.onSpinWait();
            }
        }
        Producer createProducer = this.producerFactory.createProducer("test");
        try {
            Consumer createConsumer = this.consumerFactory.createConsumer("Test", "test");
            try {
                org.elasticsoftware.akcestest.aggregate.wallet.CreateWalletCommand createWalletCommand = new org.elasticsoftware.akcestest.aggregate.wallet.CreateWalletCommand("d3bd665a-6c67-4301-a8f1-4381f8d7d567", "USD");
                CommandRecord commandRecord = new CommandRecord((String) null, "CreateWallet", 1, this.objectMapper.writeValueAsBytes(createWalletCommand), PayloadEncoding.JSON, createWalletCommand.getAggregateId(), (String) null, (String) null);
                String resolveTopic = this.walletAggregateController.resolveTopic(createWalletCommand.getClass());
                int intValue = this.walletAggregateController.resolvePartition(createWalletCommand.getAggregateId()).intValue();
                createProducer.beginTransaction();
                createProducer.send(new ProducerRecord(resolveTopic, Integer.valueOf(intValue), commandRecord.aggregateId(), commandRecord));
                createProducer.commitTransaction();
                CreditWalletCommand creditWalletCommand = new CreditWalletCommand("d3bd665a-6c67-4301-a8f1-4381f8d7d567", "USD", new BigDecimal("100.00"));
                CommandRecord commandRecord2 = new CommandRecord((String) null, "CreditWallet", 1, this.objectMapper.writeValueAsBytes(creditWalletCommand), PayloadEncoding.JSON, creditWalletCommand.getAggregateId(), (String) null, (String) null);
                createProducer.beginTransaction();
                createProducer.send(new ProducerRecord(resolveTopic, Integer.valueOf(intValue), commandRecord2.aggregateId(), commandRecord2));
                createProducer.commitTransaction();
                CreditWalletCommand creditWalletCommand2 = new CreditWalletCommand("d3bd665a-6c67-4301-a8f1-4381f8d7d567", "USD", new BigDecimal("-100.00"));
                CommandRecord commandRecord3 = new CommandRecord((String) null, "CreditWallet", 1, this.objectMapper.writeValueAsBytes(creditWalletCommand2), PayloadEncoding.JSON, creditWalletCommand2.getAggregateId(), (String) null, (String) null);
                createProducer.beginTransaction();
                createProducer.send(new ProducerRecord(resolveTopic, Integer.valueOf(intValue), commandRecord3.aggregateId(), commandRecord3));
                createProducer.commitTransaction();
                TopicDescription topicDescription = getTopicDescription("Users-d3bd665a-6c67-4301-a8f1-4381f8d7d567-DomainEventIndex");
                while (topicDescription == null) {
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    topicDescription = getTopicDescription("Users-d3bd665a-6c67-4301-a8f1-4381f8d7d567-DomainEventIndex");
                }
                createConsumer.assign(generateTopicPartitions("Users-d3bd665a-6c67-4301-a8f1-4381f8d7d567-DomainEventIndex", 1).toList());
                createConsumer.seekToBeginning(createConsumer.assignment());
                ConsumerRecords poll = createConsumer.poll(Duration.ofMillis(250L));
                ArrayList arrayList = new ArrayList();
                while (arrayList.size() < 3) {
                    poll.forEach(consumerRecord -> {
                        arrayList.add((ProtocolRecord) consumerRecord.value());
                    });
                    poll = createConsumer.poll(Duration.ofMillis(250L));
                }
                Assertions.assertEquals(3, arrayList.size());
                Assertions.assertEquals("WalletCreated", ((ProtocolRecord) arrayList.getFirst()).name());
                Assertions.assertEquals("BalanceCreated", ((ProtocolRecord) arrayList.get(1)).name());
                Assertions.assertEquals("WalletCredited", ((ProtocolRecord) arrayList.getLast()).name());
                if (createConsumer != null) {
                    createConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @Order(9)
    public void testWithAkcesClient() throws ExecutionException, InterruptedException, TimeoutException {
        while (true) {
            if (this.walletAggregateController.isRunning() && this.accountAggregateController.isRunning() && this.orderProcessManagerAggregateController.isRunning() && this.akcesClient.isRunning()) {
                List list = (List) this.akcesClient.send("TEST_TENANT", new org.elasticsoftware.akcestest.aggregate.wallet.CreateWalletCommand("243f2482-d81f-42cb-91f1-105a79f35e34", "USD")).toCompletableFuture().get(10L, TimeUnit.SECONDS);
                Assertions.assertNotNull(list);
                Assertions.assertEquals(2, list.size());
                Assertions.assertInstanceOf(WalletCreatedEvent.class, list.getFirst());
                Assertions.assertInstanceOf(org.elasticsoftware.akcestest.aggregate.wallet.BalanceCreatedEvent.class, list.getLast());
                return;
            }
            Thread.onSpinWait();
        }
    }

    @Test
    @Order(10)
    public void testOrderFlowWithAkcesClient() throws ExecutionException, InterruptedException, TimeoutException {
        while (true) {
            if (this.walletAggregateController.isRunning() && this.accountAggregateController.isRunning() && this.orderProcessManagerAggregateController.isRunning() && this.akcesClient.isRunning()) {
                List list = (List) this.akcesClient.send("TEST_TENANT", new CreateAccountCommand("a28d41c4-9f9c-4708-b142-6b83768ee8f3", "NL", "Bella", "Fowler", "bella.fowler@example.com")).toCompletableFuture().get(10L, TimeUnit.SECONDS);
                Assertions.assertNotNull(list);
                Assertions.assertEquals(1, list.size());
                Assertions.assertInstanceOf(AccountCreatedEvent.class, list.getFirst());
                this.akcesClient.sendAndForget("TEST_TENANT", new CreditWalletCommand("a28d41c4-9f9c-4708-b142-6b83768ee8f3", "EUR", new BigDecimal("100.00")));
                List list2 = (List) this.akcesClient.send("TEST_TENANT", new PlaceBuyOrderCommand("a28d41c4-9f9c-4708-b142-6b83768ee8f3", new FxMarket("USDEUR", "USD", "EUR"), new BigDecimal("90.00"), new BigDecimal("1.05"), "trade-1")).toCompletableFuture().get(10L, TimeUnit.SECONDS);
                Assertions.assertNotNull(list2);
                Assertions.assertEquals(1, list2.size());
                Assertions.assertInstanceOf(BuyOrderCreatedEvent.class, list2.getFirst());
                return;
            }
            Thread.onSpinWait();
        }
    }

    @Test
    @Order(11)
    public void testAggregateAlreadyExistsErrorWithAkcesClient() throws ExecutionException, InterruptedException, TimeoutException {
        while (true) {
            if (this.walletAggregateController.isRunning() && this.accountAggregateController.isRunning() && this.orderProcessManagerAggregateController.isRunning() && this.akcesClient.isRunning()) {
                CreateAccountCommand createAccountCommand = new CreateAccountCommand("854c0c16-b3be-4d04-8ce8-a606eec89a1f", "USA", "Angelo", "Plummer", "AngeloRPlummer@rhyta.com");
                List list = (List) this.akcesClient.send("TEST_TENANT", createAccountCommand).toCompletableFuture().get(10L, TimeUnit.SECONDS);
                Assertions.assertNotNull(list);
                Assertions.assertEquals(1, list.size());
                Assertions.assertInstanceOf(AccountCreatedEvent.class, list.getFirst());
                List list2 = (List) this.akcesClient.send("TEST_TENANT", createAccountCommand).toCompletableFuture().get(10L, TimeUnit.SECONDS);
                Assertions.assertNotNull(list2);
                Assertions.assertEquals(1, list2.size());
                Assertions.assertInstanceOf(AggregateAlreadyExistsErrorEvent.class, list2.getFirst());
                return;
            }
            Thread.onSpinWait();
        }
    }

    public TopicDescription getTopicDescription(String str) {
        try {
            return (TopicDescription) this.adminClient.describeTopics(new String[]{str}).get(str);
        } catch (KafkaException e) {
            if (e.getCause().getClass().equals(ExecutionException.class) && e.getCause().getCause().getClass().equals(UnknownTopicOrPartitionException.class)) {
                return null;
            }
            throw e;
        }
    }
}
