package org.elasticsoftware.akces.kafka;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.WakeupException;
import org.elasticsoftware.akces.aggregate.AggregateRuntime;
import org.elasticsoftware.akces.aggregate.CommandType;
import org.elasticsoftware.akces.aggregate.DomainEventType;
import org.elasticsoftware.akces.aggregate.IndexParams;
import org.elasticsoftware.akces.commands.Command;
import org.elasticsoftware.akces.commands.CommandBus;
import org.elasticsoftware.akces.control.AkcesRegistry;
import org.elasticsoftware.akces.gdpr.GDPRContextHolder;
import org.elasticsoftware.akces.gdpr.GDPRContextRepository;
import org.elasticsoftware.akces.gdpr.GDPRContextRepositoryFactory;
import org.elasticsoftware.akces.gdpr.GDPRKeyUtils;
import org.elasticsoftware.akces.protocol.AggregateStateRecord;
import org.elasticsoftware.akces.protocol.CommandRecord;
import org.elasticsoftware.akces.protocol.CommandResponseRecord;
import org.elasticsoftware.akces.protocol.DomainEventRecord;
import org.elasticsoftware.akces.protocol.GDPRKeyRecord;
import org.elasticsoftware.akces.protocol.PayloadEncoding;
import org.elasticsoftware.akces.protocol.ProtocolRecord;
import org.elasticsoftware.akces.state.AggregateStateRepository;
import org.elasticsoftware.akces.state.AggregateStateRepositoryFactory;
import org.elasticsoftware.akces.util.HostUtils;
import org.elasticsoftware.akces.util.KafkaSender;
import org.elasticsoftware.akces.util.KafkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.ProducerFactory;

/* loaded from: input_file:org/elasticsoftware/akces/kafka/AggregatePartition.class */
public class AggregatePartition implements Runnable, AutoCloseable, CommandBus {
    private static final Logger logger = LoggerFactory.getLogger(AggregatePartition.class);
    private final ConsumerFactory<String, ProtocolRecord> consumerFactory;
    private final ProducerFactory<String, ProtocolRecord> producerFactory;
    private final AggregateRuntime runtime;
    private final AggregateStateRepository stateRepository;
    private final GDPRContextRepository gdprContextRepository;
    private final Integer id;
    private final TopicPartition commandPartition;
    private final TopicPartition domainEventPartition;
    private final TopicPartition statePartition;
    private final TopicPartition gdprKeyPartition;
    private final Collection<DomainEventType<?>> externalDomainEventTypes;
    private final AkcesRegistry ackesRegistry;
    private final BiFunction<String, String, Boolean> indexTopicCreator;
    private Consumer<String, ProtocolRecord> consumer;
    private Producer<String, ProtocolRecord> producer;
    private final Set<TopicPartition> externalEventPartitions = new HashSet();
    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
    private Map<TopicPartition, Long> initializedEndOffsets = Collections.emptyMap();
    private volatile Thread aggregatePartitionThread = null;
    private volatile AggregatePartitionState processState = AggregatePartitionState.INITIALIZING;

    public AggregatePartition(ConsumerFactory<String, ProtocolRecord> consumerFactory, ProducerFactory<String, ProtocolRecord> producerFactory, AggregateRuntime aggregateRuntime, AggregateStateRepositoryFactory aggregateStateRepositoryFactory, GDPRContextRepositoryFactory gDPRContextRepositoryFactory, Integer num, TopicPartition topicPartition, TopicPartition topicPartition2, TopicPartition topicPartition3, TopicPartition topicPartition4, Collection<DomainEventType<?>> collection, AkcesRegistry akcesRegistry, BiFunction<String, String, Boolean> biFunction) {
        this.gdprKeyPartition = topicPartition4;
        this.ackesRegistry = akcesRegistry;
        this.consumerFactory = consumerFactory;
        this.producerFactory = producerFactory;
        this.runtime = aggregateRuntime;
        this.indexTopicCreator = biFunction;
        this.stateRepository = aggregateStateRepositoryFactory.create(aggregateRuntime, num);
        this.id = num;
        this.commandPartition = topicPartition;
        this.domainEventPartition = topicPartition2;
        this.statePartition = topicPartition3;
        this.externalDomainEventTypes = collection;
        this.gdprContextRepository = gDPRContextRepositoryFactory.create(aggregateRuntime.getName(), num);
    }

