package org.apache.james.backends.cassandra.init;

import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import java.util.function.Predicate;
import javax.inject.Inject;
import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.backends.cassandra.components.CassandraTable;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/apache/james/backends/cassandra/init/CassandraTableManager.class */
public class CassandraTableManager {
    private final Session session;
    private final CassandraModule module;

    @Inject
    public CassandraTableManager(CassandraModule cassandraModule, Session session) {
        this.session = session;
        this.module = cassandraModule;
    }

    public CassandraTable.InitializationStatus initializeTables() {
        KeyspaceMetadata keyspace = this.session.getCluster().getMetadata().getKeyspace(this.session.getLoggedKeyspace());
        return (CassandraTable.InitializationStatus) this.module.moduleTables().stream().map(cassandraTable -> {
            return cassandraTable.initialize(keyspace, this.session);
        }).reduce((initializationStatus, initializationStatus2) -> {
            return initializationStatus.reduce(initializationStatus2);
        }).orElse(CassandraTable.InitializationStatus.ALREADY_DONE);
    }

    public void clearAllTables() {
        CassandraAsyncExecutor cassandraAsyncExecutor = new CassandraAsyncExecutor(this.session);
        Flux.fromIterable(this.module.moduleTables()).publishOn(Schedulers.elastic()).map((v0) -> {
            return v0.getName();
        }).flatMap(str -> {
            return truncate(cassandraAsyncExecutor, str);
        }, 16).then().block();
    }

    private Mono<Void> truncate(CassandraAsyncExecutor cassandraAsyncExecutor, String str) {
        return cassandraAsyncExecutor.execute(QueryBuilder.select().from(str).limit(1).setFetchSize(1)).filter(Predicate.not((v0) -> {
            return v0.isExhausted();
        })).flatMap(resultSet -> {
            return cassandraAsyncExecutor.executeVoid(QueryBuilder.truncate(str));
        }).onErrorResume(th -> {
            return cassandraAsyncExecutor.executeVoid(QueryBuilder.truncate(str));
        });
    }
}
