package com.github.cosycode.common.thread;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.function.Predicate;
import lombok.NonNull;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/cosycode/common/thread/AsynchronousProcessor.class */
public class AsynchronousProcessor<T> extends CtrlLoopThreadComp {
    private static final Logger log = LoggerFactory.getLogger(AsynchronousProcessor.class);
    private Predicate<T> thenFun;
    private Consumer<T> catchFun;
    private BlockingQueue<T> blockingQueue;

    public AsynchronousProcessor(@NonNull BlockingQueue<T> blockingQueue, @NonNull Predicate<T> predicate, Consumer<T> consumer, int i) {
        super(null, true, i);
        if (blockingQueue == null) {
            throw new NullPointerException("blockingQueue is marked non-null but is null");
        }
        if (predicate == null) {
            throw new NullPointerException("thenFun is marked non-null but is null");
        }
        this.blockingQueue = blockingQueue;
        this.thenFun = predicate;
        this.catchFun = consumer;
        Validate.isTrue(i >= 0, "millisecond:%s 不能小于0", i);
    }

    public static <T> AsynchronousProcessor<T> ofConsumer(@NonNull Consumer<T> consumer) {
        if (consumer == null) {
            throw new NullPointerException("thenFun is marked non-null but is null");
        }
        return new AsynchronousProcessor<>(new LinkedBlockingQueue(), obj -> {
            consumer.accept(obj);
            return true;
        }, null, 0);
    }

    public static <T> AsynchronousProcessor<T> ofPredicate(@NonNull Predicate<T> predicate) {
        if (predicate == null) {
            throw new NullPointerException("thenFun is marked non-null but is null");
        }
        return new AsynchronousProcessor<>(new LinkedBlockingQueue(), predicate, null, 0);
    }

    @Override // com.github.cosycode.common.thread.CtrlLoopThreadComp
    protected boolean loop() {
        try {
            T take = this.blockingQueue.take();
            if (this.thenFun == null || this.thenFun.test(take) || this.catchFun == null) {
                return true;
            }
            this.catchFun.accept(take);
            return true;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("AsynchronousProcessor.take() 阻塞时, 线程中断", e);
        }
    }

    public void add(T t) {
        if (t == null) {
            return;
        }
        this.blockingQueue.add(t);
    }

    public Predicate<T> getThenFun() {
        return this.thenFun;
    }

    public Consumer<T> getCatchFun() {
        return this.catchFun;
    }

    public BlockingQueue<T> getBlockingQueue() {
        return this.blockingQueue;
    }
}