    public Integer getId() {
        return this.id;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            try {
                this.aggregatePartitionThread = Thread.currentThread();
                AggregatePartitionCommandBus.registerCommandBus(this);
                logger.info("Starting AggregatePartition {} of {}Aggregate", this.id, this.runtime.getName());
                this.consumer = this.consumerFactory.createConsumer(this.runtime.getName() + "Aggregate-partition-" + this.id, this.runtime.getName() + "Aggregate-partition-" + this.id + "-" + HostUtils.getHostName(), (String) null);
                this.producer = this.producerFactory.createProducer(this.runtime.getName() + "Aggregate-partition-" + this.id + "-" + HostUtils.getHostName());
                this.externalDomainEventTypes.forEach(domainEventType -> {
                    this.externalEventPartitions.add(new TopicPartition(this.ackesRegistry.resolveTopic(domainEventType), this.id.intValue()));
                });
                this.consumer.assign(Stream.concat(Stream.concat(Stream.of((Object[]) new TopicPartition[]{this.commandPartition, this.domainEventPartition, this.statePartition}), this.runtime.shouldHandlePIIData() ? Stream.of(this.gdprKeyPartition) : Stream.empty()), this.externalEventPartitions.stream()).toList());
                logger.info("Assigned partitions {} for AggregatePartition {} of {}Aggregate", new Object[]{this.consumer.assignment(), this.id, this.runtime.getName()});
                while (this.processState != AggregatePartitionState.SHUTTING_DOWN) {
                    process();
                }
                logger.info("Shutting down AggregatePartition {} of {}Aggregate", this.id, this.runtime.getName());
                try {
                    this.consumer.close(Duration.ofSeconds(5L));
                    this.producer.close(Duration.ofSeconds(5L));
                } catch (KafkaException e) {
                    logger.error("Error closing consumer/producer", e);
                } catch (InterruptException e2) {
                }
                try {
                    this.stateRepository.close();
                } catch (IOException e3) {
                    logger.error("Error closing state repository", e3);
                }
                try {
                    this.gdprContextRepository.close();
                } catch (IOException e4) {
                    logger.error("Error closing gdpr context repository", e4);
                }
                AggregatePartitionCommandBus.registerCommandBus(null);
            } catch (Throwable th) {
                try {
                    this.consumer.close(Duration.ofSeconds(5L));
                    this.producer.close(Duration.ofSeconds(5L));
                } catch (InterruptException e5) {
                } catch (KafkaException e6) {
                    logger.error("Error closing consumer/producer", e6);
                }
                try {
                    this.stateRepository.close();
                } catch (IOException e7) {
                    logger.error("Error closing state repository", e7);
                }
                try {
                    this.gdprContextRepository.close();
                } catch (IOException e8) {
                    logger.error("Error closing gdpr context repository", e8);
                }
                AggregatePartitionCommandBus.registerCommandBus(null);
                throw th;
            }
        } catch (Throwable th2) {
            logger.error("Unexpected error in AggregatePartition {} of {}Aggregate", new Object[]{this.id, this.runtime.getName(), th2});
            try {
                this.consumer.close(Duration.ofSeconds(5L));
                this.producer.close(Duration.ofSeconds(5L));
            } catch (KafkaException e9) {
                logger.error("Error closing consumer/producer", e9);
            } catch (InterruptException e10) {
            }
            try {
                this.stateRepository.close();
            } catch (IOException e11) {
                logger.error("Error closing state repository", e11);
            }
            try {
                this.gdprContextRepository.close();
            } catch (IOException e12) {
                logger.error("Error closing gdpr context repository", e12);
            }
            AggregatePartitionCommandBus.registerCommandBus(null);
        }
        logger.info("Finished Shutting down AggregatePartition {} of {}Aggregate", this.id, this.runtime.getName());
        this.shutdownLatch.countDown();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws InterruptedException {
        this.processState = AggregatePartitionState.SHUTTING_DOWN;
        try {
            if (this.shutdownLatch.await(10L, TimeUnit.SECONDS)) {
                logger.info("AggregatePartition={} has been shutdown", this.id);
            } else {
                logger.warn("AggregatePartition={} did not shutdown within 10 seconds", this.id);
            }
        } catch (InterruptedException e) {
        }
    }

