package org.apache.pulsar.functions.source.batch;

import com.google.gson.Gson;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.io.BatchSourceConfig;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.utils.Actions;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.io.core.BatchSource;
import org.apache.pulsar.io.core.BatchSourceTriggerer;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.9.0-rc-202109072205.jar:org/apache/pulsar/functions/source/batch/BatchSourceExecutor.class */
public class BatchSourceExecutor<T> implements Source<T> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) BatchSourceExecutor.class);
    private Map<String, Object> config;
    private SourceContext sourceContext;
    private BatchSourceTriggerer discoveryTriggerer;
    private Consumer<byte[]> intermediateTopicConsumer;
    private Message<byte[]> currentTask;
    private BatchSourceConfig batchSourceConfig;
    private String batchSourceClassName;
    private BatchSource<T> batchSource;
    private String intermediateTopicName;
    private ExecutorService discoveryThread;
    private volatile Exception currentError = null;
    private volatile boolean isRunning = false;
    volatile boolean discoverInProgress = false;

    @Override // org.apache.pulsar.io.core.Source
    public void open(Map<String, Object> map, SourceContext sourceContext) throws Exception {
        this.config = map;
        this.sourceContext = sourceContext;
        this.intermediateTopicName = SourceConfigUtils.computeBatchSourceIntermediateTopicName(sourceContext.getTenant(), sourceContext.getNamespace(), sourceContext.getSourceName()).toString();
        this.discoveryThread = Executors.newSingleThreadExecutor(new DefaultThreadFactory(String.format("%s-batch-source-discovery", FunctionCommon.getFullyQualifiedName(sourceContext.getTenant(), sourceContext.getNamespace(), sourceContext.getSourceName()))));
        getBatchSourceConfigs(map);
        initializeBatchSource();
        start();
    }

    @Override // org.apache.pulsar.io.core.Source
    public Record<T> read() throws Exception {
        while (this.currentError == null) {
            if (this.currentTask == null) {
                this.currentTask = retrieveNextTask();
                prepareInternal(this.currentTask);
            }
            Record<T> readNext = this.batchSource.readNext();
            if (readNext != null) {
                return readNext;
            }
            this.intermediateTopicConsumer.acknowledgeAsync(this.currentTask.getMessageId()).exceptionally(th -> {
                log.error("Encountered error when acknowledging completed task with id {}", this.currentTask.getMessageId(), th);
                setCurrentError(th);
                return null;
            });
            this.currentTask = null;
        }
        throw this.currentError;
    }

    private void getBatchSourceConfigs(Map<String, Object> map) {
        if (!map.containsKey(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY) || !map.containsKey(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY)) {
            throw new IllegalArgumentException("Batch Configs cannot be found");
        }
        this.batchSourceConfig = (BatchSourceConfig) new Gson().fromJson((String) map.get(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY), (Class) BatchSourceConfig.class);
        this.batchSourceClassName = (String) map.get(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY);
    }

    private void initializeBatchSource() {
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        Object createInstance = Reflections.createInstance(this.batchSourceClassName, contextClassLoader);
        if (!(createInstance instanceof BatchSource)) {
            throw new IllegalArgumentException("BatchSource does not implement the correct interface");
        }
        this.batchSource = (BatchSource) createInstance;
        Object createInstance2 = Reflections.createInstance(this.batchSourceConfig.getDiscoveryTriggererClassName(), contextClassLoader);
        if (!(createInstance2 instanceof BatchSourceTriggerer)) {
            throw new IllegalArgumentException("BatchSourceTriggerer does not implement the correct interface");
        }
        this.discoveryTriggerer = (BatchSourceTriggerer) createInstance2;
    }

    private void start() throws Exception {
        this.isRunning = true;
        createIntermediateTopicConsumer();
        this.batchSource.open(this.config, this.sourceContext);
        if (this.sourceContext.getInstanceId() == 0) {
            this.discoveryTriggerer.init(this.batchSourceConfig.getDiscoveryTriggererConfig(), this.sourceContext);
            this.discoveryTriggerer.start(this::triggerDiscover);
        }
    }

    private synchronized void triggerDiscover(String str) {
        if (this.discoverInProgress) {
            log.info("Discovery is already in progress");
        } else {
            this.discoverInProgress = true;
            this.discoveryThread.submit(() -> {
                try {
                    this.batchSource.discover(bArr -> {
                        taskEater(str, bArr);
                    });
                } catch (Exception e) {
                    if (this.isRunning || !(e instanceof InterruptedException)) {
                        log.error("Encountered error during task discovery", (Throwable) e);
                        setCurrentError(e);
                    }
                } finally {
                    this.discoverInProgress = false;
                }
            });
        }
    }

    private void taskEater(String str, byte[] bArr) {
        try {
            HashMap hashMap = new HashMap();
            hashMap.put("discoveredEvent", str);
            hashMap.put("produceTime", String.valueOf(System.currentTimeMillis()));
            TypedMessageBuilder newOutputMessage = this.sourceContext.newOutputMessage(this.intermediateTopicName, Schema.BYTES);
            newOutputMessage.value(bArr).properties(hashMap);
            newOutputMessage.send();
        } catch (Exception e) {
            log.error("error writing discovered task to intermediate topic", (Throwable) e);
            throw new RuntimeException("error writing discovered task to intermediate topic");
        }
    }

    private void prepareInternal(Message<byte[]> message) {
        try {
            this.batchSource.prepare(message.getValue());
        } catch (Exception e) {
            log.error("Error on prepare", (Throwable) e);
            throw new RuntimeException(e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        stop();
    }

    private void stop() throws Exception {
        this.isRunning = false;
        Exception exc = null;
        if (this.discoveryTriggerer != null) {
            try {
                this.discoveryTriggerer.stop();
            } catch (Exception e) {
                log.error("Encountered exception when closing Batch Source Triggerer", (Throwable) e);
                exc = e;
            }
            this.discoveryTriggerer = null;
        }
        this.discoveryThread.shutdownNow();
        try {
            this.discoveryThread.awaitTermination(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e2) {
            log.warn("Shutdown of discovery thread was interrupted");
            Thread.currentThread().interrupt();
        }
        if (this.intermediateTopicConsumer != null) {
            try {
                this.intermediateTopicConsumer.close();
            } catch (Exception e3) {
                log.error("Encountered exception when closing intermediate topic of Batch Source", (Throwable) e3);
                if (exc != null) {
                    exc = e3;
                }
            }
            this.intermediateTopicConsumer = null;
        }
        if (this.batchSource != null) {
            try {
                this.batchSource.close();
            } catch (Exception e4) {
                log.error("Encountered exception when closing Batch Source", (Throwable) e4);
                if (exc != null) {
                    exc = e4;
                }
            }
        }
        if (exc != null) {
            throw exc;
        }
    }

    private void createIntermediateTopicConsumer() {
        String computeBatchSourceInstanceSubscriptionName = SourceConfigUtils.computeBatchSourceInstanceSubscriptionName(this.sourceContext.getTenant(), this.sourceContext.getNamespace(), this.sourceContext.getSourceName());
        String fullyQualifiedName = FunctionCommon.getFullyQualifiedName(this.sourceContext.getTenant(), this.sourceContext.getNamespace(), this.sourceContext.getSourceName());
        try {
            Actions.newBuilder().addAction(Actions.Action.builder().actionName(String.format("Setting up instance consumer for BatchSource intermediate topic for function %s", fullyQualifiedName)).numRetries(10).sleepBetweenInvocationsMs(1000L).supplier(() -> {
                try {
                    this.intermediateTopicConsumer = this.sourceContext.newConsumerBuilder(Schema.BYTES).subscriptionName(computeBatchSourceInstanceSubscriptionName).subscriptionType(SubscriptionType.Shared).topic(this.intermediateTopicName).properties(InstanceUtils.getProperties(Function.FunctionDetails.ComponentType.SOURCE, fullyQualifiedName, this.sourceContext.getInstanceId())).subscribeAsync().join();
                    return Actions.ActionResult.builder().success(true).build();
                } catch (Exception e) {
                    return Actions.ActionResult.builder().success(false).errorMsg(e.getMessage()).build();
                }
            }).build()).run();
        } catch (InterruptedException e) {
            log.error("Error setting up instance subscription for intermediate topic", (Throwable) e);
            throw new RuntimeException(e);
        }
    }

    private Message<byte[]> retrieveNextTask() throws Exception {
        while (this.currentError == null) {
            Message<byte[]> receive = this.intermediateTopicConsumer.receive(5, TimeUnit.SECONDS);
            if (receive != null) {
                return receive;
            }
        }
        throw this.currentError;
    }

    private void setCurrentError(Throwable th) {
        if (th instanceof Exception) {
            this.currentError = (Exception) th;
        } else {
            this.currentError = new RuntimeException(th.getCause());
        }
    }
}
