package io.datarouter.util.concurrent;

import io.datarouter.scanner.Scanner;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.function.Consumer;
import java.util.function.Function;

/* loaded from: input_file:io/datarouter/util/concurrent/TransferThread.class */
public class TransferThread<T, R> {
    private final Consumer<Long> inputStallNanosCallback;
    private final Consumer<Long> outputStallNanosCallback;
    private final BlockingDeque<TransferThreadMessage<T>> deque;
    private final ExecutorService exec = Executors.newSingleThreadExecutor();
    private final Future<R> resultFuture;

    /* loaded from: input_file:io/datarouter/util/concurrent/TransferThread$TransferThreadBuilder.class */
    public static class TransferThreadBuilder<T, R> {
        private final int bufferSize;
        private final Function<Scanner<T>, R> threadFunction;
        private Consumer<Long> inputStallNanosCallback = l -> {
        };
        private Consumer<Long> outputStallNanosCallback = l -> {
        };

        public TransferThreadBuilder(int i, Function<Scanner<T>, R> function) {
            this.bufferSize = i;
            this.threadFunction = function;
        }

        public TransferThreadBuilder<T, R> withInputStallNanosCallback(Consumer<Long> consumer) {
            this.inputStallNanosCallback = consumer;
            return this;
        }

        public TransferThreadBuilder<T, R> withOutputStallNanosCallback(Consumer<Long> consumer) {
            this.outputStallNanosCallback = consumer;
            return this;
        }

        public TransferThread<T, R> build() {
            return new TransferThread<>(this.inputStallNanosCallback, this.outputStallNanosCallback, this.bufferSize, this.threadFunction);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/datarouter/util/concurrent/TransferThread$TransferThreadMessage.class */
    public static final class TransferThreadMessage<T> extends Record {
        private final T item;
        private final boolean shouldTerminate;

        private TransferThreadMessage(T t, boolean z) {
            this.item = t;
            this.shouldTerminate = z;
        }

        static <T> TransferThreadMessage<T> makeDataMessage(T t) {
            return new TransferThreadMessage<>(t, false);
        }

        static <T> TransferThreadMessage<T> makeTerminationMessage() {
            return new TransferThreadMessage<>(null, true);
        }

        public T item() {
            return this.item;
        }

        public boolean shouldTerminate() {
            return this.shouldTerminate;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, TransferThreadMessage.class), TransferThreadMessage.class, "item;shouldTerminate", "FIELD:Lio/datarouter/util/concurrent/TransferThread$TransferThreadMessage;->item:Ljava/lang/Object;", "FIELD:Lio/datarouter/util/concurrent/TransferThread$TransferThreadMessage;->shouldTerminate:Z").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, TransferThreadMessage.class), TransferThreadMessage.class, "item;shouldTerminate", "FIELD:Lio/datarouter/util/concurrent/TransferThread$TransferThreadMessage;->item:Ljava/lang/Object;", "FIELD:Lio/datarouter/util/concurrent/TransferThread$TransferThreadMessage;->shouldTerminate:Z").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, TransferThreadMessage.class, Object.class), TransferThreadMessage.class, "item;shouldTerminate", "FIELD:Lio/datarouter/util/concurrent/TransferThread$TransferThreadMessage;->item:Ljava/lang/Object;", "FIELD:Lio/datarouter/util/concurrent/TransferThread$TransferThreadMessage;->shouldTerminate:Z").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }
    }

    private TransferThread(Consumer<Long> consumer, Consumer<Long> consumer2, int i, Function<Scanner<T>, R> function) {
        this.inputStallNanosCallback = consumer;
        this.outputStallNanosCallback = consumer2;
        this.deque = new LinkedBlockingDeque(i);
        this.resultFuture = this.exec.submit(() -> {
            return scan().apply(function);
        });
    }

    public void submit(T t) {
        long nanoTime = System.nanoTime();
        BlockingDequeTool.put(this.deque, TransferThreadMessage.makeDataMessage(t));
        this.outputStallNanosCallback.accept(Long.valueOf(System.nanoTime() - nanoTime));
    }

    private Scanner<T> scan() {
        return Scanner.generate(this::nextMessage).advanceUntil((v0) -> {
            return v0.shouldTerminate();
        }).map((v0) -> {
            return v0.item();
        });
    }

    private TransferThreadMessage<T> nextMessage() {
        long nanoTime = System.nanoTime();
        TransferThreadMessage<T> transferThreadMessage = (TransferThreadMessage) BlockingDequeTool.pollForever(this.deque);
        this.inputStallNanosCallback.accept(Long.valueOf(System.nanoTime() - nanoTime));
        return transferThreadMessage;
    }

    public R complete() {
        BlockingDequeTool.put(this.deque, TransferThreadMessage.makeTerminationMessage());
        R r = (R) FutureTool.get(this.resultFuture);
        this.exec.shutdownNow();
        return r;
    }
}