    public void send(Command command) {
        if (Thread.currentThread() != this.aggregatePartitionThread) {
            throw new IllegalStateException("send() can only be called from the AggregatePartition thread");
        }
        CommandType<?> resolveType = this.ackesRegistry.resolveType(command.getClass());
        try {
            this.runtime.registerAndValidate(resolveType);
            if (resolveType != null) {
                String resolveTopic = this.ackesRegistry.resolveTopic(resolveType);
                CommandRecord commandRecord = new CommandRecord((String) null, resolveType.typeName(), resolveType.version(), this.runtime.serialize(command), PayloadEncoding.JSON, command.getAggregateId(), (String) null, (String) null);
                KafkaSender.send(this.producer, new ProducerRecord(resolveTopic, this.ackesRegistry.resolvePartition(command.getAggregateId()), commandRecord.id(), commandRecord));
            }
        } catch (Exception e) {
            logger.error("Problem registering command {}", resolveType.typeName(), e);
            throw new RuntimeException(e);
        }
    }

    private void send(ProtocolRecord protocolRecord) {
        if (protocolRecord instanceof AggregateStateRecord) {
            AggregateStateRecord aggregateStateRecord = (AggregateStateRecord) protocolRecord;
            logger.trace("Sending AggregateStateRecord with id {} to {}", aggregateStateRecord.aggregateId(), this.statePartition);
            this.stateRepository.prepare(aggregateStateRecord, KafkaSender.send(this.producer, new ProducerRecord(this.statePartition.topic(), Integer.valueOf(this.statePartition.partition()), aggregateStateRecord.aggregateId(), aggregateStateRecord)));
        } else if (protocolRecord instanceof DomainEventRecord) {
            DomainEventRecord domainEventRecord = (DomainEventRecord) protocolRecord;
            logger.trace("Sending DomainEventRecord {}:{} with id {} to {}", new Object[]{domainEventRecord.name(), Integer.valueOf(domainEventRecord.version()), domainEventRecord.id(), this.domainEventPartition});
            KafkaSender.send(this.producer, new ProducerRecord(this.domainEventPartition.topic(), Integer.valueOf(this.domainEventPartition.partition()), domainEventRecord.id(), domainEventRecord));
        } else if (protocolRecord instanceof GDPRKeyRecord) {
            GDPRKeyRecord gDPRKeyRecord = (GDPRKeyRecord) protocolRecord;
            logger.trace("Sending GDPRKeyRecord with id {} to {}", gDPRKeyRecord.aggregateId(), this.gdprKeyPartition);
            this.gdprContextRepository.prepare(gDPRKeyRecord, KafkaSender.send(this.producer, new ProducerRecord(this.gdprKeyPartition.topic(), Integer.valueOf(this.gdprKeyPartition.partition()), gDPRKeyRecord.aggregateId(), gDPRKeyRecord)));
        } else if (protocolRecord instanceof CommandRecord) {
            throw new IllegalArgumentException("send(ProtocolRecord) should not be used for CommandRecord type.\nUse send(commandRecord,commandPartition) instead");
        }
    }

    private void index(DomainEventRecord domainEventRecord, IndexParams indexParams) {
        String indexTopicName = KafkaUtils.getIndexTopicName(indexParams.indexName(), indexParams.indexKey());
        if (indexParams.createIndex() && this.consumer.partitionsFor(indexTopicName).isEmpty() && this.indexTopicCreator.apply(indexParams.indexName(), indexParams.indexKey()).booleanValue()) {
            logger.info("Creating DomainEventIndex topic {}", indexTopicName);
        }
        logger.trace("Indexing DomainEventRecord {}:{} with id {} to topic {}", new Object[]{domainEventRecord.name(), Integer.valueOf(domainEventRecord.version()), domainEventRecord.id(), indexTopicName + "-0"});
        KafkaSender.send(this.producer, new ProducerRecord(indexTopicName, 0, domainEventRecord.id(), domainEventRecord));
    }

    private void setupGDPRContext(String str, String str2, boolean z) {
        if (!this.gdprContextRepository.exists(str2) && z) {
            logger.trace("Generating GDPR key for aggregate {}", str2);
            send((ProtocolRecord) new GDPRKeyRecord(str, str2, GDPRKeyUtils.createKey().getEncoded()));
        }
        GDPRContextHolder.setCurrentGDPRContext(this.gdprContextRepository.get(str2));
    }

    private void tearDownGDPRContext() {
        GDPRContextHolder.resetCurrentGDPRContext();
    }

