package org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Queue;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.AsyncResult;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.Context;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.Future;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.Handler;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.Promise;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.Vertx;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.Message;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.MessageConsumer;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.impl.Arguments;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.impl.ContextInternal;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.impl.future.PromiseInternal;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.impl.logging.Logger;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.impl.logging.LoggerFactory;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.streams.ReadStream;
import org.apache.pulsar.jetcd.shaded.io.vertx.core.streams.StreamBase;

/* loaded from: input_file:META-INF/bundled-dependencies/jetcd-core-shaded-3.3.2.2-shaded.jar:org/apache/pulsar/jetcd/shaded/io/vertx/core/eventbus/impl/MessageConsumerImpl.class */
public class MessageConsumerImpl<T> extends HandlerRegistration<T> implements MessageConsumer<T> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MessageConsumerImpl.class);
    private static final int DEFAULT_MAX_BUFFERED_MESSAGES = 1000;
    private final Vertx vertx;
    private final ContextInternal context;
    private final EventBusImpl eventBus;
    private final String address;
    private final boolean localOnly;
    private Handler<Message<T>> handler;
    private Handler<AsyncResult<Void>> completionHandler;
    private Handler<Void> endHandler;
    private Handler<Message<T>> discardHandler;
    private int maxBufferedMessages;
    private Queue<Message<T>> pending;
    private long demand;
    private Promise<Void> result;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageConsumerImpl(Vertx vertx, ContextInternal contextInternal, EventBusImpl eventBusImpl, String str, boolean z) {
        super(contextInternal, eventBusImpl, str, false);
        this.maxBufferedMessages = 1000;
        this.pending = new ArrayDeque(8);
        this.demand = Long.MAX_VALUE;
        this.vertx = vertx;
        this.context = contextInternal;
        this.eventBus = eventBusImpl;
        this.address = str;
        this.localOnly = z;
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.MessageConsumer
    public MessageConsumer<T> setMaxBufferedMessages(int i) {
        Arguments.require(i >= 0, "Max buffered messages cannot be negative");
        synchronized (this) {
            this.maxBufferedMessages = i;
            int size = this.pending.size() - i;
            if (size <= 0) {
                return this;
            }
            if (this.pending.isEmpty()) {
                return this;
            }
            Handler<Message<T>> handler = this.discardHandler;
            ArrayList<Message<T>> arrayList = new ArrayList(size);
            while (this.pending.size() > i) {
                arrayList.add(this.pending.poll());
            }
            for (Message<T> message : arrayList) {
                if (handler != null) {
                    handler.handle(message);
                }
                discard(message);
            }
            return this;
        }
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.MessageConsumer
    public synchronized int getMaxBufferedMessages() {
        return this.maxBufferedMessages;
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.MessageConsumer
    public String address() {
        return this.address;
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.MessageConsumer
    public synchronized void completionHandler(Handler<AsyncResult<Void>> handler) {
        Objects.requireNonNull(handler);
        if (this.result != null) {
            this.result.future().onComplete2(handler);
        } else {
            this.completionHandler = handler;
        }
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.HandlerRegistration, org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.MessageConsumer
    public synchronized Future<Void> unregister() {
        this.handler = null;
        if (this.endHandler != null) {
            this.endHandler.handle(null);
        }
        if (this.pending.size() > 0) {
            Queue<Message<T>> queue = this.pending;
            Handler<Message<T>> handler = this.discardHandler;
            this.pending = new ArrayDeque();
            for (Message<T> message : queue) {
                discard(message);
                if (handler != null) {
                    this.context.emit(message, handler);
                }
            }
        }
        this.discardHandler = null;
        Future<Void> unregister = super.unregister();
        Promise<Void> promise = this.result;
        if (promise != null) {
            unregister.onComplete2(asyncResult -> {
                promise.tryFail("Consumer unregistered before registration completed");
            });
            this.result = null;
        }
        return unregister;
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.HandlerRegistration
    protected boolean doReceive(Message<T> message) {
        synchronized (this) {
            if (this.handler == null) {
                return false;
            }
            if (this.demand == 0) {
                if (this.pending.size() < this.maxBufferedMessages) {
                    this.pending.add(message);
                    return true;
                }
                discard(message);
                if (this.discardHandler != null) {
                    this.discardHandler.handle(message);
                } else {
                    log.warn("Discarding message as more than " + this.maxBufferedMessages + " buffered in paused consumer. address: " + this.address);
                }
                return true;
            }
            if (this.pending.size() > 0) {
                this.pending.add(message);
                message = this.pending.poll();
            }
            if (this.demand != Long.MAX_VALUE) {
                this.demand--;
            }
            deliver(this.handler, message);
            return true;
        }
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.impl.HandlerRegistration
    protected void dispatch(Message<T> message, ContextInternal contextInternal, Handler<Message<T>> handler) {
        if (handler == null) {
            throw new NullPointerException();
        }
        contextInternal.dispatch(message, handler);
    }

    private void deliver(Handler<Message<T>> handler, Message<T> message) {
        dispatch(handler, message, this.context.duplicate());
        checkNextTick();
    }

    private synchronized void checkNextTick() {
        if (this.pending.isEmpty() || this.demand <= 0) {
            return;
        }
        this.context.nettyEventLoop().execute(() -> {
            Message<T> poll;
            synchronized (this) {
                if (this.demand == 0 || (poll = this.pending.poll()) == null) {
                    return;
                }
                if (this.demand != Long.MAX_VALUE) {
                    this.demand--;
                }
                deliver(this.handler, poll);
            }
        });
    }

    public synchronized void discardHandler(Handler<Message<T>> handler) {
        this.discardHandler = handler;
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.MessageConsumer, org.apache.pulsar.jetcd.shaded.io.vertx.core.streams.ReadStream
    /* renamed from: handler */
    public synchronized MessageConsumer<T> handler2(Handler<Message<T>> handler) {
        if (handler != null) {
            synchronized (this) {
                this.handler = handler;
                if (this.result == null) {
                    PromiseInternal<T> promise = this.context.promise();
                    if (this.completionHandler != null) {
                        promise.future().onComplete2(this.completionHandler);
                    }
                    this.result = promise;
                    PromiseInternal<T> promise2 = this.context.promise();
                    register(null, this.localOnly, promise2);
                    promise2.future().onComplete2(asyncResult -> {
                        if (asyncResult.succeeded()) {
                            promise.tryComplete();
                        } else {
                            promise.tryFail(asyncResult.cause());
                        }
                    });
                }
            }
        } else {
            unregister();
        }
        return this;
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.MessageConsumer
    public ReadStream<T> bodyStream() {
        return new BodyReadStream(this);
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.MessageConsumer, org.apache.pulsar.jetcd.shaded.io.vertx.core.streams.ReadStream
    /* renamed from: pause */
    public synchronized MessageConsumer<T> pause2() {
        this.demand = 0L;
        return this;
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.MessageConsumer, org.apache.pulsar.jetcd.shaded.io.vertx.core.streams.ReadStream
    /* renamed from: resume */
    public MessageConsumer<T> resume2() {
        return fetch2(Long.MAX_VALUE);
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.MessageConsumer, org.apache.pulsar.jetcd.shaded.io.vertx.core.streams.ReadStream
    /* renamed from: fetch */
    public synchronized MessageConsumer<T> fetch2(long j) {
        if (j < 0) {
            throw new IllegalArgumentException();
        }
        this.demand += j;
        if (this.demand < 0) {
            this.demand = Long.MAX_VALUE;
        }
        if (this.demand > 0) {
            checkNextTick();
        }
        return this;
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.MessageConsumer, org.apache.pulsar.jetcd.shaded.io.vertx.core.streams.ReadStream
    public synchronized MessageConsumer<T> endHandler(Handler<Void> handler) {
        if (handler != null) {
            Context orCreateContext = this.vertx.getOrCreateContext();
            this.endHandler = r5 -> {
                orCreateContext.runOnContext(r4 -> {
                    handler.handle(null);
                });
            };
        } else {
            this.endHandler = null;
        }
        return this;
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.MessageConsumer, org.apache.pulsar.jetcd.shaded.io.vertx.core.streams.ReadStream, org.apache.pulsar.jetcd.shaded.io.vertx.core.streams.StreamBase
    public synchronized MessageConsumer<T> exceptionHandler(Handler<Throwable> handler) {
        return this;
    }

    public synchronized Handler<Message<T>> getHandler() {
        return this.handler;
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.MessageConsumer, org.apache.pulsar.jetcd.shaded.io.vertx.core.streams.ReadStream
    public /* bridge */ /* synthetic */ ReadStream endHandler(Handler handler) {
        return endHandler((Handler<Void>) handler);
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.MessageConsumer, org.apache.pulsar.jetcd.shaded.io.vertx.core.streams.ReadStream, org.apache.pulsar.jetcd.shaded.io.vertx.core.streams.StreamBase
    public /* bridge */ /* synthetic */ ReadStream exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    @Override // org.apache.pulsar.jetcd.shaded.io.vertx.core.eventbus.MessageConsumer, org.apache.pulsar.jetcd.shaded.io.vertx.core.streams.ReadStream, org.apache.pulsar.jetcd.shaded.io.vertx.core.streams.StreamBase
    public /* bridge */ /* synthetic */ StreamBase exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
