package org.enodeframework.mysql;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.jdbc.JDBCClient;
import io.vertx.ext.sql.ResultSet;
import io.vertx.ext.sql.SQLClient;
import io.vertx.ext.sql.SQLConnection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.enodeframework.ObjectContainer;
import org.enodeframework.common.exception.ENodeRuntimeException;
import org.enodeframework.common.exception.IORuntimeException;
import org.enodeframework.common.function.Action2;
import org.enodeframework.common.io.IOHelper;
import org.enodeframework.common.serializing.JsonTool;
import org.enodeframework.common.utilities.Ensure;
import org.enodeframework.configurations.DataSourceKey;
import org.enodeframework.configurations.DefaultDBConfigurationSetting;
import org.enodeframework.configurations.OptionSetting;
import org.enodeframework.eventing.DomainEventStream;
import org.enodeframework.eventing.EventAppendResult;
import org.enodeframework.eventing.EventAppendStatus;
import org.enodeframework.eventing.IDomainEvent;
import org.enodeframework.eventing.IEventSerializer;
import org.enodeframework.eventing.IEventStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:org/enodeframework/mysql/MysqlEventStore.class */
public class MysqlEventStore implements IEventStore {
    private static final String EVENT_TABLE_NAME_FORMAT = "%s_%s";
    private final String tableName;
    private final int tableCount;
    private final String versionIndexName;
    private final String commandIndexName;
    private final int bulkCopyBatchSize;
    private final int bulkCopyTimeout;
    private final SQLClient sqlClient;
    private final String INSERT_EVENT_SQL = "INSERT INTO %s(AggregateRootId,AggregateRootTypeName,CommandId,Version,CreatedOn,Events) VALUES(?,?,?,?,?,?)";

    @Autowired
    private IEventSerializer eventSerializer;
    private static final Logger logger = LoggerFactory.getLogger(MysqlEventStore.class);
    private static final Pattern pattern = Pattern.compile("^Duplicate entry '(.*)-(.*)' for key 'IX_EventStream_AggId_CommandId'");

    public MysqlEventStore(DataSource dataSource, OptionSetting optionSetting) {
        Ensure.notNull(dataSource, "ds");
        if (optionSetting != null) {
            this.tableName = optionSetting.getOptionValue(DataSourceKey.EVENT_TABLE_NAME);
            this.tableCount = Integer.parseInt(optionSetting.getOptionValue(DataSourceKey.EVENT_TABLE_COUNT));
            this.versionIndexName = optionSetting.getOptionValue(DataSourceKey.EVENT_TABLE_VERSION_UNIQUE_INDEX_NAME);
            this.commandIndexName = optionSetting.getOptionValue(DataSourceKey.COMMAND_TABLE_COMMANDID_UNIQUE_INDEX_NAME);
            this.bulkCopyBatchSize = Integer.parseInt(optionSetting.getOptionValue(DataSourceKey.EVENT_TABLE_BULKCOPY_BATCHSIZE));
            this.bulkCopyTimeout = Integer.parseInt(optionSetting.getOptionValue(DataSourceKey.EVENT_TABLE_BULKCOPY_TIMEOUT));
        } else {
            DefaultDBConfigurationSetting defaultDBConfigurationSetting = new DefaultDBConfigurationSetting();
            this.tableName = defaultDBConfigurationSetting.getEventTableName();
            this.tableCount = defaultDBConfigurationSetting.getEventTableCount();
            this.versionIndexName = defaultDBConfigurationSetting.getEventTableVersionUniqueIndexName();
            this.commandIndexName = defaultDBConfigurationSetting.getEventTableCommandIdUniqueIndexName();
            this.bulkCopyBatchSize = defaultDBConfigurationSetting.getEventTableBulkCopyBatchSize();
            this.bulkCopyTimeout = defaultDBConfigurationSetting.getEventTableBulkCopyTimeout();
        }
        Ensure.notNull(this.tableName, "tableName");
        Ensure.notNull(this.versionIndexName, "eventIndexName");
        Ensure.notNull(this.commandIndexName, "commandIndexName");
        Ensure.positive(this.bulkCopyBatchSize, "bulkCopyBatchSize");
        Ensure.positive(this.bulkCopyTimeout, "bulkCopyTimeout");
        this.sqlClient = JDBCClient.create(ObjectContainer.vertx, dataSource);
    }

