package io.github.thunderz99.cosmos.impl.mongo;

import com.azure.cosmos.CosmosContainer;
import com.azure.cosmos.implementation.guava25.collect.Lists;
import com.azure.cosmos.models.CosmosBatch;
import com.azure.cosmos.models.CosmosBatchOperationResult;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.google.common.base.Preconditions;
import com.mongodb.MongoException;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.ClientSession;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.FindOneAndReplaceOptions;
import com.mongodb.client.model.FindOneAndUpdateOptions;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.ReturnDocument;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.Updates;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.InsertOneResult;
import io.github.thunderz99.cosmos.Cosmos;
import io.github.thunderz99.cosmos.CosmosDatabase;
import io.github.thunderz99.cosmos.CosmosDocument;
import io.github.thunderz99.cosmos.CosmosDocumentList;
import io.github.thunderz99.cosmos.CosmosException;
import io.github.thunderz99.cosmos.condition.Aggregate;
import io.github.thunderz99.cosmos.condition.Condition;
import io.github.thunderz99.cosmos.dto.CosmosBatchResponseWrapper;
import io.github.thunderz99.cosmos.dto.CosmosBulkResult;
import io.github.thunderz99.cosmos.dto.CosmosSqlQuerySpec;
import io.github.thunderz99.cosmos.dto.FilterOptions;
import io.github.thunderz99.cosmos.dto.PartialUpdateOption;
import io.github.thunderz99.cosmos.util.AggregateUtil;
import io.github.thunderz99.cosmos.util.Checker;
import io.github.thunderz99.cosmos.util.ConditionUtil;
import io.github.thunderz99.cosmos.util.JoinUtil;
import io.github.thunderz99.cosmos.util.JsonPatchUtil;
import io.github.thunderz99.cosmos.util.JsonUtil;
import io.github.thunderz99.cosmos.util.LinkFormatUtil;
import io.github.thunderz99.cosmos.util.MapUtil;
import io.github.thunderz99.cosmos.util.NumberUtil;
import io.github.thunderz99.cosmos.util.RetryUtil;
import io.github.thunderz99.cosmos.v4.PatchOperations;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.bson.BsonDocument;
import org.bson.BsonObjectId;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/github/thunderz99/cosmos/impl/mongo/MongoDatabaseImpl.class */
public class MongoDatabaseImpl implements CosmosDatabase {
    static final int MAX_BATCH_NUMBER_OF_OPERATION = 100;
    String db;
    MongoClient client;
    Cosmos cosmosAccount;
    private static Logger log = LoggerFactory.getLogger(MongoDatabaseImpl.class);
    static final LinkedHashMap<String, Object> mapInstance = new LinkedHashMap<>();

    public MongoDatabaseImpl(Cosmos cosmos, String str) {
        this.cosmosAccount = cosmos;
        this.db = str;
        if (cosmos instanceof MongoImpl) {
            this.client = ((MongoImpl) cosmos).getClient();
        }
    }

    @Override // io.github.thunderz99.cosmos.CosmosDatabase
    public CosmosDocument create(String str, Object obj, String str2) throws Exception {
        Checker.checkNotBlank(str, "coll");
        Checker.checkNotBlank(str2, "partition");
        Checker.checkNotNull(obj, "create data " + str + " " + str2);
        Map<String, Object> map = JsonUtil.toMap(obj);
        map.put(MongoImpl.getDefaultPartitionKey(), str2);
        addId4Mongo(map);
        addTimestamp(map);
        String collectionLink = LinkFormatUtil.getCollectionLink(str, str2);
        checkValidId(map);
        MongoCollection collection = this.client.getDatabase(str).getCollection(str2);
        log.info("created Document:{}/docs/{}, partition:{}, account:{}", new Object[]{collectionLink, getId(((InsertOneResult) RetryUtil.executeWithRetry(() -> {
            return collection.insertOne(new Document(map));
        })).getInsertedId()), str2, getAccount()});
        return new CosmosDocument(map);
    }

    static String addId4Mongo(Map<String, Object> map) {
        String obj = map.getOrDefault("id", UUID.randomUUID()).toString();
        checkValidId(obj);
        map.put("id", obj);
        map.put("_id", obj);
        return obj;
    }

    static String getId(Object obj) {
        return obj instanceof String ? (String) obj : obj instanceof BsonObjectId ? ((BsonObjectId) obj).getValue().toHexString() : JsonUtil.toMap(obj).getOrDefault("id", "").toString();
    }

