package com.typesafe.netty;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.TypeParameterMatcher;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.kafka.shade.org.tukaani.xz.common.Util;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/netty-reactive-streams-2.0.4.jar:com/typesafe/netty/HandlerPublisher.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.4.5.jar:META-INF/bundled-dependencies/netty-reactive-streams-2.0.4.jar:com/typesafe/netty/HandlerPublisher.class */
public class HandlerPublisher<T> extends ChannelDuplexHandler implements Publisher<T> {
    private final EventExecutor executor;
    private final TypeParameterMatcher matcher;
    private volatile Subscriber<? super T> subscriber;
    private ChannelHandlerContext ctx;
    private Throwable noSubscriberError;
    private static final Object COMPLETE = new Object() { // from class: com.typesafe.netty.HandlerPublisher.3
        public String toString() {
            return "COMPLETE";
        }
    };
    private final Queue<Object> buffer = new LinkedList();
    private final AtomicBoolean hasSubscriber = new AtomicBoolean();
    private State state = State.NO_SUBSCRIBER_OR_CONTEXT;
    private long outstandingDemand = 0;

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/netty-reactive-streams-2.0.4.jar:com/typesafe/netty/HandlerPublisher$ChannelSubscription.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.4.5.jar:META-INF/bundled-dependencies/netty-reactive-streams-2.0.4.jar:com/typesafe/netty/HandlerPublisher$ChannelSubscription.class */
    public class ChannelSubscription implements Subscription {
        private ChannelSubscription() {
        }

        @Override // org.reactivestreams.Subscription
        public void request(final long j) {
            HandlerPublisher.this.executor.execute(new Runnable() { // from class: com.typesafe.netty.HandlerPublisher.ChannelSubscription.1
                @Override // java.lang.Runnable
                public void run() {
                    HandlerPublisher.this.receivedDemand(j);
                }
            });
        }

