package org.neo4j.bolt.transport;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.SocketChannelConfig;
import io.netty.util.Attribute;
import java.net.InetSocketAddress;
import java.time.Clock;
import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.neo4j.test.rule.OtherThreadRule;
import org.neo4j.time.Clocks;
import org.neo4j.time.FakeClock;

/* loaded from: input_file:org/neo4j/bolt/transport/TransportWriteThrottleTest.class */
public class TransportWriteThrottleTest {

    @Rule
    public OtherThreadRule<Void> otherThread = new OtherThreadRule<>();
    private ChannelHandlerContext context;
    private Channel channel;
    private SocketChannelConfig config;
    private ThrottleLock lock;
    private Attribute lockAttribute;

    /* loaded from: input_file:org/neo4j/bolt/transport/TransportWriteThrottleTest$TestThrottleLock.class */
    private static class TestThrottleLock implements ThrottleLock {
        private AtomicInteger lockCount = new AtomicInteger(0);
        private AtomicInteger unlockCount = new AtomicInteger(0);
        private ThrottleLock actualLock = new DefaultThrottleLock();

        private TestThrottleLock() {
        }

        public void lock(Channel channel, long j) throws InterruptedException {
            try {
                this.actualLock.lock(channel, j);
                this.lockCount.incrementAndGet();
            } catch (Throwable th) {
                this.lockCount.incrementAndGet();
                throw th;
            }
        }

        public void unlock(Channel channel) {
            try {
                this.actualLock.unlock(channel);
            } finally {
                this.unlockCount.incrementAndGet();
            }
        }

        public int lockCallCount() {
            return this.lockCount.get();
        }

        public int unlockCallCount() {
            return this.unlockCount.get();
        }
    }

    @Before
    public void setup() throws Exception {
        this.lock = newThrottleLockMock();
        this.config = (SocketChannelConfig) Mockito.mock(SocketChannelConfig.class);
        this.lockAttribute = (Attribute) Mockito.mock(Attribute.class);
        Mockito.when(this.lockAttribute.get()).thenReturn(this.lock);
        Attribute attribute = (Attribute) Mockito.mock(Attribute.class);
        Mockito.when(attribute.get()).thenReturn((Object) null);
        this.channel = (Channel) Mockito.mock(SocketChannel.class, Answers.RETURNS_MOCKS);
        Mockito.when(this.channel.config()).thenReturn(this.config);
        Mockito.when(Boolean.valueOf(this.channel.isOpen())).thenReturn(true);
        Mockito.when(this.channel.remoteAddress()).thenReturn(InetSocketAddress.createUnresolved("localhost", 32000));
        Mockito.when(this.channel.attr(TransportWriteThrottle.LOCK_KEY)).thenReturn(this.lockAttribute);
        Mockito.when(this.channel.attr(TransportWriteThrottle.MAX_DURATION_EXCEEDED_KEY)).thenReturn(attribute);
        Mockito.when(this.channel.pipeline()).thenReturn(this.channel.pipeline());
        this.context = (ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class, Answers.RETURNS_MOCKS);
        Mockito.when(this.context.channel()).thenReturn(this.channel);
    }

    @Test
    public void shouldSetWriteBufferWatermarkOnChannelConfigWhenInstalled() {
        newThrottle().install(this.channel);
        ((SocketChannelConfig) Mockito.verify(this.config)).setWriteBufferWaterMark((WriteBufferWaterMark) ArgumentCaptor.forClass(WriteBufferWaterMark.class).capture());
        Assert.assertEquals(64L, ((WriteBufferWaterMark) r0.getValue()).low());
        Assert.assertEquals(256L, ((WriteBufferWaterMark) r0.getValue()).high());
    }

    @Test
    public void shouldNotLockWhenWritable() throws Exception {
        TestThrottleLock testThrottleLock = new TestThrottleLock();
        TransportThrottle newThrottleAndInstall = newThrottleAndInstall(this.channel, testThrottleLock);
        Mockito.when(Boolean.valueOf(this.channel.isWritable())).thenReturn(true);
        Future execute = this.otherThread.execute(r5 -> {
            newThrottleAndInstall.acquire(this.channel);
            return null;
        });
        try {
            execute.get(2000L, TimeUnit.MILLISECONDS);
        } catch (Throwable th) {
            Assert.fail("should not throw");
        }
        Assert.assertTrue(execute.isDone());
        Assertions.assertThat(testThrottleLock.lockCallCount()).isEqualTo(0);
        Assertions.assertThat(testThrottleLock.unlockCallCount()).isEqualTo(0);
    }

