package org.elasticsoftware.akces.state;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.elasticsoftware.akces.kafka.RecordAndMetadata;
import org.elasticsoftware.akces.protocol.AggregateStateRecord;
import org.elasticsoftware.akces.protocol.ProtocolRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/elasticsoftware/akces/state/InMemoryAggregateStateRepository.class */
public class InMemoryAggregateStateRepository implements AggregateStateRepository {
    private static final Logger log = LoggerFactory.getLogger(InMemoryAggregateStateRepository.class);
    private final Map<String, AggregateStateRecord> stateRecordMap = new HashMap();
    private final Map<String, RecordAndMetadata<AggregateStateRecord>> transactionStateRecordMap = new HashMap();
    private long offset = -1;

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.stateRecordMap.clear();
        this.transactionStateRecordMap.clear();
    }

    @Override // org.elasticsoftware.akces.state.AggregateStateRepository
    public void prepare(AggregateStateRecord aggregateStateRecord, Future<RecordMetadata> future) {
        this.transactionStateRecordMap.put(aggregateStateRecord.aggregateId(), new RecordAndMetadata<>(aggregateStateRecord, future));
    }

    @Override // org.elasticsoftware.akces.state.AggregateStateRepository
    public void commit() {
        if (this.transactionStateRecordMap.isEmpty()) {
            return;
        }
        this.offset = ((Long) this.transactionStateRecordMap.values().stream().map((v0) -> {
            return v0.metadata();
        }).map(future -> {
            try {
                return (RecordMetadata) future.get();
            } catch (Exception e) {
                log.error("Error getting offset. Exception: '{}', message: '{}'", new Object[]{e.getCause(), e.getMessage(), e});
                return null;
            }
        }).map(recordMetadata -> {
            return Long.valueOf(recordMetadata != null ? recordMetadata.offset() : -1L);
        }).max((v0, v1) -> {
            return v0.compareTo(v1);
        }).orElse(-1L)).longValue();
        log.trace("Committing {} records and offset {}", Integer.valueOf(this.transactionStateRecordMap.size()), Long.valueOf(this.offset));
        this.transactionStateRecordMap.values().forEach(recordAndMetadata -> {
            this.stateRecordMap.put(recordAndMetadata.record().aggregateId(), (AggregateStateRecord) recordAndMetadata.record());
        });
        this.transactionStateRecordMap.clear();
    }

    @Override // org.elasticsoftware.akces.state.AggregateStateRepository
    public void rollback() {
        this.transactionStateRecordMap.clear();
    }

    @Override // org.elasticsoftware.akces.state.AggregateStateRepository
    public void process(List<ConsumerRecord<String, ProtocolRecord>> list) {
        for (ConsumerRecord<String, ProtocolRecord> consumerRecord : list) {
            AggregateStateRecord aggregateStateRecord = (AggregateStateRecord) consumerRecord.value();
            if (aggregateStateRecord != null) {
                this.stateRecordMap.put(aggregateStateRecord.aggregateId(), aggregateStateRecord);
            } else {
                this.stateRecordMap.remove(consumerRecord.key());
            }
            this.offset = consumerRecord.offset();
        }
    }

    @Override // org.elasticsoftware.akces.state.AggregateStateRepository
    public AggregateStateRecord get(String str) {
        return this.transactionStateRecordMap.containsKey(str) ? this.transactionStateRecordMap.get(str).record() : this.stateRecordMap.get(str);
    }

    @Override // org.elasticsoftware.akces.state.AggregateStateRepository
    public long getOffset() {
        return this.offset;
    }
}