        @Override // org.reactivestreams.Subscription
        public void cancel() {
            HandlerPublisher.this.executor.execute(new Runnable() { // from class: com.typesafe.netty.HandlerPublisher.ChannelSubscription.2
                @Override // java.lang.Runnable
                public void run() {
                    HandlerPublisher.this.receivedCancel();
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/netty-reactive-streams-2.0.4.jar:com/typesafe/netty/HandlerPublisher$State.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.4.5.jar:META-INF/bundled-dependencies/netty-reactive-streams-2.0.4.jar:com/typesafe/netty/HandlerPublisher$State.class */
    public enum State {
        NO_SUBSCRIBER_OR_CONTEXT,
        NO_CONTEXT,
        NO_SUBSCRIBER,
        NO_SUBSCRIBER_ERROR,
        IDLE,
        BUFFERING,
        DEMANDING,
        DRAINING,
        DONE
    }

    public HandlerPublisher(EventExecutor eventExecutor, Class<? extends T> cls) {
        this.executor = eventExecutor;
        this.matcher = TypeParameterMatcher.get(cls);
    }

    protected boolean acceptInboundMessage(Object obj) throws Exception {
        return this.matcher.match(obj);
    }

    protected void cancelled() {
        this.ctx.close();
    }

    protected void requestDemand() {
        this.ctx.read();
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(final Subscriber<? super T> subscriber) {
        if (subscriber == null) {
            throw new NullPointerException("Null subscriber");
        }
        if (this.hasSubscriber.compareAndSet(false, true)) {
            this.executor.execute(new Runnable() { // from class: com.typesafe.netty.HandlerPublisher.2
                @Override // java.lang.Runnable
                public void run() {
                    HandlerPublisher.this.provideSubscriber(subscriber);
                }
            });
        } else {
            subscriber.onSubscribe(new Subscription() { // from class: com.typesafe.netty.HandlerPublisher.1
                @Override // org.reactivestreams.Subscription
                public void request(long j) {
                }

                @Override // org.reactivestreams.Subscription
                public void cancel() {
                }
            });
            subscriber.onError(new IllegalStateException("This publisher only supports one subscriber"));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void provideSubscriber(Subscriber<? super T> subscriber) {
        this.subscriber = subscriber;
        switch (this.state) {
            case NO_SUBSCRIBER_OR_CONTEXT:
                this.state = State.NO_CONTEXT;
                return;
            case NO_SUBSCRIBER:
                if (this.buffer.isEmpty()) {
                    this.state = State.IDLE;
                } else {
                    this.state = State.BUFFERING;
                }
                subscriber.onSubscribe(new ChannelSubscription());
                return;
            case DRAINING:
                subscriber.onSubscribe(new ChannelSubscription());
                return;
            case NO_SUBSCRIBER_ERROR:
                cleanup();
                this.state = State.DONE;
                subscriber.onSubscribe(new ChannelSubscription());
                subscriber.onError(this.noSubscriberError);
                return;
            default:
                return;
        }
    }

    @Override // io.netty.channel.ChannelHandlerAdapter, io.netty.channel.ChannelHandler
    public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (channelHandlerContext.channel().isRegistered()) {
            provideChannelContext(channelHandlerContext);
        }
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
        provideChannelContext(channelHandlerContext);
        channelHandlerContext.fireChannelRegistered();
    }

    private void provideChannelContext(ChannelHandlerContext channelHandlerContext) {
        switch (this.state) {
            case NO_SUBSCRIBER_OR_CONTEXT:
                verifyRegisteredWithRightExecutor(channelHandlerContext);
                this.ctx = channelHandlerContext;
                this.state = State.NO_SUBSCRIBER;
                return;
            case NO_CONTEXT:
                verifyRegisteredWithRightExecutor(channelHandlerContext);
                this.ctx = channelHandlerContext;
                this.state = State.IDLE;
                this.subscriber.onSubscribe(new ChannelSubscription());
                return;
            default:
                return;
        }
    }

    private void verifyRegisteredWithRightExecutor(ChannelHandlerContext channelHandlerContext) {
        if (!this.executor.inEventLoop()) {
            throw new IllegalArgumentException("Channel handler MUST be registered with the same EventExecutor that it is created with.");
        }
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (this.state == State.DEMANDING) {
            requestDemand();
        }
        channelHandlerContext.fireChannelActive();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void receivedDemand(long j) {
        switch (this.state) {
            case DRAINING:
            case BUFFERING:
                if (addDemand(j)) {
                    flushBuffer();
                    return;
                }
                return;
            case NO_SUBSCRIBER_ERROR:
            case NO_CONTEXT:
            default:
                return;
            case DEMANDING:
                addDemand(j);
                return;
            case IDLE:
                if (addDemand(j)) {
                    this.state = State.DEMANDING;
                    requestDemand();
                    return;
                }
                return;
        }
    }

    private boolean addDemand(long j) {
        if (j <= 0) {
            illegalDemand();
            return false;
        }
        if (this.outstandingDemand >= Util.VLI_MAX) {
            return true;
        }
        this.outstandingDemand += j;
        if (this.outstandingDemand >= 0) {
            return true;
        }
        this.outstandingDemand = Util.VLI_MAX;
        return true;
    }

    private void illegalDemand() {
        cleanup();
        this.subscriber.onError(new IllegalArgumentException("Request for 0 or negative elements in violation of Section 3.9 of the Reactive Streams specification"));
        this.ctx.close();
        this.state = State.DONE;
    }

    private void flushBuffer() {
        while (!this.buffer.isEmpty() && (this.outstandingDemand > 0 || this.outstandingDemand == Util.VLI_MAX)) {
            publishMessage(this.buffer.remove());
        }
        if (this.buffer.isEmpty()) {
            if (this.outstandingDemand > 0) {
                if (this.state == State.BUFFERING) {
                    this.state = State.DEMANDING;
                }
                requestDemand();
            } else if (this.state == State.BUFFERING) {
                this.state = State.IDLE;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
    public void receivedCancel() {
        switch (this.state) {
            case DRAINING:
                this.state = State.DONE;
                break;
            case BUFFERING:
            case DEMANDING:
            case IDLE:
                cancelled();
                this.state = State.DONE;
                break;
        }
        cleanup();
        this.subscriber = null;
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (!acceptInboundMessage(obj)) {
            channelHandlerContext.fireChannelRead(obj);
            return;
        }
        switch (this.state) {
            case NO_SUBSCRIBER_OR_CONTEXT:
            case NO_CONTEXT:
                throw new IllegalStateException("Message received before added to the channel context");
            case NO_SUBSCRIBER:
            case BUFFERING:
                this.buffer.add(obj);
                return;
            case DRAINING:
            case DONE:
                ReferenceCountUtil.release(obj);
                return;
            case NO_SUBSCRIBER_ERROR:
            default:
                return;
            case DEMANDING:
                publishMessage(obj);
                return;
            case IDLE:
                this.buffer.add(obj);
                this.state = State.BUFFERING;
                return;
        }
    }

    private void publishMessage(Object obj) {
        if (COMPLETE.equals(obj)) {
            this.subscriber.onComplete();
            this.state = State.DONE;
            return;
        }
        this.subscriber.onNext(obj);
        if (this.outstandingDemand < Util.VLI_MAX) {
            this.outstandingDemand--;
            if (this.outstandingDemand != 0 || this.state == State.DRAINING) {
                return;
            }
            if (this.buffer.isEmpty()) {
                this.state = State.IDLE;
            } else {
                this.state = State.BUFFERING;
            }
        }
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (this.state == State.DEMANDING) {
            requestDemand();
        }
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        complete();
    }

    @Override // io.netty.channel.ChannelHandlerAdapter, io.netty.channel.ChannelHandler
    public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {
        complete();
    }

    private void complete() {
        switch (this.state) {
            case NO_SUBSCRIBER:
            case BUFFERING:
                this.buffer.add(COMPLETE);
                this.state = State.DRAINING;
                return;
            case DRAINING:
            case NO_SUBSCRIBER_ERROR:
            case NO_CONTEXT:
            default:
                return;
            case DEMANDING:
            case IDLE:
                this.subscriber.onComplete();
                this.state = State.DONE;
                return;
        }
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelHandlerAdapter, io.netty.channel.ChannelHandler, io.netty.channel.ChannelInboundHandler
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        switch (this.state) {
            case NO_SUBSCRIBER:
                this.noSubscriberError = th;
                this.state = State.NO_SUBSCRIBER_ERROR;
                cleanup();
                return;
            case DRAINING:
            case BUFFERING:
            case DEMANDING:
            case IDLE:
                this.state = State.DONE;
                cleanup();
                this.subscriber.onError(th);
                return;
            case NO_SUBSCRIBER_ERROR:
            case NO_CONTEXT:
            default:
                return;
        }
    }

    private void cleanup() {
        while (!this.buffer.isEmpty()) {
            ReferenceCountUtil.release(this.buffer.remove());
        }
    }
}
