package io.scalecube.configuration.repository.couchbase;

import com.couchbase.client.java.AsyncBucket;
import com.couchbase.client.java.document.JsonArrayDocument;
import com.couchbase.client.java.document.json.JsonArray;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.error.DocumentAlreadyExistsException;
import com.couchbase.client.java.error.DocumentDoesNotExistException;
import com.couchbase.client.java.error.subdoc.PathNotFoundException;
import com.couchbase.client.java.view.ViewQuery;
import io.scalecube.configuration.repository.ConfigurationRepository;
import io.scalecube.configuration.repository.Document;
import io.scalecube.configuration.repository.HistoryDocument;
import io.scalecube.configuration.repository.Repository;
import io.scalecube.configuration.repository.exception.DataAccessException;
import io.scalecube.configuration.repository.exception.KeyNotFoundException;
import io.scalecube.configuration.repository.exception.KeyVersionNotFoundException;
import io.scalecube.configuration.repository.exception.RepositoryAlreadyExistsException;
import io.scalecube.configuration.repository.exception.RepositoryKeyAlreadyExistsException;
import io.scalecube.configuration.repository.exception.RepositoryNotFoundException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.RxReactiveStreams;

/* loaded from: input_file:io/scalecube/configuration/repository/couchbase/CouchbaseRepository.class */
public class CouchbaseRepository implements ConfigurationRepository {
    private static final String REPOSITORY_ALREADY_EXISTS = "Repository with name: '%s' already exists";
    private static final String REPOSITORY_NOT_FOUND = "Repository '%s' not found";
    private static final String REPOSITORY_OR_ITS_KEY_NOT_FOUND = "Repository '%s' or its key '%s' not found";
    private static final String DELIMITER = "::";
    private static final String REPOS = "repos";
    private static final int DEFAULT_LATEST_VERSION = -1;
    private static final int INDEX_OF_KEY = 2;
    private final AsyncBucket bucket;

    public CouchbaseRepository(AsyncBucket asyncBucket) {
        this.bucket = asyncBucket;
    }

    @Override // io.scalecube.configuration.repository.ConfigurationRepository
    public Mono<Boolean> createRepository(Repository repository) {
        return Mono.from(RxReactiveStreams.toPublisher(this.bucket.setAdd(REPOS, repository.namespace() + DELIMITER + repository.name()))).filter(bool -> {
            return bool.booleanValue();
        }).switchIfEmpty(Mono.error(() -> {
            return new RepositoryAlreadyExistsException(String.format(REPOSITORY_ALREADY_EXISTS, repository.name()));
        })).onErrorMap(DocumentAlreadyExistsException.class, documentAlreadyExistsException -> {
            return new RepositoryAlreadyExistsException(String.format(REPOSITORY_ALREADY_EXISTS, repository.name()));
        }).onErrorMap(CouchbaseExceptionTranslator::translateExceptionIfPossible);
    }

    @Override // io.scalecube.configuration.repository.ConfigurationRepository
    public Mono<Document> read(String str, String str2, String str3, Integer num) {
        return Mono.from(RxReactiveStreams.toPublisher(this.bucket.listGet(docId(str, str2, str3), num != null ? num.intValue() - 1 : DEFAULT_LATEST_VERSION, Object.class))).onErrorMap(DocumentDoesNotExistException.class, documentDoesNotExistException -> {
            return new KeyNotFoundException(String.format(REPOSITORY_OR_ITS_KEY_NOT_FOUND, str2, str3));
        }).onErrorMap(PathNotFoundException.class, pathNotFoundException -> {
            Object[] objArr = new Object[INDEX_OF_KEY];
            objArr[0] = str3;
            objArr[1] = num != null ? num : "latest";
            return new KeyVersionNotFoundException(String.format("Key '%s' version '%s' not found", objArr));
        }).onErrorMap(CouchbaseExceptionTranslator::translateExceptionIfPossible).map(obj -> {
            return new Document(str3, readJsonValue(obj));
        });
    }

    @Override // io.scalecube.configuration.repository.ConfigurationRepository
    public Flux<Document> readAll(String str, String str2, Integer num) {
        return Mono.from(RxReactiveStreams.toPublisher(this.bucket.setContains(REPOS, str + DELIMITER + str2))).filter(bool -> {
            return bool.booleanValue();
        }).switchIfEmpty(Mono.error(() -> {
            return new RepositoryNotFoundException(String.format(REPOSITORY_NOT_FOUND, str2));
        })).thenMany(Flux.from(RxReactiveStreams.toPublisher(this.bucket.query(ViewQuery.from("keys", "by_keys").key(str + DELIMITER + str2))))).flatMap(asyncViewResult -> {
            return RxReactiveStreams.toPublisher(asyncViewResult.rows());
        }).flatMap(asyncViewRow -> {
            return read(str, str2, asyncViewRow.id().split(DELIMITER)[INDEX_OF_KEY], num);
        }).onErrorContinue(KeyVersionNotFoundException.class, (th, obj) -> {
        }).onErrorMap(CouchbaseExceptionTranslator::translateExceptionIfPossible);
    }

