package org.neo4j.fabric.executor;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.neo4j.bolt.protocol.common.message.AccessMode;
import org.neo4j.bolt.protocol.v41.message.request.RoutingContext;
import org.neo4j.cypher.internal.FullyParsedQuery;
import org.neo4j.cypher.internal.ast.GraphSelection;
import org.neo4j.exceptions.InvalidSemanticsException;
import org.neo4j.fabric.config.FabricConfig;
import org.neo4j.fabric.eval.Catalog;
import org.neo4j.fabric.eval.CatalogManager;
import org.neo4j.fabric.eval.UseEvaluation;
import org.neo4j.fabric.executor.FabricStatementLifecycles;
import org.neo4j.fabric.executor.Location;
import org.neo4j.fabric.planning.FabricPlan;
import org.neo4j.fabric.planning.FabricPlanner;
import org.neo4j.fabric.planning.FabricQuery;
import org.neo4j.fabric.planning.Fragment;
import org.neo4j.fabric.planning.QueryType;
import org.neo4j.fabric.stream.CompletionDelegatingOperator;
import org.neo4j.fabric.stream.Prefetcher;
import org.neo4j.fabric.stream.Record;
import org.neo4j.fabric.stream.Records;
import org.neo4j.fabric.stream.StatementResult;
import org.neo4j.fabric.stream.StatementResults;
import org.neo4j.fabric.stream.summary.MergedQueryStatistics;
import org.neo4j.fabric.stream.summary.MergedSummary;
import org.neo4j.fabric.stream.summary.Summary;
import org.neo4j.fabric.transaction.FabricTransaction;
import org.neo4j.fabric.transaction.TransactionMode;
import org.neo4j.graphdb.ExecutionPlanDescription;
import org.neo4j.graphdb.Notification;
import org.neo4j.graphdb.QueryExecutionType;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.database.DatabaseReference;
import org.neo4j.logging.InternalLog;
import org.neo4j.logging.InternalLogProvider;
import org.neo4j.values.AnyValue;
import org.neo4j.values.storable.Values;
import org.neo4j.values.virtual.MapValue;
import org.neo4j.values.virtual.MapValueBuilder;
import org.neo4j.values.virtual.PathValue;
import org.neo4j.values.virtual.VirtualNodeValue;
import org.neo4j.values.virtual.VirtualRelationshipValue;
import org.neo4j.values.virtual.VirtualValues;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import scala.jdk.javaapi.CollectionConverters;

/* loaded from: input_file:org/neo4j/fabric/executor/FabricExecutor.class */
public class FabricExecutor {
    public static final String WRITING_IN_READ_NOT_ALLOWED_MSG = "Writing in read access mode not allowed";
    private final FabricConfig.DataStream dataStreamConfig;
    private final FabricPlanner planner;
    private final UseEvaluation useEvaluation;
    private final CatalogManager catalogManager;
    private final InternalLog log;
    private final FabricStatementLifecycles statementLifecycles;
    private final Executor fabricWorkerExecutor;

    /* loaded from: input_file:org/neo4j/fabric/executor/FabricExecutor$FabricLoggingStatementExecution.class */
    private class FabricLoggingStatementExecution extends FabricStatementExecution {
        private final AtomicInteger step;
        private final InternalLog log;

        FabricLoggingStatementExecution(FabricPlan fabricPlan, FabricPlanner.PlannerInstance plannerInstance, UseEvaluation.Instance instance, MapValue mapValue, AccessMode accessMode, RoutingContext routingContext, FabricTransaction.FabricExecutionContext fabricExecutionContext, InternalLog internalLog, FabricStatementLifecycles.StatementLifecycle statementLifecycle, FabricConfig.DataStream dataStream) {
            super(fabricPlan, plannerInstance, instance, mapValue, accessMode, routingContext, fabricExecutionContext, statementLifecycle, dataStream);
            this.step = new AtomicInteger(0);
            this.log = internalLog;
        }

