package org.apache.pulsar.functions.instance;

import com.google.common.annotations.VisibleForTesting;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.9.0-rc-202110042205.jar:org/apache/pulsar/functions/instance/JavaInstance.class */
public class JavaInstance implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) JavaInstance.class);
    private final ContextImpl context;
    private Function function;
    private java.util.function.Function javaUtilFunction;
    private final InstanceConfig instanceConfig;
    private final ExecutorService executor = Executors.newSingleThreadExecutor();
    private final LinkedBlockingQueue<AsyncFuncRequest> pendingAsyncRequests;

    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.9.0-rc-202110042205.jar:org/apache/pulsar/functions/instance/JavaInstance$AsyncFuncRequest.class */
    public static class AsyncFuncRequest {
        private final Record record;
        private final CompletableFuture processResult;

        public AsyncFuncRequest(Record record, CompletableFuture completableFuture) {
            this.record = record;
            this.processResult = completableFuture;
        }

        public Record getRecord() {
            return this.record;
        }

        public CompletableFuture getProcessResult() {
            return this.processResult;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof AsyncFuncRequest)) {
                return false;
            }
            AsyncFuncRequest asyncFuncRequest = (AsyncFuncRequest) obj;
            if (!asyncFuncRequest.canEqual(this)) {
                return false;
            }
            Record record = getRecord();
            Record record2 = asyncFuncRequest.getRecord();
            if (record == null) {
                if (record2 != null) {
                    return false;
                }
            } else if (!record.equals(record2)) {
                return false;
            }
            CompletableFuture processResult = getProcessResult();
            CompletableFuture processResult2 = asyncFuncRequest.getProcessResult();
            return processResult == null ? processResult2 == null : processResult.equals(processResult2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof AsyncFuncRequest;
        }

        public int hashCode() {
            Record record = getRecord();
            int hashCode = (1 * 59) + (record == null ? 43 : record.hashCode());
            CompletableFuture processResult = getProcessResult();
            return (hashCode * 59) + (processResult == null ? 43 : processResult.hashCode());
        }

        public String toString() {
            return "JavaInstance.AsyncFuncRequest(record=" + getRecord() + ", processResult=" + getProcessResult() + DefaultExpressionEngine.DEFAULT_INDEX_END;
        }
    }

    public JavaInstance(ContextImpl contextImpl, Object obj, InstanceConfig instanceConfig) {
        this.context = contextImpl;
        this.instanceConfig = instanceConfig;
        this.pendingAsyncRequests = new LinkedBlockingQueue<>(this.instanceConfig.getMaxPendingAsyncRequests());
        if (obj instanceof Function) {
            this.function = (Function) obj;
        } else {
            this.javaUtilFunction = (java.util.function.Function) obj;
        }
    }

    @VisibleForTesting
    public JavaExecutionResult handleMessage(Record<?> record, Object obj) {
        return handleMessage(record, obj, (record2, javaExecutionResult) -> {
        }, th -> {
        });
    }

    public JavaExecutionResult handleMessage(Record<?> record, Object obj, BiConsumer<Record, JavaExecutionResult> biConsumer, Consumer<Throwable> consumer) {
        if (this.context != null) {
            this.context.setCurrentMessageContext(record);
        }
        JavaExecutionResult javaExecutionResult = new JavaExecutionResult();
        try {
            Object process = this.function != null ? this.function.process(obj, this.context) : this.javaUtilFunction.apply(obj);
            if (!(process instanceof CompletableFuture)) {
                if (log.isDebugEnabled()) {
                    log.debug("Got result: object: {}", process);
                }
                javaExecutionResult.setResult(process);
                return javaExecutionResult;
            }
            try {
                this.pendingAsyncRequests.put(new AsyncFuncRequest(record, (CompletableFuture) process));
                ((CompletableFuture) process).whenCompleteAsync((obj2, obj3) -> {
                    try {
                        processAsyncResults(biConsumer);
                    } catch (Throwable th) {
                        consumer.accept(th);
                    }
                }, (Executor) this.executor);
                return null;
            } catch (InterruptedException e) {
                log.warn("Exception while put Async requests", (Throwable) e);
                javaExecutionResult.setUserException(e);
                return javaExecutionResult;
            }
        } catch (Exception e2) {
            javaExecutionResult.setUserException(e2);
            return javaExecutionResult;
        }
    }

    private void processAsyncResults(BiConsumer<Record, JavaExecutionResult> biConsumer) throws InterruptedException {
        AsyncFuncRequest peek = this.pendingAsyncRequests.peek();
        while (true) {
            AsyncFuncRequest asyncFuncRequest = peek;
            if (asyncFuncRequest == null || !asyncFuncRequest.getProcessResult().isDone()) {
                return;
            }
            this.pendingAsyncRequests.remove(asyncFuncRequest);
            JavaExecutionResult javaExecutionResult = new JavaExecutionResult();
            try {
                javaExecutionResult.setResult(asyncFuncRequest.getProcessResult().get());
            } catch (ExecutionException e) {
                if (e.getCause() instanceof Exception) {
                    javaExecutionResult.setUserException((Exception) e.getCause());
                } else {
                    javaExecutionResult.setUserException(new Exception(e.getCause()));
                }
            }
            biConsumer.accept(asyncFuncRequest.getRecord(), javaExecutionResult);
            peek = this.pendingAsyncRequests.peek();
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.context.close();
        this.executor.shutdown();
    }

    public Map<String, Double> getAndResetMetrics() {
        return this.context.getAndResetMetrics();
    }

    public void resetMetrics() {
        this.context.resetMetrics();
    }

    public Map<String, Double> getMetrics() {
        return this.context.getMetrics();
    }

    ContextImpl getContext() {
        return this.context;
    }

    public LinkedBlockingQueue<AsyncFuncRequest> getPendingAsyncRequests() {
        return this.pendingAsyncRequests;
    }
}
