package org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import javax.security.sasl.SaslException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.jute.BinaryOutputArchive;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.AsyncCallback;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.CreateMode;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.KeeperException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.PortAssignment;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.ZooDefs;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.ZooKeeper;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.proto.CreateRequest;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.ByteBufferInputStream;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.Request;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.ServerCnxn;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.test.ClientBase;
import org.apache.pulsar.functions.runtime.shaded.org.junit.After;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Assert;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Before;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.class */
public class SessionUpgradeQuorumTest extends QuorumPeerTestBase {
    protected static final Logger LOG = LoggerFactory.getLogger((Class<?>) SessionUpgradeQuorumTest.class);
    public static final int CONNECTION_TIMEOUT = ClientBase.CONNECTION_TIMEOUT;
    public static final int SERVER_COUNT = 3;
    private QuorumPeerTestBase.MainThread[] mt;
    private int[] clientPorts;
    private TestQPMainDropSessionUpgrading[] qpMain;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest$TestQPMainDropSessionUpgrading.class */
    public static class TestQPMainDropSessionUpgrading extends QuorumPeerTestBase.TestQPMain {
        private volatile boolean shouldDrop;
        private volatile int submitDelayMs;

        private TestQPMainDropSessionUpgrading() {
            this.shouldDrop = false;
            this.submitDelayMs = 0;
        }

        public void setDropCreateSession(boolean z) {
            this.shouldDrop = z;
        }

        public void setSubmitDelayMs(int i) {
            this.submitDelayMs = i;
        }

