package com.couchbase.client.dcp.transport.netty;

import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.dcp.config.ClientEnvironment;
import com.couchbase.client.dcp.config.DcpControl;
import com.couchbase.client.dcp.message.DcpBufferAckRequest;
import com.couchbase.client.dcp.message.DcpDeletionMessage;
import com.couchbase.client.dcp.message.DcpExpirationMessage;
import com.couchbase.client.dcp.message.DcpMutationMessage;
import com.couchbase.client.dcp.message.DcpSetVbucketStateMessage;
import com.couchbase.client.dcp.message.DcpSnapshotMarkerRequest;
import com.couchbase.client.dcp.message.DcpStreamEndMessage;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.channel.Channel;

/* loaded from: input_file:com/couchbase/client/dcp/transport/netty/ChannelFlowControllerImpl.class */
public class ChannelFlowControllerImpl implements ChannelFlowController {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance((Class<?>) ChannelFlowControllerImpl.class);
    private final Channel channel;
    private final boolean needsBufferAck;
    private final int bufferAckWatermark;
    private int bufferAckCounter;

    public ChannelFlowControllerImpl(Channel channel, ClientEnvironment clientEnvironment) {
        this.channel = channel;
        this.needsBufferAck = clientEnvironment.dcpControl().bufferAckEnabled();
        if (this.needsBufferAck) {
            this.bufferAckWatermark = (int) Math.round((Integer.parseInt(clientEnvironment.dcpControl().get(DcpControl.Names.CONNECTION_BUFFER_SIZE)) / 100.0d) * clientEnvironment.bufferAckWatermark());
            LOGGER.debug("BufferAckWatermark absolute is {}", Integer.valueOf(this.bufferAckWatermark));
        } else {
            this.bufferAckWatermark = 0;
        }
        this.bufferAckCounter = 0;
    }

    @Override // com.couchbase.client.dcp.transport.netty.ChannelFlowController
    public void ack(ByteBuf byteBuf) {
        if (this.needsBufferAck) {
            if (DcpSetVbucketStateMessage.is(byteBuf) || DcpSnapshotMarkerRequest.is(byteBuf) || DcpStreamEndMessage.is(byteBuf) || DcpMutationMessage.is(byteBuf) || DcpDeletionMessage.is(byteBuf) || DcpExpirationMessage.is(byteBuf)) {
                ack(byteBuf.readableBytes());
            }
        }
    }

    @Override // com.couchbase.client.dcp.transport.netty.ChannelFlowController
    public void ack(int i) {
        if (this.needsBufferAck) {
            synchronized (this) {
                this.bufferAckCounter += i;
                LOGGER.trace("BufferAckCounter is now {}", Integer.valueOf(this.bufferAckCounter));
                if (this.bufferAckCounter >= this.bufferAckWatermark) {
                    LOGGER.trace("BufferAckWatermark reached on {}, acking now against the server.", this.channel.remoteAddress());
                    ByteBuf buffer = this.channel.alloc().buffer();
                    DcpBufferAckRequest.init(buffer);
                    DcpBufferAckRequest.ackBytes(buffer, this.bufferAckCounter);
                    this.channel.writeAndFlush(buffer);
                    this.bufferAckCounter = 0;
                }
                LOGGER.trace("Acknowledging {} bytes against connection {}.", Integer.valueOf(i), this.channel.remoteAddress());
            }
        }
    }
}
