package io.gridgo.connector.impl;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.dsl.Disruptor;
import io.gridgo.connector.support.config.ConnectorContext;
import io.gridgo.framework.support.Message;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import org.joo.promise4j.Deferred;
import org.joo.promise4j.Promise;
import org.joo.promise4j.impl.AsyncDeferredObject;

/* loaded from: input_file:io/gridgo/connector/impl/SingleThreadSendingProducer.class */
public abstract class SingleThreadSendingProducer extends AbstractProducer {
    private final Disruptor<ProducerEvent> sendWorker;
    private final boolean batchingEnabled;
    private final int maxBatchSize;
    private final EventHandler<ProducerEvent> sender;

    protected SingleThreadSendingProducer(ConnectorContext connectorContext, int i, boolean z, int i2) {
        this(connectorContext, i, Thread::new, z, i2);
    }

    protected SingleThreadSendingProducer(ConnectorContext connectorContext, int i, ThreadFactory threadFactory, boolean z, int i2) {
        super(connectorContext);
        this.sender = new EventHandler<ProducerEvent>() { // from class: io.gridgo.connector.impl.SingleThreadSendingProducer.1
            private final List<Message> batch = new LinkedList();
            private final List<Deferred<Message, Exception>> deferreds = new LinkedList();

            public void onEvent(ProducerEvent producerEvent, long j, boolean z2) throws Exception {
                Message message = null;
                if (SingleThreadSendingProducer.this.isBatchingEnabled()) {
                    this.batch.add(producerEvent.getMessage());
                    this.deferreds.add(producerEvent.getDeferred());
                    if (z2 || this.batch.size() >= SingleThreadSendingProducer.this.maxBatchSize) {
                        message = SingleThreadSendingProducer.this.accumulateBatch(this.batch);
                        this.batch.clear();
                    }
                } else {
                    message = producerEvent.getMessage();
                }
                if (message != null) {
                    try {
                        SingleThreadSendingProducer.this.executeSendOnSingleThread(message);
                        if (!SingleThreadSendingProducer.this.isBatchingEnabled()) {
                            SingleThreadSendingProducer.this.ack(producerEvent.getDeferred(), (Exception) null);
                            return;
                        }
                        Iterator<Deferred<Message, Exception>> it = this.deferreds.iterator();
                        while (it.hasNext()) {
                            SingleThreadSendingProducer.this.ack(it.next());
                        }
                        this.deferreds.clear();
                    } catch (Exception e) {
                        if (!SingleThreadSendingProducer.this.isBatchingEnabled()) {
                            SingleThreadSendingProducer.this.ack(producerEvent.getDeferred(), e);
                            return;
                        }
                        Iterator<Deferred<Message, Exception>> it2 = this.deferreds.iterator();
                        while (it2.hasNext()) {
                            SingleThreadSendingProducer.this.ack(it2.next());
                        }
                        this.deferreds.clear();
                    } catch (Throwable th) {
                        if (SingleThreadSendingProducer.this.isBatchingEnabled()) {
                            Iterator<Deferred<Message, Exception>> it3 = this.deferreds.iterator();
                            while (it3.hasNext()) {
                                SingleThreadSendingProducer.this.ack(it3.next());
                            }
                            this.deferreds.clear();
                        } else {
                            SingleThreadSendingProducer.this.ack(producerEvent.getDeferred(), (Exception) null);
                        }
                        throw th;
                    }
                }
            }
        };
        this.sendWorker = new Disruptor<>(ProducerEvent::new, i, threadFactory);
        this.sendWorker.handleEventsWith(new EventHandler[]{this.sender});
        this.batchingEnabled = z;
        this.maxBatchSize = i2;
    }

    protected Message accumulateBatch(Collection<Message> collection) {
        throw new UnsupportedOperationException("Method must be overrided by sub class");
    }

    protected Deferred<Message, Exception> createDeferred() {
        return new AsyncDeferredObject();
    }

    protected abstract void executeSendOnSingleThread(Message message) throws Exception;

    protected void onStart() {
        this.sendWorker.start();
    }

    protected void onStop() {
        this.sendWorker.shutdown();
    }

    private void produceEvent(Message message, Deferred<Message, Exception> deferred) {
        if (isStarted()) {
            this.sendWorker.publishEvent((producerEvent, j) -> {
                producerEvent.clear();
                producerEvent.setDeferred(deferred);
                producerEvent.setMessage(message);
            });
        }
    }

    @Override // io.gridgo.connector.support.MessageProducer
    public final void send(Message message) {
        produceEvent(message, null);
    }

    @Override // io.gridgo.connector.support.MessageProducer
    public final Promise<Message, Exception> sendWithAck(Message message) {
        Deferred<Message, Exception> createDeferred = createDeferred();
        produceEvent(message, createDeferred);
        return createDeferred.promise();
    }

    public boolean isBatchingEnabled() {
        return this.batchingEnabled;
    }
}