    @Test
    public void shouldLockWhenNotWritable() throws Exception {
        TestThrottleLock testThrottleLock = new TestThrottleLock();
        TransportThrottle newThrottleAndInstall = newThrottleAndInstall(this.channel, testThrottleLock);
        Mockito.when(Boolean.valueOf(this.channel.isWritable())).thenReturn(false);
        Future execute = this.otherThread.execute(r5 -> {
            newThrottleAndInstall.acquire(this.channel);
            return null;
        });
        try {
            execute.get(2000L, TimeUnit.MILLISECONDS);
            Assert.fail("should timeout");
        } catch (TimeoutException e) {
        }
        Assert.assertFalse(execute.isDone());
        Assertions.assertThat(testThrottleLock.lockCallCount()).isGreaterThan(0);
        Assertions.assertThat(testThrottleLock.unlockCallCount()).isEqualTo(0);
        execute.cancel(true);
        try {
            this.otherThread.get().awaitFuture(execute);
            Assert.fail("Exception expected");
        } catch (CancellationException e2) {
        }
    }

    @Test
    public void shouldResumeWhenWritableOnceAgain() throws Exception {
        TransportThrottle newThrottleAndInstall = newThrottleAndInstall(this.channel);
        Mockito.when(Boolean.valueOf(this.channel.isWritable())).thenReturn(false).thenReturn(true);
        newThrottleAndInstall.acquire(this.channel);
        ((ThrottleLock) Mockito.verify(this.lock, Mockito.atLeast(1))).lock((Channel) ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        ((ThrottleLock) Mockito.verify(this.lock, Mockito.never())).unlock((Channel) ArgumentMatchers.any());
    }

    @Test
    public void shouldResumeWhenWritabilityChanged() throws Exception {
        TestThrottleLock testThrottleLock = new TestThrottleLock();
        TransportThrottle newThrottleAndInstall = newThrottleAndInstall(this.channel, testThrottleLock);
        Mockito.when(Boolean.valueOf(this.channel.isWritable())).thenReturn(false);
        Future execute = this.otherThread.execute(r5 -> {
            newThrottleAndInstall.acquire(this.channel);
            return null;
        });
        this.otherThread.get().waitUntilWaiting();
        Mockito.when(Boolean.valueOf(this.channel.isWritable())).thenReturn(true);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(ChannelInboundHandler.class);
        ((ChannelPipeline) Mockito.verify(this.channel.pipeline())).addLast(new ChannelHandler[]{(ChannelHandler) forClass.capture()});
        ((ChannelInboundHandler) forClass.getValue()).channelWritabilityChanged(this.context);
        this.otherThread.get().awaitFuture(execute);
        Assertions.assertThat(testThrottleLock.lockCallCount()).isGreaterThan(0);
        Assertions.assertThat(testThrottleLock.unlockCallCount()).isEqualTo(1);
    }

    @Test
    public void shouldThrowThrottleExceptionWhenMaxDurationIsReached() throws Exception {
        TestThrottleLock testThrottleLock = new TestThrottleLock();
        FakeClock fakeClock = Clocks.fakeClock(1L, TimeUnit.SECONDS);
        TransportThrottle newThrottleAndInstall = newThrottleAndInstall(this.channel, testThrottleLock, fakeClock, Duration.ofSeconds(5L));
        Mockito.when(Boolean.valueOf(this.channel.isWritable())).thenReturn(false);
        Future execute = this.otherThread.execute(r5 -> {
            newThrottleAndInstall.acquire(this.channel);
            return null;
        });
        this.otherThread.get().waitUntilWaiting();
        fakeClock.forward(6L, TimeUnit.SECONDS);
        try {
            execute.get(1L, TimeUnit.MINUTES);
            Assert.fail("expecting ExecutionException");
        } catch (ExecutionException e) {
            Assertions.assertThat(e.getCause()).isInstanceOf(TransportThrottleException.class);
            Assertions.assertThat(e.getMessage()).contains(new CharSequence[]{"will be closed because the client did not consume outgoing buffers for"});
        }
    }

    private TransportThrottle newThrottle() {
        return newThrottle(null, Clocks.systemClock(), Duration.ZERO);
    }

    private TransportThrottle newThrottle(ThrottleLock throttleLock, Clock clock, Duration duration) {
        if (throttleLock != null) {
            this.lock = throttleLock;
            Mockito.when(this.lockAttribute.get()).thenReturn(throttleLock);
        }
        return new TransportWriteThrottle(64, 256, clock, duration, () -> {
            return this.lock;
        });
    }

    private TransportThrottle newThrottleAndInstall(Channel channel) {
        return newThrottleAndInstall(channel, null);
    }

    private TransportThrottle newThrottleAndInstall(Channel channel, ThrottleLock throttleLock) {
        return newThrottleAndInstall(channel, throttleLock, Clocks.systemClock(), Duration.ZERO);
    }

    private TransportThrottle newThrottleAndInstall(Channel channel, ThrottleLock throttleLock, Clock clock, Duration duration) {
        TransportThrottle newThrottle = newThrottle(throttleLock, clock, duration);
        newThrottle.install(channel);
        return newThrottle;
    }

    private static ThrottleLock newThrottleLockMock() throws InterruptedException {
        ThrottleLock throttleLock = (ThrottleLock) Mockito.mock(ThrottleLock.class);
        ((ThrottleLock) Mockito.doAnswer(invocationOnMock -> {
            Thread.sleep(500L);
            return null;
        }).when(throttleLock)).lock((Channel) ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        return throttleLock;
    }
}
