package org.apache.hadoop.hdfs.server.datanode;

import com.google.common.base.Supplier;
import com.google.common.primitives.Ints;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.HdfsBlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.LogVerificationAppender;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* JADX WARN: Classes with same name are omitted:
  input_file:hadoop-hdfs-2.5.2-tests.jar:org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.class
  input_file:hadoop-hdfs-2.5.2/share/hadoop/hdfs/hadoop-hdfs-2.5.2-tests.jar:org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.class
 */
/* loaded from: input_file:test-classes/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.class */
public class TestFsDatasetCache {
    private static final long CACHE_CAPACITY = 65536;
    private static Configuration conf;
    private static FileSystem fs;
    private static NameNode nn;
    private static FSImage fsImage;
    private static DataNode dn;
    private static FsDatasetSpi<?> fsd;
    private static DatanodeProtocolClientSideTranslatorPB spyNN;
    private static NativeIO.POSIX.CacheManipulator prevCacheManipulator;
    private static final Log LOG = LogFactory.getLog(TestFsDatasetCache.class);
    private static final long PAGE_SIZE = NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
    private static final long BLOCK_SIZE = PAGE_SIZE;
    private static MiniDFSCluster cluster = null;
    private static final FsDatasetCache.PageRounder rounder = new FsDatasetCache.PageRounder();

