package org.apache.kafka.connect.runtime.isolation;

import java.lang.Thread;
import java.lang.management.LockInfo;
import java.lang.management.ManagementFactory;
import java.lang.management.MonitorInfo;
import java.lang.management.ThreadInfo;
import java.net.URL;
import java.security.AccessController;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.isolation.TestPlugins;
import org.apache.kafka.connect.storage.Converter;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/runtime/isolation/SynchronizationTest.class */
public class SynchronizationTest {
    public static final Logger log = LoggerFactory.getLogger(SynchronizationTest.class);
    private String threadPrefix;
    private Plugins plugins;
    private ThreadPoolExecutor exec;
    private Breakpoint<String> dclBreakpoint;
    private Breakpoint<String> pclBreakpoint;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kafka.connect.runtime.isolation.SynchronizationTest$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/isolation/SynchronizationTest$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$java$lang$Thread$State = new int[Thread.State.values().length];

        static {
            try {
                $SwitchMap$java$lang$Thread$State[Thread.State.BLOCKED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$java$lang$Thread$State[Thread.State.WAITING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$java$lang$Thread$State[Thread.State.TIMED_WAITING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/isolation/SynchronizationTest$Breakpoint.class */
    public static class Breakpoint<T> {
        private Predicate<T> predicate;
        private CyclicBarrier barrier;

        private Breakpoint() {
        }

        public synchronized void clear() {
            if (this.barrier != null) {
                this.barrier.reset();
            }
            this.predicate = null;
            this.barrier = null;
        }

        public synchronized void set(Predicate<T> predicate) {
            clear();
            this.predicate = predicate;
            this.barrier = new CyclicBarrier(2);
        }

        public void await(T t) {
            Predicate<T> predicate;
            CyclicBarrier cyclicBarrier;
            synchronized (this) {
                predicate = this.predicate;
                cyclicBarrier = this.barrier;
            }
            if ((predicate == null || predicate.test(t)) && cyclicBarrier != null) {
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    throw new RuntimeException("Interrupted while waiting for load gate", e);
                }
            }
        }

        public void testAwait() throws InterruptedException, BrokenBarrierException, TimeoutException {
            CyclicBarrier cyclicBarrier;
            synchronized (this) {
                cyclicBarrier = this.barrier;
            }
            Objects.requireNonNull(cyclicBarrier, "Barrier must be set up before awaiting");
            cyclicBarrier.await(1L, TimeUnit.SECONDS);
        }

        /* synthetic */ Breakpoint(AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/isolation/SynchronizationTest$SynchronizedClassLoaderFactory.class */
    private class SynchronizedClassLoaderFactory extends ClassLoaderFactory {
        private SynchronizedClassLoaderFactory() {
        }

        public DelegatingClassLoader newDelegatingClassLoader(ClassLoader classLoader) {
            return (DelegatingClassLoader) AccessController.doPrivileged(() -> {
                return new SynchronizedDelegatingClassLoader(classLoader, SynchronizationTest.this.dclBreakpoint);
            });
        }

        public PluginClassLoader newPluginClassLoader(URL url, URL[] urlArr, ClassLoader classLoader) {
            return (PluginClassLoader) AccessController.doPrivileged(() -> {
                return new SynchronizedPluginClassLoader(url, urlArr, classLoader, SynchronizationTest.this.pclBreakpoint);
            });
        }

        /* synthetic */ SynchronizedClassLoaderFactory(SynchronizationTest synchronizationTest, AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/isolation/SynchronizationTest$SynchronizedDelegatingClassLoader.class */
    private static class SynchronizedDelegatingClassLoader extends DelegatingClassLoader {
        private final Breakpoint<String> dclBreakpoint;

        public SynchronizedDelegatingClassLoader(ClassLoader classLoader, Breakpoint<String> breakpoint) {
            super(classLoader);
            this.dclBreakpoint = breakpoint;
        }

        public PluginClassLoader pluginClassLoader(String str) {
            this.dclBreakpoint.await(str);
            this.dclBreakpoint.await(str);
            return super.pluginClassLoader(str);
        }

        static {
            ClassLoader.registerAsParallelCapable();
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/isolation/SynchronizationTest$SynchronizedPluginClassLoader.class */
    private static class SynchronizedPluginClassLoader extends PluginClassLoader {
        private final Breakpoint<String> pclBreakpoint;

        public SynchronizedPluginClassLoader(URL url, URL[] urlArr, ClassLoader classLoader, Breakpoint<String> breakpoint) {
            super(url, urlArr, classLoader);
            this.pclBreakpoint = breakpoint;
        }

        protected Object getClassLoadingLock(String str) {
            this.pclBreakpoint.await(str);
            return super.getClassLoadingLock(str);
        }

        static {
            ClassLoader.registerAsParallelCapable();
        }
    }

    @BeforeEach
    public void setup(TestInfo testInfo) {
        Map singletonMap = Collections.singletonMap("plugin.path", TestPlugins.pluginPathJoined());
        this.threadPrefix = SynchronizationTest.class.getSimpleName() + "." + testInfo.getDisplayName() + "-";
        this.dclBreakpoint = new Breakpoint<>(null);
        this.pclBreakpoint = new Breakpoint<>(null);
        this.plugins = new Plugins(singletonMap, Plugins.class.getClassLoader(), new SynchronizedClassLoaderFactory(this, null));
        this.exec = new ThreadPoolExecutor(2, 2, 1000L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque(), threadFactoryWithNamedThreads(this.threadPrefix));
    }

    @AfterEach
    public void tearDown() throws InterruptedException {
        this.dclBreakpoint.clear();
        this.pclBreakpoint.clear();
        this.exec.shutdown();
        this.exec.awaitTermination(1L, TimeUnit.SECONDS);
    }

    @Timeout(15)
    @Test
    public void testSimultaneousUpwardAndDownwardDelegating() throws Exception {
        String className = TestPlugins.TestPlugin.SAMPLING_CONVERTER.className();
        ClassLoader connectorLoader = this.plugins.connectorLoader(className);
        Breakpoint<String> breakpoint = this.dclBreakpoint;
        className.getClass();
        breakpoint.set((v1) -> {
            return r1.equals(v1);
        });
        Runnable runnable = () -> {
            LoaderSwap withClassLoader = this.plugins.withClassLoader(this.plugins.delegatingLoader());
            Throwable th = null;
            try {
                try {
                    new AbstractConfig(new ConfigDef().define("a.class", ConfigDef.Type.CLASS, ConfigDef.Importance.HIGH, ""), Collections.singletonMap("a.class", className));
                    if (withClassLoader != null) {
                        if (0 == 0) {
                            withClassLoader.close();
                            return;
                        }
                        try {
                            withClassLoader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (withClassLoader != null) {
                    if (th != null) {
                        try {
                            withClassLoader.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        withClassLoader.close();
                    }
                }
                throw th4;
            }
        };
        String name = Mockito.class.getName();
        Breakpoint<String> breakpoint2 = this.pclBreakpoint;
        name.getClass();
        breakpoint2.set((v1) -> {
            return r1.equals(v1);
        });
        Runnable runnable2 = () -> {
            LoaderSwap withClassLoader = this.plugins.withClassLoader(connectorLoader);
            Throwable th = null;
            try {
                try {
                    new AbstractConfig(new ConfigDef().define("a.class", ConfigDef.Type.CLASS, ConfigDef.Importance.HIGH, ""), Collections.singletonMap("a.class", name));
                    if (withClassLoader != null) {
                        if (0 == 0) {
                            withClassLoader.close();
                            return;
                        }
                        try {
                            withClassLoader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (withClassLoader != null) {
                    if (th != null) {
                        try {
                            withClassLoader.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        withClassLoader.close();
                    }
                }
                throw th4;
            }
        };
        this.exec.submit(runnable);
        this.dclBreakpoint.testAwait();
        this.dclBreakpoint.testAwait();
        this.dclBreakpoint.testAwait();
        dumpThreads("step 1, T1 waiting in DelegatingClassLoader");
        this.exec.submit(runnable2);
        this.pclBreakpoint.testAwait();
        this.pclBreakpoint.testAwait();
        dumpThreads("step 2, T2 entered DelegatingClassLoader and is loading class from parent");
        this.dclBreakpoint.testAwait();
        dumpThreads("step 3, T1 entered PluginClassLoader and is/was loading class from isolated jar");
        assertNoDeadlocks();
    }

    @Timeout(15)
    @Test
    public void testPluginClassLoaderDoesntHoldMonitorLock() throws InterruptedException, TimeoutException, BrokenBarrierException {
        ClassLoader connectorLoader = this.plugins.connectorLoader(TestPlugins.TestPlugin.SAMPLING_CONVERTER.className());
        Object obj = new Object();
        Breakpoint breakpoint = new Breakpoint(null);
        Breakpoint breakpoint2 = new Breakpoint(null);
        breakpoint.set(null);
        Runnable runnable = () -> {
            synchronized (connectorLoader) {
                breakpoint.await(null);
                breakpoint.await(null);
                synchronized (obj) {
                }
            }
        };
        breakpoint2.set(null);
        Runnable runnable2 = () -> {
            synchronized (obj) {
                try {
                    breakpoint2.await(null);
                    Utils.loadClass(TestPlugins.TestPlugin.SAMPLING_CONVERTER.className(), Converter.class);
                } catch (ClassNotFoundException e) {
                    throw new RuntimeException("Failed to load test plugin", e);
                }
            }
        };
        this.exec.submit(runnable);
        breakpoint.testAwait();
        dumpThreads("step 1, T1 holding classloader monitor lock");
        this.exec.submit(runnable2);
        breakpoint2.testAwait();
        dumpThreads("step 2, T2 holding external lock");
        breakpoint.testAwait();
        dumpThreads("step 3, T1 grabbed external lock");
        assertNoDeadlocks();
    }

    private boolean threadFromCurrentTest(ThreadInfo threadInfo) {
        return threadInfo.getThreadName().startsWith(this.threadPrefix);
    }

    private void assertNoDeadlocks() {
        long[] findDeadlockedThreads = ManagementFactory.getThreadMXBean().findDeadlockedThreads();
        if (findDeadlockedThreads == null || findDeadlockedThreads.length <= 0) {
            return;
        }
        String str = (String) Arrays.stream(ManagementFactory.getThreadMXBean().getThreadInfo(findDeadlockedThreads)).filter(this::threadFromCurrentTest).map(SynchronizationTest::threadInfoToString).collect(Collectors.joining(""));
        if (str.isEmpty()) {
            return;
        }
        Assertions.fail("Found deadlocked threads while classloading\n" + str);
    }

    private void dumpThreads(String str) {
        if (log.isDebugEnabled()) {
            log.debug("{}:\n{}", str, Arrays.stream(ManagementFactory.getThreadMXBean().dumpAllThreads(true, true)).filter(this::threadFromCurrentTest).map(SynchronizationTest::threadInfoToString).collect(Collectors.joining("\n")));
        }
    }

    private static String threadInfoToString(ThreadInfo threadInfo) {
        StringBuilder sb = new StringBuilder("\"" + threadInfo.getThreadName() + "\" Id=" + threadInfo.getThreadId() + " " + threadInfo.getThreadState());
        if (threadInfo.getLockName() != null) {
            sb.append(" on " + threadInfo.getLockName());
        }
        if (threadInfo.getLockOwnerName() != null) {
            sb.append(" owned by \"" + threadInfo.getLockOwnerName() + "\" Id=" + threadInfo.getLockOwnerId());
        }
        if (threadInfo.isSuspended()) {
            sb.append(" (suspended)");
        }
        if (threadInfo.isInNative()) {
            sb.append(" (in native)");
        }
        sb.append('\n');
        printStacktrace(threadInfo, sb);
        LockInfo[] lockedSynchronizers = threadInfo.getLockedSynchronizers();
        if (lockedSynchronizers.length > 0) {
            sb.append("\n\tNumber of locked synchronizers = " + lockedSynchronizers.length);
            sb.append('\n');
            for (LockInfo lockInfo : lockedSynchronizers) {
                sb.append("\t- " + lockInfo);
                sb.append('\n');
            }
        }
        sb.append('\n');
        return sb.toString();
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:9:0x004f. Please report as an issue. */
    private static void printStacktrace(ThreadInfo threadInfo, StringBuilder sb) {
        StackTraceElement[] stackTrace = threadInfo.getStackTrace();
        for (int i = 0; i < stackTrace.length; i++) {
            sb.append("\tat " + stackTrace[i].toString());
            sb.append('\n');
            if (i == 0 && threadInfo.getLockInfo() != null) {
                switch (AnonymousClass1.$SwitchMap$java$lang$Thread$State[threadInfo.getThreadState().ordinal()]) {
                    case 1:
                        sb.append("\t-  blocked on " + threadInfo.getLockInfo());
                        sb.append('\n');
                        break;
                    case 2:
                        sb.append("\t-  waiting on " + threadInfo.getLockInfo());
                        sb.append('\n');
                        break;
                    case 3:
                        sb.append("\t-  waiting on " + threadInfo.getLockInfo());
                        sb.append('\n');
                        break;
                }
            }
            for (MonitorInfo monitorInfo : threadInfo.getLockedMonitors()) {
                if (monitorInfo.getLockedStackDepth() == i) {
                    sb.append("\t-  locked " + monitorInfo);
                    sb.append('\n');
                }
            }
        }
    }

    private static ThreadFactory threadFactoryWithNamedThreads(String str) {
        AtomicInteger atomicInteger = new AtomicInteger(1);
        return runnable -> {
            SecurityManager securityManager = System.getSecurityManager();
            Thread thread = new Thread(securityManager != null ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup(), runnable, str + atomicInteger.getAndIncrement(), 0L);
            if (thread.isDaemon()) {
                thread.setDaemon(false);
            }
            if (thread.getPriority() != 5) {
                thread.setPriority(5);
            }
            return thread;
        };
    }
}
