package io.nosqlbench.engine.core.lifecycle.activity;

import io.nosqlbench.api.annotations.Annotation;
import io.nosqlbench.api.annotations.Layer;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.engine.activityimpl.ParameterMap;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.core.ActivityController;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.core.Motor;
import io.nosqlbench.engine.api.activityapi.core.RunState;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressCapable;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay;
import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally;
import io.nosqlbench.engine.core.annotation.Annotators;
import io.nosqlbench.engine.core.lifecycle.ExecutionResult;
import io.nosqlbench.engine.core.lifecycle.IndexedThreadFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutor.class */
public class ActivityExecutor implements ActivityController, ParameterMap.Listener, ProgressCapable, Callable<ExecutionResult> {
    private static final Logger logger = LogManager.getLogger(ActivityExecutor.class);
    private static final Logger activitylogger = LogManager.getLogger("ACTIVITY");
    private final Activity activity;
    private final ActivityDef activityDef;
    private final RunStateTally tally;
    private ExecutorService executorService;
    private Exception exception;
    private String sessionId;
    private final List<Motor<?>> motors = new ArrayList();
    private long startedAt = 0;
    private long stoppedAt = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.nosqlbench.engine.core.lifecycle.activity.ActivityExecutor$1, reason: invalid class name */
    /* loaded from: input_file:io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutor$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$nosqlbench$engine$api$activityapi$core$RunState = new int[RunState.values().length];

        static {
            try {
                $SwitchMap$io$nosqlbench$engine$api$activityapi$core$RunState[RunState.Uninitialized.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$nosqlbench$engine$api$activityapi$core$RunState[RunState.Running.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$nosqlbench$engine$api$activityapi$core$RunState[RunState.Starting.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$nosqlbench$engine$api$activityapi$core$RunState[RunState.Stopped.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$io$nosqlbench$engine$api$activityapi$core$RunState[RunState.Finished.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$io$nosqlbench$engine$api$activityapi$core$RunState[RunState.Stopping.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    public ActivityExecutor(Activity activity, String str) {
        this.sessionId = "";
        this.activity = activity;
        this.activityDef = activity.getActivityDef();
        activity.getActivityDef().getParams().addListener(this);
        activity.setActivityController(this);
        this.sessionId = str;
        this.tally = activity.getRunStateTally();
    }

    public void stopActivity() {
        logger.info(() -> {
            return "stopping activity in progress: " + getActivityDef().getAlias();
        });
        this.activity.setRunState(RunState.Stopping);
        this.motors.forEach((v0) -> {
            v0.requestStop();
        });
        this.tally.awaitNoneOther(new RunState[]{RunState.Stopped, RunState.Finished});
        shutdownExecutorService(Integer.MAX_VALUE);
        this.tally.awaitNoneOther(new RunState[]{RunState.Stopped, RunState.Finished});
        this.activity.setRunState(RunState.Stopped);
        logger.info(() -> {
            return "stopped: " + getActivityDef().getAlias() + " with " + this.motors.size() + " slots";
        });
        Annotators.recordAnnotation(Annotation.newBuilder().session(this.sessionId).interval(this.startedAt, this.stoppedAt).layer(Layer.Activity).label("alias", getActivityDef().getAlias()).label("driver", getActivityDef().getActivityType()).label("workload", (String) getActivityDef().getParams().getOptionalString(new String[]{"workload"}).orElse("none")).detail("params", getActivityDef().toString()).build());
    }

    public void forceStopActivity() {
        logger.info(() -> {
            return "force stopping activity in progress: " + getActivityDef().getAlias();
        });
        this.activity.setRunState(RunState.Stopping);
        this.motors.forEach((v0) -> {
            v0.requestStop();
        });
        shutdownExecutorService(Integer.MAX_VALUE);
        this.tally.awaitNoneOther(new RunState[]{RunState.Stopped, RunState.Finished});
        this.activity.setRunState(RunState.Stopped);
        logger.info(() -> {
            return "stopped: " + getActivityDef().getAlias() + " with " + this.motors.size() + " slots";
        });
        Annotators.recordAnnotation(Annotation.newBuilder().session(this.sessionId).interval(this.startedAt, this.stoppedAt).layer(Layer.Activity).label("alias", getActivityDef().getAlias()).label("driver", getActivityDef().getActivityType()).label("workload", (String) getActivityDef().getParams().getOptionalString(new String[]{"workload"}).orElse("none")).detail("params", getActivityDef().toString()).build());
    }

    public Exception forceStopActivity(int i) {
        activitylogger.debug("FORCE STOP/before alias=(" + this.activity.getAlias() + ")");
        this.activity.setRunState(RunState.Stopped);
        this.executorService.shutdown();
        requestStopMotors();
        int i2 = i / 100;
        long currentTimeMillis = System.currentTimeMillis();
        long j = i + currentTimeMillis;
        long j2 = currentTimeMillis;
        while (j2 < j && !this.executorService.isTerminated()) {
            try {
                Thread.sleep(i2);
                j2 = System.currentTimeMillis();
            } catch (InterruptedException e) {
            }
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        logger.debug("took " + (currentTimeMillis2 - currentTimeMillis) + " ms to shutdown gracefully");
        if (!this.executorService.isTerminated()) {
            logger.info(() -> {
                return "stopping activity forcibly " + this.activity.getAlias();
            });
            List<Runnable> shutdownNow = this.executorService.shutdownNow();
            long currentTimeMillis3 = System.currentTimeMillis();
            logger.debug(() -> {
                return "took " + (currentTimeMillis3 - currentTimeMillis2) + " ms to shutdown forcibly";
            });
            logger.debug(() -> {
                return shutdownNow.size() + " tasks never started.";
            });
        }
        long currentTimeMillis4 = System.currentTimeMillis();
        logger.debug("invoking activity-specific shutdown hooks");
        this.activity.shutdownActivity();
        this.activity.closeAutoCloseables();
        logger.debug("took " + (System.currentTimeMillis() - currentTimeMillis4) + " ms to shutdown activity threads");
        activitylogger.debug("FORCE STOP/after alias=(" + this.activity.getAlias() + ")");
        if (this.exception != null) {
            activitylogger.debug("FORCE STOP/exception alias=(" + this.activity.getAlias() + ")");
        }
        return this.exception;
    }

    public synchronized void forceStopScenarioAndThrow(int i, boolean z) {
        Exception forceStopActivity = forceStopActivity(i);
        if (forceStopActivity != null && z) {
            throw new RuntimeException(forceStopActivity);
        }
    }

    public void handleParameterMapUpdate(ParameterMap parameterMap) {
        this.activity.onActivityDefUpdate(this.activityDef);
        if (this.activity.getRunState() != RunState.Uninitialized) {
            if (this.activity.getRunState() == RunState.Running) {
                adjustMotorCountToThreadParam(this.activity.getActivityDef());
            }
            this.motors.stream().filter(motor -> {
                return motor instanceof ActivityDefObserver;
            }).forEach(motor2 -> {
                ((ActivityDefObserver) motor2).onActivityDefUpdate(this.activityDef);
            });
        }
    }

    public ActivityDef getActivityDef() {
        return this.activityDef;
    }

    public String toString() {
        return getClass().getSimpleName() + "~" + this.activityDef.getAlias();
    }

    private String getSlotStatus() {
        return (String) this.motors.stream().map(motor -> {
            return motor.getState().get().getCode();
        }).collect(Collectors.joining(",", "[", "]"));
    }

    private void adjustMotorCountToThreadParam(ActivityDef activityDef) {
        logger.trace(() -> {
            return ">-pre-adjust->" + getSlotStatus();
        });
        reduceActiveMotorCountDownToThreadParam(activityDef);
        increaseActiveMotorCountUpToThreadParam(activityDef);
        alignMotorStateToIntendedActivityState();
        awaitAlignmentOfMotorStateToActivityState();
        logger.trace(() -> {
            return ">post-adjust->" + getSlotStatus();
        });
    }

    private void increaseActiveMotorCountUpToThreadParam(ActivityDef activityDef) {
        while (this.motors.size() < activityDef.getThreads()) {
            Motor<?> motor = this.activity.getMotorDispenserDelegate().getMotor(activityDef, this.motors.size());
            logger.trace(() -> {
                return "Starting cycle motor thread:" + motor;
            });
            this.motors.add(motor);
        }
    }

    private void reduceActiveMotorCountDownToThreadParam(ActivityDef activityDef) {
        while (this.motors.size() > activityDef.getThreads()) {
            Motor<?> motor = this.motors.get(this.motors.size() - 1);
            logger.trace(() -> {
                return "Stopping cycle motor thread:" + motor;
            });
            motor.requestStop();
            motor.removeState();
            this.motors.remove(this.motors.size() - 1);
        }
    }

    private void alignMotorStateToIntendedActivityState() {
        RunState runState = this.activity.getRunState();
        logger.trace(() -> {
            return "ADJUSTING to INTENDED " + runState;
        });
        switch (AnonymousClass1.$SwitchMap$io$nosqlbench$engine$api$activityapi$core$RunState[runState.ordinal()]) {
            case 1:
                return;
            case 2:
            case 3:
                this.motors.stream().filter(motor -> {
                    return motor.getState().get() != RunState.Running;
                }).filter(motor2 -> {
                    return motor2.getState().get() != RunState.Finished;
                }).filter(motor3 -> {
                    return motor3.getState().get() != RunState.Starting;
                }).forEach(motor4 -> {
                    this.executorService.execute(motor4);
                });
                return;
            case 4:
                this.motors.stream().filter(motor5 -> {
                    return motor5.getState().get() != RunState.Stopped;
                }).forEach((v0) -> {
                    v0.requestStop();
                });
                return;
            case 5:
            case 6:
                throw new RuntimeException("Invalid requested state in activity executor:" + this.activity.getRunState());
            default:
                throw new RuntimeException("Unmatched run state:" + this.activity.getRunState());
        }
    }

    private void awaitAlignmentOfMotorStateToActivityState() {
        logger.debug(() -> {
            return "awaiting state alignment from " + this.activity.getRunState();
        });
        switch (AnonymousClass1.$SwitchMap$io$nosqlbench$engine$api$activityapi$core$RunState[this.activity.getRunState().ordinal()]) {
            case 1:
                break;
            case 2:
            case 3:
                this.tally.awaitNoneOther(new RunState[]{RunState.Running, RunState.Finished});
                break;
            case 4:
                this.tally.awaitNoneOther(new RunState[]{RunState.Stopped, RunState.Finished});
                break;
            case 5:
                this.tally.awaitNoneOther(new RunState[]{RunState.Finished});
                break;
            case 6:
                throw new RuntimeException("Invalid requested state in activity executor:" + this.activity.getRunState());
            default:
                throw new RuntimeException("Unmatched run state:" + this.activity.getRunState());
        }
        logger.debug("activity and threads are aligned to state " + this.activity.getRunState() + " for " + getActivity().getAlias());
    }

    private void requestStopMotors() {
        logger.info(() -> {
            return "stopping activity " + this.activity;
        });
        this.activity.setRunState(RunState.Stopping);
        this.motors.forEach((v0) -> {
            v0.requestStop();
        });
    }

    public boolean isRunning() {
        return this.motors.stream().anyMatch(motor -> {
            return motor.getState().get() == RunState.Running;
        });
    }

    public Activity getActivity() {
        return this.activity;
    }

    public void notifyException(Thread thread, Throwable th) {
        logger.debug(() -> {
            return "Uncaught exception in activity thread forwarded to activity executor: " + th.getMessage();
        });
        this.exception = new RuntimeException("Error in activity thread " + thread.getName(), th);
        requestStopMotors();
    }

    public synchronized void stopActivityWithReasonAsync(String str) {
        logger.info(() -> {
            return "Stopping activity " + this.activityDef.getAlias() + ": " + str;
        });
        this.exception = new RuntimeException("Stopping activity " + this.activityDef.getAlias() + ": " + str);
        logger.error("stopping with reason: " + this.exception);
        requestStopMotors();
    }

    public synchronized void stopActivityWithErrorAsync(Throwable th) {
        if (this.exception == null) {
            this.exception = new RuntimeException(th);
            logger.error("stopping on error: " + th.toString(), th);
        } else if (((Boolean) this.activityDef.getParams().getOptionalBoolean("fullerrors").orElse(false)).booleanValue()) {
            logger.error("additional error: " + th.toString(), th);
        } else {
            logger.warn("summarized error (fullerrors=false): " + th.toString());
        }
        requestStopMotors();
    }

    public ProgressMeterDisplay getProgressMeter() {
        return this.activity.getProgressMeter();
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public ExecutionResult call() throws Exception {
        try {
            this.activity.initActivity();
            awaitMotorsAtLeastRunning();
            awaitActivityCompletion();
            this.activity.shutdownActivity();
            this.activity.closeAutoCloseables();
        } catch (Exception e) {
            this.exception = e;
        }
        return new ExecutionResult(this.startedAt, this.stoppedAt, "", this.exception);
    }

    private void awaitMotorsAtLeastRunning() {
        RunState maxState = this.tally.awaitAny(new RunState[]{RunState.Running, RunState.Stopped, RunState.Finished, RunState.Errored}).getMaxState();
        if (maxState == RunState.Errored) {
            this.activity.setRunState(maxState);
            throw new RuntimeException("Error in activity");
        }
    }

    public void startActivity() {
        startMotorExecutorService();
        startRunningActivityThreads();
        awaitMotorsAtLeastRunning();
    }

    private boolean shutdownExecutorService(int i) {
        activitylogger.debug(() -> {
            return "Shutting down motor executor for (" + this.activity.getAlias() + ")";
        });
        boolean z = false;
        try {
            try {
                try {
                    this.executorService.shutdown();
                    logger.trace(() -> {
                        return "awaiting termination with timeout of " + i + " seconds";
                    });
                    z = this.executorService.awaitTermination(i, TimeUnit.SECONDS);
                    logger.trace(() -> {
                        return "finally shutting down activity " + getActivity().getAlias();
                    });
                    this.stoppedAt = System.currentTimeMillis();
                    this.activity.setRunState(RunState.Stopped);
                } catch (InterruptedException e) {
                    logger.trace("interrupted while awaiting termination");
                    logger.warn("while waiting termination of shutdown " + this.activity.getAlias() + ", " + e.getMessage());
                    activitylogger.debug("REQUEST STOP/exception alias=(" + this.activity.getAlias() + ") wasstopped=" + z);
                    logger.trace(() -> {
                        return "finally shutting down activity " + getActivity().getAlias();
                    });
                    this.stoppedAt = System.currentTimeMillis();
                    this.activity.setRunState(RunState.Stopped);
                }
            } catch (RuntimeException e2) {
                logger.trace("Received exception while awaiting termination: " + e2.getMessage());
                z = true;
                this.exception = e2;
                logger.trace(() -> {
                    return "finally shutting down activity " + getActivity().getAlias();
                });
                this.stoppedAt = System.currentTimeMillis();
                this.activity.setRunState(RunState.Stopped);
            }
            if (this.exception == null) {
                activitylogger.debug("motor executor for " + this.activity.getAlias() + ") wasstopped=" + z);
                return z;
            }
            logger.trace(() -> {
                return "an exception caused the activity to stop:" + this.exception.getMessage();
            });
            logger.warn("Setting ERROR on motor executor for activity '" + this.activity.getAlias() + "': " + this.exception.getMessage());
            throw new RuntimeException(this.exception);
        } catch (Throwable th) {
            logger.trace(() -> {
                return "finally shutting down activity " + getActivity().getAlias();
            });
            this.stoppedAt = System.currentTimeMillis();
            this.activity.setRunState(RunState.Stopped);
            throw th;
        }
    }

    private void awaitActivityCompletion() {
        RunState maxState = this.tally.awaitNoneOther(new RunState[]{RunState.Stopped, RunState.Finished, RunState.Errored}).getMaxState();
        this.activity.setRunState(maxState);
        if (maxState == RunState.Errored) {
            throw new RuntimeException("Error while waiting for activity completion:" + this.exception);
        }
    }

    private void startMotorExecutorService() {
        this.executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS, new SynchronousQueue(), new IndexedThreadFactory(this.activity.getAlias(), new ActivityExceptionHandler(this)));
    }

    private void startRunningActivityThreads() {
        logger.info(() -> {
            return "starting activity " + this.activity.getAlias() + " for cycles " + this.activity.getCycleSummary();
        });
        Annotators.recordAnnotation(Annotation.newBuilder().session(this.sessionId).now().layer(Layer.Activity).label("alias", getActivityDef().getAlias()).label("driver", getActivityDef().getActivityType()).label("workload", (String) getActivityDef().getParams().getOptionalString(new String[]{"workload"}).orElse("none")).detail("params", getActivityDef().toString()).build());
        activitylogger.debug("START/before alias=(" + this.activity.getAlias() + ")");
        try {
            this.activity.setRunState(RunState.Starting);
            this.startedAt = System.currentTimeMillis();
            this.activity.onActivityDefUpdate(this.activityDef);
            adjustMotorCountToThreadParam(this.activity.getActivityDef());
            this.tally.awaitAny(new RunState[]{RunState.Running, RunState.Finished, RunState.Stopped});
            this.activity.setRunState(RunState.Running);
            activitylogger.debug("START/after alias=(" + this.activity.getAlias() + ")");
        } catch (Exception e) {
            this.exception = new RuntimeException("Error initializing activity '" + this.activity.getAlias() + "':\n" + e.getMessage(), e);
            activitylogger.error(() -> {
                return "error initializing activity '" + this.activity.getAlias() + "': " + this.exception;
            });
            throw new RuntimeException(this.exception);
        }
    }
}
