package io.github.hylexus.jt.jt1078.support.codec.impl.collector;

import io.github.hylexus.jt.jt1078.spec.Jt1078Request;
import io.github.hylexus.jt.jt1078.spec.Jt1078Subscription;
import io.github.hylexus.jt.jt1078.support.codec.impl.collector.InternalSubscriber;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/github/hylexus/jt/jt1078/support/codec/impl/collector/AbstractAsyncChannelCollector.class */
public abstract class AbstractAsyncChannelCollector<PT extends Jt1078Subscription, IST extends InternalSubscriber<PT>> extends AbstractChannelCollector<PT, IST> {
    private final ThreadFactory threadFactory;
    protected final Logger log = LoggerFactory.getLogger(getClass());
    private volatile boolean running = true;
    private final BlockingQueue<Jt1078Request> queue = new LinkedBlockingQueue(1024);

    public AbstractAsyncChannelCollector(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        startConsumer();
    }

    @Override // io.github.hylexus.jt.jt1078.support.codec.impl.collector.AbstractChannelCollector, io.github.hylexus.jt.jt1078.support.codec.Jt1078ChannelCollector
    public void collect(Jt1078Request jt1078Request) {
        this.queue.add(jt1078Request.copy());
    }

    @Override // io.github.hylexus.jt.jt1078.support.codec.Jt1078ChannelCollector
    public void close() {
        this.running = false;
    }

    protected void startConsumer() {
        this.threadFactory.newThread(() -> {
            while (this.running) {
                Jt1078Request jt1078Request = null;
                try {
                    try {
                        jt1078Request = this.queue.take();
                        doCollect(jt1078Request);
                        if (jt1078Request != null) {
                            jt1078Request.release();
                        }
                    } catch (Throwable th) {
                        this.log.error(th.getMessage(), th);
                        if (jt1078Request != null) {
                            jt1078Request.release();
                        }
                    }
                } catch (Throwable th2) {
                    if (jt1078Request != null) {
                        jt1078Request.release();
                    }
                    throw th2;
                }
            }
        }).start();
    }
}