        @Override // org.neo4j.fabric.executor.FabricExecutor.FabricStatementExecution
        FragmentResult runLocalQueryAt(Location.Local local, TransactionMode transactionMode, FullyParsedQuery fullyParsedQuery, MapValue mapValue, boolean z, Flux<Record> flux) {
            String executionId = executionId();
            trace(executionId, "local " + local.getGraphId(), compact(fullyParsedQuery.description()));
            return traceRecords(executionId, super.runLocalQueryAt(local, transactionMode, fullyParsedQuery, mapValue, z, flux));
        }

        @Override // org.neo4j.fabric.executor.FabricExecutor.FabricStatementExecution
        FragmentResult runRemoteQueryAt(Location.Remote remote, TransactionMode transactionMode, String str, MapValue mapValue) {
            String executionId = executionId();
            trace(executionId, "remote " + remote.getGraphId(), compact(str));
            return traceRecords(executionId, super.runRemoteQueryAt(remote, transactionMode, str, mapValue));
        }

        private String compact(String str) {
            return str.replaceAll("\\r?\\n", " ").replaceAll("\\s+", " ");
        }

        private FragmentResult traceRecords(String str, FragmentResult fragmentResult) {
            return new FragmentResult(fragmentResult.records.doOnNext(record -> {
                trace(str, "output", (String) IntStream.range(0, record.size()).mapToObj(i -> {
                    return record.getValue(i).toString();
                }).collect(Collectors.joining(", ", "[", "]")));
            }).doOnError(th -> {
                trace(str, "error", th.getClass().getSimpleName() + ": " + th.getMessage());
            }).doOnCancel(() -> {
                trace(str, "cancel", "cancel");
            }).doOnComplete(() -> {
                trace(str, "complete", "complete");
            }), fragmentResult.planDescription, fragmentResult.executionType);
        }

        private void trace(String str, String str2, String str3) {
            this.log.debug(String.format("%s: %s: %s", str, str2, str3));
        }

        private String executionId() {
            return String.format("%s/%s", idString(hashCode()), idString(this.step.getAndIncrement()));
        }

        private String idString(int i) {
            return String.format("%08X", Integer.valueOf(i));
        }
    }

    /* loaded from: input_file:org/neo4j/fabric/executor/FabricExecutor$FabricStatementExecution.class */
    private class FabricStatementExecution {
        private final FabricPlan plan;
        private final FabricPlanner.PlannerInstance plannerInstance;
        private final UseEvaluation.Instance useEvaluator;
        private final MapValue queryParams;
        private final FabricTransaction.FabricExecutionContext ctx;
        private final MergedQueryStatistics statistics = new MergedQueryStatistics();
        private final Set<Notification> notifications = ConcurrentHashMap.newKeySet();
        private final FabricStatementLifecycles.StatementLifecycle lifecycle;
        private final Prefetcher prefetcher;
        private final AccessMode accessMode;
        private final RoutingContext routingContext;

        FabricStatementExecution(FabricPlan fabricPlan, FabricPlanner.PlannerInstance plannerInstance, UseEvaluation.Instance instance, MapValue mapValue, AccessMode accessMode, RoutingContext routingContext, FabricTransaction.FabricExecutionContext fabricExecutionContext, FabricStatementLifecycles.StatementLifecycle statementLifecycle, FabricConfig.DataStream dataStream) {
            this.plan = fabricPlan;
            this.plannerInstance = plannerInstance;
            this.useEvaluator = instance;
            this.queryParams = mapValue;
            this.ctx = fabricExecutionContext;
            this.lifecycle = statementLifecycle;
            this.prefetcher = new Prefetcher(dataStream);
            this.accessMode = accessMode;
            this.routingContext = routingContext;
        }

