package tachyon.network.protocol;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.FileChannel;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import tachyon.TestUtils;
import tachyon.network.protocol.RPCResponse;
import tachyon.network.protocol.databuffer.DataBuffer;
import tachyon.network.protocol.databuffer.DataByteBuffer;
import tachyon.network.protocol.databuffer.DataFileChannel;
import tachyon.util.NetworkUtils;

/* loaded from: input_file:tachyon/network/protocol/RPCMessageIntegrationTest.class */
public class RPCMessageIntegrationTest {
    private static final long USER_ID = 10;
    private static final long BLOCK_ID = 11;
    private static final long OFFSET = 22;
    private static final long LENGTH = 33;

    @Rule
    public TemporaryFolder mFolder = new TemporaryFolder();
    private Channel mOutgoingChannel;
    private static NioEventLoopGroup sEventClient;
    private static NioEventLoopGroup sEventServer;
    private static MessageSavingHandler sIncomingHandler;
    private static Bootstrap sBootstrapClient;
    private static SocketAddress sLocalAddress;

    /* loaded from: input_file:tachyon/network/protocol/RPCMessageIntegrationTest$PipelineInitializer.class */
    private static class PipelineInitializer extends ChannelInitializer<SocketChannel> {
        private MessageSavingHandler mHandler;

        public PipelineInitializer(MessageSavingHandler messageSavingHandler) {
            this.mHandler = null;
            this.mHandler = messageSavingHandler;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline pipeline = socketChannel.pipeline();
            pipeline.addLast("frameDecoder", RPCMessage.createFrameDecoder());
            pipeline.addLast("RPCMessageDecoder", new RPCMessageDecoder());
            pipeline.addLast("RPCMessageEncoder", new RPCMessageEncoder());
            pipeline.addLast("handler", this.mHandler);
        }
    }

    @BeforeClass
    public static void beforeClass() {
        sEventClient = new NioEventLoopGroup(1);
        sEventServer = new NioEventLoopGroup(1);
        sIncomingHandler = new MessageSavingHandler();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(sEventServer);
        serverBootstrap.channel(NioServerSocketChannel.class);
        serverBootstrap.childHandler(new PipelineInitializer(sIncomingHandler));
        sLocalAddress = serverBootstrap.bind(new InetSocketAddress(NetworkUtils.getLocalHostName(100), 19998)).syncUninterruptibly().channel().localAddress();
        sBootstrapClient = new Bootstrap();
        sBootstrapClient.group(sEventClient);
        sBootstrapClient.channel(NioSocketChannel.class);
        sBootstrapClient.handler(new PipelineInitializer(new MessageSavingHandler()));
    }

    @AfterClass
    public static void afterClass() {
        sEventClient.shutdownGracefully(0L, 0L, TimeUnit.SECONDS);
        sEventServer.shutdownGracefully(0L, 0L, TimeUnit.SECONDS);
        sEventClient.terminationFuture().syncUninterruptibly();
        sEventServer.terminationFuture().syncUninterruptibly();
    }

    @Before
    public final void before() {
        sIncomingHandler.reset();
        this.mOutgoingChannel = sBootstrapClient.connect(sLocalAddress).syncUninterruptibly().channel();
    }

    @After
    public final void after() {
        this.mOutgoingChannel.close().syncUninterruptibly();
    }

    private void assertValid(RPCBlockRequest rPCBlockRequest, RPCBlockRequest rPCBlockRequest2) {
        Assert.assertEquals(rPCBlockRequest.getType(), rPCBlockRequest2.getType());
        Assert.assertEquals(rPCBlockRequest.getEncodedLength(), rPCBlockRequest2.getEncodedLength());
        Assert.assertEquals(rPCBlockRequest.getBlockId(), rPCBlockRequest2.getBlockId());
        Assert.assertEquals(rPCBlockRequest.getOffset(), rPCBlockRequest2.getOffset());
        Assert.assertEquals(rPCBlockRequest.getLength(), rPCBlockRequest2.getLength());
    }