    @Before
    public void setUp() throws Exception {
        Assume.assumeTrue(!Path.WINDOWS);
        conf = new HdfsConfiguration();
        conf.setLong(DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 100L);
        conf.setLong(DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 500L);
        conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
        conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, CACHE_CAPACITY);
        conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
        prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
        NativeIO.POSIX.setCacheManipulator(new NativeIO.POSIX.NoMlockCacheManipulator());
        cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
        cluster.waitActive();
        fs = cluster.getFileSystem();
        nn = cluster.getNameNode();
        fsImage = nn.getFSImage();
        dn = cluster.getDataNodes().get(0);
        fsd = dn.getFSDataset();
        spyNN = DataNodeTestUtils.spyOnBposToNN(dn, nn);
    }

    @After
    public void tearDown() throws Exception {
        if (fs != null) {
            fs.close();
        }
        if (cluster != null) {
            cluster.shutdown();
        }
        NativeIO.POSIX.setCacheManipulator(prevCacheManipulator);
    }

    private static void setHeartbeatResponse(DatanodeCommand[] datanodeCommandArr) throws IOException {
        ((DatanodeProtocolClientSideTranslatorPB) Mockito.doReturn(new HeartbeatResponse(datanodeCommandArr, new NNHAStatusHeartbeat(HAServiceProtocol.HAServiceState.ACTIVE, fsImage.getLastAppliedOrWrittenTxId()), null)).when(spyNN)).sendHeartbeat((DatanodeRegistration) Matchers.any(), (StorageReport[]) Matchers.any(), Matchers.anyLong(), Matchers.anyLong(), Matchers.anyInt(), Matchers.anyInt(), Matchers.anyInt());
    }

    private static DatanodeCommand[] cacheBlock(HdfsBlockLocation hdfsBlockLocation) {
        return cacheBlocks(new HdfsBlockLocation[]{hdfsBlockLocation});
    }

    private static DatanodeCommand[] cacheBlocks(HdfsBlockLocation[] hdfsBlockLocationArr) {
        return new DatanodeCommand[]{getResponse(hdfsBlockLocationArr, 9)};
    }

    private static DatanodeCommand[] uncacheBlock(HdfsBlockLocation hdfsBlockLocation) {
        return uncacheBlocks(new HdfsBlockLocation[]{hdfsBlockLocation});
    }

    private static DatanodeCommand[] uncacheBlocks(HdfsBlockLocation[] hdfsBlockLocationArr) {
        return new DatanodeCommand[]{getResponse(hdfsBlockLocationArr, 10)};
    }

    private static DatanodeCommand getResponse(HdfsBlockLocation[] hdfsBlockLocationArr, int i) {
        String blockPoolId = hdfsBlockLocationArr[0].getLocatedBlock().getBlock().getBlockPoolId();
        long[] jArr = new long[hdfsBlockLocationArr.length];
        for (int i2 = 0; i2 < hdfsBlockLocationArr.length; i2++) {
            jArr[i2] = hdfsBlockLocationArr[i2].getLocatedBlock().getBlock().getBlockId();
        }
        return new BlockIdCommand(i, blockPoolId, jArr);
    }

    private static long[] getBlockSizes(HdfsBlockLocation[] hdfsBlockLocationArr) throws Exception {
        long[] jArr = new long[hdfsBlockLocationArr.length];
        for (int i = 0; i < hdfsBlockLocationArr.length; i++) {
            HdfsBlockLocation hdfsBlockLocation = hdfsBlockLocationArr[i];
            jArr[i] = ((FileInputStream) fsd.getBlockInputStream(new ExtendedBlock(hdfsBlockLocation.getLocatedBlock().getBlock().getBlockPoolId(), hdfsBlockLocation.getLocatedBlock().getBlock().getLocalBlock()), 0L)).getChannel().size();
        }
        return jArr;
    }

    private void testCacheAndUncacheBlock() throws Exception {
        LOG.info("beginning testCacheAndUncacheBlock");
        DFSTestUtil.verifyExpectedCacheUsage(0L, 0L, fsd);
        Assert.assertEquals(0L, fsd.getNumBlocksCached());
        Path path = new Path("/testCacheBlock");
        long j = BLOCK_SIZE * 5;
        DFSTestUtil.createFile(fs, path, j, (short) 1, 43962L);
        HdfsBlockLocation[] hdfsBlockLocationArr = (HdfsBlockLocation[]) fs.getFileBlockLocations(path, 0L, j);
        Assert.assertEquals("Unexpected number of blocks", 5L, hdfsBlockLocationArr.length);
        long[] blockSizes = getBlockSizes(hdfsBlockLocationArr);
        long cacheCapacity = fsd.getCacheCapacity();
        long cacheUsed = fsd.getCacheUsed();
        long j2 = 0;
        Assert.assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity);
        Assert.assertEquals("Unexpected amount of cache used", 0L, cacheUsed);
        long j3 = 0;
        long j4 = 0;
        for (int i = 0; i < 5; i++) {
            setHeartbeatResponse(cacheBlock(hdfsBlockLocationArr[i]));
            j2 = DFSTestUtil.verifyExpectedCacheUsage(j2 + blockSizes[i], i + 1, fsd);
            long longCounter = MetricsAsserts.getLongCounter("BlocksCached", MetricsAsserts.getMetrics(dn.getMetrics().name()));
            Assert.assertTrue("Expected more cache requests from the NN (" + longCounter + " <= " + j3 + ")", longCounter > j3);
            j3 = longCounter;
        }
        for (int i2 = 0; i2 < 5; i2++) {
            setHeartbeatResponse(uncacheBlock(hdfsBlockLocationArr[i2]));
            j2 = DFSTestUtil.verifyExpectedCacheUsage(j2 - blockSizes[i2], 4 - i2, fsd);
            long longCounter2 = MetricsAsserts.getLongCounter("BlocksUncached", MetricsAsserts.getMetrics(dn.getMetrics().name()));
            Assert.assertTrue("Expected more uncache requests from the NN", longCounter2 > j4);
            j4 = longCounter2;
        }
        LOG.info("finishing testCacheAndUncacheBlock");
    }

    @Test(timeout = 600000)
    public void testCacheAndUncacheBlockSimple() throws Exception {
        testCacheAndUncacheBlock();
    }

    @Test(timeout = 600000)
    public void testCacheAndUncacheBlockWithRetries() throws Exception {
        NativeIO.POSIX.setCacheManipulator(new NativeIO.POSIX.NoMlockCacheManipulator() { // from class: org.apache.hadoop.hdfs.server.datanode.TestFsDatasetCache.1
            private final Set<String> seenIdentifiers = new HashSet();

            public void mlock(String str, ByteBuffer byteBuffer, long j) throws IOException {
                if (this.seenIdentifiers.contains(str)) {
                    TestFsDatasetCache.LOG.info("mlocking " + str);
                } else {
                    this.seenIdentifiers.add(str);
                    throw new IOException("injecting IOException during mlock of " + str);
                }
            }
        });
        testCacheAndUncacheBlock();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test(timeout = 600000)
    public void testFilesExceedMaxLockedMemory() throws Exception {
        LOG.info("beginning testFilesExceedMaxLockedMemory");
        Path[] pathArr = new Path[5];
        HdfsBlockLocation[] hdfsBlockLocationArr = new HdfsBlockLocation[5];
        long[] jArr = new long[5];
        for (int i = 0; i < 5; i++) {
            pathArr[i] = new Path("/testFilesExceedMaxLockedMemory-" + i);
            DFSTestUtil.createFile(fs, pathArr[i], 16384L, (short) 1, 3578L);
            hdfsBlockLocationArr[i] = (HdfsBlockLocation[]) fs.getFileBlockLocations(pathArr[i], 0L, 16384L);
            for (long j : getBlockSizes(hdfsBlockLocationArr[i])) {
                int i2 = i;
                jArr[i2] = jArr[i2] + j;
            }
        }
        long j2 = 0;
        DFSTestUtil.verifyExpectedCacheUsage(0L, 0L, fsd);
        for (int i3 = 0; i3 < 4; i3++) {
            setHeartbeatResponse(cacheBlocks(hdfsBlockLocationArr[i3]));
            j2 = DFSTestUtil.verifyExpectedCacheUsage(rounder.round(j2 + jArr[i3]), 4 * (i3 + 1), fsd);
        }
        final LogVerificationAppender logVerificationAppender = new LogVerificationAppender();
        Logger.getRootLogger().addAppender(logVerificationAppender);
        setHeartbeatResponse(cacheBlocks(hdfsBlockLocationArr[4]));
        GenericTestUtils.waitFor(new Supplier<Boolean>() { // from class: org.apache.hadoop.hdfs.server.datanode.TestFsDatasetCache.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.google.common.base.Supplier
            public Boolean get() {
                return Boolean.valueOf(logVerificationAppender.countLinesWithMessage("more bytes in the cache: dfs.datanode.max.locked.memory") > 0);
            }
        }, 500, 30000);
        Assert.assertTrue("Expected more than 0 failed cache attempts", fsd.getNumBlocksFailedToCache() > 0);
        int i4 = 16;
        for (int i5 = 0; i5 < 4; i5++) {
            setHeartbeatResponse(uncacheBlocks(hdfsBlockLocationArr[i5]));
            long round = rounder.round(jArr[i5]);
            j2 -= round;
            i4 = (int) (i4 - (round / BLOCK_SIZE));
            DFSTestUtil.verifyExpectedCacheUsage(j2, i4, fsd);
        }
        LOG.info("finishing testFilesExceedMaxLockedMemory");
    }

    @Test(timeout = 600000)
    public void testUncachingBlocksBeforeCachingFinishes() throws Exception {
        LOG.info("beginning testUncachingBlocksBeforeCachingFinishes");
        DFSTestUtil.verifyExpectedCacheUsage(0L, 0L, fsd);
        Path path = new Path("/testCacheBlock");
        long j = BLOCK_SIZE * 5;
        DFSTestUtil.createFile(fs, path, j, (short) 1, 43962L);
        HdfsBlockLocation[] hdfsBlockLocationArr = (HdfsBlockLocation[]) fs.getFileBlockLocations(path, 0L, j);
        Assert.assertEquals("Unexpected number of blocks", 5L, hdfsBlockLocationArr.length);
        long[] blockSizes = getBlockSizes(hdfsBlockLocationArr);
        long cacheCapacity = fsd.getCacheCapacity();
        long cacheUsed = fsd.getCacheUsed();
        long j2 = 0;
        Assert.assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity);
        Assert.assertEquals("Unexpected amount of cache used", 0L, cacheUsed);
        NativeIO.POSIX.setCacheManipulator(new NativeIO.POSIX.NoMlockCacheManipulator() { // from class: org.apache.hadoop.hdfs.server.datanode.TestFsDatasetCache.3
            public void mlock(String str, ByteBuffer byteBuffer, long j3) throws IOException {
                TestFsDatasetCache.LOG.info("An mlock operation is starting on " + str);
                try {
                    Thread.sleep(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT);
                } catch (InterruptedException e) {
                    Assert.fail();
                }
            }
        });
        for (int i = 0; i < 5; i++) {
            setHeartbeatResponse(cacheBlock(hdfsBlockLocationArr[i]));
            j2 = DFSTestUtil.verifyExpectedCacheUsage(j2 + blockSizes[i], i + 1, fsd);
        }
        setHeartbeatResponse(new DatanodeCommand[]{getResponse(hdfsBlockLocationArr, 10)});
        DFSTestUtil.verifyExpectedCacheUsage(0L, 0L, fsd);
        LOG.info("finishing testUncachingBlocksBeforeCachingFinishes");
    }

    @Test(timeout = 60000)
    public void testUncacheUnknownBlock() throws Exception {
        Path path = new Path("/testUncacheUnknownBlock");
        DFSTestUtil.createFile(fs, path, 4096, (short) 1, 65021L);
        setHeartbeatResponse(uncacheBlocks((HdfsBlockLocation[]) fs.getFileBlockLocations(path, 0L, 4096)));
        GenericTestUtils.waitFor(new Supplier<Boolean>() { // from class: org.apache.hadoop.hdfs.server.datanode.TestFsDatasetCache.4
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.google.common.base.Supplier
            public Boolean get() {
                return Boolean.valueOf(TestFsDatasetCache.fsd.getNumBlocksFailedToUncache() > 0);
            }
        }, 100, 10000);
    }

    @Test(timeout = 60000)
    public void testPageRounder() throws Exception {
        Path path = new Path("/testPageRounder");
        Assert.assertTrue("Page size should be greater than smallBlocks!", PAGE_SIZE > 512);
        FSDataOutputStream create = fs.create(path, false, 4096, (short) 1, 512L);
        create.write(new byte[2560]);
        create.close();
        HdfsBlockLocation[] hdfsBlockLocationArr = (HdfsBlockLocation[]) fs.getFileBlockLocations(path, 0L, 2560L);
        setHeartbeatResponse(cacheBlocks(hdfsBlockLocationArr));
        DFSTestUtil.verifyExpectedCacheUsage(PAGE_SIZE * 5, 5L, fsd);
        setHeartbeatResponse(uncacheBlocks(hdfsBlockLocationArr));
        DFSTestUtil.verifyExpectedCacheUsage(0L, 0L, fsd);
    }

    @Test(timeout = 60000)
    public void testUncacheQuiesces() throws Exception {
        Path path = new Path("/testUncacheQuiesces");
        DFSTestUtil.createFile(fs, path, 4096, (short) 1, 65021L);
        DistributedFileSystem fileSystem = cluster.getFileSystem();
        fileSystem.addCachePool(new CachePoolInfo("pool"));
        fileSystem.addCacheDirective(new CacheDirectiveInfo.Builder().setPool("pool").setPath(path).setReplication((short) 3).build());
        GenericTestUtils.waitFor(new Supplier<Boolean>() { // from class: org.apache.hadoop.hdfs.server.datanode.TestFsDatasetCache.5
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.google.common.base.Supplier
            public Boolean get() {
                return Boolean.valueOf(MetricsAsserts.getLongCounter("BlocksCached", MetricsAsserts.getMetrics(TestFsDatasetCache.dn.getMetrics().name())) > 0);
            }
        }, 1000, 30000);
        fileSystem.removeCacheDirective(1L);
        GenericTestUtils.waitFor(new Supplier<Boolean>() { // from class: org.apache.hadoop.hdfs.server.datanode.TestFsDatasetCache.6
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.google.common.base.Supplier
            public Boolean get() {
                return Boolean.valueOf(MetricsAsserts.getLongCounter("BlocksUncached", MetricsAsserts.getMetrics(TestFsDatasetCache.dn.getMetrics().name())) > 0);
            }
        }, 1000, 30000);
        Thread.sleep(10000L);
        MetricsRecordBuilder metrics = MetricsAsserts.getMetrics(dn.getMetrics().name());
        MetricsAsserts.assertCounter("BlocksCached", 1L, metrics);
        MetricsAsserts.assertCounter("BlocksUncached", 1L, metrics);
    }

    @Test(timeout = 60000)
    public void testReCacheAfterUncache() throws Exception {
        final int checkedCast = Ints.checkedCast(CACHE_CAPACITY / BLOCK_SIZE);
        BlockReaderTestUtil.enableHdfsCachingTracing();
        Assert.assertEquals(0L, CACHE_CAPACITY % BLOCK_SIZE);
        Path path = new Path("/smallFile");
        DFSTestUtil.createFile(fs, path, BLOCK_SIZE, (short) 1, 51966L);
        Path path2 = new Path("/bigFile");
        DFSTestUtil.createFile(fs, path2, checkedCast * BLOCK_SIZE, (short) 1, 48879L);
        final DistributedFileSystem fileSystem = cluster.getFileSystem();
        fileSystem.addCachePool(new CachePoolInfo("pool"));
        long addCacheDirective = fileSystem.addCacheDirective(new CacheDirectiveInfo.Builder().setPool("pool").setPath(path2).setReplication((short) 1).build());
        GenericTestUtils.waitFor(new Supplier<Boolean>() { // from class: org.apache.hadoop.hdfs.server.datanode.TestFsDatasetCache.7
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.google.common.base.Supplier
            public Boolean get() {
                long longCounter = MetricsAsserts.getLongCounter("BlocksCached", MetricsAsserts.getMetrics(TestFsDatasetCache.dn.getMetrics().name()));
                if (longCounter != checkedCast) {
                    TestFsDatasetCache.LOG.info("waiting for " + checkedCast + " to be cached.   Right now only " + longCounter + " blocks are cached.");
                    return false;
                }
                TestFsDatasetCache.LOG.info(checkedCast + " blocks are now cached.");
                return true;
            }
        }, 1000, 30000);
        final long addCacheDirective2 = fileSystem.addCacheDirective(new CacheDirectiveInfo.Builder().setPool("pool").setPath(path).setReplication((short) 1).build());
        Thread.sleep(10000L);
        Assert.assertEquals(checkedCast, MetricsAsserts.getLongCounter("BlocksCached", MetricsAsserts.getMetrics(dn.getMetrics().name())));
        fileSystem.removeCacheDirective(addCacheDirective);
        GenericTestUtils.waitFor(new Supplier<Boolean>() { // from class: org.apache.hadoop.hdfs.server.datanode.TestFsDatasetCache.8
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.google.common.base.Supplier
            public Boolean get() {
                CacheDirectiveEntry cacheDirectiveEntry;
                try {
                    RemoteIterator<CacheDirectiveEntry> listCacheDirectives = fileSystem.listCacheDirectives(new CacheDirectiveInfo.Builder().build());
                    do {
                        cacheDirectiveEntry = (CacheDirectiveEntry) listCacheDirectives.next();
                    } while (cacheDirectiveEntry.getInfo().getId().longValue() != addCacheDirective2);
                } catch (IOException e) {
                    Assert.fail("unexpected exception" + e.toString());
                }
                if (cacheDirectiveEntry.getStats().getFilesCached() != 1) {
                    TestFsDatasetCache.LOG.info("waiting for directive " + addCacheDirective2 + " to be cached.  stats = " + cacheDirectiveEntry.getStats());
                    return false;
                }
                TestFsDatasetCache.LOG.info("directive " + addCacheDirective2 + " has been cached.");
                return true;
            }
        }, 1000, 30000);
    }

    static {
        EditLogFileOutputStream.setShouldSkipFsyncForTesting(false);
        LogManager.getLogger(FsDatasetCache.class).setLevel(Level.DEBUG);
    }
}
