package org.neo4j.bolt.transport;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.SocketChannelConfig;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/neo4j/bolt/transport/TransportWriteThrottleTest.class */
public class TransportWriteThrottleTest {
    private ChannelHandlerContext context;
    private Channel channel;
    private SocketChannelConfig config;
    private TestThrottleLock lock;

    /* loaded from: input_file:org/neo4j/bolt/transport/TransportWriteThrottleTest$TestThrottleLock.class */
    private static class TestThrottleLock implements ThrottleLock {
        private AtomicInteger lockCount;
        private AtomicInteger unlockCount;
        private Semaphore semaphore;
        private volatile CountDownLatch lockWaiter;

        private TestThrottleLock() {
            this.lockCount = new AtomicInteger(0);
            this.unlockCount = new AtomicInteger(0);
            this.semaphore = new Semaphore(1);
            this.lockWaiter = new CountDownLatch(1);
        }

        public void lock(Channel channel, long j) throws InterruptedException {
            this.semaphore.acquire();
            this.lockCount.incrementAndGet();
            this.lockWaiter.countDown();
        }

        public void unlock(Channel channel) {
            this.semaphore.release();
            this.unlockCount.incrementAndGet();
            this.lockWaiter = new CountDownLatch(1);
        }

        public boolean waitLocked(long j, TimeUnit timeUnit) throws InterruptedException {
            return this.lockWaiter.await(j, timeUnit);
        }

        public int lockCallCount() {
            return this.lockCount.get();
        }

        public int unlockCallCount() {
            return this.unlockCount.get();
        }
    }

    @Before
    public void setup() throws Exception {
        this.lock = new TestThrottleLock();
        this.config = (SocketChannelConfig) Mockito.mock(SocketChannelConfig.class);
        Attribute attribute = (Attribute) Mockito.mock(Attribute.class);
        Mockito.when(attribute.get()).thenReturn(this.lock);
        this.channel = (Channel) Mockito.mock(SocketChannel.class, Answers.RETURNS_MOCKS);
        Mockito.when(this.channel.config()).thenReturn(this.config);
        Mockito.when(Boolean.valueOf(this.channel.isOpen())).thenReturn(true);
        Mockito.when(this.channel.attr((AttributeKey) ArgumentMatchers.any())).thenReturn(attribute);
        Mockito.when(this.channel.pipeline()).thenReturn(this.channel.pipeline());
        this.context = (ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class, Answers.RETURNS_MOCKS);
        Mockito.when(this.context.channel()).thenReturn(this.channel);
    }

    @Test
    public void shouldSetWriteBufferWatermarkOnChannelConfigWhenInstalled() {
        newThrottle().install(this.channel);
        ((SocketChannelConfig) Mockito.verify(this.config, Mockito.times(1))).setWriteBufferWaterMark((WriteBufferWaterMark) ArgumentCaptor.forClass(WriteBufferWaterMark.class).capture());
        Assert.assertEquals(64L, ((WriteBufferWaterMark) r0.getValue()).low());
        Assert.assertEquals(256L, ((WriteBufferWaterMark) r0.getValue()).high());
    }

    @Test
    public void shouldNotLockWhenWritable() throws Exception {
        TransportThrottle newThrottleAndInstall = newThrottleAndInstall(this.channel);
        Mockito.when(Boolean.valueOf(this.channel.isWritable())).thenReturn(true);
        Future<?> submit = Executors.newSingleThreadExecutor().submit(() -> {
            newThrottleAndInstall.acquire(this.channel);
        });
        try {
            submit.get(2000L, TimeUnit.MILLISECONDS);
        } catch (Throwable th) {
            Assert.fail("should not throw");
        }
        Assert.assertTrue(submit.isDone());
        Assert.assertThat(Integer.valueOf(this.lock.lockCallCount()), Matchers.is(0));
        Assert.assertThat(Integer.valueOf(this.lock.unlockCallCount()), Matchers.is(0));
    }

    @Test
    public void shouldLockWhenNotWritable() throws Exception {
        TransportThrottle newThrottleAndInstall = newThrottleAndInstall(this.channel);
        Mockito.when(Boolean.valueOf(this.channel.isWritable())).thenReturn(false);
        Future<?> submit = Executors.newSingleThreadExecutor().submit(() -> {
            newThrottleAndInstall.acquire(this.channel);
        });
        try {
            submit.get(2000L, TimeUnit.MILLISECONDS);
            Assert.fail("should timeout");
        } catch (TimeoutException e) {
        } catch (Throwable th) {
            Assert.fail("should timeout");
        }
        Assert.assertFalse(submit.isDone());
        Assert.assertThat(Integer.valueOf(this.lock.lockCallCount()), Matchers.greaterThan(0));
        Assert.assertThat(Integer.valueOf(this.lock.unlockCallCount()), Matchers.is(0));
    }

    @Test
    public void shouldResumeWhenWritableOnceAgain() throws Exception {
        TransportThrottle newThrottleAndInstall = newThrottleAndInstall(this.channel);
        Mockito.when(Boolean.valueOf(this.channel.isWritable())).thenReturn(false).thenReturn(true);
        newThrottleAndInstall.acquire(this.channel);
        Assert.assertThat(Integer.valueOf(this.lock.lockCallCount()), Matchers.greaterThan(0));
        Assert.assertThat(Integer.valueOf(this.lock.unlockCallCount()), Matchers.is(0));
    }

    @Test
    public void shouldResumeWhenWritabilityChanged() throws Exception {
        TransportThrottle newThrottleAndInstall = newThrottleAndInstall(this.channel);
        Mockito.when(Boolean.valueOf(this.channel.isWritable())).thenReturn(false);
        Future<?> submit = Executors.newSingleThreadExecutor().submit(() -> {
            newThrottleAndInstall.acquire(this.channel);
        });
        if (!this.lock.waitLocked(1L, TimeUnit.MINUTES)) {
            Assert.fail("lock should be acquired");
        }
        Mockito.when(Boolean.valueOf(this.channel.isWritable())).thenReturn(true);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(ChannelInboundHandler.class);
        ((ChannelPipeline) Mockito.verify(this.channel.pipeline())).addLast(new ChannelHandler[]{(ChannelHandler) forClass.capture()});
        ((ChannelInboundHandler) forClass.getValue()).channelWritabilityChanged(this.context);
        try {
            submit.get(1L, TimeUnit.MINUTES);
        } catch (Throwable th) {
            Assert.fail("should not throw");
        }
        Assert.assertThat(Integer.valueOf(this.lock.lockCallCount()), Matchers.greaterThan(0));
        Assert.assertThat(Integer.valueOf(this.lock.unlockCallCount()), Matchers.is(1));
    }

    private TransportThrottle newThrottle() {
        return new TransportWriteThrottle(64, 256, () -> {
            return this.lock;
        });
    }

    private TransportThrottle newThrottleAndInstall(Channel channel) {
        TransportThrottle newThrottle = newThrottle();
        newThrottle.install(channel);
        return newThrottle;
    }
}