    private void assertValid(RPCBlockResponse rPCBlockResponse, RPCBlockResponse rPCBlockResponse2) {
        Assert.assertEquals(rPCBlockResponse.getType(), rPCBlockResponse2.getType());
        Assert.assertEquals(rPCBlockResponse.getEncodedLength(), rPCBlockResponse2.getEncodedLength());
        Assert.assertEquals(rPCBlockResponse.getBlockId(), rPCBlockResponse2.getBlockId());
        Assert.assertEquals(rPCBlockResponse.getOffset(), rPCBlockResponse2.getOffset());
        Assert.assertEquals(rPCBlockResponse.getLength(), rPCBlockResponse2.getLength());
        Assert.assertEquals(rPCBlockResponse.getStatus(), rPCBlockResponse2.getStatus());
        if (rPCBlockResponse.getLength() != 0) {
            Assert.assertTrue(TestUtils.equalIncreasingByteBuffer(22, 33, rPCBlockResponse2.getPayloadDataBuffer().getReadOnlyByteBuffer()));
        } else {
            Assert.assertNull(rPCBlockResponse.getPayloadDataBuffer());
            Assert.assertNull(rPCBlockResponse2.getPayloadDataBuffer());
        }
    }

    private void assertValid(RPCBlockWriteRequest rPCBlockWriteRequest, RPCBlockWriteRequest rPCBlockWriteRequest2) {
        Assert.assertEquals(rPCBlockWriteRequest.getType(), rPCBlockWriteRequest2.getType());
        Assert.assertEquals(rPCBlockWriteRequest.getEncodedLength(), rPCBlockWriteRequest2.getEncodedLength());
        Assert.assertEquals(rPCBlockWriteRequest.getBlockId(), rPCBlockWriteRequest2.getBlockId());
        Assert.assertEquals(rPCBlockWriteRequest.getOffset(), rPCBlockWriteRequest2.getOffset());
        Assert.assertEquals(rPCBlockWriteRequest.getLength(), rPCBlockWriteRequest2.getLength());
        Assert.assertEquals(rPCBlockWriteRequest.getUserId(), rPCBlockWriteRequest2.getUserId());
        if (rPCBlockWriteRequest.getLength() > 0) {
            Assert.assertTrue(TestUtils.equalIncreasingByteBuffer(22, 33, rPCBlockWriteRequest2.getPayloadDataBuffer().getReadOnlyByteBuffer()));
        }
    }

    private void assertValid(RPCBlockWriteResponse rPCBlockWriteResponse, RPCBlockWriteResponse rPCBlockWriteResponse2) {
        Assert.assertEquals(rPCBlockWriteResponse.getType(), rPCBlockWriteResponse2.getType());
        Assert.assertEquals(rPCBlockWriteResponse.getEncodedLength(), rPCBlockWriteResponse2.getEncodedLength());
        Assert.assertEquals(rPCBlockWriteResponse.getBlockId(), rPCBlockWriteResponse2.getBlockId());
        Assert.assertEquals(rPCBlockWriteResponse.getOffset(), rPCBlockWriteResponse2.getOffset());
        Assert.assertEquals(rPCBlockWriteResponse.getLength(), rPCBlockWriteResponse2.getLength());
        Assert.assertEquals(rPCBlockWriteResponse.getUserId(), rPCBlockWriteResponse2.getUserId());
        Assert.assertEquals(rPCBlockWriteResponse.getStatus(), rPCBlockWriteResponse2.getStatus());
    }

    private void assertValid(RPCErrorResponse rPCErrorResponse, RPCErrorResponse rPCErrorResponse2) {
        Assert.assertEquals(rPCErrorResponse.getType(), rPCErrorResponse2.getType());
        Assert.assertEquals(rPCErrorResponse.getEncodedLength(), rPCErrorResponse2.getEncodedLength());
        Assert.assertEquals(rPCErrorResponse.getStatus(), rPCErrorResponse2.getStatus());
    }

