package com.couchbase.client.dcp.buffer;

import com.couchbase.client.core.event.EventBus;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.dcp.ControlEventHandler;
import com.couchbase.client.dcp.DataEventHandler;
import com.couchbase.client.dcp.events.StreamEndEvent;
import com.couchbase.client.dcp.message.DcpDeletionMessage;
import com.couchbase.client.dcp.message.DcpExpirationMessage;
import com.couchbase.client.dcp.message.DcpMutationMessage;
import com.couchbase.client.dcp.message.DcpSnapshotMarkerRequest;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.dcp.message.RollbackMessage;
import com.couchbase.client.dcp.message.StreamEndReason;
import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
import com.couchbase.client.dcp.util.MathUtils;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;

/* loaded from: input_file:com/couchbase/client/dcp/buffer/StreamEventBuffer.class */
public class StreamEventBuffer implements DataEventHandler, ControlEventHandler {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance((Class<?>) StreamEventBuffer.class);
    private static final int MAX_PARITITONS = 1024;
    private final EventBus eventBus;
    private volatile DataEventHandler dataEventHandler;
    private volatile ControlEventHandler controlEventHandler;
    private final List<Deque<BufferedEvent>> partitionQueues;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/couchbase/client/dcp/buffer/StreamEventBuffer$BufferedEvent.class */
    public static class BufferedEvent {
        private final long seqno;
        private final ByteBuf event;
        private final ChannelFlowController flowController;
        private final Type type;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:com/couchbase/client/dcp/buffer/StreamEventBuffer$BufferedEvent$Type.class */
        public enum Type {
            DATA,
            CONTROL,
            STREAM_END_OK
        }

        BufferedEvent(long j, ByteBuf byteBuf, ChannelFlowController channelFlowController, Type type) {
            this.seqno = j;
            this.event = byteBuf;
            this.flowController = channelFlowController;
            this.type = type;
        }

        static BufferedEvent streamEnd(long j) {
            return new BufferedEvent(j, null, null, Type.STREAM_END_OK);
        }
    }

    public StreamEventBuffer(EventBus eventBus) {
        this.eventBus = eventBus;
        ArrayList arrayList = new ArrayList(1024);
        for (int i = 0; i < 1024; i++) {
            arrayList.add(new ArrayDeque());
        }
        this.partitionQueues = Collections.unmodifiableList(arrayList);
    }

    public void setDataEventHandler(DataEventHandler dataEventHandler) {
        this.dataEventHandler = dataEventHandler;
    }

    public void setControlEventHandler(ControlEventHandler controlEventHandler) {
        this.controlEventHandler = controlEventHandler;
    }

    @Override // com.couchbase.client.dcp.DataEventHandler
    public void onEvent(ChannelFlowController channelFlowController, ByteBuf byteBuf) {
        if (DcpMutationMessage.is(byteBuf) || DcpDeletionMessage.is(byteBuf) || DcpExpirationMessage.is(byteBuf)) {
            enqueue(MessageUtil.getVbucket(byteBuf), new BufferedEvent(DcpMutationMessage.bySeqno(byteBuf), byteBuf, channelFlowController, BufferedEvent.Type.DATA));
            return;
        }
        if (DcpSnapshotMarkerRequest.is(byteBuf)) {
            enqueue(MessageUtil.getVbucket(byteBuf), new BufferedEvent(DcpSnapshotMarkerRequest.startSeqno(byteBuf), byteBuf, channelFlowController, BufferedEvent.Type.CONTROL));
        } else if (RollbackMessage.is(byteBuf)) {
            rollback(RollbackMessage.vbucket(byteBuf), RollbackMessage.seqno(byteBuf));
            this.controlEventHandler.onEvent(channelFlowController, byteBuf);
        } else {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Propagating unhandled control event: {}", MessageUtil.humanize(byteBuf));
            }
            this.controlEventHandler.onEvent(channelFlowController, byteBuf);
        }
    }

    public void onStreamEnd(StreamEndEvent streamEndEvent) {
        Deque<BufferedEvent> deque = this.partitionQueues.get(streamEndEvent.partition());
        synchronized (deque) {
            if (streamEndEvent.reason() != StreamEndReason.OK) {
                deque.clear();
                this.eventBus.publish(streamEndEvent);
            } else if (deque.isEmpty()) {
                this.eventBus.publish(streamEndEvent);
            } else {
                deque.add(BufferedEvent.streamEnd(deque.peekLast().seqno));
            }
        }
    }

    private void enqueue(short s, BufferedEvent bufferedEvent) {
        Deque<BufferedEvent> deque = this.partitionQueues.get(s);
        synchronized (deque) {
            deque.add(bufferedEvent);
        }
    }

    private void rollback(short s, long j) {
        Deque<BufferedEvent> deque = this.partitionQueues.get(s);
        synchronized (deque) {
            Iterator<BufferedEvent> it = deque.iterator();
            while (it.hasNext()) {
                if (MathUtils.compareUnsignedLong(it.next().seqno, j) > 0) {
                    it.remove();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasBufferedEvents(short s) {
        boolean z;
        Deque<BufferedEvent> deque = this.partitionQueues.get(s);
        synchronized (deque) {
            z = !deque.isEmpty();
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Failed to find 'out' block for switch in B:11:0x004c. Please report as an issue. */
    public void onSeqnoPersisted(short s, long j) {
        Deque<BufferedEvent> deque = this.partitionQueues.get(s);
        synchronized (deque) {
            while (!deque.isEmpty() && MathUtils.compareUnsignedLong(deque.peek().seqno, j) < 1) {
                BufferedEvent poll = deque.poll();
                try {
                } catch (Throwable th) {
                    LOGGER.error("Event handler threw exception", th);
                }
                switch (poll.type) {
                    case DATA:
                        this.dataEventHandler.onEvent(poll.flowController, poll.event);
                    case CONTROL:
                        this.controlEventHandler.onEvent(poll.flowController, poll.event);
                    case STREAM_END_OK:
                        this.eventBus.publish(new StreamEndEvent(s, StreamEndReason.OK));
                    default:
                        throw new RuntimeException("Unexpected event type: " + poll.type);
                        break;
                }
            }
        }
    }
}