        StatementResult run() {
            List emptyList;
            Flux<Record> flux;
            this.notifications.addAll(CollectionConverters.asJava(this.plan.notifications()));
            this.lifecycle.startExecution(false);
            Fragment query = this.plan.query();
            if (this.plan.executionType() == FabricPlan.EXPLAIN() && this.plan.inFabricContext()) {
                this.lifecycle.endSuccess();
                return StatementResults.create(CollectionConverters.asJava(query.outputColumns()), Flux.empty(), Mono.just(new MergedSummary(Mono.just(this.plan.query().description()), this.statistics, this.notifications)), Mono.just(EffectiveQueryType.queryExecutionType(this.plan, this.accessMode)));
            }
            FragmentResult run = run(query, null);
            if (query.producesResults()) {
                emptyList = CollectionConverters.asJava(query.outputColumns());
                flux = run.records;
            } else {
                emptyList = Collections.emptyList();
                flux = run.records.then(Mono.empty()).flux();
            }
            Mono just = Mono.just(new MergedSummary(run.planDescription, this.statistics, this.notifications));
            FabricStatementLifecycles.StatementLifecycle statementLifecycle = this.lifecycle;
            Objects.requireNonNull(statementLifecycle);
            Flux doOnComplete = flux.doOnComplete(statementLifecycle::endSuccess);
            FabricStatementLifecycles.StatementLifecycle statementLifecycle2 = this.lifecycle;
            Objects.requireNonNull(statementLifecycle2);
            Flux doOnCancel = doOnComplete.doOnCancel(statementLifecycle2::endSuccess);
            FabricStatementLifecycles.StatementLifecycle statementLifecycle3 = this.lifecycle;
            Objects.requireNonNull(statementLifecycle3);
            return StatementResults.create(emptyList, doOnCancel.doOnError(statementLifecycle3::endFailure), just, run.executionType);
        }

        FragmentResult run(Fragment fragment, Record record) {
            if (fragment instanceof Fragment.Init) {
                return runInit();
            }
            if (fragment instanceof Fragment.Apply) {
                return runApply((Fragment.Apply) fragment, record);
            }
            if (fragment instanceof Fragment.Union) {
                return runUnion((Fragment.Union) fragment, record);
            }
            if (fragment instanceof Fragment.Exec) {
                return runExec((Fragment.Exec) fragment, record);
            }
            throw notImplemented("Invalid query fragment", fragment);
        }

        FragmentResult runInit() {
            return new FragmentResult(Flux.just(Records.empty()), Mono.empty(), Mono.empty());
        }

        FragmentResult runApply(Fragment.Apply apply, Record record) {
            return new FragmentResult(run(apply.input(), record).records.flatMap(apply.inner().outputColumns().isEmpty() ? record2 -> {
                return runAndProduceOnlyRecord(apply.inner(), record2);
            } : record3 -> {
                return runAndProduceJoinedResult(apply.inner(), record3);
            }, FabricExecutor.this.dataStreamConfig.getConcurrency(), 1), Mono.empty(), Mono.just(EffectiveQueryType.queryExecutionType(this.plan, this.accessMode)));
        }

        private Flux<Record> runAndProduceJoinedResult(Fragment fragment, Record record) {
            return run(fragment, record).records.map(record2 -> {
                return Records.join(record, record2);
            });
        }

        private Mono<Record> runAndProduceOnlyRecord(Fragment fragment, Record record) {
            return run(fragment, record).records.then(Mono.just(record));
        }

        FragmentResult runUnion(Fragment.Union union, Record record) {
            FragmentResult run = run(union.lhs(), record);
            FragmentResult run2 = run(union.rhs(), record);
            Flux merge = Flux.merge(new Publisher[]{run.records, run2.records});
            Mono<QueryExecutionType> mergeExecutionType = mergeExecutionType(run.executionType, run2.executionType);
            return union.distinct() ? new FragmentResult(merge.distinct(), Mono.empty(), mergeExecutionType) : new FragmentResult(merge, Mono.empty(), mergeExecutionType);
        }