    public CompletableFuture<EventAppendResult> batchAppendAsync(List<DomainEventStream> list) {
        CompletableFuture<EventAppendResult> completableFuture = new CompletableFuture<>();
        EventAppendResult eventAppendResult = new EventAppendResult();
        if (list.size() == 0) {
            completableFuture.complete(eventAppendResult);
            return completableFuture;
        }
        Map map = (Map) list.stream().distinct().collect(Collectors.groupingBy((v0) -> {
            return v0.getAggregateRootId();
        }));
        BatchAggregateEventAppendResult batchAggregateEventAppendResult = new BatchAggregateEventAppendResult(map.keySet().size());
        for (Map.Entry entry : map.entrySet()) {
            batchAppendAggregateEventsAsync((String) entry.getKey(), (List) entry.getValue(), batchAggregateEventAppendResult, 0);
        }
        return batchAggregateEventAppendResult.taskCompletionSource;
    }

    private void batchAppendAggregateEventsAsync(String str, List<DomainEventStream> list, BatchAggregateEventAppendResult batchAggregateEventAppendResult, int i) {
        IOHelper.tryAsyncActionRecursively("BatchAppendAggregateEventsAsync", () -> {
            return batchAppendAggregateEventsAsync(str, list);
        }, aggregateEventAppendResult -> {
            batchAggregateEventAppendResult.addCompleteAggregate(str, aggregateEventAppendResult);
        }, () -> {
            return String.format("[aggregateRootId: %s, eventStreamCount: %s]", str, Integer.valueOf(list.size()));
        }, (Action2) null, i, true);
    }