        @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.QuorumPeerMain
        protected QuorumPeer getQuorumPeer() throws SaslException {
            return new QuorumPeer() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.SessionUpgradeQuorumTest.TestQPMainDropSessionUpgrading.1
                @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.QuorumPeer
                protected Leader makeLeader(FileTxnSnapLog fileTxnSnapLog) throws IOException {
                    return new Leader(this, new LeaderZooKeeperServer(fileTxnSnapLog, this, getZkDb()) { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.SessionUpgradeQuorumTest.TestQPMainDropSessionUpgrading.1.1
                        @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.ZooKeeperServer
                        public void submitRequestNow(Request request) {
                            if (TestQPMainDropSessionUpgrading.this.submitDelayMs > 0) {
                                try {
                                    Thread.sleep(TestQPMainDropSessionUpgrading.this.submitDelayMs);
                                } catch (Exception e) {
                                }
                            }
                            super.submitRequestNow(request);
                        }
                    });
                }

                @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.QuorumPeer
                protected Follower makeFollower(FileTxnSnapLog fileTxnSnapLog) throws IOException {
                    return new Follower(this, new FollowerZooKeeperServer(fileTxnSnapLog, this, getZkDb()) { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.SessionUpgradeQuorumTest.TestQPMainDropSessionUpgrading.1.2
                        @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.ZooKeeperServer
                        public void submitRequestNow(Request request) {
                            if (TestQPMainDropSessionUpgrading.this.submitDelayMs > 0) {
                                try {
                                    Thread.sleep(TestQPMainDropSessionUpgrading.this.submitDelayMs);
                                } catch (Exception e) {
                                }
                            }
                            super.submitRequestNow(request);
                        }
                    }) { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.SessionUpgradeQuorumTest.TestQPMainDropSessionUpgrading.1.3
                        /* JADX INFO: Access modifiers changed from: protected */
                        @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.Learner
                        public void request(Request request) throws IOException {
                            if (!TestQPMainDropSessionUpgrading.this.shouldDrop) {
                                super.request(request);
                                return;
                            }
                            LOG.info("request is {}, cnxn {}", Integer.valueOf(request.type), request.cnxn);
                            if (request.type == -10) {
                                LOG.info("drop createSession request {}", request);
                                return;
                            }
                            if (request.type != 1 || request.cnxn == null) {
                                super.request(request);
                                return;
                            }
                            CreateRequest createRequest = new CreateRequest();
                            request.request.rewind();
                            ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
                            request.request.rewind();
                            try {
                                if (CreateMode.fromFlag(createRequest.getFlags()).isEphemeral()) {
                                    request.cnxn.sendCloseSession();
                                }
                            } catch (KeeperException e) {
                            }
                        }
                    };
                }
            };
        }
    }

    @Before
    public void setUp() throws Exception {
        LOG.info("STARTING quorum {}", getClass().getName());
        ClientBase.setupTestEnv();
        this.mt = new QuorumPeerTestBase.MainThread[3];
        this.clientPorts = new int[3];
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 3; i++) {
            this.clientPorts[i] = PortAssignment.unique();
            sb.append("server.").append(i).append("=127.0.0.1:").append(PortAssignment.unique()).append(":").append(PortAssignment.unique()).append("\n");
        }
        sb.append("localSessionsEnabled=true\n");
        sb.append("localSessionsUpgradingEnabled=true\n");
        String sb2 = sb.toString();
        this.qpMain = new TestQPMainDropSessionUpgrading[3];
        for (int i2 = 0; i2 < 3; i2++) {
            final TestQPMainDropSessionUpgrading testQPMainDropSessionUpgrading = new TestQPMainDropSessionUpgrading();
            this.qpMain[i2] = testQPMainDropSessionUpgrading;
            this.mt[i2] = new QuorumPeerTestBase.MainThread(i2, this.clientPorts[i2], sb2, false) { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.SessionUpgradeQuorumTest.1
                @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.QuorumPeerTestBase.MainThread
                public QuorumPeerTestBase.TestQPMain getTestQPMain() {
                    return testQPMainDropSessionUpgrading;
                }
            };
            this.mt[i2].start();
        }
        for (int i3 = 0; i3 < 3; i3++) {
            Assert.assertTrue("waiting for server " + i3 + " being up", ClientBase.waitForServerUp("127.0.0.1:" + this.clientPorts[i3], CONNECTION_TIMEOUT));
        }
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.QuorumPeerTestBase
    @After
    public void tearDown() throws Exception {
        LOG.info("STOPPING quorum {}", getClass().getName());
        for (int i = 0; i < 3; i++) {
            this.mt[i].shutdown();
        }
    }

    @Test
    public void testLocalSessionUpgradeSnapshot() throws IOException, InterruptedException {
        int i = -1;
        for (int i2 = 2; i2 >= 0; i2--) {
            if (this.mt[i2].main.quorumPeer.leader == null && i == -1) {
                i = i2;
            }
        }
        LOG.info("follower A is {}", Integer.valueOf(i));
        this.qpMain[i].setDropCreateSession(true);
        ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:" + this.clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
        waitForOne(zooKeeper, ZooKeeper.States.CONNECTED);
        long sessionId = zooKeeper.getSessionId();
        try {
            zooKeeper.create("/node-1", new byte[2], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            Assert.fail("expect to failed to upgrade session due to the TestQPMainDropSessionUpgrading is being used");
        } catch (KeeperException e) {
            LOG.info("KeeperException when create ephemeral node.", (Throwable) e);
        }
        this.qpMain[i].quorumPeer.follower.zk.takeSnapshot(true);
        Thread.sleep(500L);
        for (int i3 = 0; i3 < 3; i3++) {
            this.mt[i3].shutdown();
        }
        ArrayList<ZooKeeper.States> arrayList = new ArrayList<>();
        arrayList.add(ZooKeeper.States.CONNECTING);
        arrayList.add(ZooKeeper.States.CLOSED);
        waitForOne(zooKeeper, arrayList);
        for (int i4 = 0; i4 < 3; i4++) {
            this.mt[i4].start();
        }
        for (int i5 = 0; i5 < 3; i5++) {
            Assert.assertTrue("waiting for server " + i5 + " being up", ClientBase.waitForServerUp("127.0.0.1:" + this.clientPorts[i5], CONNECTION_TIMEOUT));
        }
        for (int i6 = 0; i6 < 3; i6++) {
            Assert.assertFalse("server " + i6 + " should not have global session " + sessionId, this.mt[i6].main.quorumPeer.getZkDb().getSessionWithTimeOuts().containsKey(Long.valueOf(sessionId)));
        }
        zooKeeper.close();
    }

    @Test
    public void testOnlyUpgradeSessionOnce() throws IOException, InterruptedException, KeeperException {
        ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:" + this.clientPorts[0], ClientBase.CONNECTION_TIMEOUT, this);
        waitForOne(zooKeeper, ZooKeeper.States.CONNECTED);
        long sessionId = zooKeeper.getSessionId();
        QuorumZooKeeperServer quorumZooKeeperServer = (QuorumZooKeeperServer) this.mt[0].main.quorumPeer.getActiveServer();
        Request createEphemeralRequest = createEphemeralRequest("/data-1", sessionId);
        Request createEphemeralRequest2 = createEphemeralRequest("/data-2", sessionId);
        Assert.assertNotNull("failed to upgrade on a ephemeral create", quorumZooKeeperServer.checkUpgradeSession(createEphemeralRequest));
        Assert.assertNull("tried to upgrade again", quorumZooKeeperServer.checkUpgradeSession(createEphemeralRequest2));
        zooKeeper.close();
    }

    @Test
    public void testCloseSessionWhileUpgradeOnLeader() throws IOException, KeeperException, InterruptedException {
        int i = -1;
        for (int i2 = 2; i2 >= 0; i2--) {
            if (this.mt[i2].main.quorumPeer.leader != null) {
                i = i2;
            }
        }
        if (i > 0) {
            makeSureEphemeralIsGone(i);
        }
    }

    @Test
    public void testCloseSessionWhileUpgradeOnLearner() throws IOException, KeeperException, InterruptedException {
        int i = -1;
        for (int i2 = 2; i2 >= 0; i2--) {
            if (this.mt[i2].main.quorumPeer.follower != null) {
                i = i2;
            }
        }
        if (i > 0) {
            makeSureEphemeralIsGone(i);
        }
    }

    private void makeSureEphemeralIsGone(int i) throws IOException, KeeperException, InterruptedException {
        this.qpMain[i].setSubmitDelayMs(200);
        ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:" + this.clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
        waitForOne(zooKeeper, ZooKeeper.States.CONNECTED);
        zooKeeper.create("/node-1", new byte[2], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new AsyncCallback.StringCallback() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.SessionUpgradeQuorumTest.2
            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.AsyncCallback.StringCallback
            public void processResult(int i2, String str, Object obj, String str2) {
            }
        }, (Object) null);
        zooKeeper.close();
        ZooKeeper zooKeeper2 = new ZooKeeper("127.0.0.1:" + this.clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
        waitForOne(zooKeeper2, ZooKeeper.States.CONNECTED);
        Assert.assertNull(zooKeeper2.exists("/node-1", false));
        zooKeeper2.close();
    }

    private void waitForOne(ZooKeeper zooKeeper, ArrayList<ZooKeeper.States> arrayList) throws InterruptedException {
        int i = ClientBase.CONNECTION_TIMEOUT / 500;
        while (!arrayList.contains(zooKeeper.getState())) {
            int i2 = i;
            i--;
            if (i2 == 0) {
                LOG.info("state is {}", zooKeeper.getState());
                throw new RuntimeException("Waiting too long");
            }
            Thread.sleep(500L);
        }
    }

    private Request createEphemeralRequest(String str, long j) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        new CreateRequest(str, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL.toFlag()).serialize(BinaryOutputArchive.getArchive(byteArrayOutputStream), "request");
        return new Request((ServerCnxn) null, j, 1, 15, ByteBuffer.wrap(byteArrayOutputStream.toByteArray()), new ArrayList());
    }
}