        FragmentResult runExec(Fragment.Exec exec, Record record) {
            this.ctx.validateStatementType(exec.statementType());
            Map<String, AnyValue> argumentValues = argumentValues(exec, record);
            Catalog.Graph evalUse = evalUse(exec.use().graphSelection(), argumentValues);
            validateCanUseGraph(evalUse, this.ctx.getSessionDatabaseReference());
            TransactionMode transactionMode = getTransactionMode(exec.queryType(), evalUse.toString());
            Location orComputeLocation = this.ctx.getOrComputeLocation(evalUse, () -> {
                return FabricExecutor.this.catalogManager.locationOf(this.ctx.getSessionDatabaseReference(), evalUse, transactionMode.requiresWrite(), this.routingContext.isServerRoutingEnabled());
            });
            MapValue addParamsFromRecord = addParamsFromRecord(this.queryParams, argumentValues, CollectionConverters.asJava(exec.parameters()));
            if (!(orComputeLocation instanceof Location.Local)) {
                if (!(orComputeLocation instanceof Location.Remote)) {
                    throw notImplemented("Invalid graph location", orComputeLocation);
                }
                Location.Remote remote = (Location.Remote) orComputeLocation;
                FabricQuery.RemoteQuery asRemote = this.plannerInstance.asRemote(exec);
                return runRemoteQueryAt(remote, transactionMode, asRemote.query(), addParams(addParamsFromRecord, CollectionConverters.asJava(asRemote.extractedLiterals())));
            }
            Location.Local local = (Location.Local) orComputeLocation;
            FragmentResult run = run(exec.input(), record);
            if (!exec.executable()) {
                return run;
            }
            FabricQuery.LocalQuery asLocal = this.plannerInstance.asLocal(exec);
            FragmentResult runLocalQueryAt = runLocalQueryAt(local, transactionMode, asLocal.query(), addParamsFromRecord, this.plannerInstance.targetsComposite(exec), run.records);
            return new FragmentResult(runLocalQueryAt.records, runLocalQueryAt.planDescription, mergeExecutionType(run.executionType, runLocalQueryAt.executionType));
        }

        private void validateCanUseGraph(Catalog.Graph graph, DatabaseReference databaseReference) {
            Catalog.Graph resolveGraph = this.useEvaluator.resolveGraph(databaseReference.alias());
            if (!(resolveGraph instanceof Catalog.Composite)) {
                if (!this.useEvaluator.isDatabaseOrAliasInRoot(graph)) {
                    throw new InvalidSemanticsException(cantAccessCompositeConstituentsMessage(resolveGraph, graph));
                }
            } else if (!this.useEvaluator.isConstituentOrSelf(graph, resolveGraph) && !this.useEvaluator.isSystem(graph)) {
                throw new InvalidSemanticsException(cantAccessOutsideCompositeMessage(resolveGraph, graph));
            }
        }

        private String cantAccessOutsideCompositeMessage(Catalog.Graph graph, Catalog.Graph graph2) {
            return "When connected to a composite database, access is allowed only to its constituents. " + "Attempted to access '%s' while connected to '%s'".formatted(this.useEvaluator.qualifiedNameString(graph2), this.useEvaluator.qualifiedNameString(graph));
        }

        private String cantAccessCompositeConstituentsMessage(Catalog.Graph graph, Catalog.Graph graph2) {
            return "Accessing a composite database and its constituents is only allowed when connected to it. " + "Attempted to access '%s' while connected to '%s'".formatted(this.useEvaluator.qualifiedNameString(graph2), this.useEvaluator.qualifiedNameString(graph));
        }

        FragmentResult runLocalQueryAt(Location.Local local, TransactionMode transactionMode, FullyParsedQuery fullyParsedQuery, MapValue mapValue, boolean z, Flux<Record> flux) {
            StatementResult run = this.ctx.getLocal().run(local, transactionMode, this.lifecycle, fullyParsedQuery, mapValue, flux, (!this.plan.inFabricContext() || z) ? new ExecutionOptions() : new ExecutionOptions(local.getGraphId()));
            return new FragmentResult(run.records().doOnComplete(() -> {
                run.summary().subscribe(this::updateSummary);
            }), run.summary().map((v0) -> {
                return v0.executionPlanDescription();
            }).map(executionPlanDescription -> {
                return new TaggingPlanDescriptionWrapper(executionPlanDescription, local.getDatabaseName());
            }), run.executionType());
        }

