package tachyon.worker;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import tachyon.IntegrationTestConstants;
import tachyon.TachyonURI;
import tachyon.TestUtils;
import tachyon.client.RemoteBlockReader;
import tachyon.client.TachyonFS;
import tachyon.client.TachyonFSTestUtils;
import tachyon.client.WriteType;
import tachyon.conf.TachyonConf;
import tachyon.master.LocalTachyonCluster;
import tachyon.network.protocol.RPCResponse;
import tachyon.thrift.ClientBlockInfo;
import tachyon.thrift.FileAlreadyExistException;
import tachyon.thrift.InvalidPathException;
import tachyon.thrift.NetAddress;
import tachyon.util.CommonUtils;

@RunWith(Parameterized.class)
/* loaded from: input_file:tachyon/worker/DataServerIntegrationTest.class */
public class DataServerIntegrationTest {
    private static final int WORKER_CAPACITY_BYTES = 1000;
    private static final int USER_QUOTA_UNIT_BYTES = 100;
    private final String mDataServerClass;
    private final String mNettyTransferType;
    private final String mBlockReader;
    private LocalTachyonCluster mLocalTachyonCluster = null;
    private TachyonFS mTFS = null;
    private TachyonConf mWorkerTachyonConf;

    @Parameterized.Parameters
    public static Collection<Object[]> data() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Object[]{IntegrationTestConstants.NETTY_DATA_SERVER, IntegrationTestConstants.MAPPED_TRANSFER, IntegrationTestConstants.TCP_BLOCK_READER});
        arrayList.add(new Object[]{IntegrationTestConstants.NETTY_DATA_SERVER, IntegrationTestConstants.MAPPED_TRANSFER, IntegrationTestConstants.NETTY_BLOCK_READER});
        arrayList.add(new Object[]{IntegrationTestConstants.NETTY_DATA_SERVER, IntegrationTestConstants.FILE_CHANNEL_TRANSFER, IntegrationTestConstants.TCP_BLOCK_READER});
        arrayList.add(new Object[]{IntegrationTestConstants.NETTY_DATA_SERVER, IntegrationTestConstants.FILE_CHANNEL_TRANSFER, IntegrationTestConstants.NETTY_BLOCK_READER});
        arrayList.add(new Object[]{IntegrationTestConstants.NIO_DATA_SERVER, IntegrationTestConstants.UNUSED_TRANSFER, IntegrationTestConstants.TCP_BLOCK_READER});
        arrayList.add(new Object[]{IntegrationTestConstants.NIO_DATA_SERVER, IntegrationTestConstants.UNUSED_TRANSFER, IntegrationTestConstants.NETTY_BLOCK_READER});
        return arrayList;
    }

    public DataServerIntegrationTest(String str, String str2, String str3) {
        this.mDataServerClass = str;
        this.mNettyTransferType = str2;
        this.mBlockReader = str3;
    }

    @After
    public final void after() throws Exception {
        this.mLocalTachyonCluster.stop();
        System.clearProperty("tachyon.worker.data.server.class");
        System.clearProperty("tachyon.worker.network.netty.file.transfer");
        System.clearProperty("tachyon.user.remote.block.reader.class");
    }

    private void assertError(DataServerMessage dataServerMessage, long j) {
        Assert.assertEquals(j, dataServerMessage.getBlockId());
        Assert.assertEquals(0L, dataServerMessage.getLength());
        Assert.assertNotEquals(dataServerMessage.getStatus().getId(), RPCResponse.Status.SUCCESS.getId());
    }

    private void assertValid(DataServerMessage dataServerMessage, ByteBuffer byteBuffer, long j, long j2, long j3) {
        Assert.assertEquals(byteBuffer, dataServerMessage.getReadOnlyData());
        Assert.assertEquals(j, dataServerMessage.getBlockId());
        Assert.assertEquals(j2, dataServerMessage.getOffset());
        Assert.assertEquals(j3, dataServerMessage.getLength());
    }

    private void assertValid(DataServerMessage dataServerMessage, int i, long j, long j2, long j3) {
        assertValid(dataServerMessage, TestUtils.getIncreasingByteBuffer(i), j, j2, j3);
    }

    @Before
    public final void before() throws IOException {
        System.setProperty("tachyon.worker.data.server.class", this.mDataServerClass);
        System.setProperty("tachyon.worker.network.netty.file.transfer", this.mNettyTransferType);
        System.setProperty("tachyon.user.remote.block.reader.class", this.mBlockReader);
        this.mLocalTachyonCluster = new LocalTachyonCluster(1000L, USER_QUOTA_UNIT_BYTES, 1073741824);
        this.mLocalTachyonCluster.start();
        this.mWorkerTachyonConf = this.mLocalTachyonCluster.getWorkerTachyonConf();
        this.mTFS = this.mLocalTachyonCluster.getClient();
    }

    @Test
    public void lengthTooSmall() throws IOException {
        ClientBlockInfo clientBlockInfo = (ClientBlockInfo) this.mTFS.getFileBlocks(TachyonFSTestUtils.createByteFile(this.mTFS, "/readTooLarge", WriteType.MUST_CACHE, 20)).get(0);
        assertError(request(clientBlockInfo, 0L, -40L), clientBlockInfo.blockId);
    }

    @Test
    public void multiReadTest() throws IOException {
        ClientBlockInfo clientBlockInfo = (ClientBlockInfo) this.mTFS.getFileBlocks(TachyonFSTestUtils.createByteFile(this.mTFS, "/multiReadTest", WriteType.MUST_CACHE, 20)).get(0);
        for (int i = 0; i < 10; i++) {
            assertValid(request(clientBlockInfo), 20, clientBlockInfo.getBlockId(), 0L, 20L);
        }
    }

    @Test
    public void negativeOffset() throws IOException {
        ClientBlockInfo clientBlockInfo = (ClientBlockInfo) this.mTFS.getFileBlocks(TachyonFSTestUtils.createByteFile(this.mTFS, "/readTooLarge", WriteType.MUST_CACHE, 10)).get(0);
        assertError(request(clientBlockInfo, -20L, 1L), clientBlockInfo.blockId);
    }

    @Test
    public void readMultiFiles() throws IOException {
        ClientBlockInfo clientBlockInfo = (ClientBlockInfo) this.mTFS.getFileBlocks(TachyonFSTestUtils.createByteFile(this.mTFS, "/readFile1", WriteType.MUST_CACHE, 501)).get(0);
        assertValid(request(clientBlockInfo), 501, clientBlockInfo.getBlockId(), 0L, 501L);
        ClientBlockInfo clientBlockInfo2 = (ClientBlockInfo) this.mTFS.getFileBlocks(TachyonFSTestUtils.createByteFile(this.mTFS, "/readFile2", WriteType.MUST_CACHE, 501)).get(0);
        assertValid(request(clientBlockInfo2), 501, clientBlockInfo2.getBlockId(), 0L, 501L);
        CommonUtils.sleepMs((Logger) null, (TestUtils.getToMasterHeartBeatIntervalMs(this.mWorkerTachyonConf) * 2) + 10);
        Assert.assertEquals(0L, this.mTFS.getFileStatus(-1, new TachyonURI("/readFile1")).inMemoryPercentage);
    }

    @Test
    public void readPartialTest1() throws InvalidPathException, FileAlreadyExistException, IOException {
        ClientBlockInfo clientBlockInfo = (ClientBlockInfo) this.mTFS.getFileBlocks(TachyonFSTestUtils.createByteFile(this.mTFS, "/testFile", WriteType.MUST_CACHE, 10)).get(0);
        assertValid(request(clientBlockInfo, 0L, 6L), 6, clientBlockInfo.getBlockId(), 0L, 6L);
    }

    @Test
    public void readPartialTest2() throws InvalidPathException, FileAlreadyExistException, IOException {
        ClientBlockInfo clientBlockInfo = (ClientBlockInfo) this.mTFS.getFileBlocks(TachyonFSTestUtils.createByteFile(this.mTFS, "/testFile", WriteType.MUST_CACHE, 10)).get(0);
        assertValid(request(clientBlockInfo, 2L, 6L), TestUtils.getIncreasingByteBuffer(2, 6), clientBlockInfo.getBlockId(), 2L, 6L);
    }

    @Test
    public void readTest() throws InvalidPathException, FileAlreadyExistException, IOException {
        ClientBlockInfo clientBlockInfo = (ClientBlockInfo) this.mTFS.getFileBlocks(TachyonFSTestUtils.createByteFile(this.mTFS, "/testFile", WriteType.MUST_CACHE, 10)).get(0);
        assertValid(request(clientBlockInfo), 10, clientBlockInfo.getBlockId(), 0L, 10L);
    }

    @Test
    public void readThroughClientTest() throws InvalidPathException, FileAlreadyExistException, IOException {
        ClientBlockInfo clientBlockInfo = (ClientBlockInfo) this.mTFS.getFileBlocks(TachyonFSTestUtils.createByteFile(this.mTFS, "/testFile", WriteType.MUST_CACHE, 10)).get(0);
        Assert.assertEquals(TestUtils.getIncreasingByteBuffer(10), RemoteBlockReader.Factory.createRemoteBlockReader(this.mWorkerTachyonConf).readRemoteBlock(((NetAddress) clientBlockInfo.getLocations().get(0)).mHost, ((NetAddress) clientBlockInfo.getLocations().get(0)).mSecondaryPort, clientBlockInfo.getBlockId(), 0L, 10L));
    }

    public void readThroughClientNonExistentTest() throws InvalidPathException, FileAlreadyExistException, IOException {
        List<ClientBlockInfo> fileBlocks = this.mTFS.getFileBlocks(TachyonFSTestUtils.createByteFile(this.mTFS, "/testFile", WriteType.MUST_CACHE, 10));
        ClientBlockInfo clientBlockInfo = (ClientBlockInfo) fileBlocks.get(0);
        long blockId = clientBlockInfo.getBlockId();
        for (ClientBlockInfo clientBlockInfo2 : fileBlocks) {
            if (clientBlockInfo2.getBlockId() > blockId) {
                blockId = clientBlockInfo2.getBlockId();
            }
        }
        Assert.assertNull(RemoteBlockReader.Factory.createRemoteBlockReader(this.mWorkerTachyonConf).readRemoteBlock(((NetAddress) clientBlockInfo.getLocations().get(0)).mHost, ((NetAddress) clientBlockInfo.getLocations().get(0)).mSecondaryPort, blockId + 1, 0L, 10L));
    }

    @Test
    public void readTooLarge() throws IOException {
        ClientBlockInfo clientBlockInfo = (ClientBlockInfo) this.mTFS.getFileBlocks(TachyonFSTestUtils.createByteFile(this.mTFS, "/readTooLarge", WriteType.MUST_CACHE, 20)).get(0);
        assertError(request(clientBlockInfo, 0L, 40L), clientBlockInfo.blockId);
    }

    private DataServerMessage request(ClientBlockInfo clientBlockInfo) throws IOException {
        return request(clientBlockInfo, 0L, -1L);
    }

    private DataServerMessage request(ClientBlockInfo clientBlockInfo, long j, long j2) throws IOException {
        DataServerMessage createBlockRequestMessage = DataServerMessage.createBlockRequestMessage(clientBlockInfo.blockId, j, j2);
        SocketChannel open = SocketChannel.open(new InetSocketAddress(((NetAddress) clientBlockInfo.getLocations().get(0)).mHost, ((NetAddress) clientBlockInfo.getLocations().get(0)).mSecondaryPort));
        while (!createBlockRequestMessage.finishSending()) {
            try {
                createBlockRequestMessage.send(open);
            } finally {
                open.close();
            }
        }
        DataServerMessage createBlockResponseMessage = DataServerMessage.createBlockResponseMessage(false, clientBlockInfo.blockId, j, j2, (ByteBuffer) null);
        while (!createBlockResponseMessage.isMessageReady() && createBlockResponseMessage.recv(open) != -1) {
        }
        return createBlockResponseMessage;
    }

    @Test
    public void tooLargeOffset() throws IOException {
        ClientBlockInfo clientBlockInfo = (ClientBlockInfo) this.mTFS.getFileBlocks(TachyonFSTestUtils.createByteFile(this.mTFS, "/readTooLarge", WriteType.MUST_CACHE, 10)).get(0);
        assertError(request(clientBlockInfo, 20L, 1L), clientBlockInfo.blockId);
    }
}
