package org.apache.hadoop.ipc;

import com.google.common.base.Supplier;
import com.google.protobuf.ServiceException;
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.SocketFactory;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.commons.math3.optimization.direct.CMAESOptimizer;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.TestRpcBase;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
import org.apache.hadoop.ipc.protobuf.TestProtos;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.Service;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.test.MockitoUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

/* loaded from: input_file:lib/hadoop-common-2.10.1-tests.jar:org/apache/hadoop/ipc/TestRPC.class */
public class TestRPC extends TestRpcBase {
    public static final Logger LOG = LoggerFactory.getLogger((Class<?>) TestRPC.class);
    int datasize = 102400;
    int numThreads = 50;
    private static final String ACL_CONFIG = "test.protocol.acl";

    /* loaded from: input_file:lib/hadoop-common-2.10.1-tests.jar:org/apache/hadoop/ipc/TestRPC$FakeRequestClass.class */
    public static class FakeRequestClass extends RpcWritable {
        static volatile IOException exception;

        @Override // org.apache.hadoop.ipc.RpcWritable
        void writeTo(ResponseBuffer responseBuffer) throws IOException {
            throw new UnsupportedOperationException();
        }

        @Override // org.apache.hadoop.ipc.RpcWritable
        <T> T readFrom(ByteBuffer byteBuffer) throws IOException {
            throw exception;
        }
    }

    /* loaded from: input_file:lib/hadoop-common-2.10.1-tests.jar:org/apache/hadoop/ipc/TestRPC$SlowRPC.class */
    static class SlowRPC implements Runnable {
        private TestRpcBase.TestRpcService proxy;
        private volatile boolean done = false;

        SlowRPC(TestRpcBase.TestRpcService testRpcService) {
            this.proxy = testRpcService;
        }

        boolean isDone() {
            return this.done;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                ping(true);
                this.done = true;
            } catch (ServiceException e) {
                Assert.assertTrue("SlowRPC ping exception " + e, false);
            }
        }

