package org.apache.hadoop.hdfs.server.datanode;

import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.LinkedHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:lib/hadoop-hdfs-2.2.0-tests.jar:org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.class */
public class TestBPOfferService {
    private static final String FAKE_CLUSTERID = "fake cluster";
    private DatanodeProtocolClientSideTranslatorPB mockNN1;
    private DatanodeProtocolClientSideTranslatorPB mockNN2;
    private NNHAStatusHeartbeat[] mockHaStatuses = new NNHAStatusHeartbeat[2];
    private int[] heartbeatCounts = new int[2];
    private DataNode mockDn;
    private FsDatasetSpi<?> mockFSDataset;
    protected static final Log LOG = LogFactory.getLog(TestBPOfferService.class);
    private static final String FAKE_BPID = "fake bpid";
    private static final ExtendedBlock FAKE_BLOCK = new ExtendedBlock(FAKE_BPID, 12345);
    private static final String TEST_BUILD_DATA = System.getProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA, "build/test/data");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/hadoop-hdfs-2.2.0-tests.jar:org/apache/hadoop/hdfs/server/datanode/TestBPOfferService$HeartbeatAnswer.class */
    public class HeartbeatAnswer implements Answer<HeartbeatResponse> {
        private final int nnIdx;

        public HeartbeatAnswer(int i) {
            this.nnIdx = i;
        }

        /* renamed from: answer, reason: merged with bridge method [inline-methods] */
        public HeartbeatResponse m2357answer(InvocationOnMock invocationOnMock) throws Throwable {
            int[] iArr = TestBPOfferService.this.heartbeatCounts;
            int i = this.nnIdx;
            iArr[i] = iArr[i] + 1;
            return new HeartbeatResponse(new DatanodeCommand[0], TestBPOfferService.this.mockHaStatuses[this.nnIdx]);
        }
    }

    @Before
    public void setupMocks() throws Exception {
        this.mockNN1 = setupNNMock(0);
        this.mockNN2 = setupNNMock(1);
        this.mockDn = (DataNode) Mockito.mock(DataNode.class);
        ((DataNode) Mockito.doReturn(true).when(this.mockDn)).shouldRun();
        Configuration configuration = new Configuration();
        configuration.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, new File(new File(TEST_BUILD_DATA, "dfs"), MapFile.DATA_FILE_NAME).toURI().toString());
        ((DataNode) Mockito.doReturn(configuration).when(this.mockDn)).getConf();
        ((DataNode) Mockito.doReturn(new DNConf(configuration)).when(this.mockDn)).getDnConf();
        ((DataNode) Mockito.doReturn(DataNodeMetrics.create(configuration, "fake dn")).when(this.mockDn)).getMetrics();
        this.mockFSDataset = (FsDatasetSpi) Mockito.spy(new SimulatedFSDataset(null, null, configuration));
        this.mockFSDataset.addBlockPool(FAKE_BPID, configuration);
        ((DataNode) Mockito.doReturn(this.mockFSDataset).when(this.mockDn)).getFSDataset();
    }

    private DatanodeProtocolClientSideTranslatorPB setupNNMock(int i) throws Exception {
        DatanodeProtocolClientSideTranslatorPB datanodeProtocolClientSideTranslatorPB = (DatanodeProtocolClientSideTranslatorPB) Mockito.mock(DatanodeProtocolClientSideTranslatorPB.class);
        ((DatanodeProtocolClientSideTranslatorPB) Mockito.doReturn(new NamespaceInfo(1, FAKE_CLUSTERID, FAKE_BPID, 0L)).when(datanodeProtocolClientSideTranslatorPB)).versionRequest();
        ((DatanodeProtocolClientSideTranslatorPB) Mockito.doReturn(DFSTestUtil.getLocalDatanodeRegistration()).when(datanodeProtocolClientSideTranslatorPB)).registerDatanode((DatanodeRegistration) Mockito.any(DatanodeRegistration.class));
        ((DatanodeProtocolClientSideTranslatorPB) Mockito.doAnswer(new HeartbeatAnswer(i)).when(datanodeProtocolClientSideTranslatorPB)).sendHeartbeat((DatanodeRegistration) Mockito.any(DatanodeRegistration.class), (StorageReport[]) Mockito.any(StorageReport[].class), Mockito.anyInt(), Mockito.anyInt(), Mockito.anyInt());
        this.mockHaStatuses[i] = new NNHAStatusHeartbeat(HAServiceProtocol.HAServiceState.STANDBY, 0L);
        return datanodeProtocolClientSideTranslatorPB;
    }

    @Test
    public void testBasicFunctionality() throws Exception {
        BPOfferService bPOfferService = setupBPOSForNNs(this.mockNN1, this.mockNN2);
        bPOfferService.start();
        try {
            waitForInitialization(bPOfferService);
            ((DatanodeProtocolClientSideTranslatorPB) Mockito.verify(this.mockNN1)).registerDatanode((DatanodeRegistration) Mockito.any(DatanodeRegistration.class));
            ((DatanodeProtocolClientSideTranslatorPB) Mockito.verify(this.mockNN2)).registerDatanode((DatanodeRegistration) Mockito.any(DatanodeRegistration.class));
            waitForBlockReport(this.mockNN1);
            waitForBlockReport(this.mockNN2);
            bPOfferService.notifyNamenodeReceivedBlock(FAKE_BLOCK, "");
            ReceivedDeletedBlockInfo[] waitForBlockReceived = waitForBlockReceived(FAKE_BLOCK, this.mockNN1);
            Assert.assertEquals(1L, waitForBlockReceived.length);
            Assert.assertEquals(FAKE_BLOCK.getLocalBlock(), waitForBlockReceived[0].getBlock());
            ReceivedDeletedBlockInfo[] waitForBlockReceived2 = waitForBlockReceived(FAKE_BLOCK, this.mockNN2);
            Assert.assertEquals(1L, waitForBlockReceived2.length);
            Assert.assertEquals(FAKE_BLOCK.getLocalBlock(), waitForBlockReceived2[0].getBlock());
            bPOfferService.stop();
        } catch (Throwable th) {
            bPOfferService.stop();
            throw th;
        }
    }

    @Test
    public void testIgnoreDeletionsFromNonActive() throws Exception {
        BPOfferService bPOfferService = setupBPOSForNNs(this.mockNN1, this.mockNN2);
        ((DatanodeProtocolClientSideTranslatorPB) Mockito.doReturn(new BlockCommand(2, FAKE_BPID, new Block[]{FAKE_BLOCK.getLocalBlock()})).when(this.mockNN2)).blockReport((DatanodeRegistration) Mockito.anyObject(), (String) Mockito.eq(FAKE_BPID), (StorageBlockReport[]) Mockito.anyObject());
        bPOfferService.start();
        try {
            waitForInitialization(bPOfferService);
            waitForBlockReport(this.mockNN1);
            waitForBlockReport(this.mockNN2);
            bPOfferService.stop();
            ((FsDatasetSpi) Mockito.verify(this.mockFSDataset, Mockito.never())).invalidate((String) Mockito.eq(FAKE_BPID), (Block[]) Mockito.anyObject());
        } catch (Throwable th) {
            bPOfferService.stop();
            throw th;
        }
    }

    @Test
    public void testNNsFromDifferentClusters() throws Exception {
        ((DatanodeProtocolClientSideTranslatorPB) Mockito.doReturn(new NamespaceInfo(1, "fake foreign cluster", FAKE_BPID, 0L)).when(this.mockNN1)).versionRequest();
        BPOfferService bPOfferService = setupBPOSForNNs(this.mockNN1, this.mockNN2);
        bPOfferService.start();
        try {
            waitForOneToFail(bPOfferService);
            bPOfferService.stop();
        } catch (Throwable th) {
            bPOfferService.stop();
            throw th;
        }
    }

    @Test
    public void testPickActiveNameNode() throws Exception {
        BPOfferService bPOfferService = setupBPOSForNNs(this.mockNN1, this.mockNN2);
        bPOfferService.start();
        try {
            waitForInitialization(bPOfferService);
            Assert.assertNull(bPOfferService.getActiveNN());
            this.mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceProtocol.HAServiceState.ACTIVE, 1L);
            bPOfferService.triggerHeartbeatForTests();
            Assert.assertSame(this.mockNN1, bPOfferService.getActiveNN());
            this.mockHaStatuses[1] = new NNHAStatusHeartbeat(HAServiceProtocol.HAServiceState.ACTIVE, 2L);
            bPOfferService.triggerHeartbeatForTests();
            Assert.assertSame(this.mockNN2, bPOfferService.getActiveNN());
            bPOfferService.triggerHeartbeatForTests();
            Assert.assertSame(this.mockNN2, bPOfferService.getActiveNN());
            this.mockHaStatuses[1] = new NNHAStatusHeartbeat(HAServiceProtocol.HAServiceState.STANDBY, 2L);
            bPOfferService.triggerHeartbeatForTests();
            Assert.assertNull(bPOfferService.getActiveNN());
            this.mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceProtocol.HAServiceState.ACTIVE, 3L);
            bPOfferService.triggerHeartbeatForTests();
            Assert.assertSame(this.mockNN1, bPOfferService.getActiveNN());
            bPOfferService.stop();
        } catch (Throwable th) {
            bPOfferService.stop();
            throw th;
        }
    }

    private void waitForOneToFail(final BPOfferService bPOfferService) throws Exception {
        GenericTestUtils.waitFor(new Supplier<Boolean>() { // from class: org.apache.hadoop.hdfs.server.datanode.TestBPOfferService.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.google.common.base.Supplier
            public Boolean get() {
                return Boolean.valueOf(bPOfferService.countNameNodes() == 1);
            }
        }, 100, 10000);
    }

    private BPOfferService setupBPOSForNNs(DatanodeProtocolClientSideTranslatorPB... datanodeProtocolClientSideTranslatorPBArr) throws IOException {
        LinkedHashMap newLinkedHashMap = Maps.newLinkedHashMap();
        for (int i = 0; i < datanodeProtocolClientSideTranslatorPBArr.length; i++) {
            newLinkedHashMap.put(new InetSocketAddress(i), datanodeProtocolClientSideTranslatorPBArr[i]);
            ((DataNode) Mockito.doReturn(datanodeProtocolClientSideTranslatorPBArr[i]).when(this.mockDn)).connectToNN((InetSocketAddress) Mockito.eq(new InetSocketAddress(i)));
        }
        return new BPOfferService(Lists.newArrayList(newLinkedHashMap.keySet()), this.mockDn);
    }

    private void waitForInitialization(final BPOfferService bPOfferService) throws Exception {
        GenericTestUtils.waitFor(new Supplier<Boolean>() { // from class: org.apache.hadoop.hdfs.server.datanode.TestBPOfferService.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.google.common.base.Supplier
            public Boolean get() {
                return Boolean.valueOf(bPOfferService.isAlive() && bPOfferService.isInitialized());
            }
        }, 100, 10000);
    }

    private void waitForBlockReport(final DatanodeProtocolClientSideTranslatorPB datanodeProtocolClientSideTranslatorPB) throws Exception {
        GenericTestUtils.waitFor(new Supplier<Boolean>() { // from class: org.apache.hadoop.hdfs.server.datanode.TestBPOfferService.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.google.common.base.Supplier
            public Boolean get() {
                try {
                    ((DatanodeProtocolClientSideTranslatorPB) Mockito.verify(datanodeProtocolClientSideTranslatorPB)).blockReport((DatanodeRegistration) Mockito.anyObject(), (String) Mockito.eq(TestBPOfferService.FAKE_BPID), (StorageBlockReport[]) Mockito.anyObject());
                    return true;
                } catch (Throwable th) {
                    TestBPOfferService.LOG.info("waiting on block report: " + th.getMessage());
                    return false;
                }
            }
        }, 500, 10000);
    }

    private ReceivedDeletedBlockInfo[] waitForBlockReceived(ExtendedBlock extendedBlock, DatanodeProtocolClientSideTranslatorPB datanodeProtocolClientSideTranslatorPB) throws Exception {
        final ArgumentCaptor forClass = ArgumentCaptor.forClass(StorageReceivedDeletedBlocks[].class);
        GenericTestUtils.waitFor(new Supplier<Boolean>() { // from class: org.apache.hadoop.hdfs.server.datanode.TestBPOfferService.4
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.google.common.base.Supplier
            public Boolean get() {
                try {
                    ((DatanodeProtocolClientSideTranslatorPB) Mockito.verify(TestBPOfferService.this.mockNN1)).blockReceivedAndDeleted((DatanodeRegistration) Mockito.anyObject(), (String) Mockito.eq(TestBPOfferService.FAKE_BPID), (StorageReceivedDeletedBlocks[]) forClass.capture());
                    return true;
                } catch (Throwable th) {
                    return false;
                }
            }
        }, 100, 10000);
        return ((StorageReceivedDeletedBlocks[]) forClass.getValue())[0].getBlocks();
    }

    static {
        ((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.ALL);
    }
}