        FragmentResult runRemoteQueryAt(Location.Remote remote, TransactionMode transactionMode, String str, MapValue mapValue) {
            ExecutionOptions executionOptions = this.plan.inFabricContext() ? new ExecutionOptions(remote.getGraphId()) : new ExecutionOptions();
            this.lifecycle.startExecution(true);
            Mono<StatementResult> run = this.ctx.getRemote().run(remote, executionOptions, str, transactionMode, mapValue);
            return new FragmentResult(this.prefetcher.addPrefetch(new CompletionDelegatingOperator(run.flatMapMany(statementResult -> {
                return statementResult.records().doOnComplete(() -> {
                    statementResult.summary().subscribe(this::updateSummary);
                });
            }), FabricExecutor.this.fabricWorkerExecutor)), run.flatMap((v0) -> {
                return v0.summary();
            }).map((v0) -> {
                return v0.executionPlanDescription();
            }).map(executionPlanDescription -> {
                return new RemoteExecutionPlanDescription(executionPlanDescription, remote);
            }), Mono.just(EffectiveQueryType.queryExecutionType(this.plan, this.accessMode)));
        }

        private Map<String, AnyValue> argumentValues(Fragment fragment, Record record) {
            return record == null ? Map.of() : Records.asMap(record, CollectionConverters.asJava(fragment.argumentColumns()));
        }

        private Catalog.Graph evalUse(GraphSelection graphSelection, Map<String, AnyValue> map) {
            return this.useEvaluator.evaluate(graphSelection, this.queryParams, map);
        }

        private MapValue addParamsFromRecord(MapValue mapValue, Map<String, AnyValue> map, Map<String, String> map2) {
            int size = mapValue.size() + map2.size();
            if (size == 0) {
                return VirtualValues.EMPTY_MAP;
            }
            MapValueBuilder mapValueBuilder = new MapValueBuilder(size);
            Objects.requireNonNull(mapValueBuilder);
            mapValue.foreach(mapValueBuilder::add);
            map2.forEach((str, str2) -> {
                mapValueBuilder.add(str2, validateValue((AnyValue) map.get(str)));
            });
            return mapValueBuilder.build();
        }

        private MapValue addParams(MapValue mapValue, Map<String, Object> map) {
            int size = mapValue.size() + map.size();
            if (size == 0 || map.size() == 0) {
                return mapValue;
            }
            MapValueBuilder mapValueBuilder = new MapValueBuilder(size);
            Objects.requireNonNull(mapValueBuilder);
            mapValue.foreach(mapValueBuilder::add);
            map.forEach((str, obj) -> {
                mapValueBuilder.add(str, Values.of(obj));
            });
            return mapValueBuilder.build();
        }

        private AnyValue validateValue(AnyValue anyValue) {
            if (anyValue instanceof VirtualNodeValue) {
                throw new FabricException((Status) Status.Statement.TypeError, "Importing node values in remote subqueries is currently not supported", new Object[0]);
            }
            if (anyValue instanceof VirtualRelationshipValue) {
                throw new FabricException((Status) Status.Statement.TypeError, "Importing relationship values in remote subqueries is currently not supported", new Object[0]);
            }
            if (anyValue instanceof PathValue) {
                throw new FabricException((Status) Status.Statement.TypeError, "Importing path values in remote subqueries is currently not supported", new Object[0]);
            }
            return anyValue;
        }

        private void updateSummary(Summary summary) {
            if (summary != null) {
                this.statistics.add(summary.getQueryStatistics());
                this.notifications.addAll(summary.getNotifications());
            }
        }

        private Mono<QueryExecutionType> mergeExecutionType(Mono<QueryExecutionType> mono, Mono<QueryExecutionType> mono2) {
            return Mono.zip(mono, mono2).map(tuple2 -> {
                return QueryTypes.merge((QueryExecutionType) tuple2.getT1(), (QueryExecutionType) tuple2.getT2());
            }).switchIfEmpty(mono).switchIfEmpty(mono2);
        }

        private RuntimeException notImplemented(String str, Object obj) {
            return notImplemented(str, obj.toString());
        }

        private RuntimeException notImplemented(String str, String str2) {
            return new InvalidSemanticsException(str + ": " + str2);
        }