    @Override // io.scalecube.configuration.repository.ConfigurationRepository
    public Flux<HistoryDocument> readHistory(String str, String str2, String str3) {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        return Mono.from(RxReactiveStreams.toPublisher(this.bucket.get(docId(str, str2, str3), JsonArrayDocument.class))).switchIfEmpty(Mono.error(() -> {
            return new KeyNotFoundException(String.format(REPOSITORY_OR_ITS_KEY_NOT_FOUND, str2, str3));
        })).map((v0) -> {
            return v0.content();
        }).flatMapIterable(jsonArray -> {
            return (List) jsonArray.toList().stream().map(obj -> {
                return new HistoryDocument(Integer.valueOf(atomicInteger.incrementAndGet()), readJsonValue(obj));
            }).collect(Collectors.toList());
        }).onErrorMap(CouchbaseExceptionTranslator::translateExceptionIfPossible);
    }

    @Override // io.scalecube.configuration.repository.ConfigurationRepository
    public Mono<Document> save(String str, String str2, Document document) {
        return Mono.from(RxReactiveStreams.toPublisher(this.bucket.setContains(REPOS, str + DELIMITER + str2))).filter(bool -> {
            return bool.booleanValue();
        }).switchIfEmpty(Mono.error(() -> {
            return new RepositoryNotFoundException(String.format(REPOSITORY_NOT_FOUND, str2));
        })).then(Mono.from(RxReactiveStreams.toPublisher(this.bucket.insert(JsonArrayDocument.create(docId(str, str2, document.key()), JsonArray.create().add(savedJsonValue(document.value()))))))).onErrorMap(DocumentAlreadyExistsException.class, documentAlreadyExistsException -> {
            return new RepositoryKeyAlreadyExistsException(String.format("Repository '%s' key '%s' already exists", str2, document.key()));
        }).onErrorMap(CouchbaseExceptionTranslator::translateExceptionIfPossible).switchIfEmpty(Mono.error(() -> {
            return new DataAccessException("Save operation is failed because of unknown reason");
        })).thenReturn(document);
    }

    @Override // io.scalecube.configuration.repository.ConfigurationRepository
    public Mono<Document> update(String str, String str2, Document document) {
        return Mono.from(RxReactiveStreams.toPublisher(this.bucket.listAppend(docId(str, str2, document.key()), document.value()))).filter(bool -> {
            return bool.booleanValue();
        }).switchIfEmpty(Mono.error(() -> {
            return new DataAccessException("Save operation is failed because of unknown reason");
        })).then(Mono.from(RxReactiveStreams.toPublisher(this.bucket.listSize(docId(str, str2, document.key()))))).onErrorMap(DocumentDoesNotExistException.class, documentDoesNotExistException -> {
            return new DocumentDoesNotExistException(String.format(REPOSITORY_OR_ITS_KEY_NOT_FOUND, str2, document.key()));
        }).onErrorMap(CouchbaseExceptionTranslator::translateExceptionIfPossible).map(num -> {
            return new Document(document.key(), savedJsonValue(document.value()), num);
        });
    }

    @Override // io.scalecube.configuration.repository.ConfigurationRepository
    public Mono<Void> delete(String str, String str2, String str3) {
        return Mono.from(RxReactiveStreams.toPublisher(this.bucket.remove(docId(str, str2, str3)))).onErrorMap(DocumentDoesNotExistException.class, documentDoesNotExistException -> {
            return new KeyNotFoundException(String.format(REPOSITORY_OR_ITS_KEY_NOT_FOUND, str2, str3));
        }).onErrorMap(CouchbaseExceptionTranslator::translateExceptionIfPossible).switchIfEmpty(Mono.error(() -> {
            return new DataAccessException("Delete operation is failed because of unknown reason");
        })).then();
    }

    private String docId(String str, String str2, String str3) {
        return str + DELIMITER + str2 + DELIMITER + str3;
    }

    private Object savedJsonValue(Object obj) {
        return obj instanceof LinkedHashMap ? JsonObject.from((Map) obj) : obj instanceof ArrayList ? JsonArray.from((ArrayList) obj) : obj == null ? JsonObject.NULL : obj;
    }

    private Object readJsonValue(Object obj) {
        return obj instanceof JsonObject ? ((JsonObject) obj).toMap() : obj instanceof JsonArray ? ((JsonArray) obj).toList() : (obj == null || obj == JsonObject.NULL) ? JsonObject.NULL : obj;
    }
}