    static void checkValidId(List<?> list) {
        for (Object obj : list) {
            if (obj instanceof String) {
                checkValidId((String) obj);
            } else {
                checkValidId(JsonUtil.toMap(obj));
            }
        }
    }

    static void checkValidId(Map<String, Object> map) {
        if (map == null) {
            return;
        }
        checkValidId(getId(map));
    }

    static void checkValidId(String str) {
        if (StringUtils.containsAny(str, new CharSequence[]{"\t", "\n", "\r", "/"})) {
            throw new IllegalArgumentException("id cannot contain \\t or \\n or \\r or /. id:" + str);
        }
    }

    @Override // io.github.thunderz99.cosmos.CosmosDatabase
    public CosmosDocument read(String str, String str2, String str3) throws Exception {
        Checker.checkNotBlank(str2, "id");
        Checker.checkNotBlank(str, "coll");
        Checker.checkNotBlank(str3, "partition");
        String documentLink = LinkFormatUtil.getDocumentLink(str, str3, str2);
        MongoCollection collection = this.client.getDatabase(str).getCollection(str3);
        Document document = (Document) RetryUtil.executeWithRetry(() -> {
            return (Document) collection.find(Filters.eq("_id", str2)).first();
        });
        log.info("read Document:{}, partition:{}, account:{}", new Object[]{documentLink, str3, getAccount()});
        return checkAndGetCosmosDocument(document);
    }

    static CosmosDocument checkAndGetCosmosDocument(Document document) {
        if (document == null) {
            throw new CosmosException(404, "404", "Resource Not Found");
        }
        return new CosmosDocument((Map<String, Object>) document);
    }

    @Override // io.github.thunderz99.cosmos.CosmosDatabase
    public CosmosDocument readSuppressing404(String str, String str2, String str3) throws Exception {
        try {
            return read(str, str2, str3);
        } catch (Exception e) {
            if (MongoImpl.isResourceNotFoundException(e)) {
                return null;
            }
            throw e;
        }
    }

    @Override // io.github.thunderz99.cosmos.CosmosDatabase
    public CosmosDocument update(String str, Object obj, String str2) throws Exception {
        Checker.checkNotBlank(str, "coll");
        Checker.checkNotBlank(str2, "partition");
        Checker.checkNotNull(obj, "update data " + str + " " + str2);
        Map<String, Object> map = JsonUtil.toMap(obj);
        String id = getId(map);
        Checker.checkNotBlank(id, "id");
        checkValidId(id);
        map.put("_id", id);
        addTimestamp(map);
        String documentLink = LinkFormatUtil.getDocumentLink(str, str2, id);
        map.put(MongoImpl.getDefaultPartitionKey(), str2);
        MongoCollection collection = this.client.getDatabase(str).getCollection(str2);
        Document document = (Document) RetryUtil.executeWithRetry(() -> {
            return (Document) collection.findOneAndReplace(Filters.eq("_id", id), new Document(map), new FindOneAndReplaceOptions().upsert(false).returnDocument(ReturnDocument.AFTER));
        });
        log.info("updated Document:{}, id:{}, partition:{}, account:{}", new Object[]{documentLink, id, str2, getAccount()});
        return checkAndGetCosmosDocument(document);
    }

    @Override // io.github.thunderz99.cosmos.CosmosDatabase
    public CosmosDocument updatePartial(String str, String str2, Object obj, String str3) throws Exception {
        return updatePartial(str, str2, obj, str3, new PartialUpdateOption());
    }

