package org.elasticsoftware.akces.state;

import com.google.common.base.Charsets;
import com.google.common.primitives.Longs;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Future;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.elasticsoftware.akces.kafka.RecordAndMetadata;
import org.elasticsoftware.akces.protocol.AggregateStateRecord;
import org.elasticsoftware.akces.protocol.ProtocolRecord;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Transaction;
import org.rocksdb.TransactionDB;
import org.rocksdb.TransactionDBOptions;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/elasticsoftware/akces/state/RocksDBAggregateStateRepository.class */
public class RocksDBAggregateStateRepository implements AggregateStateRepository {
    private static final Logger log = LoggerFactory.getLogger(RocksDBAggregateStateRepository.class);
    private static final byte[] OFFSET = {79, 70, 70, 83, 69, 84};
    private final TransactionDB db;
    private final File rocksDBDataDir;
    private final String topicName;
    private final Serializer<ProtocolRecord> serializer;
    private final Deserializer<ProtocolRecord> deserializer;
    private final Map<String, RecordAndMetadata<AggregateStateRecord>> transactionStateRecordMap = new HashMap();
    private boolean aggregateIdIsUUID = false;
    private boolean aggregateIdTypeCheckDone = false;
    private long lastOffset = -1;

    public RocksDBAggregateStateRepository(String str, String str2, String str3, Serializer<ProtocolRecord> serializer, Deserializer<ProtocolRecord> deserializer) {
        this.topicName = str3;
        this.serializer = serializer;
        this.deserializer = deserializer;
        RocksDB.loadLibrary();
        Options options = new Options();
        TransactionDBOptions transactionDBOptions = new TransactionDBOptions();
        options.setCreateIfMissing(true);
        this.rocksDBDataDir = new File(str, str2);
        try {
            Files.createDirectories(this.rocksDBDataDir.getParentFile().toPath(), new FileAttribute[0]);
            Files.createDirectories(this.rocksDBDataDir.getAbsoluteFile().toPath(), new FileAttribute[0]);
            this.db = TransactionDB.open(options, transactionDBOptions, this.rocksDBDataDir.getAbsolutePath());
            initializeOffset();
            log.info("RocksDB for partition {} initialized in folder {}", str2, this.rocksDBDataDir.getAbsolutePath());
        } catch (IOException | RocksDBException e) {
            throw new AggregateStateRepositoryException("Error initializing RocksDB", e);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        try {
            this.db.syncWal();
        } catch (RocksDBException e) {
            log.error("Error syncing WAL. Exception: '{}', message: '{}'", new Object[]{e.getCause(), e.getMessage(), e});
        }
        this.db.close();
    }

    private void initializeOffset() {
        try {
            byte[] bArr = this.db.get(OFFSET);
            if (bArr != null) {
                this.lastOffset = Longs.fromByteArray(bArr);
            }
        } catch (RocksDBException e) {
            throw new AggregateStateRepositoryException("Error initializing offset", e);
        }
    }

    private void updateOffset(long j) {
        this.lastOffset = j;
        log.trace("Updated offset to {}", Long.valueOf(j));
    }

    @Override // org.elasticsoftware.akces.state.AggregateStateRepository
    public long getOffset() {
        return this.lastOffset;
    }

    @Override // org.elasticsoftware.akces.state.AggregateStateRepository
    public void prepare(AggregateStateRecord aggregateStateRecord, Future<RecordMetadata> future) {
        checkAggregateIdType(aggregateStateRecord.aggregateId());
        this.transactionStateRecordMap.put(aggregateStateRecord.aggregateId(), new RecordAndMetadata<>(aggregateStateRecord, future));
    }

    @Override // org.elasticsoftware.akces.state.AggregateStateRepository
    public void commit() {
        if (this.transactionStateRecordMap.isEmpty()) {
            return;
        }
        Transaction beginTransaction = this.db.beginTransaction(new WriteOptions());
        try {
            try {
                for (RecordAndMetadata<AggregateStateRecord> recordAndMetadata : this.transactionStateRecordMap.values()) {
                    beginTransaction.put(keyBytes(recordAndMetadata.record().aggregateId()), this.serializer.serialize(this.topicName, recordAndMetadata.record()));
                }
                long longValue = ((Long) this.transactionStateRecordMap.values().stream().map((v0) -> {
                    return v0.metadata();
                }).map(future -> {
                    try {
                        return (RecordMetadata) future.get();
                    } catch (Exception e) {
                        log.error("Error getting offset. Exception: '{}', message: '{}'", new Object[]{e.getCause(), e.getMessage(), e});
                        return null;
                    }
                }).map(recordMetadata -> {
                    return Long.valueOf(recordMetadata != null ? recordMetadata.offset() : -1L);
                }).max((v0, v1) -> {
                    return v0.compareTo(v1);
                }).orElse(-1L)).longValue();
                beginTransaction.put(OFFSET, Longs.toByteArray(longValue));
                beginTransaction.commit();
                beginTransaction.close();
                updateOffset(longValue);
                this.transactionStateRecordMap.clear();
            } catch (RocksDBException e) {
                throw new AggregateStateRepositoryException("Error committing records", e);
            }
        } catch (Throwable th) {
            this.transactionStateRecordMap.clear();
            throw th;
        }
    }

    @Override // org.elasticsoftware.akces.state.AggregateStateRepository
    public void rollback() {
        this.transactionStateRecordMap.clear();
    }

    @Override // org.elasticsoftware.akces.state.AggregateStateRepository
    public void process(List<ConsumerRecord<String, ProtocolRecord>> list) {
        long longValue = ((Long) list.stream().map((v0) -> {
            return v0.offset();
        }).max((v0, v1) -> {
            return v0.compareTo(v1);
        }).orElse(-1L)).longValue();
        if (longValue > this.lastOffset) {
            Transaction beginTransaction = this.db.beginTransaction(new WriteOptions());
            try {
                for (ConsumerRecord<String, ProtocolRecord> consumerRecord : list) {
                    beginTransaction.put(keyBytes((String) consumerRecord.key()), this.serializer.serialize(this.topicName, (ProtocolRecord) consumerRecord.value()));
                }
                beginTransaction.put(OFFSET, Longs.toByteArray(longValue));
                beginTransaction.commit();
                beginTransaction.close();
                updateOffset(longValue);
            } catch (RocksDBException e) {
                throw new AggregateStateRepositoryException("Error processing records", e);
            }
        }
    }

    @Override // org.elasticsoftware.akces.state.AggregateStateRepository
    public AggregateStateRecord get(String str) {
        checkAggregateIdType(str);
        if (this.transactionStateRecordMap.containsKey(str)) {
            return this.transactionStateRecordMap.get(str).record();
        }
        byte[] keyBytes = keyBytes(str);
        if (!this.db.keyExists(keyBytes)) {
            return null;
        }
        try {
            return (AggregateStateRecord) this.deserializer.deserialize(this.topicName, this.db.get(keyBytes));
        } catch (RocksDBException | SerializationException e) {
            throw new AggregateStateRepositoryException("Problem reading record with aggregateId " + str, e);
        }
    }

    private byte[] keyBytes(String str) {
        checkAggregateIdType(str);
        if (!this.aggregateIdIsUUID) {
            return str.getBytes(Charsets.UTF_8);
        }
        UUID fromString = UUID.fromString(str);
        return ByteBuffer.wrap(new byte[16]).putLong(fromString.getMostSignificantBits()).putLong(fromString.getLeastSignificantBits()).array();
    }

    private void checkAggregateIdType(String str) {
        if (this.aggregateIdTypeCheckDone) {
            return;
        }
        try {
            UUID.fromString(str);
            this.aggregateIdIsUUID = true;
        } catch (IllegalArgumentException e) {
            this.aggregateIdIsUUID = false;
        }
        this.aggregateIdTypeCheckDone = true;
    }
}