        void ping(boolean z) throws ServiceException {
            this.proxy.slowPing(null, TestRpcBase.newSlowPingRequest(z));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/hadoop-common-2.10.1-tests.jar:org/apache/hadoop/ipc/TestRPC$StoppedInvocationHandler.class */
    public static class StoppedInvocationHandler implements InvocationHandler, Closeable {
        private int closeCalled;

        private StoppedInvocationHandler() {
            this.closeCalled = 0;
        }

        @Override // java.lang.reflect.InvocationHandler
        public Object invoke(Object obj, Method method, Object[] objArr) throws Throwable {
            return null;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.closeCalled++;
        }

        public int getCloseCalled() {
            return this.closeCalled;
        }
    }

    /* loaded from: input_file:lib/hadoop-common-2.10.1-tests.jar:org/apache/hadoop/ipc/TestRPC$StoppedProtocol.class */
    private interface StoppedProtocol {
        public static final long versionID = 0;

        void stop();
    }

    /* loaded from: input_file:lib/hadoop-common-2.10.1-tests.jar:org/apache/hadoop/ipc/TestRPC$StoppedRpcEngine.class */
    private static class StoppedRpcEngine implements RpcEngine {
        private StoppedRpcEngine() {
        }

        @Override // org.apache.hadoop.ipc.RpcEngine
        public <T> ProtocolProxy<T> getProxy(Class<T> cls, long j, InetSocketAddress inetSocketAddress, UserGroupInformation userGroupInformation, Configuration configuration, SocketFactory socketFactory, int i, RetryPolicy retryPolicy) throws IOException {
            return getProxy(cls, j, inetSocketAddress, userGroupInformation, configuration, socketFactory, i, retryPolicy, null, null);
        }

        @Override // org.apache.hadoop.ipc.RpcEngine
        public <T> ProtocolProxy<T> getProxy(Class<T> cls, long j, InetSocketAddress inetSocketAddress, UserGroupInformation userGroupInformation, Configuration configuration, SocketFactory socketFactory, int i, RetryPolicy retryPolicy, AtomicBoolean atomicBoolean, AlignmentContext alignmentContext) throws IOException {
            return new ProtocolProxy<>(cls, Proxy.newProxyInstance(cls.getClassLoader(), new Class[]{cls}, new StoppedInvocationHandler()), false);
        }

        @Override // org.apache.hadoop.ipc.RpcEngine
        public RPC.Server getServer(Class<?> cls, Object obj, String str, int i, int i2, int i3, int i4, boolean z, Configuration configuration, SecretManager<? extends TokenIdentifier> secretManager, String str2, AlignmentContext alignmentContext) throws IOException {
            return null;
        }

        @Override // org.apache.hadoop.ipc.RpcEngine
        public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(Client.ConnectionId connectionId, Configuration configuration, SocketFactory socketFactory) throws IOException {
            throw new UnsupportedOperationException("This proxy is not supported");
        }
    }

    /* loaded from: input_file:lib/hadoop-common-2.10.1-tests.jar:org/apache/hadoop/ipc/TestRPC$TestImpl.class */
    public static class TestImpl implements TestProtocol {
        int fastPingCounter = 0;

        @Override // org.apache.hadoop.ipc.VersionedProtocol
        public long getProtocolVersion(String str, long j) {
            return 1L;
        }

        @Override // org.apache.hadoop.ipc.VersionedProtocol
        public ProtocolSignature getProtocolSignature(String str, long j, int i) {
            return new ProtocolSignature(1L, null);
        }

        @Override // org.apache.hadoop.ipc.TestRPC.TestProtocol
        public void ping() {
        }

        @Override // org.apache.hadoop.ipc.TestRPC.TestProtocol
        public void sleep(long j) throws InterruptedException {
            Thread.sleep(j);
        }

        @Override // org.apache.hadoop.ipc.TestRPC.TestProtocol
        public String echo(String str) throws IOException {
            return str;
        }

        @Override // org.apache.hadoop.ipc.TestRPC.TestProtocol
        public String[] echo(String[] strArr) throws IOException {
            return strArr;
        }

        @Override // org.apache.hadoop.ipc.TestRPC.TestProtocol
        public Writable echo(Writable writable) {
            return writable;
        }

        @Override // org.apache.hadoop.ipc.TestRPC.TestProtocol
        public int add(int i, int i2) {
            return i + i2;
        }

        @Override // org.apache.hadoop.ipc.TestRPC.TestProtocol
        public int add(int[] iArr) {
            int i = 0;
            for (int i2 : iArr) {
                i += i2;
            }
            return i;
        }

        @Override // org.apache.hadoop.ipc.TestRPC.TestProtocol
        public int error() throws IOException {
            throw new IOException("bobo");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/hadoop-common-2.10.1-tests.jar:org/apache/hadoop/ipc/TestRPC$TestPolicyProvider.class */
    public static class TestPolicyProvider extends PolicyProvider {
        private TestPolicyProvider() {
        }

        @Override // org.apache.hadoop.security.authorize.PolicyProvider
        public Service[] getServices() {
            return new Service[]{new Service(TestRPC.ACL_CONFIG, TestRpcBase.TestRpcService.class)};
        }
    }

    /* loaded from: input_file:lib/hadoop-common-2.10.1-tests.jar:org/apache/hadoop/ipc/TestRPC$TestProtocol.class */
    public interface TestProtocol extends VersionedProtocol {
        public static final long versionID = 1;

        void ping() throws IOException;

        void sleep(long j) throws IOException, InterruptedException;

        String echo(String str) throws IOException;

        String[] echo(String[] strArr) throws IOException;

        Writable echo(Writable writable) throws IOException;

        int add(int i, int i2) throws IOException;

        int add(int[] iArr) throws IOException;

        int error() throws IOException;
    }

    /* loaded from: input_file:lib/hadoop-common-2.10.1-tests.jar:org/apache/hadoop/ipc/TestRPC$TestReaderException.class */
    public static class TestReaderException extends IOException {
        public TestReaderException(String str) {
            super(str);
        }

        public boolean equals(Object obj) {
            return obj.getClass() == TestReaderException.class && getMessage().equals(((TestReaderException) obj).getMessage());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:lib/hadoop-common-2.10.1-tests.jar:org/apache/hadoop/ipc/TestRPC$Transactions.class */
    public static class Transactions implements Runnable {
        int datasize;
        TestRpcBase.TestRpcService proxy;

        Transactions(TestRpcBase.TestRpcService testRpcService, int i) {
            this.proxy = testRpcService;
            this.datasize = i;
        }

        @Override // java.lang.Runnable
        public void run() {
            Integer[] numArr = new Integer[this.datasize];
            Arrays.fill((Object[]) numArr, (Object) 123);
            TestProtos.ExchangeRequestProto build = TestProtos.ExchangeRequestProto.newBuilder().addAllValues(Arrays.asList(numArr)).build();
            Integer[] numArr2 = null;
            TestProtos.AddRequestProto build2 = TestProtos.AddRequestProto.newBuilder().setParam1(1).setParam2(2).build();
            int i = 0;
            try {
                TestProtos.ExchangeResponseProto exchange = this.proxy.exchange(null, build);
                numArr2 = (Integer[]) exchange.getValuesList().toArray(new Integer[exchange.getValuesCount()]);
                i = this.proxy.add(null, build2).getResult();
            } catch (ServiceException e) {
                Assert.assertTrue("Exception from RPC exchange() " + e, false);
            }
            Assert.assertEquals(numArr.length, numArr2.length);
            Assert.assertEquals(3L, i);
            for (int i2 = 0; i2 < numArr2.length; i2++) {
                Assert.assertEquals(numArr2[i2].intValue(), i2);
            }
        }
    }

    @Before
    public void setup() {
        setupConf();
    }

    @Test
    public void testConfRpc() throws IOException {
        RPC.Server build = newServerBuilder(conf).setNumHandlers(1).setVerbose(false).build();
        Assert.assertEquals(conf.getInt(CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, 100), build.getMaxQueueSize());
        Assert.assertEquals(conf.getInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 1), build.getNumReaders());
        RPC.Server build2 = newServerBuilder(conf).setNumHandlers(1).setnumReaders(3).setQueueSizePerHandler(200).setVerbose(false).build();
        Assert.assertEquals(3L, build2.getNumReaders());
        Assert.assertEquals(200L, build2.getMaxQueueSize());
        Assert.assertEquals(20L, newServerBuilder(conf).setQueueSizePerHandler(10).setNumHandlers(2).setVerbose(false).build().getMaxQueueSize());
    }

    @Test
    public void testProxyAddress() throws Exception {
        RPC.Server server = null;
        TestRpcBase.TestRpcService testRpcService = null;
        try {
            server = setupTestServer(conf, -1);
            testRpcService = getClient(addr, conf);
            Assert.assertEquals(addr, RPC.getServerAddress(testRpcService));
            stop(server, testRpcService);
        } catch (Throwable th) {
            stop(server, testRpcService);
            throw th;
        }
    }

    @Test
    public void testSlowRpc() throws IOException, ServiceException {
        TestRpcBase.TestRpcService testRpcService = null;
        System.out.println("Testing Slow RPC");
        RPC.Server server = setupTestServer(conf, 2);
        try {
            testRpcService = getClient(addr, conf);
            SlowRPC slowRPC = new SlowRPC(testRpcService);
            new Thread(slowRPC, "SlowRPC").start();
            Assert.assertTrue("Slow RPC should not have finished1.", !slowRPC.isDone());
            slowRPC.ping(false);
            Assert.assertTrue("Slow RPC should not have finished2.", !slowRPC.isDone());
            slowRPC.ping(false);
            while (!slowRPC.isDone()) {
                System.out.println("Waiting for slow RPC to get done.");
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                }
            }
            System.out.println("Down slow rpc testing");
            stop(server, testRpcService);
        } catch (Throwable th) {
            System.out.println("Down slow rpc testing");
            stop(server, testRpcService);
            throw th;
        }
    }

    @Test
    public void testCalls() throws Exception {
        testCallsInternal(conf);
    }

    private void testCallsInternal(Configuration configuration) throws Exception {
        TestRpcBase.TestRpcService testRpcService = null;
        RPC.Server server = setupTestServer(configuration, -1);
        try {
            testRpcService = getClient(addr, configuration);
            testRpcService.ping(null, newEmptyRequest());
            Assert.assertEquals(testRpcService.echo(null, newEchoRequest("foo")).getMessage(), "foo");
            Assert.assertEquals(testRpcService.echo(null, newEchoRequest("")).getMessage(), "");
            MetricsRecordBuilder metrics = MetricsAsserts.getMetrics(server.rpcMetrics.name());
            MetricsAsserts.assertCounter("RpcProcessingTimeNumOps", 3L, metrics);
            MetricsAsserts.assertCounterGt("SentBytes", 0L, metrics);
            MetricsAsserts.assertCounterGt("ReceivedBytes", 0L, metrics);
            MetricsRecordBuilder metrics2 = MetricsAsserts.getMetrics(server.rpcDetailedMetrics.name());
            MetricsAsserts.assertCounter("EchoNumOps", 2L, metrics2);
            MetricsAsserts.assertCounter("PingNumOps", 1L, metrics2);
            String[] strArr = {"foo", "bar"};
            Assert.assertTrue(Arrays.equals(testRpcService.echo2(null, TestProtos.EchoRequestProto2.newBuilder().addAllMessage(Arrays.asList(strArr)).build()).getMessageList().toArray(), strArr));
            Assert.assertTrue(Arrays.equals(testRpcService.echo2(null, TestProtos.EchoRequestProto2.newBuilder().addAllMessage(Collections.emptyList()).build()).getMessageList().toArray(), new String[0]));
            Assert.assertEquals(testRpcService.add(null, TestProtos.AddRequestProto.newBuilder().setParam1(1).setParam2(2).build()).getResult(), 3L);
            Assert.assertEquals(testRpcService.add2(null, TestProtos.AddRequestProto2.newBuilder().addAllParams(Arrays.asList(1, 2)).build()).getResult(), 3L);
            boolean z = false;
            try {
                testRpcService.error(null, newEmptyRequest());
            } catch (ServiceException e) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Caught " + e);
                }
                z = true;
            }
            Assert.assertTrue(z);
            MetricsAsserts.assertCounter("RpcServerExceptionNumOps", 1L, MetricsAsserts.getMetrics(server.rpcDetailedMetrics.name()));
            System.out.println("Starting multi-threaded RPC test...");
            server.setSocketSendBufSize(1024);
            Thread[] threadArr = new Thread[this.numThreads];
            for (int i = 0; i < this.numThreads; i++) {
                threadArr[i] = new Thread(new Transactions(testRpcService, this.datasize), "TransactionThread-" + i);
                threadArr[i].start();
            }
            System.out.println("Waiting for all threads to finish RPCs...");
            int i2 = 0;
            while (i2 < this.numThreads) {
                try {
                    threadArr[i2].join();
                } catch (InterruptedException e2) {
                    i2--;
                }
                i2++;
            }
            stop(server, testRpcService);
        } catch (Throwable th) {
            stop(server, testRpcService);
            throw th;
        }
    }

    @Test
    public void testClientWithoutServer() throws Exception {
        try {
            ((TestRpcBase.TestRpcService) RPC.getProxy(TestRpcBase.TestRpcService.class, 1L, new InetSocketAddress("0.0.0.0", 20), conf)).echo(null, newEchoRequest("hello"));
            Assert.fail("We should not have reached here");
        } catch (ServiceException e) {
            if (e.getCause() instanceof ConnectException) {
                return;
            }
            Assert.fail("We should not have reached here");
        }
    }

    private void doRPCs(Configuration configuration, boolean z) throws Exception {
        TestRpcBase.TestRpcService testRpcService = null;
        RPC.Server server = setupTestServer(configuration, 5);
        server.refreshServiceAcl(configuration, new TestPolicyProvider());
        TestProtos.EmptyRequestProto build = TestProtos.EmptyRequestProto.newBuilder().build();
        try {
            try {
                testRpcService = getClient(addr, conf);
                testRpcService.ping(null, build);
                if (z) {
                    Assert.fail("Expect RPC.getProxy to fail with AuthorizationException!");
                }
                MetricsRecordBuilder metrics = MetricsAsserts.getMetrics(server.rpcMetrics.name());
                if (z) {
                    MetricsAsserts.assertCounter("RpcAuthorizationFailures", 1L, metrics);
                } else {
                    MetricsAsserts.assertCounter("RpcAuthorizationSuccesses", 1L, metrics);
                }
                MetricsAsserts.assertCounter("RpcAuthenticationFailures", 0L, metrics);
                MetricsAsserts.assertCounter("RpcAuthenticationSuccesses", 0L, metrics);
                stop(server, testRpcService);
            } catch (ServiceException e) {
                if (!z) {
                    throw e;
                }
                RemoteException remoteException = (RemoteException) e.getCause();
                Assert.assertTrue(remoteException.unwrapRemoteException() instanceof AuthorizationException);
                Assert.assertEquals("RPC error code should be UNAUTHORIZED", RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_UNAUTHORIZED, remoteException.getErrorCode());
                MetricsRecordBuilder metrics2 = MetricsAsserts.getMetrics(server.rpcMetrics.name());
                if (z) {
                    MetricsAsserts.assertCounter("RpcAuthorizationFailures", 1L, metrics2);
                } else {
                    MetricsAsserts.assertCounter("RpcAuthorizationSuccesses", 1L, metrics2);
                }
                MetricsAsserts.assertCounter("RpcAuthenticationFailures", 0L, metrics2);
                MetricsAsserts.assertCounter("RpcAuthenticationSuccesses", 0L, metrics2);
                stop(server, testRpcService);
            }
        } catch (Throwable th) {
            MetricsRecordBuilder metrics3 = MetricsAsserts.getMetrics(server.rpcMetrics.name());
            if (z) {
                MetricsAsserts.assertCounter("RpcAuthorizationFailures", 1L, metrics3);
            } else {
                MetricsAsserts.assertCounter("RpcAuthorizationSuccesses", 1L, metrics3);
            }
            MetricsAsserts.assertCounter("RpcAuthenticationFailures", 0L, metrics3);
            MetricsAsserts.assertCounter("RpcAuthenticationSuccesses", 0L, metrics3);
            stop(server, testRpcService);
            throw th;
        }
    }

    @Test
    public void testServerAddress() throws IOException {
        RPC.Server server = setupTestServer(conf, 5);
        try {
            Assert.assertEquals(InetAddress.getLocalHost(), NetUtils.getConnectAddress(server).getAddress());
            stop(server, null);
        } catch (Throwable th) {
            stop(server, null);
            throw th;
        }
    }

    @Test
    public void testAuthorization() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setBoolean("hadoop.security.authorization", true);
        configuration.set(ACL_CONFIG, "*");
        doRPCs(configuration, false);
        configuration.set(ACL_CONFIG, "invalid invalid");
        doRPCs(configuration, true);
        configuration.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2);
        configuration.set(ACL_CONFIG, "*");
        doRPCs(configuration, false);
        configuration.set(ACL_CONFIG, "invalid invalid");
        doRPCs(configuration, true);
    }

    public void testNoPings() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, false);
        new TestRPC().testCallsInternal(configuration);
        configuration.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2);
        new TestRPC().testCallsInternal(configuration);
    }

    @Test(expected = HadoopIllegalArgumentException.class)
    public void testStopNonRegisteredProxy() throws IOException {
        RPC.stopProxy(null);
    }

    @Test
    public void testStopMockObject() throws IOException {
        RPC.stopProxy(MockitoUtil.mockProtocol(TestProtocol.class));
    }

    @Test
    public void testStopProxy() throws IOException {
        RPC.setProtocolEngine(conf, StoppedProtocol.class, StoppedRpcEngine.class);
        StoppedProtocol stoppedProtocol = (StoppedProtocol) RPC.getProxy(StoppedProtocol.class, 0L, null, conf);
        StoppedInvocationHandler stoppedInvocationHandler = (StoppedInvocationHandler) Proxy.getInvocationHandler(stoppedProtocol);
        Assert.assertEquals(0L, stoppedInvocationHandler.getCloseCalled());
        RPC.stopProxy(stoppedProtocol);
        Assert.assertEquals(1L, stoppedInvocationHandler.getCloseCalled());
    }

    @Test
    public void testWrappedStopProxy() throws IOException {
        StoppedProtocol stoppedProtocol = (StoppedProtocol) RPC.getProxy(StoppedProtocol.class, 0L, null, conf);
        StoppedInvocationHandler stoppedInvocationHandler = (StoppedInvocationHandler) Proxy.getInvocationHandler(stoppedProtocol);
        StoppedProtocol stoppedProtocol2 = (StoppedProtocol) RetryProxy.create((Class<StoppedProtocol>) StoppedProtocol.class, stoppedProtocol, RetryPolicies.RETRY_FOREVER);
        Assert.assertEquals(0L, stoppedInvocationHandler.getCloseCalled());
        RPC.stopProxy(stoppedProtocol2);
        Assert.assertEquals(1L, stoppedInvocationHandler.getCloseCalled());
    }

    @Test
    public void testErrorMsgForInsecureClient() throws IOException {
        TestRpcBase.TestRpcService testRpcService = null;
        Configuration configuration = new Configuration(conf);
        SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS, configuration);
        UserGroupInformation.setConfiguration(configuration);
        RPC.Server server = setupTestServer(configuration, 5);
        boolean z = false;
        try {
            try {
                UserGroupInformation.setConfiguration(conf);
                testRpcService = getClient(addr, conf);
                testRpcService.echo(null, newEchoRequest(""));
                stop(server, testRpcService);
            } catch (ServiceException e) {
                Assert.assertTrue(e.getCause() instanceof RemoteException);
                RemoteException remoteException = (RemoteException) e.getCause();
                LOG.info("LOGGING MESSAGE: " + remoteException.getLocalizedMessage());
                Assert.assertEquals("RPC error code should be UNAUTHORIZED", RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_UNAUTHORIZED, remoteException.getErrorCode());
                Assert.assertTrue(remoteException.unwrapRemoteException() instanceof AccessControlException);
                z = true;
                stop(server, testRpcService);
            }
            Assert.assertTrue(z);
            conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2);
            UserGroupInformation.setConfiguration(configuration);
            server = setupTestServer(configuration, 5);
            boolean z2 = false;
            TestRpcBase.TestRpcService testRpcService2 = null;
            try {
                try {
                    UserGroupInformation.setConfiguration(conf);
                    testRpcService2 = getClient(addr, conf);
                    testRpcService2.echo(null, newEchoRequest(""));
                    stop(server, testRpcService2);
                } catch (ServiceException e2) {
                    RemoteException remoteException2 = (RemoteException) e2.getCause();
                    LOG.info("LOGGING MESSAGE: " + remoteException2.getLocalizedMessage());
                    Assert.assertEquals("RPC error code should be UNAUTHORIZED", RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_UNAUTHORIZED, remoteException2.getErrorCode());
                    Assert.assertTrue(remoteException2.unwrapRemoteException() instanceof AccessControlException);
                    z2 = true;
                    stop(server, testRpcService2);
                }
                Assert.assertTrue(z2);
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testStopsAllThreads() throws IOException, InterruptedException {
        Assert.assertEquals("Expect no Reader threads running before test", 0L, countThreads("Server$Listener$Reader"));
        RPC.Server server = setupTestServer(conf, 5);
        long j = 0;
        do {
            try {
                j += 10;
                Thread.sleep(10L);
                if (countThreads("Server$Listener$Reader") != 0) {
                    break;
                }
            } catch (Throwable th) {
                server.stop();
                throw th;
            }
        } while (j < 5000);
        Assert.assertTrue(countThreads("Server$Listener$Reader") > 0);
        server.stop();
        Assert.assertEquals("Expect no Reader threads left running after test", 0L, countThreads("Server$Listener$Reader"));
    }

    @Test
    public void testRPCBuilder() throws IOException {
        try {
            new RPC.Builder(null).setProtocol(TestProtocol.class).setInstance(new TestImpl()).setBindAddress("0.0.0.0").setPort(0).setNumHandlers(5).setVerbose(true).build();
            Assert.fail("Didn't throw HadoopIllegalArgumentException");
        } catch (Exception e) {
            if (!(e instanceof HadoopIllegalArgumentException)) {
                Assert.fail("Expecting HadoopIllegalArgumentException but caught " + e);
            }
        }
        try {
            new RPC.Builder(conf).setInstance(new TestImpl()).setBindAddress("0.0.0.0").setPort(0).setNumHandlers(5).setVerbose(true).build();
            Assert.fail("Didn't throw HadoopIllegalArgumentException");
        } catch (Exception e2) {
            if (!(e2 instanceof HadoopIllegalArgumentException)) {
                Assert.fail("Expecting HadoopIllegalArgumentException but caught " + e2);
            }
        }
        try {
            new RPC.Builder(conf).setProtocol(TestProtocol.class).setBindAddress("0.0.0.0").setPort(0).setNumHandlers(5).setVerbose(true).build();
            Assert.fail("Didn't throw HadoopIllegalArgumentException");
        } catch (Exception e3) {
            if (e3 instanceof HadoopIllegalArgumentException) {
                return;
            }
            Assert.fail("Expecting HadoopIllegalArgumentException but caught " + e3);
        }
    }

    @Test(timeout = 90000)
    public void testRPCInterruptedSimple() throws Exception {
        TestRpcBase.TestRpcService testRpcService = null;
        RPC.Server server = setupTestServer(newServerBuilder(conf).setNumHandlers(5).setVerbose(true).setSecretManager(null));
        try {
            testRpcService = getClient(addr, conf);
            testRpcService.ping(null, newEmptyRequest());
            Thread.currentThread().interrupt();
            try {
                testRpcService.ping(null, newEmptyRequest());
                Assert.fail("Interruption did not cause IPC to fail");
                stop(server, testRpcService);
            } catch (ServiceException e) {
                if (!e.toString().contains("InterruptedException") && !(e.getCause() instanceof InterruptedIOException)) {
                    throw e;
                }
                Thread.interrupted();
                stop(server, testRpcService);
            }
        } catch (Throwable th) {
            stop(server, testRpcService);
            throw th;
        }
    }

    @Test(timeout = 30000)
    public void testRPCInterrupted() throws Exception {
        RPC.Server server = setupTestServer(newServerBuilder(conf).setNumHandlers(5).setVerbose(true).setSecretManager(null));
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(200);
        final CountDownLatch countDownLatch = new CountDownLatch(200);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        final AtomicReference atomicReference = new AtomicReference();
        Thread thread = null;
        for (int i = 0; i < 200; i++) {
            try {
                final int i2 = i;
                final TestRpcBase.TestRpcService client = getClient(addr, conf);
                Thread thread2 = new Thread(new Runnable() { // from class: org.apache.hadoop.ipc.TestRPC.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            try {
                                cyclicBarrier.await();
                                while (true) {
                                    if (i2 != 0 && !atomicBoolean.get()) {
                                        client.slowPing(null, TestRpcBase.newSlowPingRequest(false));
                                        countDownLatch.countDown();
                                        return;
                                    }
                                    client.slowPing(null, TestRpcBase.newSlowPingRequest(false));
                                }
                            } catch (Exception e) {
                                if (i2 == 0) {
                                    atomicBoolean.set(false);
                                } else {
                                    atomicReference.set(e);
                                }
                                TestRPC.LOG.error("thread " + i2, (Throwable) e);
                                countDownLatch.countDown();
                            }
                        } catch (Throwable th) {
                            countDownLatch.countDown();
                            throw th;
                        }
                    }
                });
                thread2.start();
                if (thread == null) {
                    thread = thread2;
                }
            } catch (Throwable th) {
                server.stop();
                throw th;
            }
        }
        Thread.sleep(1000L);
        while (atomicBoolean.get()) {
            thread.interrupt();
        }
        countDownLatch.await();
        Assert.assertTrue("rpc got exception " + atomicReference.get(), atomicReference.get() == null);
        server.stop();
        Thread.sleep(1000L);
        while (atomicBoolean.get()) {
            thread.interrupt();
        }
        countDownLatch.await();
        Assert.assertTrue("rpc got exception " + atomicReference.get(), atomicReference.get() == null);
        server.stop();
    }

    @Test
    public void testConnectionPing() throws Exception {
        TestRpcBase.TestRpcService testRpcService = null;
        conf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
        conf.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, 50);
        RPC.Server server = setupTestServer(conf, 5);
        try {
            testRpcService = getClient(addr, conf);
            testRpcService.sleep(null, newSleepRequest(50 * 4));
            stop(server, testRpcService);
        } catch (Throwable th) {
            stop(server, testRpcService);
            throw th;
        }
    }

    @Test(timeout = 30000)
    public void testExternalCall() throws Exception {
        UserGroupInformation createUserForTesting = UserGroupInformation.createUserForTesting("user123", new String[0]);
        final IOException iOException = new IOException("boom");
        RPC.Server server = setupTestServer(conf, 1);
        try {
            new AtomicBoolean();
            ExternalCall newExtCall = newExtCall(createUserForTesting, new PrivilegedExceptionAction<String>() { // from class: org.apache.hadoop.ipc.TestRPC.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.security.PrivilegedExceptionAction
                public String run() throws Exception {
                    return UserGroupInformation.getCurrentUser().getUserName();
                }
            });
            ExternalCall newExtCall2 = newExtCall(createUserForTesting, new PrivilegedExceptionAction<String>() { // from class: org.apache.hadoop.ipc.TestRPC.3
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.security.PrivilegedExceptionAction
                public String run() throws Exception {
                    throw iOException;
                }
            });
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
            ExternalCall newExtCall3 = newExtCall(createUserForTesting, new PrivilegedExceptionAction<Void>() { // from class: org.apache.hadoop.ipc.TestRPC.4
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.security.PrivilegedExceptionAction
                public Void run() throws Exception {
                    countDownLatch.countDown();
                    cyclicBarrier.await();
                    return null;
                }
            });
            server.queueCall(newExtCall3);
            server.queueCall(newExtCall2);
            server.queueCall(newExtCall);
            countDownLatch.await();
            Assert.assertEquals(2L, server.getCallQueueLen());
            cyclicBarrier.await();
            newExtCall3.get();
            Assert.assertEquals(createUserForTesting.getUserName(), (String) newExtCall.get());
            try {
                newExtCall2.get();
                Assert.fail("didn't throw");
            } catch (ExecutionException e) {
                Assert.assertTrue(e.getCause() instanceof IOException);
                Assert.assertEquals(iOException.getMessage(), e.getCause().getMessage());
            }
        } finally {
            server.stop();
        }
    }

    private <T> ExternalCall<T> newExtCall(final UserGroupInformation userGroupInformation, PrivilegedExceptionAction<T> privilegedExceptionAction) {
        return new ExternalCall<T>(privilegedExceptionAction) { // from class: org.apache.hadoop.ipc.TestRPC.5
            @Override // org.apache.hadoop.ipc.Server.Call
            public String getProtocol() {
                return "test";
            }

            @Override // org.apache.hadoop.ipc.ExternalCall, org.apache.hadoop.ipc.Server.Call
            public UserGroupInformation getRemoteUser() {
                return userGroupInformation;
            }
        };
    }

    @Test
    public void testRpcMetrics() throws Exception {
        TestRpcBase.TestRpcService testRpcService = null;
        conf.setBoolean(CommonConfigurationKeys.RPC_METRICS_QUANTILE_ENABLE, true);
        conf.set(CommonConfigurationKeys.RPC_METRICS_PERCENTILES_INTERVALS_KEY, "1");
        final RPC.Server server = setupTestServer(conf, 5);
        TestRpcBase.TestRpcService testRpcService2 = (TestRpcBase.TestRpcService) UserGroupInformation.createRemoteUser("testUser").doAs(new PrivilegedAction<TestRpcBase.TestRpcService>() { // from class: org.apache.hadoop.ipc.TestRPC.6
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.security.PrivilegedAction
            public TestRpcBase.TestRpcService run() {
                try {
                    return (TestRpcBase.TestRpcService) RPC.getProxy(TestRpcBase.TestRpcService.class, 0L, server.getListenerAddress(), TestRpcBase.conf);
                } catch (IOException e) {
                    e.printStackTrace();
                    return null;
                }
            }
        });
        try {
            testRpcService = getClient(addr, conf);
            for (int i = 0; i < 1000; i++) {
                testRpcService.ping(null, newEmptyRequest());
                testRpcService.echo(null, newEchoRequest("" + i));
                testRpcService2.echo(null, newEchoRequest("" + i));
            }
            MetricsRecordBuilder metrics = MetricsAsserts.getMetrics(server.getRpcMetrics().name());
            Assert.assertEquals("Expected correct rpc queue count", 3000L, MetricsAsserts.getLongCounter("RpcQueueTimeNumOps", metrics));
            Assert.assertEquals("Expected correct rpc processing count", 3000L, MetricsAsserts.getLongCounter("RpcProcessingTimeNumOps", metrics));
            Assert.assertEquals("Expected correct rpc lock wait count", 3000L, MetricsAsserts.getLongCounter("RpcLockWaitTimeNumOps", metrics));
            Assert.assertEquals("Expected zero rpc lock wait time", CMAESOptimizer.DEFAULT_STOPFITNESS, MetricsAsserts.getDoubleGauge("RpcLockWaitTimeAvgTime", metrics), 0.001d);
            MetricsAsserts.assertQuantileGauges("RpcQueueTime1s", metrics);
            MetricsAsserts.assertQuantileGauges("RpcProcessingTime1s", metrics);
            String stringMetric = MetricsAsserts.getStringMetric("NumOpenConnectionsPerUser", metrics);
            Assert.assertTrue(stringMetric.contains("\"" + UserGroupInformation.getCurrentUser().getShortUserName() + "\":1"));
            Assert.assertTrue(stringMetric.contains("\"testUser\":1"));
            testRpcService.lockAndSleep(null, newSleepRequest(5));
            MetricsAsserts.assertGauge("RpcLockWaitTimeAvgTime", RpcMetrics.TIMEUNIT.convert(10L, TimeUnit.SECONDS), MetricsAsserts.getMetrics(server.getRpcMetrics().name()));
            if (testRpcService2 != null) {
                RPC.stopProxy(testRpcService2);
            }
            stop(server, testRpcService);
        } catch (Throwable th) {
            if (testRpcService2 != null) {
                RPC.stopProxy(testRpcService2);
            }
            stop(server, testRpcService);
            throw th;
        }
    }

    @Test(timeout = 30000)
    public void testClientBackOff() throws Exception {
        boolean z = false;
        ArrayList arrayList = new ArrayList();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
        conf.setBoolean("ipc.0.backoff.enable", true);
        RPC.Server server = setupTestServer(newServerBuilder(conf).setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true));
        CallQueueManager callQueueManager = (CallQueueManager) Mockito.spy((CallQueueManager) Whitebox.getInternalState(server, "callQueue"));
        Whitebox.setInternalState(server, "callQueue", callQueueManager);
        IOException iOException = null;
        final TestRpcBase.TestRpcService client = getClient(addr, conf);
        for (int i = 0; i < 2; i++) {
            try {
                arrayList.add(newFixedThreadPool.submit(new Callable<Void>() { // from class: org.apache.hadoop.ipc.TestRPC.7
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws ServiceException, InterruptedException {
                        client.sleep(null, TestRpcBase.newSleepRequest(100000));
                        return null;
                    }
                }));
                ((CallQueueManager) Mockito.verify(callQueueManager, Mockito.timeout(500).times(i + 1))).addInternal((Schedulable) Mockito.anyObject(), Mockito.eq(false));
            } finally {
                newFixedThreadPool.shutdown();
                stop(server, client);
            }
        }
        try {
            client.sleep(null, newSleepRequest(100));
        } catch (ServiceException e) {
            IOException unwrapRemoteException = ((RemoteException) e.getCause()).unwrapRemoteException();
            if (unwrapRemoteException instanceof RetriableException) {
                z = true;
            } else {
                iOException = unwrapRemoteException;
            }
        }
        if (iOException != null) {
            LOG.error("Last received non-RetriableException:", (Throwable) iOException);
        }
        Assert.assertTrue("RetriableException not received", z);
    }

    @Test(timeout = 30000)
    public void testClientBackOffByResponseTime() throws Exception {
        boolean z = false;
        GenericTestUtils.setLogLevel(DecayRpcScheduler.LOG, Level.DEBUG);
        GenericTestUtils.setLogLevel(RPC.LOG, Level.DEBUG);
        ArrayList arrayList = new ArrayList();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
        Server server = setupDecayRpcSchedulerandTestServer("ipc.0.");
        CallQueueManager callQueueManager = (CallQueueManager) Mockito.spy((CallQueueManager) Whitebox.getInternalState(server, "callQueue"));
        Whitebox.setInternalState(server, "callQueue", callQueueManager);
        IOException iOException = null;
        final TestRpcBase.TestRpcService client = getClient(addr, conf);
        for (int i = 0; i < 1; i++) {
            try {
                arrayList.add(newFixedThreadPool.submit(new Callable<Void>() { // from class: org.apache.hadoop.ipc.TestRPC.8
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws ServiceException, InterruptedException {
                        client.sleep(null, TestRpcBase.newSleepRequest(3000));
                        return null;
                    }
                }));
                ((CallQueueManager) Mockito.verify(callQueueManager, Mockito.timeout(500).times(i + 1))).addInternal((Schedulable) Mockito.anyObject(), Mockito.eq(false));
            } finally {
                newFixedThreadPool.shutdown();
                stop(server, client);
            }
        }
        try {
            Thread.sleep(5500L);
            client.sleep(null, newSleepRequest(100));
        } catch (ServiceException e) {
            IOException unwrapRemoteException = ((RemoteException) e.getCause()).unwrapRemoteException();
            if (unwrapRemoteException instanceof RetriableException) {
                z = true;
            } else {
                iOException = unwrapRemoteException;
            }
        }
        if (iOException != null) {
            LOG.error("Last received non-RetriableException:", (Throwable) iOException);
        }
        Assert.assertTrue("RetriableException not received", z);
    }

    @Test(timeout = 30000)
    public void testDecayRpcSchedulerMetrics() throws Exception {
        Server server = setupDecayRpcSchedulerandTestServer("ipc.0.");
        MetricsRecordBuilder metrics = MetricsAsserts.getMetrics("DecayRpcSchedulerMetrics2.ipc.0");
        final long longCounter = MetricsAsserts.getLongCounter("DecayedCallVolume", metrics);
        final long longCounter2 = MetricsAsserts.getLongCounter("CallVolume", metrics);
        final int intCounter = MetricsAsserts.getIntCounter("UniqueCallers", metrics);
        TestRpcBase.TestRpcService client = getClient(addr, conf);
        for (int i = 0; i < 2; i++) {
            try {
                client.sleep(null, newSleepRequest(100));
            } catch (Throwable th) {
                stop(server, client);
                throw th;
            }
        }
        GenericTestUtils.waitFor(new Supplier<Boolean>() { // from class: org.apache.hadoop.ipc.TestRPC.9
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.google.common.base.Supplier
            public Boolean get() {
                MetricsRecordBuilder metrics2 = MetricsAsserts.getMetrics("DecayRpcSchedulerMetrics2.ipc.0");
                long longCounter3 = MetricsAsserts.getLongCounter("DecayedCallVolume", metrics2);
                long longCounter4 = MetricsAsserts.getLongCounter("CallVolume", metrics2);
                int intCounter2 = MetricsAsserts.getIntCounter("UniqueCallers", metrics2);
                long longGauge = MetricsAsserts.getLongGauge("Priority.0.CompletedCallVolume", metrics2);
                long longGauge2 = MetricsAsserts.getLongGauge("Priority.1.CompletedCallVolume", metrics2);
                double doubleGauge = MetricsAsserts.getDoubleGauge("Priority.0.AvgResponseTime", metrics2);
                double doubleGauge2 = MetricsAsserts.getDoubleGauge("Priority.1.AvgResponseTime", metrics2);
                TestRPC.LOG.info("DecayedCallVolume: {}", Long.valueOf(longCounter3));
                TestRPC.LOG.info("CallVolume: {}", Long.valueOf(longCounter4));
                TestRPC.LOG.info("UniqueCaller: {}", Integer.valueOf(intCounter2));
                TestRPC.LOG.info("Priority.0.CompletedCallVolume: {}", Long.valueOf(longGauge));
                TestRPC.LOG.info("Priority.1.CompletedCallVolume: {}", Long.valueOf(longGauge2));
                TestRPC.LOG.info("Priority.0.AvgResponseTime: {}", Double.valueOf(doubleGauge));
                TestRPC.LOG.info("Priority.1.AvgResponseTime: {}", Double.valueOf(doubleGauge2));
                return Boolean.valueOf(longCounter3 > longCounter && longCounter4 > longCounter2 && intCounter2 > intCounter);
            }
        }, 30, 60000);
        stop(server, client);
    }

    private Server setupDecayRpcSchedulerandTestServer(String str) throws Exception {
        conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
        conf.setBoolean(str + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
        conf.setStrings(str + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY, "org.apache.hadoop.ipc.FairCallQueue");
        conf.setStrings(str + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY, "org.apache.hadoop.ipc.DecayRpcScheduler");
        conf.setInt(str + CommonConfigurationKeys.IPC_SCHEDULER_PRIORITY_LEVELS_KEY, 2);
        conf.setBoolean(str + DecayRpcScheduler.IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_KEY, true);
        conf.set(str + DecayRpcScheduler.IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_THRESHOLDS_KEY, "2s, 4s");
        return setupTestServer(newServerBuilder(conf).setQueueSizePerHandler(3).setNumHandlers(1).setVerbose(true));
    }

    @Test(timeout = 30000)
    public void testClientRpcTimeout() throws Exception {
        TestRpcBase.TestRpcService testRpcService = null;
        RPC.Server server = setupTestServer(newServerBuilder(conf).setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true));
        try {
            try {
                Configuration configuration = new Configuration(conf);
                configuration.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
                testRpcService = getClient(addr, configuration);
                testRpcService.sleep(null, newSleepRequest(3000));
                Assert.fail("RPC should time out.");
            } catch (ServiceException e) {
                Assert.assertTrue(e.getCause() instanceof SocketTimeoutException);
                LOG.info("got expected timeout.", (Throwable) e);
            }
            try {
                Configuration configuration2 = new Configuration(conf);
                configuration2.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, false);
                configuration2.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
                testRpcService = getClient(addr, configuration2);
                testRpcService.sleep(null, newSleepRequest(3000));
                Assert.fail("RPC should time out.");
            } catch (ServiceException e2) {
                Assert.assertTrue(e2.getCause() instanceof SocketTimeoutException);
                LOG.info("got expected timeout.", (Throwable) e2);
            }
            try {
                Configuration configuration3 = new Configuration(conf);
                configuration3.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, -1);
                testRpcService = getClient(addr, configuration3);
                testRpcService.sleep(null, newSleepRequest(2000));
            } catch (ServiceException e3) {
                LOG.info("got unexpected exception.", (Throwable) e3);
                Assert.fail("RPC should not time out.");
            }
            try {
                Configuration configuration4 = new Configuration(conf);
                configuration4.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
                configuration4.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, 800);
                configuration4.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
                testRpcService = getClient(addr, configuration4);
                try {
                    testRpcService.sleep(null, newSleepRequest(1300));
                } catch (ServiceException e4) {
                    LOG.info("got unexpected exception.", (Throwable) e4);
                    Assert.fail("RPC should not time out.");
                }
                testRpcService.sleep(null, newSleepRequest(2000));
                Assert.fail("RPC should time out.");
            } catch (ServiceException e5) {
                Assert.assertTrue(e5.getCause() instanceof SocketTimeoutException);
                LOG.info("got expected timeout.", (Throwable) e5);
            }
        } finally {
            stop(server, testRpcService);
        }
    }

    @Test(timeout = 30000)
    public void testReaderExceptions() throws Exception {
        RPC.Server server = null;
        TestRpcBase.TestRpcService testRpcService = null;
        TestReaderException testReaderException = new TestReaderException("testing123");
        RpcServerException rpcServerException = new RpcServerException("keepalive", testReaderException) { // from class: org.apache.hadoop.ipc.TestRPC.10
            @Override // org.apache.hadoop.ipc.RpcServerException
            public RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto getRpcStatusProto() {
                return RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.ERROR;
            }
        };
        RpcServerException rpcServerException2 = new RpcServerException("disconnect", testReaderException) { // from class: org.apache.hadoop.ipc.TestRPC.11
            @Override // org.apache.hadoop.ipc.RpcServerException
            public RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto getRpcStatusProto() {
                return RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.FATAL;
            }
        };
        try {
            server = setupTestServer(newServerBuilder(conf).setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true));
            Whitebox.setInternalState(server, "rpcRequestClass", FakeRequestClass.class);
            MutableCounterLong mutableCounterLong = (MutableCounterLong) Whitebox.getInternalState(server.getRpcMetrics(), "rpcAuthorizationSuccesses");
            testRpcService = getClient(addr, conf);
            boolean z = true;
            Server.Connection connection = null;
            long j = 0;
            for (int i = 0; i < 128; i++) {
                String str = "request[" + i + DefaultExpressionEngine.DEFAULT_ATTRIBUTE_END;
                boolean z2 = ThreadLocalRandom.current().nextInt() % 4 == 0;
                LOG.info("TestDisconnect request[" + i + "]  shouldConnect=" + z + " willDisconnect=" + z2);
                if (z) {
                    j++;
                }
                try {
                    FakeRequestClass.exception = z2 ? rpcServerException2 : rpcServerException;
                    testRpcService.ping(null, newEmptyRequest());
                    Assert.fail(str + " didn't fail");
                } catch (ServiceException e) {
                    Assert.assertEquals(str, testReaderException, ((RemoteException) e.getCause()).unwrapRemoteException());
                }
                Assert.assertEquals(str, j, mutableCounterLong.value());
                if (!z2) {
                    Server.Connection[] connections = server.getConnections();
                    Assert.assertEquals(str, 1L, connections.length);
                    if (z) {
                        Assert.assertNotSame(str, connection, connections[0]);
                    } else {
                        Assert.assertSame(str, connection, connections[0]);
                    }
                    connection = connections[0];
                } else if (connection != null) {
                    Assert.assertTrue(str, connection.shouldClose());
                }
                z = z2;
            }
            stop(server, testRpcService);
        } catch (Throwable th) {
            stop(server, testRpcService);
            throw th;
        }
    }

    public static void main(String[] strArr) throws Exception {
        new TestRPC().testCallsInternal(conf);
    }
}