    private void handleCommand(CommandRecord commandRecord) {
        try {
            try {
                ArrayList arrayList = commandRecord.replyToTopicPartition() != null ? new ArrayList() : null;
                java.util.function.Consumer<ProtocolRecord> consumer = protocolRecord -> {
                    send(protocolRecord);
                    if (arrayList == null || !(protocolRecord instanceof DomainEventRecord)) {
                        return;
                    }
                    arrayList.add((DomainEventRecord) protocolRecord);
                };
                if (this.runtime.requiresGDPRContext(commandRecord)) {
                    setupGDPRContext(commandRecord.tenantId(), commandRecord.aggregateId(), this.runtime.shouldGenerateGDPRKey(commandRecord));
                }
                logger.trace("Handling CommandRecord with type {}", commandRecord.name());
                this.runtime.handleCommandRecord(commandRecord, consumer, this::index, () -> {
                    return this.stateRepository.get(commandRecord.aggregateId());
                });
                if (arrayList != null) {
                    CommandResponseRecord commandResponseRecord = new CommandResponseRecord(commandRecord.tenantId(), commandRecord.aggregateId(), commandRecord.correlationId(), commandRecord.id(), arrayList, GDPRContextHolder.getCurrentGDPRContext() != null ? GDPRContextHolder.getCurrentGDPRContext().getEncryptionKey() : null);
                    TopicPartition parseReplyToTopicPartition = PartitionUtils.parseReplyToTopicPartition(commandRecord.replyToTopicPartition());
                    logger.trace("Sending CommandResponseRecord with commandId {} to {}", commandResponseRecord.commandId(), parseReplyToTopicPartition);
                    KafkaSender.send(this.producer, new ProducerRecord(parseReplyToTopicPartition.topic(), Integer.valueOf(parseReplyToTopicPartition.partition()), commandResponseRecord.commandId(), commandResponseRecord));
                }
            } catch (IOException e) {
                logger.error("Error handling command", e);
                tearDownGDPRContext();
            }
        } finally {
            tearDownGDPRContext();
        }
    }

    private void handleExternalEvent(DomainEventRecord domainEventRecord) {
        try {
            logger.trace("Handling DomainEventRecord with type {} as External Event", domainEventRecord.name());
            if (this.runtime.requiresGDPRContext(domainEventRecord)) {
                setupGDPRContext(domainEventRecord.tenantId(), domainEventRecord.aggregateId(), this.runtime.shouldGenerateGDPRKey(domainEventRecord));
            }
            this.runtime.handleExternalDomainEventRecord(domainEventRecord, this::send, this::index, () -> {
                return this.stateRepository.get(domainEventRecord.aggregateId());
            }, this);
        } catch (IOException e) {
            logger.error("Error handling external event", e);
        } finally {
            tearDownGDPRContext();
        }
    }