    @Override // io.github.thunderz99.cosmos.CosmosDatabase
    public CosmosDocument updatePartial(String str, String str2, Object obj, String str3, PartialUpdateOption partialUpdateOption) throws Exception {
        Checker.checkNotBlank(str2, "id");
        Checker.checkNotBlank(str, "coll");
        Checker.checkNotBlank(str3, "partition");
        Checker.checkNotNull(obj, "updatePartial data " + str + " " + str3);
        checkValidId(str2);
        Map<String, Object> map = JsonUtil.toMap(obj);
        addTimestamp(map);
        map.remove(MongoImpl.getDefaultPartitionKey());
        Map<String, Object> flatMapWithPeriod = MapUtil.toFlatMapWithPeriod(map);
        String documentLink = LinkFormatUtil.getDocumentLink(str, str3, str2);
        MongoCollection collection = this.client.getDatabase(str).getCollection(str3);
        Document document = (Document) RetryUtil.executeWithRetry(() -> {
            return (Document) collection.findOneAndUpdate(Filters.eq("_id", str2), new Document("$set", new Document(flatMapWithPeriod)), new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER));
        });
        log.info("updated Document:{}, id:{}, partition:{}, account:{}", new Object[]{documentLink, str2, str3, getAccount()});
        return checkAndGetCosmosDocument(document);
    }

    static void addTimestamp(Map<String, Object> map) {
        map.put("_ts", Long.valueOf(Instant.now().getEpochSecond()));
    }

    @Override // io.github.thunderz99.cosmos.CosmosDatabase
    public CosmosDocument upsert(String str, Object obj, String str2) throws Exception {
        Map<String, Object> map = JsonUtil.toMap(obj);
        String obj2 = map.getOrDefault("id", "").toString();
        Checker.checkNotBlank(obj2, "id");
        checkValidId(obj2);
        Checker.checkNotBlank(str, "coll");
        Checker.checkNotBlank(str2, "partition");
        Checker.checkNotNull(obj, "upsert data " + str + " " + str2);
        map.put("_id", obj2);
        addTimestamp(map);
        String collectionLink = LinkFormatUtil.getCollectionLink(str, str2);
        map.put(MongoImpl.getDefaultPartitionKey(), str2);
        MongoCollection collection = this.client.getDatabase(str).getCollection(str2);
        Document document = (Document) RetryUtil.executeWithRetry(() -> {
            return (Document) collection.findOneAndReplace(Filters.eq("_id", obj2), new Document(map), new FindOneAndReplaceOptions().upsert(true).returnDocument(ReturnDocument.AFTER));
        });
        log.info("upsert Document:{}/docs/{}, partition:{}, account:{}", new Object[]{collectionLink, obj2, str2, getAccount()});
        return checkAndGetCosmosDocument(document);
    }

    @Override // io.github.thunderz99.cosmos.CosmosDatabase
    public CosmosDatabase delete(String str, String str2, String str3) throws Exception {
        Checker.checkNotBlank(str, "coll");
        Checker.checkNotBlank(str2, "id");
        Checker.checkNotBlank(str3, "partition");
        String documentLink = LinkFormatUtil.getDocumentLink(str, str3, str2);
        MongoCollection collection = this.client.getDatabase(str).getCollection(str3);
        log.info("deleted Document:{}, partition:{}, account:{}", new Object[]{documentLink, str3, getAccount()});
        return this;
    }

    @Override // io.github.thunderz99.cosmos.CosmosDatabase
    public CosmosDocumentList find(String str, Condition condition, String str2) throws Exception {
        if (condition == null) {
            condition = new Condition();
        }
        if (CollectionUtils.isNotEmpty(condition.join) && !condition.returnAllSubArray) {
            return findWithJoin(str, condition, str2);
        }
        String collectionLink = LinkFormatUtil.getCollectionLink(this.db, str);
        Bson processNor = ConditionUtil.processNor(ConditionUtil.toBsonFilter(condition));
        Bson bsonSort = ConditionUtil.toBsonSort(condition.sort);
        MongoCollection collection = this.client.getDatabase(str).getCollection(str2);
        new CosmosDocumentList();
        FindIterable limit = collection.find(processNor).sort(bsonSort).skip(condition.offset).limit(condition.limit);
        List<String> processFields = ConditionUtil.processFields(condition.fields);
        if (!processFields.isEmpty()) {
            limit.projection(Projections.fields(new Bson[]{Projections.excludeId(), Projections.include(processFields)}));
        }
        CosmosDocumentList cosmosDocumentList = new CosmosDocumentList((ArrayList) RetryUtil.executeWithRetry(() -> {
            return (ArrayList) limit.into(new ArrayList());
        }));
        if (log.isInfoEnabled()) {
            Logger logger = log;
            Object[] objArr = new Object[4];
            objArr[0] = collectionLink;
            objArr[1] = condition;
            objArr[2] = condition.crossPartition ? "crossPartition" : str2;
            objArr[3] = getAccount();
            logger.info("find Document:{}, cond:{}, partition:{}, account:{}", objArr);
        }
        return cosmosDocumentList;
    }

    CosmosDocumentList findWithJoin(String str, Condition condition, String str2) {
        Checker.check(CollectionUtils.isNotEmpty(condition.join), "join cannot be empty in findWithJoin");
        Checker.check(!condition.negative, "Top negative condition is not supported for findWithJoin");
        Checker.check(!condition.returnAllSubArray, "findWithJoin should be used when returnAllSubArray = false");
        MongoCollection collection = this.client.getDatabase(str).getCollection(str2);
        ArrayList arrayList = new ArrayList();
        Bson processNor = ConditionUtil.processNor(ConditionUtil.toBsonFilter(condition));
        if (processNor != null) {
            arrayList.add(Aggregates.match(processNor));
        }
        Bson bsonSort = ConditionUtil.toBsonSort(condition.sort);
        if (bsonSort != null) {
            arrayList.add(Aggregates.sort(bsonSort));
        }
        arrayList.add(Aggregates.skip(condition.offset));
        arrayList.add(Aggregates.limit(condition.limit));
        Map<String, Object> extractJoinFilters = JoinUtil.extractJoinFilters(condition.filter, condition.join);
        Document document = new Document();
        document.append("original", "$$ROOT");
        for (String str3 : condition.join) {
            String str4 = "matching_" + AggregateUtil.convertFieldNameIncludingDot(str3);
            String str5 = "$" + str3;
            Map map = (Map) extractJoinFilters.entrySet().stream().filter(entry -> {
                return ((String) entry.getKey()).startsWith(str3);
            }).collect(Collectors.toMap(entry2 -> {
                return ((String) entry2.getKey()).substring(str3.length() + 1);
            }, (v0) -> {
                return v0.getValue();
            }));
            ArrayList arrayList2 = new ArrayList();
            map.forEach((str6, obj) -> {
                Bson bsonFilter = ConditionUtil.toBsonFilter("$$this." + str6, obj, FilterOptions.create().join(condition.join).innerCond(true));
                if (bsonFilter != null) {
                    arrayList2.add(bsonFilter);
                }
            });
            document.append(str4, new Document("$filter", new Document("input", str5).append("cond", arrayList2.size() == 1 ? arrayList2.get(0) : Filters.and(arrayList2))));
        }
        arrayList.add(Aggregates.project(document));
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add("$original");
        Iterator<String> it = condition.join.iterator();
        while (it.hasNext()) {
            String str7 = "matching_" + AggregateUtil.convertFieldNameIncludingDot(it.next());
            arrayList3.add(new Document(str7, "$" + str7));
        }
        arrayList.add(Aggregates.replaceRoot(new Document("$mergeObjects", arrayList3)));
        ArrayList newArrayList = Lists.newArrayList(new Object[]{"$$ROOT"});
        for (String str8 : condition.join) {
            newArrayList.add(ConditionUtil.generateMergeObjects(str8, "matching_" + AggregateUtil.convertFieldNameIncludingDot(str8)));
        }
        arrayList.add(Aggregates.replaceWith(new Document("$mergeObjects", newArrayList)));
        List<String> processFields = ConditionUtil.processFields(condition.fields);
        if (!processFields.isEmpty()) {
            arrayList.add(Aggregates.project(Projections.fields(new Bson[]{Projections.excludeId(), Projections.include(processFields)})));
        }
        return new CosmosDocumentList((ArrayList) collection.aggregate(arrayList).into(new ArrayList()));
    }

    static List<? extends LinkedHashMap> convertAggregateResultsToInteger(List<? extends LinkedHashMap> list) {
        if (CollectionUtils.isEmpty(list)) {
            return list;
        }
        Iterator<? extends LinkedHashMap> it = list.iterator();
        while (it.hasNext()) {
            it.next().replaceAll((obj, obj2) -> {
                return obj2 instanceof Number ? NumberUtil.convertNumberToIntIfCompatible((Number) obj2) : obj2;
            });
        }
        return list;
    }

    List<Map<String, Object>> mergeSubArrayToDoc(String str, Condition condition, CosmosSqlQuerySpec cosmosSqlQuerySpec, CosmosQueryRequestOptions cosmosQueryRequestOptions) throws Exception {
        throw new NotImplementedException();
    }

    List<Map<String, Object>> mergeArrayValueToDoc(List<Map<String, Object>> list, Map<String, String[]> map) {
        ArrayList arrayList = new ArrayList();
        for (Map<String, Object> map2 : list) {
            Map<String, Object> map3 = JsonUtil.toMap(map2.get("c"));
            for (Map.Entry<String, String[]> entry : map.entrySet()) {
                if (Objects.nonNull(map2.get(entry.getKey()))) {
                    traverseListValueToDoc(map3, Map.of(entry.getKey(), map2.get(entry.getKey())), entry, 0);
                }
            }
            arrayList.add(map3);
        }
        return arrayList;
    }

    void traverseListValueToDoc(Map<String, Object> map, Map<String, Object> map2, Map.Entry<String, String[]> entry, int i) {
        String key = entry.getKey();
        String[] value = entry.getValue();
        if (i == value.length - 1) {
            if (map2.get(key) instanceof List) {
                map.put(value[entry.getValue().length - 1], map2.get(key));
            }
        } else if (map.get(value[i]) instanceof Map) {
            traverseListValueToDoc((Map) map.get(value[i]), map2, entry, i + 1);
        }
    }

    @Override // io.github.thunderz99.cosmos.CosmosDatabase
    public CosmosDocumentList aggregate(String str, Aggregate aggregate, String str2) throws Exception {
        return aggregate(str, aggregate, Condition.filter(new Object[0]), str2);
    }

    @Override // io.github.thunderz99.cosmos.CosmosDatabase
    public CosmosDocumentList aggregate(String str, Aggregate aggregate, Condition condition, String str2) throws Exception {
        MongoCollection collection = this.client.getDatabase(str).getCollection(str2);
        Bson processNor = ConditionUtil.processNor(ConditionUtil.toBsonFilter(condition));
        ArrayList arrayList = new ArrayList();
        if (processNor != null) {
            arrayList.add(Aggregates.match(processNor));
        }
        arrayList.add(AggregateUtil.createProjectStage(aggregate));
        Bson createGroupStage = AggregateUtil.createGroupStage(aggregate);
        if (createGroupStage != null) {
            arrayList.add(createGroupStage);
        }
        arrayList.add(AggregateUtil.createFinalProjectStage(aggregate));
        Condition condition2 = aggregate.condAfterAggregate;
        if (condition2 != null) {
            Bson processNor2 = ConditionUtil.processNor(ConditionUtil.toBsonFilter(condition2));
            if (processNor2 != null) {
                arrayList.add(Aggregates.match(processNor2));
            }
            Bson bsonSort = ConditionUtil.toBsonSort(condition2.sort);
            if (bsonSort != null) {
                arrayList.add(Aggregates.sort(bsonSort));
            }
            arrayList.add(Aggregates.skip(condition2.offset));
            arrayList.add(Aggregates.limit(condition2.limit));
        }
        return new CosmosDocumentList((ArrayList) collection.aggregate(arrayList).into(new ArrayList()));
    }

    @Override // io.github.thunderz99.cosmos.CosmosDatabase
    public int count(String str, Condition condition, String str2) throws Exception {
        String collectionLink = LinkFormatUtil.getCollectionLink(this.db, str);
        Bson processNor = ConditionUtil.processNor(ConditionUtil.toBsonFilter(condition));
        MongoCollection collection = this.client.getDatabase(str).getCollection(str2);
        Long l = (Long) RetryUtil.executeWithRetry(() -> {
            return Long.valueOf(collection.countDocuments(processNor));
        });
        if (log.isInfoEnabled()) {
            Logger logger = log;
            Object[] objArr = new Object[5];
            objArr[0] = l;
            objArr[1] = collectionLink;
            objArr[2] = condition;
            objArr[3] = condition.crossPartition ? "crossPartition" : str2;
            objArr[4] = getAccount();
            logger.info("count:{}, Document:{}, cond:{}, partition:{}, account:{}", objArr);
        }
        return Math.toIntExact(l.longValue());
    }

    @Override // io.github.thunderz99.cosmos.CosmosDatabase
    public CosmosDocument increment(String str, String str2, String str3, int i, String str4) throws Exception {
        String documentLink = LinkFormatUtil.getDocumentLink(str, str4, str2);
        CosmosDocument patch = patch(str, str2, PatchOperations.create().increment(str3, i), str4);
        log.info("increment Document:{}, partition:{}, account:{}", new Object[]{documentLink, str4, getAccount()});
        return patch;
    }

    @Override // io.github.thunderz99.cosmos.CosmosDatabase
    public CosmosDocument patch(String str, String str2, PatchOperations patchOperations, String str3) throws Exception {
        String documentLink = LinkFormatUtil.getDocumentLink(str, str3, str2);
        Checker.checkNotEmpty("id", "id");
        Checker.checkNotNull(patchOperations, "operations");
        Preconditions.checkArgument(patchOperations.size() <= PatchOperations.LIMIT, "Size of operations should be less or equal to 10. We got: %d, which exceed the limit 10", patchOperations.size());
        patchOperations.set("/_ts", Long.valueOf(Instant.now().getEpochSecond()));
        List<BsonDocument> mongoPatchData = JsonPatchUtil.toMongoPatchData(patchOperations);
        MongoCollection collection = this.client.getDatabase(str).getCollection(str3);
        Document document = (Document) RetryUtil.executeWithRetry(() -> {
            return (Document) collection.findOneAndUpdate(Filters.eq("_id", str2), Updates.combine(mongoPatchData), new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER));
        });
        log.info("patched Document:{}, id:{}, partition:{}, account:{}", new Object[]{documentLink, str2, str3, getAccount()});
        return checkAndGetCosmosDocument(document);
    }

    static Map<String, Object> merge(Map<String, Object> map, Map<String, Object> map2) {
        for (Map.Entry<String, Object> entry : map.entrySet()) {
            String key = entry.getKey();
            Object value = entry.getValue();
            Object obj = map2.get(key);
            if (value != null && (value instanceof Map) && obj != null && (obj instanceof Map)) {
                map2.put(key, merge((Map) value, (Map) obj));
            }
        }
        map.putAll(map2);
        return map;
    }

    @Override // io.github.thunderz99.cosmos.CosmosDatabase
    public List<CosmosDocument> batchCreate(String str, List<?> list, String str2) throws Exception {
        doCheckBeforeBatch(str, list, str2);
        MongoCollection collection = this.client.getDatabase(str).getCollection(str2);
        ArrayList arrayList = new ArrayList();
        Iterator<?> it = list.iterator();
        while (it.hasNext()) {
            Map<String, Object> map = JsonUtil.toMap(it.next());
            map.put(MongoImpl.getDefaultPartitionKey(), str2);
            addId4Mongo(map);
            addTimestamp(map);
            arrayList.add(new Document(map));
        }
        ClientSession startSession = this.client.startSession();
        try {
            startSession.startTransaction();
            try {
                collection.insertMany(startSession, arrayList);
                startSession.commitTransaction();
                log.info("Batch created Documents in collection:{}, partition:{}, insertedCount:{}, account:{}", new Object[]{str, str2, Integer.valueOf(arrayList.size()), getAccount()});
                if (startSession != null) {
                    startSession.close();
                }
                return (List) arrayList.stream().map(document -> {
                    return new CosmosDocument((Map<String, Object>) document);
                }).collect(Collectors.toList());
            } catch (Exception e) {
                startSession.abortTransaction();
                if (e instanceof MongoException) {
                    throw new CosmosException(e);
                }
                throw new CosmosException(500, "500", "batchCreate Transaction failed: " + e.getMessage(), (Exception) e);
            }
        } catch (Throwable th) {
            if (startSession != null) {
                try {
                    startSession.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // io.github.thunderz99.cosmos.CosmosDatabase
    public List<CosmosDocument> batchUpsert(String str, List<?> list, String str2) throws Exception {
        doCheckBeforeBatch(str, list, str2);
        MongoCollection collection = this.client.getDatabase(str).getCollection(str2);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        Iterator<?> it = list.iterator();
        while (it.hasNext()) {
            Map<String, Object> map = JsonUtil.toMap(it.next());
            map.put(MongoImpl.getDefaultPartitionKey(), str2);
            String addId4Mongo = addId4Mongo(map);
            addTimestamp(map);
            Document document = new Document(map);
            arrayList.add(new UpdateOneModel(Filters.eq("_id", addId4Mongo), new Document("$set", document), new UpdateOptions().upsert(true)));
            arrayList2.add(document);
        }
        ClientSession startSession = this.client.startSession();
        try {
            startSession.startTransaction();
            try {
                collection.bulkWrite(startSession, arrayList);
                startSession.commitTransaction();
                log.info("Batch created Documents in collection:{}, partition:{}, insertedCount:{}, account:{}", new Object[]{str, str2, Integer.valueOf(arrayList2.size()), getAccount()});
                if (startSession != null) {
                    startSession.close();
                }
                return (List) arrayList2.stream().map(document2 -> {
                    return new CosmosDocument((Map<String, Object>) document2);
                }).collect(Collectors.toList());
            } catch (Exception e) {
                startSession.abortTransaction();
                if (e instanceof MongoException) {
                    throw new CosmosException(e);
                }
                throw new CosmosException(500, "500", "batchUpsert Transaction failed: " + e.getMessage(), (Exception) e);
            }
        } catch (Throwable th) {
            if (startSession != null) {
                try {
                    startSession.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // io.github.thunderz99.cosmos.CosmosDatabase
    public List<CosmosDocument> batchDelete(String str, List<?> list, String str2) throws Exception {
        doCheckBeforeBatch(str, list, str2);
        MongoCollection collection = this.client.getDatabase(str).getCollection(str2);
        ArrayList arrayList = new ArrayList();
        Iterator<?> it = list.iterator();
        while (it.hasNext()) {
            String obj = JsonUtil.toMap(it.next()).getOrDefault("id", "").toString();
            checkValidId(obj);
            if (StringUtils.isNotEmpty(obj)) {
                arrayList.add(obj);
            }
        }
        ClientSession startSession = this.client.startSession();
        try {
            startSession.startTransaction();
            try {
                collection.deleteMany(startSession, Filters.in("_id", arrayList));
                startSession.commitTransaction();
                log.info("Batch created Documents in collection:{}, partition:{}, insertedCount:{}, account:{}", new Object[]{str, str2, Integer.valueOf(arrayList.size()), getAccount()});
                if (startSession != null) {
                    startSession.close();
                }
                return (List) arrayList.stream().map(str3 -> {
                    return new CosmosDocument((Map<String, Object>) Map.of("id", str3));
                }).collect(Collectors.toList());
            } catch (Exception e) {
                startSession.abortTransaction();
                if (e instanceof MongoException) {
                    throw new CosmosException(e);
                }
                throw new CosmosException(500, "500", "batchDelete Transaction failed: " + e.getMessage(), (Exception) e);
            }
        } catch (Throwable th) {
            if (startSession != null) {
                try {
                    startSession.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    static void doCheckBeforeBatch(String str, List<?> list, String str2) {
        Checker.checkNotBlank(str, "coll");
        Checker.checkNotBlank(str2, "partition");
        Checker.checkNotEmpty(list, "create data " + str + " " + str2);
        checkBatchMaxOperations(list);
        checkValidId(list);
    }

    static void doCheckBeforeBulk(String str, List<?> list, String str2) {
        Checker.checkNotBlank(str, "coll");
        Checker.checkNotBlank(str2, "partition");
        Checker.checkNotEmpty(list, "create data " + str + " " + str2);
    }

    private List<CosmosDocument> doBatchWithRetry(CosmosContainer cosmosContainer, CosmosBatch cosmosBatch) throws Exception {
        CosmosBatchResponseWrapper executeBatchWithRetry = RetryUtil.executeBatchWithRetry(() -> {
            return new CosmosBatchResponseWrapper(cosmosContainer.executeCosmosBatch(cosmosBatch));
        });
        ArrayList arrayList = new ArrayList();
        Iterator it = executeBatchWithRetry.cosmosBatchReponse.getResults().iterator();
        while (it.hasNext()) {
            LinkedHashMap linkedHashMap = (LinkedHashMap) ((CosmosBatchOperationResult) it.next()).getItem(mapInstance.getClass());
            if (linkedHashMap != null) {
                arrayList.add(new CosmosDocument(linkedHashMap));
            }
        }
        return arrayList;
    }

    @Override // io.github.thunderz99.cosmos.CosmosDatabase
    public CosmosBulkResult bulkCreate(String str, List<?> list, String str2) throws Exception {
        doCheckBeforeBulk(str, list, str2);
        ArrayList arrayList = new ArrayList();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        Iterator<?> it = list.iterator();
        while (it.hasNext()) {
            Map<String, Object> map = JsonUtil.toMap(it.next());
            map.put(MongoImpl.getDefaultPartitionKey(), str2);
            String addId4Mongo = addId4Mongo(map);
            addTimestamp(map);
            Document document = new Document(map);
            arrayList.add(document);
            linkedHashMap.put(addId4Mongo, document);
        }
        Map insertedIds = this.client.getDatabase(str).getCollection(str2).insertMany(arrayList).getInsertedIds();
        log.info("Bulk created Documents in collection:{}, partition:{}, insertedCount:{}, account:{}", new Object[]{str, str2, Integer.valueOf(insertedIds.size()), getAccount()});
        CosmosBulkResult cosmosBulkResult = new CosmosBulkResult();
        LinkedHashSet linkedHashSet = (LinkedHashSet) insertedIds.entrySet().stream().map(entry -> {
            return ((BsonValue) entry.getValue()).asString().getValue();
        }).collect(Collectors.toCollection(LinkedHashSet::new));
        for (String str3 : linkedHashMap.keySet()) {
            if (linkedHashSet.contains(str3)) {
                cosmosBulkResult.successList.add(new CosmosDocument((Map<String, Object>) linkedHashMap.get(str3)));
            } else {
                cosmosBulkResult.fatalList.add(new CosmosException(500, str3, "Failed to insert"));
            }
        }
        return cosmosBulkResult;
    }

    @Override // io.github.thunderz99.cosmos.CosmosDatabase
    public CosmosBulkResult bulkUpsert(String str, List<?> list, String str2) throws Exception {
        doCheckBeforeBulk(str, list, str2);
        MongoCollection collection = this.client.getDatabase(str).getCollection(str2);
        ArrayList arrayList = new ArrayList();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        Iterator<?> it = list.iterator();
        while (it.hasNext()) {
            Map<String, Object> map = JsonUtil.toMap(it.next());
            map.put(MongoImpl.getDefaultPartitionKey(), str2);
            String addId4Mongo = addId4Mongo(map);
            addTimestamp(map);
            Document document = new Document(map);
            arrayList.add(new UpdateOneModel(Filters.eq("_id", addId4Mongo), new Document("$set", document), new UpdateOptions().upsert(true)));
            linkedHashMap.put(addId4Mongo, document);
        }
        BulkWriteResult bulkWrite = collection.bulkWrite(arrayList, new BulkWriteOptions().ordered(true));
        log.info("Bulk created Documents in collection:{}, partition:{}, insertedCount:{}, account:{}", new Object[]{str, str2, Integer.valueOf(bulkWrite.getModifiedCount()), getAccount()});
        CosmosBulkResult cosmosBulkResult = new CosmosBulkResult();
        int modifiedCount = bulkWrite.getModifiedCount();
        int i = 0;
        for (Map.Entry entry : linkedHashMap.entrySet()) {
            if (i < modifiedCount) {
                cosmosBulkResult.successList.add(new CosmosDocument((Map<String, Object>) entry.getValue()));
            } else {
                cosmosBulkResult.fatalList.add(new CosmosException(500, (String) entry.getKey(), "Failed to upsert"));
            }
            i++;
        }
        return cosmosBulkResult;
    }

    @Override // io.github.thunderz99.cosmos.CosmosDatabase
    public CosmosBulkResult bulkDelete(String str, List<?> list, String str2) throws Exception {
        doCheckBeforeBulk(str, list, str2);
        ArrayList arrayList = new ArrayList();
        Iterator<?> it = list.iterator();
        while (it.hasNext()) {
            String obj = JsonUtil.toMap(it.next()).getOrDefault("id", "").toString();
            checkValidId(obj);
            if (StringUtils.isNotEmpty(obj)) {
                arrayList.add(obj);
            }
        }
        DeleteResult deleteMany = this.client.getDatabase(str).getCollection(str2).deleteMany(Filters.in("_id", arrayList));
        log.info("Bulk deleted Documents in collection:{}, partition:{}, deletedCount:{}, account:{}", new Object[]{str, str2, Long.valueOf(deleteMany.getDeletedCount()), getAccount()});
        CosmosBulkResult cosmosBulkResult = new CosmosBulkResult();
        if (deleteMany.getDeletedCount() > 0) {
            cosmosBulkResult.successList = (List) arrayList.stream().map(str3 -> {
                return new CosmosDocument((Map<String, Object>) Map.of("id", str3));
            }).collect(Collectors.toList());
        } else {
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                cosmosBulkResult.fatalList.add(new CosmosException(500, (String) it2.next(), "Failed to delete"));
            }
        }
        return cosmosBulkResult;
    }

    static void checkBatchMaxOperations(List<?> list) {
        if (list.size() > MAX_BATCH_NUMBER_OF_OPERATION) {
            throw new IllegalArgumentException("The number of data operations should not exceed 100.");
        }
    }

    String getAccount() throws Exception {
        return this.cosmosAccount.getAccount();
    }

    @Override // io.github.thunderz99.cosmos.CosmosDatabase
    public Cosmos getCosmosAccount() {
        return this.cosmosAccount;
    }

    @Override // io.github.thunderz99.cosmos.CosmosDatabase
    public String getDatabaseName() {
        return this.db;
    }
}