    private FileChannel getTempFileChannel() throws IOException {
        File newFile = this.mFolder.newFile("temp.txt");
        FileOutputStream fileOutputStream = new FileOutputStream(newFile.getAbsolutePath());
        fileOutputStream.write(TestUtils.getIncreasingByteArray(55));
        fileOutputStream.close();
        return new FileInputStream(newFile).getChannel();
    }

    private RPCMessage encodeThenDecode(RPCMessage rPCMessage) {
        this.mOutgoingChannel.writeAndFlush(rPCMessage);
        return sIncomingHandler.getMessage();
    }

    @Test
    public void RPCBlockRequestTest() {
        RPCBlockRequest rPCBlockRequest = new RPCBlockRequest(BLOCK_ID, OFFSET, LENGTH);
        assertValid(rPCBlockRequest, (RPCBlockRequest) encodeThenDecode(rPCBlockRequest));
    }

    @Test
    public void RPCBlockResponseTest() {
        RPCBlockResponse rPCBlockResponse = new RPCBlockResponse(BLOCK_ID, OFFSET, LENGTH, new DataByteBuffer(TestUtils.getIncreasingByteBuffer(22, 33), LENGTH), RPCResponse.Status.SUCCESS);
        assertValid(rPCBlockResponse, (RPCBlockResponse) encodeThenDecode(rPCBlockResponse));
    }

    @Test
    public void RPCBlockResponseEmptyPayloadTest() {
        RPCBlockResponse rPCBlockResponse = new RPCBlockResponse(BLOCK_ID, OFFSET, 0L, (DataBuffer) null, RPCResponse.Status.SUCCESS);
        assertValid(rPCBlockResponse, (RPCBlockResponse) encodeThenDecode(rPCBlockResponse));
    }

    @Test
    public void RPCBlockResponseErrorTest() {
        RPCBlockResponse createErrorResponse = RPCBlockResponse.createErrorResponse(new RPCBlockRequest(BLOCK_ID, OFFSET, LENGTH), RPCResponse.Status.FILE_DNE);
        assertValid(createErrorResponse, (RPCBlockResponse) encodeThenDecode(createErrorResponse));
    }

    @Test
    public void RPCBlockResponseFileChannelTest() throws IOException {
        RPCBlockResponse rPCBlockResponse = new RPCBlockResponse(BLOCK_ID, OFFSET, LENGTH, new DataFileChannel(getTempFileChannel(), OFFSET, LENGTH), RPCResponse.Status.SUCCESS);
        assertValid(rPCBlockResponse, (RPCBlockResponse) encodeThenDecode(rPCBlockResponse));
    }

    @Test
    public void RPCBlockWriteRequestTest() {
        RPCBlockWriteRequest rPCBlockWriteRequest = new RPCBlockWriteRequest(USER_ID, BLOCK_ID, OFFSET, LENGTH, new DataByteBuffer(TestUtils.getIncreasingByteBuffer(22, 33), LENGTH));
        assertValid(rPCBlockWriteRequest, (RPCBlockWriteRequest) encodeThenDecode(rPCBlockWriteRequest));
    }

    @Test
    public void RPCBlockWriteResponseTest() {
        RPCBlockWriteResponse rPCBlockWriteResponse = new RPCBlockWriteResponse(USER_ID, BLOCK_ID, OFFSET, LENGTH, RPCResponse.Status.SUCCESS);
        assertValid(rPCBlockWriteResponse, (RPCBlockWriteResponse) encodeThenDecode(rPCBlockWriteResponse));
    }

    @Test
    public void RPCErrorResponseTest() {
        for (RPCResponse.Status status : RPCResponse.Status.values()) {
            RPCErrorResponse rPCErrorResponse = new RPCErrorResponse(status);
            assertValid(rPCErrorResponse, (RPCErrorResponse) encodeThenDecode(rPCErrorResponse));
        }
    }
}