    private void process() {
        try {
            if (this.processState == AggregatePartitionState.PROCESSING) {
                ConsumerRecords<String, ProtocolRecord> poll = this.consumer.poll(Duration.ofMillis(10L));
                if (!poll.isEmpty()) {
                    processRecords(poll);
                }
            } else if (this.processState == AggregatePartitionState.LOADING_GDPR_KEYS) {
                ConsumerRecords poll2 = this.consumer.poll(Duration.ofMillis(10L));
                this.gdprContextRepository.process(poll2.records(this.gdprKeyPartition));
                if (poll2.isEmpty() && this.initializedEndOffsets.getOrDefault(this.gdprKeyPartition, 0L).longValue() <= this.consumer.position(this.gdprKeyPartition)) {
                    if (this.initializedEndOffsets.getOrDefault(this.statePartition, 0L).longValue() == 0) {
                        commitInitialOffsetsIfNecessary();
                        logger.info("No state found in Kafka for AggregatePartition {} of {}Aggregate", this.id, this.runtime.getName());
                        this.consumer.resume(Stream.concat(Stream.of((Object[]) new TopicPartition[]{this.statePartition, this.commandPartition, this.domainEventPartition}), this.externalEventPartitions.stream()).toList());
                        this.processState = AggregatePartitionState.PROCESSING;
                    } else {
                        logger.info("Loading state for AggregatePartition {} of {}Aggregate", this.id, this.runtime.getName());
                        this.consumer.resume(Collections.singletonList(this.statePartition));
                        this.consumer.pause(Collections.singletonList(this.gdprKeyPartition));
                        this.processState = AggregatePartitionState.LOADING_STATE;
                    }
                }
            } else if (this.processState == AggregatePartitionState.LOADING_STATE) {
                ConsumerRecords poll3 = this.consumer.poll(Duration.ofMillis(10L));
                this.stateRepository.process(poll3.records(this.statePartition));
                if (poll3.isEmpty() && this.initializedEndOffsets.getOrDefault(this.statePartition, 0L).longValue() <= this.consumer.position(this.statePartition)) {
                    this.consumer.resume(Stream.concat(Stream.concat(Stream.of((Object[]) new TopicPartition[]{this.commandPartition, this.domainEventPartition}), this.runtime.shouldHandlePIIData() ? Stream.of(this.gdprKeyPartition) : Stream.empty()), this.externalEventPartitions.stream()).toList());
                    this.processState = AggregatePartitionState.PROCESSING;
                }
            } else if (this.processState == AggregatePartitionState.INITIALIZING) {
                Logger logger2 = logger;
                Object[] objArr = new Object[3];
                objArr[0] = this.id;
                objArr[1] = this.runtime.getName();
                objArr[2] = this.runtime.shouldHandlePIIData() ? "Handle PII Data" : "Not Handle PII Data";
                logger2.info("Initializing AggregatePartition {} of {}Aggregate. Will {}", objArr);
                long offset = this.stateRepository.getOffset();
                if (offset >= 0) {
                    logger.info("Resuming State from offset {} for AggregatePartition {} of {}Aggregate", new Object[]{Long.valueOf(offset), this.id, this.runtime.getName()});
                    this.consumer.seek(this.statePartition, this.stateRepository.getOffset() + 1);
                } else {
                    this.consumer.seekToBeginning(Collections.singletonList(this.statePartition));
                }
                if (this.runtime.shouldHandlePIIData()) {
                    long offset2 = this.gdprContextRepository.getOffset();
                    if (offset2 >= 0) {
                        logger.info("Resuming GDPRKeys from offset {} for AggregatePartition {} of {}Aggregate", new Object[]{Long.valueOf(offset2), this.id, this.runtime.getName()});
                        this.consumer.seek(this.gdprKeyPartition, this.gdprContextRepository.getOffset() + 1);
                    } else {
                        this.consumer.seekToBeginning(Collections.singletonList(this.gdprKeyPartition));
                    }
                    this.initializedEndOffsets = this.consumer.endOffsets(List.of(this.gdprKeyPartition, this.statePartition));
                    logger.info("Loading GDPR Keys for AggregatePartition {} of {}Aggregate", this.id, this.runtime.getName());
                    this.consumer.pause(Stream.concat(Stream.of((Object[]) new TopicPartition[]{this.statePartition, this.commandPartition, this.domainEventPartition}), this.externalEventPartitions.stream()).toList());
                    this.processState = AggregatePartitionState.LOADING_GDPR_KEYS;
                } else {
                    this.initializedEndOffsets = this.consumer.endOffsets(List.of(this.statePartition));
                    if (this.initializedEndOffsets.getOrDefault(this.statePartition, 0L).longValue() == 0) {
                        commitInitialOffsetsIfNecessary();
                        logger.info("No state found in Kafka for AggregatePartition {} of {}Aggregate", this.id, this.runtime.getName());
                        this.consumer.resume(Stream.concat(Stream.of((Object[]) new TopicPartition[]{this.statePartition, this.commandPartition, this.domainEventPartition}), this.externalEventPartitions.stream()).toList());
                        this.processState = AggregatePartitionState.PROCESSING;
                    } else {
                        logger.info("Loading state for AggregatePartition {} of {}Aggregate", this.id, this.runtime.getName());
                        this.consumer.resume(Collections.singletonList(this.statePartition));
                        this.processState = AggregatePartitionState.LOADING_STATE;
                    }
                }
            }
        } catch (KafkaException e) {
            logger.error("Fatal error during " + String.valueOf(this.processState) + " phase, shutting down AggregatePartition " + this.id + " of " + this.runtime.getName() + "Aggregate", e);
            this.processState = AggregatePartitionState.SHUTTING_DOWN;
        } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e2) {
            logger.error("Fatal error during " + String.valueOf(this.processState) + " phase, shutting down AggregatePartition " + this.id + " of " + this.runtime.getName() + "Aggregate", e2);
            this.processState = AggregatePartitionState.SHUTTING_DOWN;
        } catch (WakeupException | InterruptException e3) {
        }
    }

    private void commitInitialOffsetsIfNecessary() {
        if ("latest".equals((String) Optional.ofNullable(this.consumerFactory.getConfigurationProperties().get("auto.offset.reset")).orElse("latest"))) {
            List list = Stream.concat(Stream.of((Object[]) new TopicPartition[]{this.commandPartition, this.domainEventPartition, this.statePartition}), this.externalEventPartitions.stream()).toList();
            Map beginningOffsets = this.consumer.beginningOffsets(list);
            Map committed = this.consumer.committed(new HashSet(list));
            HashMap hashMap = new HashMap();
            committed.forEach((topicPartition, offsetAndMetadata) -> {
                if (offsetAndMetadata == null) {
                    logger.info("TopicPartition[{}] has no committed offsets, will commit offset {} to avoid skipping records", topicPartition, beginningOffsets.getOrDefault(topicPartition, 0L));
                    hashMap.put(topicPartition, new OffsetAndMetadata(((Long) beginningOffsets.getOrDefault(topicPartition, 0L)).longValue()));
                }
            });
            if (hashMap.isEmpty()) {
                return;
            }
            this.producer.beginTransaction();
            this.producer.sendOffsetsToTransaction(hashMap, this.consumer.groupMetadata());
            this.producer.commitTransaction();
        }
    }

    private void processRecords(ConsumerRecords<String, ProtocolRecord> consumerRecords) {
        try {
            if (logger.isTraceEnabled()) {
                logger.trace("Processing {} records in a single transaction", Integer.valueOf(consumerRecords.count()));
                logger.trace("Processing {} gdpr key records", Integer.valueOf(consumerRecords.records(this.gdprKeyPartition).size()));
                logger.trace("Processing {} command records", Integer.valueOf(consumerRecords.records(this.commandPartition).size()));
                if (!this.externalEventPartitions.isEmpty()) {
                    logger.trace("Processing {} external event records", Integer.valueOf(this.externalEventPartitions.stream().map(topicPartition -> {
                        return Integer.valueOf(consumerRecords.records(topicPartition).size());
                    }).mapToInt((v0) -> {
                        return v0.intValue();
                    }).sum()));
                }
                logger.trace("Processing {} state records", Integer.valueOf(consumerRecords.records(this.statePartition).size()));
                logger.trace("Processing {} internal event records", Integer.valueOf(consumerRecords.records(this.domainEventPartition).size()));
            }
            this.producer.beginTransaction();
            HashMap hashMap = new HashMap();
            List records = consumerRecords.records(this.gdprKeyPartition);
            if (!records.isEmpty()) {
                this.gdprContextRepository.process(records);
                hashMap.put(this.gdprKeyPartition, Long.valueOf(((ConsumerRecord) records.getLast()).offset()));
            }
            this.externalEventPartitions.forEach(topicPartition2 -> {
                consumerRecords.records(topicPartition2).forEach(consumerRecord -> {
                    handleExternalEvent((DomainEventRecord) consumerRecord.value());
                    hashMap.put(topicPartition2, Long.valueOf(consumerRecord.offset()));
                });
            });
            consumerRecords.records(this.commandPartition).forEach(consumerRecord -> {
                handleCommand((CommandRecord) consumerRecord.value());
                hashMap.put(this.commandPartition, Long.valueOf(consumerRecord.offset()));
            });
            List<ConsumerRecord<String, ProtocolRecord>> records2 = consumerRecords.records(this.statePartition);
            if (!records2.isEmpty()) {
                this.stateRepository.process(records2);
                hashMap.put(this.statePartition, Long.valueOf(((ConsumerRecord) records2.getLast()).offset()));
            }
            consumerRecords.records(this.domainEventPartition).forEach(consumerRecord2 -> {
                hashMap.put(this.domainEventPartition, Long.valueOf(consumerRecord2.offset()));
            });
            this.producer.sendOffsetsToTransaction((Map) hashMap.entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry -> {
                return new OffsetAndMetadata(((Long) entry.getValue()).longValue() + 1);
            })), this.consumer.groupMetadata());
            this.producer.commitTransaction();
            this.stateRepository.commit();
            this.gdprContextRepository.commit();
        } catch (InvalidProducerEpochException e) {
            this.producer.abortTransaction();
            rollbackConsumer(consumerRecords);
            this.stateRepository.rollback();
            this.gdprContextRepository.rollback();
        }
    }

    private void rollbackConsumer(ConsumerRecords<String, ProtocolRecord> consumerRecords) {
        consumerRecords.partitions().forEach(topicPartition -> {
            consumerRecords.records(topicPartition).stream().map((v0) -> {
                return v0.offset();
            }).min((v0, v1) -> {
                return v0.compareTo(v1);
            }).ifPresent(l -> {
                this.consumer.seek(topicPartition, l.longValue());
            });
        });
    }

    public boolean isProcessing() {
        return this.processState == AggregatePartitionState.PROCESSING;
    }
}