    private CompletableFuture<Void> tryFindEventByCommandIdAsync(String str, String str2, List<String> list, int i) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        IOHelper.tryAsyncActionRecursively("TryFindEventByCommandIdAsync", () -> {
            return findAsync(str, str2);
        }, domainEventStream -> {
            if (domainEventStream != null) {
                list.add(domainEventStream.getCommandId());
            }
            completableFuture.complete(null);
        }, () -> {
            return String.format("[aggregateRootId:%s, commandId:%s]", str, str2);
        }, (Action2) null, i, true);
        return completableFuture;
    }

    private CompletableFuture<AggregateEventAppendResult> batchAppendAggregateEventsAsync(String str, List<DomainEventStream> list) {
        CompletableFuture completableFuture = new CompletableFuture();
        String format = String.format("INSERT INTO %s(AggregateRootId,AggregateRootTypeName,CommandId,Version,CreatedOn,Events) VALUES(?,?,?,?,?,?)", getTableName(str));
        ArrayList newArrayList = Lists.newArrayList();
        for (DomainEventStream domainEventStream : list) {
            JsonArray jsonArray = new JsonArray();
            jsonArray.add(domainEventStream.getAggregateRootId());
            jsonArray.add(domainEventStream.getAggregateRootTypeName());
            jsonArray.add(domainEventStream.getCommandId());
            jsonArray.add(Integer.valueOf(domainEventStream.getVersion()));
            jsonArray.add(domainEventStream.getTimestamp().toInstant());
            jsonArray.add(JsonTool.serialize(this.eventSerializer.serialize(domainEventStream.events())));
            newArrayList.add(jsonArray);
        }
        batchWithParams(format, newArrayList, asyncResult -> {
            if (!asyncResult.succeeded()) {
                completableFuture.completeExceptionally(asyncResult.cause());
                return;
            }
            AggregateEventAppendResult aggregateEventAppendResult = new AggregateEventAppendResult();
            aggregateEventAppendResult.setEventAppendStatus(EventAppendStatus.Success);
            completableFuture.complete(aggregateEventAppendResult);
        });
        return completableFuture.exceptionally(th -> {
            if (!(th instanceof SQLException)) {
                logger.error("Batch append event has unknown exception.", th);
                throw new ENodeRuntimeException(th);
            }
            SQLException sQLException = (SQLException) th;
            if (sQLException.getErrorCode() == 1062 && sQLException.getMessage().contains(this.versionIndexName)) {
                AggregateEventAppendResult aggregateEventAppendResult = new AggregateEventAppendResult();
                aggregateEventAppendResult.setEventAppendStatus(EventAppendStatus.DuplicateEvent);
                return aggregateEventAppendResult;
            }
            if (sQLException.getErrorCode() != 1062 || !sQLException.getMessage().contains(this.commandIndexName)) {
                logger.error("Batch append event has sql exception.", th);
                throw new IORuntimeException(th);
            }
            AggregateEventAppendResult aggregateEventAppendResult2 = new AggregateEventAppendResult();
            aggregateEventAppendResult2.setEventAppendStatus(EventAppendStatus.DuplicateCommand);
            aggregateEventAppendResult2.setDuplicateCommandIds(Lists.newArrayList(new String[]{parseCommandIdInException(sQLException.getMessage())}));
            return aggregateEventAppendResult2;
        });
    }

    private String parseCommandIdInException(String str) {
        Matcher matcher = pattern.matcher(str);
        return (!matcher.find() || matcher.groupCount() < 2) ? "" : matcher.group(2);
    }

    public CompletableFuture<List<DomainEventStream>> queryAggregateEventsAsync(String str, String str2, int i, int i2) {
        return IOHelper.tryIOFuncAsync(() -> {
            CompletableFuture completableFuture = new CompletableFuture();
            String format = String.format("SELECT * FROM `%s` WHERE AggregateRootId = ? AND Version >= ? AND Version <= ? ORDER BY Version", getTableName(str));
            JsonArray jsonArray = new JsonArray();
            jsonArray.add(str);
            jsonArray.add(Integer.valueOf(i));
            jsonArray.add(Integer.valueOf(i2));
            this.sqlClient.queryWithParams(format, jsonArray, asyncResult -> {
                if (!asyncResult.succeeded()) {
                    completableFuture.completeExceptionally(asyncResult.cause());
                    return;
                }
                ArrayList newArrayList = Lists.newArrayList();
                ((ResultSet) asyncResult.result()).getRows().forEach(jsonObject -> {
                    newArrayList.add(jsonObject.mapTo(StreamRecord.class));
                });
                completableFuture.complete((List) newArrayList.stream().map(this::convertFrom).collect(Collectors.toList()));
            });
            return completableFuture.exceptionally(th -> {
                if (!(th instanceof SQLException)) {
                    logger.error("Failed to query aggregate events async, aggregateRootId: {}, aggregateRootType: {}", new Object[]{str, str2, th});
                    throw new ENodeRuntimeException(th);
                }
                logger.error(String.format("Failed to query aggregate events async, aggregateRootId: %s, aggregateRootType: %s", str, str2), (SQLException) th);
                throw new IORuntimeException(th);
            });
        }, "QueryAggregateEventsAsync");
    }

    public CompletableFuture<DomainEventStream> findAsync(String str, int i) {
        return IOHelper.tryIOFuncAsync(() -> {
            CompletableFuture completableFuture = new CompletableFuture();
            String format = String.format("select * from `%s` where AggregateRootId=? and Version=?", getTableName(str));
            JsonArray jsonArray = new JsonArray();
            jsonArray.add(str);
            jsonArray.add(Integer.valueOf(i));
            this.sqlClient.queryWithParams(format, jsonArray, asyncResult -> {
                if (!asyncResult.succeeded()) {
                    completableFuture.completeExceptionally(asyncResult.cause());
                    return;
                }
                DomainEventStream domainEventStream = null;
                Optional findFirst = ((ResultSet) asyncResult.result()).getRows().stream().findFirst();
                if (findFirst.isPresent()) {
                    domainEventStream = convertFrom((StreamRecord) ((JsonObject) findFirst.get()).mapTo(StreamRecord.class));
                }
                completableFuture.complete(domainEventStream);
            });
            return completableFuture.exceptionally(th -> {
                if (th instanceof SQLException) {
                    logger.error("Find event by version has sql exception, aggregateRootId: {}, version: {}", new Object[]{str, Integer.valueOf(i), (SQLException) th});
                    throw new IORuntimeException(th);
                }
                logger.error("Find event by version has unknown exception, aggregateRootId: {}, version: {}", new Object[]{str, Integer.valueOf(i), th});
                throw new ENodeRuntimeException(th);
            });
        }, "FindEventByVersionAsync");
    }

    public CompletableFuture<DomainEventStream> findAsync(String str, String str2) {
        return IOHelper.tryIOFuncAsync(() -> {
            CompletableFuture completableFuture = new CompletableFuture();
            String format = String.format("select * from `%s` where AggregateRootId=? and CommandId=?", getTableName(str));
            JsonArray jsonArray = new JsonArray();
            jsonArray.add(str);
            jsonArray.add(str2);
            this.sqlClient.queryWithParams(format, jsonArray, asyncResult -> {
                if (!asyncResult.succeeded()) {
                    completableFuture.completeExceptionally(asyncResult.cause());
                    return;
                }
                DomainEventStream domainEventStream = null;
                Optional findFirst = ((ResultSet) asyncResult.result()).getRows().stream().findFirst();
                if (findFirst.isPresent()) {
                    domainEventStream = convertFrom((StreamRecord) ((JsonObject) findFirst.get()).mapTo(StreamRecord.class));
                }
                completableFuture.complete(domainEventStream);
            });
            return completableFuture.exceptionally(th -> {
                if (th instanceof SQLException) {
                    logger.error("Find event by commandId has sql exception, aggregateRootId: {}, commandId: {}", new Object[]{str, str2, (SQLException) th});
                    throw new IORuntimeException(th);
                }
                logger.error("Find event by commandId has unknown exception, aggregateRootId: {}, commandId: {}", new Object[]{str, str2, th});
                throw new ENodeRuntimeException(th);
            });
        }, "FindEventByCommandIdAsync");
    }

    private int getTableIndex(String str) {
        int hashCode = str.hashCode();
        if (hashCode < 0) {
            hashCode = Math.abs(hashCode);
        }
        return hashCode % this.tableCount;
    }

    private String getTableName(String str) {
        return this.tableCount <= 1 ? this.tableName : String.format(EVENT_TABLE_NAME_FORMAT, this.tableName, Integer.valueOf(getTableIndex(str)));
    }

    private DomainEventStream convertFrom(StreamRecord streamRecord) {
        return new DomainEventStream(streamRecord.CommandId, streamRecord.AggregateRootId, streamRecord.AggregateRootTypeName, streamRecord.CreatedOn, this.eventSerializer.deserialize((Map) JsonTool.deserialize(streamRecord.Events, Map.class), IDomainEvent.class), Maps.newHashMap());
    }

    private SQLClient batchWithParams(String str, List<JsonArray> list, Handler<AsyncResult<List<Integer>>> handler) {
        this.sqlClient.getConnection(asyncResult -> {
            if (asyncResult.failed()) {
                handler.handle(Future.failedFuture(asyncResult.cause()));
            } else {
                SQLConnection sQLConnection = (SQLConnection) asyncResult.result();
                sQLConnection.setAutoCommit(false, asyncResult -> {
                    if (!asyncResult.failed()) {
                        sQLConnection.batchWithParams(str, list, asyncResult -> {
                            if (asyncResult.succeeded()) {
                                sQLConnection.commit(asyncResult -> {
                                    if (asyncResult.succeeded()) {
                                        handler.handle(Future.succeededFuture(asyncResult.result()));
                                    } else {
                                        handler.handle(Future.failedFuture(asyncResult.cause()));
                                    }
                                    resetAutoCommitAndCloseConnection(sQLConnection);
                                });
                            } else {
                                sQLConnection.rollback(asyncResult2 -> {
                                    if (asyncResult2.succeeded()) {
                                        handler.handle(Future.failedFuture(asyncResult.cause()));
                                    } else {
                                        handler.handle(Future.failedFuture(asyncResult2.cause()));
                                    }
                                    resetAutoCommitAndCloseConnection(sQLConnection);
                                });
                            }
                        });
                    } else {
                        handler.handle(Future.failedFuture(asyncResult.cause()));
                        resetAutoCommitAndCloseConnection(sQLConnection);
                    }
                });
            }
        });
        return this.sqlClient;
    }

    private void resetAutoCommitAndCloseConnection(SQLConnection sQLConnection) {
        sQLConnection.setAutoCommit(true, asyncResult -> {
            if (asyncResult.failed()) {
                logger.error("mysql driver set autocommit true failed", asyncResult.cause());
            }
            sQLConnection.close();
        });
    }
}
