package ratpack.exec.internal;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.reflect.TypeToken;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.PlatformDependent;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ratpack.exec.Downstream;
import ratpack.exec.ExecController;
import ratpack.exec.ExecInitializer;
import ratpack.exec.ExecInterceptor;
import ratpack.exec.Execution;
import ratpack.exec.ExecutionException;
import ratpack.exec.OverlappingExecutionException;
import ratpack.exec.UnmanagedThreadException;
import ratpack.exec.Upstream;
import ratpack.func.Action;
import ratpack.func.Block;
import ratpack.registry.MutableRegistry;
import ratpack.registry.NotInRegistryException;
import ratpack.registry.RegistrySpec;
import ratpack.registry.internal.SimpleMutableRegistry;
import ratpack.stream.TransformablePublisher;

/* loaded from: input_file:ratpack/exec/internal/DefaultExecution.class */
public class DefaultExecution implements Execution {
    public static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) Execution.class);
    public static final FastThreadLocal<DefaultExecution> THREAD_BINDING = new FastThreadLocal<>();
    private ExecStream execStream;
    private final ExecControllerInternal controller;
    private final EventLoop eventLoop;
    private final Action<? super Throwable> onError;
    private final Action<? super Execution> onComplete;
    private List<AutoCloseable> closeables;
    private final MutableRegistry registry = new SimpleMutableRegistry();
    private List<ExecInterceptor> adhocInterceptors;
    private Iterable<? extends ExecInterceptor> interceptors;

    /* loaded from: input_file:ratpack/exec/internal/DefaultExecution$ExecStream.class */
    public static abstract class ExecStream {
        abstract boolean exec() throws Exception;

        abstract void enqueue(Block block);

        abstract void error(Throwable th);
    }

    /* loaded from: input_file:ratpack/exec/internal/DefaultExecution$InitialExecStream.class */
    private class InitialExecStream extends ExecStream {
        Action<? super Execution> initial;
        Queue<Block> segments;

        public InitialExecStream(Action<? super Execution> action) {
            this.initial = action;
        }

        @Override // ratpack.exec.internal.DefaultExecution.ExecStream
        boolean exec() throws Exception {
            if (this.initial != null) {
                this.initial.execute(DefaultExecution.this);
                this.initial = null;
                return true;
            }
            if (this.segments == null) {
                DefaultExecution.this.execStream = TerminalExecStream.INSTANCE;
                return false;
            }
            Block poll = this.segments.poll();
            if (poll != null) {
                poll.execute();
                return true;
            }
            DefaultExecution.this.execStream = TerminalExecStream.INSTANCE;
            return false;
        }

        @Override // ratpack.exec.internal.DefaultExecution.ExecStream
        void enqueue(Block block) {
            if (this.segments == null) {
                this.segments = new ArrayDeque(1);
            }
            this.segments.add(block);
        }

        @Override // ratpack.exec.internal.DefaultExecution.ExecStream
        void error(Throwable th) {
            this.initial = null;
            if (this.segments != null) {
                this.segments.clear();
            }
            try {
                DefaultExecution.this.onError.execute(th);
            } catch (Throwable th2) {
                DefaultExecution.LOGGER.error("error handler " + DefaultExecution.this.onError + " threw error (this execution will terminate):", th2);
                DefaultExecution.this.execStream = TerminalExecStream.INSTANCE;
            }
        }
    }

    /* loaded from: input_file:ratpack/exec/internal/DefaultExecution$MultiEventExecStream.class */
    private class MultiEventExecStream extends ExecStream implements ContinuationStream {
        final ExecStream parent;
        private final Action<? super Throwable> onError;
        final Queue<Queue<Block>> events = PlatformDependent.newMpscQueue();
        Block complete;

        public MultiEventExecStream(ExecStream execStream, Action<? super Throwable> action, Action<? super ContinuationStream> action2) {
            this.parent = execStream;
            this.onError = action;
            event(() -> {
                action2.execute(this);
            });
        }

        @Override // ratpack.exec.internal.ContinuationStream
        public void event(Block block) {
            ArrayDeque arrayDeque = new ArrayDeque();
            arrayDeque.add(block);
            this.events.add(arrayDeque);
            DefaultExecution.this.drain();
        }

        @Override // ratpack.exec.internal.ContinuationStream
        public void complete(Block block) {
            this.complete = block;
            DefaultExecution.this.drain();
        }

        @Override // ratpack.exec.internal.DefaultExecution.ExecStream
        boolean exec() throws Exception {
            Block poll = this.events.peek().poll();
            if (poll != null) {
                poll.execute();
                return true;
            }
            if (this.events.size() != 1) {
                this.events.poll();
                return exec();
            }
            if (this.complete == null) {
                return false;
            }
            DefaultExecution.this.execStream = this.parent;
            this.complete.execute();
            return true;
        }

        @Override // ratpack.exec.internal.DefaultExecution.ExecStream
        void enqueue(Block block) {
            this.events.peek().add(block);
        }

        @Override // ratpack.exec.internal.DefaultExecution.ExecStream
        void error(Throwable th) {
            DefaultExecution.this.execStream = this.parent;
            try {
                this.onError.execute(th);
            } catch (Exception e) {
                DefaultExecution.this.execStream.error(e);
            }
        }
    }

    /* loaded from: input_file:ratpack/exec/internal/DefaultExecution$SingleEventExecStream.class */
    private class SingleEventExecStream extends ExecStream implements Continuation {
        final ExecStream parent;
        private final Action<? super Throwable> onError;
        Action<? super Continuation> initial;
        Block resume;
        boolean resumed;
        Queue<Block> segments;

        public SingleEventExecStream(ExecStream execStream, Action<? super Throwable> action, Action<? super Continuation> action2) {
            this.parent = execStream;
            this.onError = action;
            this.initial = action2;
        }

        @Override // ratpack.exec.internal.DefaultExecution.ExecStream
        boolean exec() throws Exception {
            if (this.initial != null) {
                this.initial.execute(this);
                this.initial = null;
                return true;
            }
            if (this.segments != null && !this.segments.isEmpty()) {
                Block poll = this.segments.poll();
                if (poll != null) {
                    poll.execute();
                    return true;
                }
                DefaultExecution.this.execStream = this.parent;
                return DefaultExecution.this.execStream.exec();
            }
            if (this.resume != null) {
                this.resume.execute();
                this.resume = null;
                return true;
            }
            if (!this.resumed) {
                return false;
            }
            DefaultExecution.this.execStream = this.parent;
            return DefaultExecution.this.execStream.exec();
        }

        @Override // ratpack.exec.internal.DefaultExecution.ExecStream
        void enqueue(Block block) {
            if (this.segments == null) {
                this.segments = new ArrayDeque(1);
            }
            this.segments.add(block);
        }

        @Override // ratpack.exec.internal.Continuation
        public void resume(Block block) {
            this.resumed = true;
            this.resume = block;
            DefaultExecution.this.drain();
        }

        @Override // ratpack.exec.internal.DefaultExecution.ExecStream
        void error(Throwable th) {
            DefaultExecution.this.execStream = this.parent;
            if (this.resumed && this.resume == null) {
                DefaultExecution.this.execStream.error(th);
                return;
            }
            try {
                this.onError.execute(th);
            } catch (Throwable th2) {
                DefaultExecution.this.execStream.error(th2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ratpack/exec/internal/DefaultExecution$TerminalExecStream.class */
    public static class TerminalExecStream extends ExecStream {
        static final ExecStream INSTANCE = new TerminalExecStream();

        private TerminalExecStream() {
        }

        @Override // ratpack.exec.internal.DefaultExecution.ExecStream
        boolean exec() {
            return false;
        }

        @Override // ratpack.exec.internal.DefaultExecution.ExecStream
        void enqueue(Block block) {
            throw new ExecutionException("this execution has completed (you may be trying to use a promise in a cleanup method)");
        }

        @Override // ratpack.exec.internal.DefaultExecution.ExecStream
        void error(Throwable th) {
            throw new ExecutionException("this execution has completed (you may be trying to use a promise in a cleanup method)");
        }
    }

    public DefaultExecution(ExecControllerInternal execControllerInternal, EventLoop eventLoop, Action<? super RegistrySpec> action, Action<? super Execution> action2, Action<? super Throwable> action3, Action<? super Execution> action4, Action<? super Execution> action5) throws Exception {
        this.controller = execControllerInternal;
        this.eventLoop = eventLoop;
        this.onError = action3;
        this.onComplete = action5;
        action.execute(this.registry);
        action4.execute(this);
        this.execStream = new InitialExecStream(action2);
        this.interceptors = Iterables.concat(execControllerInternal.getInterceptors(), ImmutableList.copyOf(this.registry.getAll(ExecInterceptor.class)));
        UnmodifiableIterator<? extends ExecInitializer> it = execControllerInternal.getInitializers().iterator();
        while (it.hasNext()) {
            it.next().init(this);
        }
        Iterator it2 = this.registry.getAll(ExecInitializer.class).iterator();
        while (it2.hasNext()) {
            ((ExecInitializer) it2.next()).init(this);
        }
        drain();
    }

    public static DefaultExecution get() throws UnmanagedThreadException {
        return THREAD_BINDING.get();
    }

    public static DefaultExecution require() throws UnmanagedThreadException {
        DefaultExecution defaultExecution = get();
        if (defaultExecution == null) {
            throw new UnmanagedThreadException();
        }
        return defaultExecution;
    }

    public static <T> TransformablePublisher<T> stream(Publisher<T> publisher) {
        return subscriber -> {
            DefaultExecution require = require();
            subscriber.getClass();
            require.delimitStream(subscriber::onError, continuationStream -> {
                publisher.subscribe(new Subscriber<T>() { // from class: ratpack.exec.internal.DefaultExecution.1
                    @Override // org.reactivestreams.Subscriber
                    public void onSubscribe(Subscription subscription) {
                        ContinuationStream continuationStream = ContinuationStream.this;
                        Subscriber subscriber = subscriber;
                        continuationStream.event(() -> {
                            subscriber.onSubscribe(subscription);
                        });
                    }

                    @Override // org.reactivestreams.Subscriber
                    public void onNext(T t) {
                        ContinuationStream continuationStream = ContinuationStream.this;
                        Subscriber subscriber = subscriber;
                        continuationStream.event(() -> {
                            subscriber.onNext(t);
                        });
                    }

                    @Override // org.reactivestreams.Subscriber
                    public void onComplete() {
                        ContinuationStream continuationStream = ContinuationStream.this;
                        Subscriber subscriber = subscriber;
                        subscriber.getClass();
                        continuationStream.complete(subscriber::onComplete);
                    }

                    @Override // org.reactivestreams.Subscriber
                    public void onError(Throwable th) {
                        ContinuationStream continuationStream = ContinuationStream.this;
                        Subscriber subscriber = subscriber;
                        continuationStream.complete(() -> {
                            subscriber.onError(th);
                        });
                    }
                });
            });
        };
    }

    public static <T> Upstream<T> upstream(Upstream<T> upstream) {
        return downstream -> {
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            DefaultExecution require = require();
            downstream.getClass();
            require.delimit(downstream::error, continuation -> {
                try {
                    upstream.connect(new Downstream<T>() { // from class: ratpack.exec.internal.DefaultExecution.2
                        @Override // ratpack.exec.Downstream
                        public void error(Throwable th) {
                            if (!atomicBoolean.compareAndSet(false, true)) {
                                DefaultExecution.LOGGER.error("", (Throwable) new OverlappingExecutionException("promise already fulfilled", th));
                                return;
                            }
                            Continuation continuation = continuation;
                            Downstream downstream = downstream;
                            continuation.resume(() -> {
                                downstream.error(th);
                            });
                        }

                        @Override // ratpack.exec.Downstream
                        public void success(T t) {
                            if (!atomicBoolean.compareAndSet(false, true)) {
                                DefaultExecution.LOGGER.error("", (Throwable) new OverlappingExecutionException("promise already fulfilled"));
                                return;
                            }
                            Continuation continuation = continuation;
                            Downstream downstream = downstream;
                            continuation.resume(() -> {
                                downstream.success(t);
                            });
                        }

                        @Override // ratpack.exec.Downstream
                        public void complete() {
                            if (!atomicBoolean.compareAndSet(false, true)) {
                                DefaultExecution.LOGGER.error("", (Throwable) new OverlappingExecutionException("promise already fulfilled"));
                                return;
                            }
                            Continuation continuation = continuation;
                            Downstream downstream = downstream;
                            downstream.getClass();
                            continuation.resume(downstream::complete);
                        }
                    });
                } catch (Throwable th) {
                    if (atomicBoolean.compareAndSet(false, true)) {
                        continuation.resume(() -> {
                            downstream.error(th);
                        });
                    } else {
                        LOGGER.error("", (Throwable) new OverlappingExecutionException("promise already fulfilled", th));
                    }
                }
            });
        };
    }

    @Override // ratpack.exec.Execution
    public EventLoop getEventLoop() {
        return this.eventLoop;
    }

    public void delimit(Action<? super Throwable> action, Action<? super Continuation> action2) {
        this.execStream.enqueue(() -> {
            this.execStream = new SingleEventExecStream(this.execStream, action, action2);
        });
        drain();
    }

    public void delimitStream(Action<? super Throwable> action, Action<? super ContinuationStream> action2) {
        this.execStream.enqueue(() -> {
            this.execStream = new MultiEventExecStream(this.execStream, action, action2);
        });
        drain();
    }

    public void eventLoopDrain() {
        this.eventLoop.execute(this::drain);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void drain() {
        DefaultExecution defaultExecution;
        if (this.execStream == TerminalExecStream.INSTANCE || this == (defaultExecution = THREAD_BINDING.get())) {
            return;
        }
        if (!this.eventLoop.inEventLoop() || defaultExecution != null) {
            eventLoopDrain();
            return;
        }
        try {
            try {
                THREAD_BINDING.set(this);
                intercept(this.interceptors.iterator());
                THREAD_BINDING.remove();
            } catch (Throwable th) {
                interceptorError(th);
                THREAD_BINDING.remove();
            }
        } catch (Throwable th2) {
            THREAD_BINDING.remove();
            throw th2;
        }
    }

    public static void interceptorError(Throwable th) {
        LOGGER.warn("exception was thrown by an execution interceptor (which will be ignored):", th);
    }

    public Iterable<? extends ExecInterceptor> getAllInterceptors() {
        return this.interceptors;
    }

    private void intercept(Iterator<? extends ExecInterceptor> it) throws Exception {
        if (it.hasNext()) {
            it.next().intercept(this, ExecInterceptor.ExecType.COMPUTE, () -> {
                intercept(it);
            });
        } else {
            exec();
        }
    }

    private void exec() {
        while (this.execStream.exec()) {
            try {
            } catch (Throwable th) {
                this.execStream.error(th);
            }
        }
        if (this.execStream == TerminalExecStream.INSTANCE) {
            try {
                this.onComplete.execute(this);
            } catch (Throwable th2) {
                LOGGER.warn("exception raised during onComplete action", th2);
            }
            if (this.closeables != null) {
                for (AutoCloseable autoCloseable : this.closeables) {
                    try {
                        autoCloseable.close();
                    } catch (Throwable th3) {
                        LOGGER.warn("exception raised by execution closeable " + autoCloseable, th3);
                    }
                }
            }
        }
    }

    @Override // ratpack.exec.Execution
    public ExecController getController() {
        return this.controller;
    }

    @Override // ratpack.exec.Execution
    public void onComplete(AutoCloseable autoCloseable) {
        if (this.closeables == null) {
            this.closeables = Lists.newArrayList();
        }
        this.closeables.add(autoCloseable);
    }

    @Override // ratpack.exec.Execution, ratpack.registry.RegistrySpec
    public <O> Execution addLazy(TypeToken<O> typeToken, Supplier<? extends O> supplier) {
        this.registry.addLazy(typeToken, supplier);
        return this;
    }

    @Override // ratpack.exec.Execution
    public void addInterceptor(ExecInterceptor execInterceptor, Block block) throws Exception {
        if (this.adhocInterceptors == null) {
            this.adhocInterceptors = Lists.newArrayList();
            this.interceptors = Iterables.concat(this.interceptors, this.adhocInterceptors);
        }
        this.adhocInterceptors.add(execInterceptor);
        execInterceptor.intercept(this, ExecInterceptor.ExecType.COMPUTE, block);
    }

    @Override // ratpack.registry.MutableRegistry
    public <T> void remove(TypeToken<T> typeToken) throws NotInRegistryException {
        this.registry.remove(typeToken);
    }

    @Override // ratpack.registry.Registry
    public <O> Optional<O> maybeGet(TypeToken<O> typeToken) {
        return this.registry.maybeGet(typeToken);
    }

    @Override // ratpack.registry.Registry
    public <O> Iterable<? extends O> getAll(TypeToken<O> typeToken) {
        return this.registry.getAll(typeToken);
    }
}