        private TransactionMode getTransactionMode(QueryType queryType, String str) {
            AccessMode effectiveAccessMode = EffectiveQueryType.effectiveAccessMode(this.accessMode, this.plan.executionType(), queryType);
            if (this.accessMode == AccessMode.WRITE) {
                return effectiveAccessMode == AccessMode.WRITE ? TransactionMode.DEFINITELY_WRITE : TransactionMode.MAYBE_WRITE;
            }
            if (effectiveAccessMode == AccessMode.WRITE) {
                throw new FabricException((Status) Status.Statement.AccessMode, "Writing in read access mode not allowed. Attempted write to %s", str);
            }
            return TransactionMode.DEFINITELY_READ;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/neo4j/fabric/executor/FabricExecutor$FragmentResult.class */
    public static class FragmentResult {
        private final Flux<Record> records;
        private final Mono<ExecutionPlanDescription> planDescription;
        private final Mono<QueryExecutionType> executionType;

        FragmentResult(Flux<Record> flux, Mono<ExecutionPlanDescription> mono, Mono<QueryExecutionType> mono2) {
            this.records = flux;
            this.planDescription = mono;
            this.executionType = mono2;
        }
    }

    public FabricExecutor(FabricConfig fabricConfig, FabricPlanner fabricPlanner, UseEvaluation useEvaluation, CatalogManager catalogManager, InternalLogProvider internalLogProvider, FabricStatementLifecycles fabricStatementLifecycles, Executor executor) {
        this.dataStreamConfig = fabricConfig.getDataStream();
        this.planner = fabricPlanner;
        this.useEvaluation = useEvaluation;
        this.catalogManager = catalogManager;
        this.log = internalLogProvider.getLog(getClass());
        this.statementLifecycles = fabricStatementLifecycles;
        this.fabricWorkerExecutor = executor;
    }

    public StatementResult run(FabricTransaction fabricTransaction, String str, MapValue mapValue) {
        FabricStatementLifecycles.StatementLifecycle create = this.statementLifecycles.create(fabricTransaction.getTransactionInfo(), str, mapValue);
        create.startProcessing();
        fabricTransaction.setLastSubmittedStatement(create);
        try {
            String name = fabricTransaction.getTransactionInfo().getSessionDatabaseReference().alias().name();
            Catalog catalogSnapshot = fabricTransaction.getCatalogSnapshot();
            FabricPlanner.PlannerInstance instance = this.planner.instance(str, mapValue, name, catalogSnapshot, fabricTransaction.cancellationChecker());
            FabricPlan plan = instance.plan();
            Fragment query = plan.query();
            create.doneFabricProcessing(plan);
            AccessMode accessMode = fabricTransaction.getTransactionInfo().getAccessMode();
            RoutingContext routingContext = fabricTransaction.getTransactionInfo().getRoutingContext();
            if (plan.debugOptions().logPlan()) {
                this.log.debug(String.format("Fabric plan: %s", Fragment.pretty().asString(query)));
            }
            return new FabricExecutionStatementResultImpl(StatementResults.withErrorMapping(fabricTransaction.execute(fabricExecutionContext -> {
                UseEvaluation.Instance instance2 = this.useEvaluation.instance(str, catalogSnapshot);
                return (plan.debugOptions().logRecords() ? new FabricLoggingStatementExecution(plan, instance, instance2, mapValue, accessMode, routingContext, fabricExecutionContext, this.log, create, this.dataStreamConfig) : new FabricStatementExecution(plan, instance, instance2, mapValue, accessMode, routingContext, fabricExecutionContext, create, this.dataStreamConfig)).run();
            }), FabricSecondaryException.class, (v0) -> {
                return v0.getPrimaryException();
            }), th -> {
                rollbackOnFailure(fabricTransaction, th);
            });
        } catch (RuntimeException e) {
            create.endFailure(e);
            rollbackOnFailure(fabricTransaction, e);
            throw e;
        }
    }

    public long clearQueryCachesForDatabase(String str) {
        return this.planner.queryCache().clearByContext(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void rollbackOnFailure(FabricTransaction fabricTransaction, Throwable th) {
        try {
            fabricTransaction.rollback();
        } catch (Exception e) {
            if (e != th) {
                th.addSuppressed(e);
            }
        }
    }
}
