package org.apache.pulsar.functions.source;

import com.google.common.base.Preconditions;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.11.0-rc-202204222205.jar:org/apache/pulsar/functions/source/MultiConsumerPulsarSource.class */
public class MultiConsumerPulsarSource<T> extends PushPulsarSource<T> implements MessageListener<T> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MultiConsumerPulsarSource.class);
    private final MultiConsumerPulsarSourceConfig pulsarSourceConfig;
    private final ClassLoader functionClassLoader;
    private final List<Consumer<T>> inputConsumers;

    public MultiConsumerPulsarSource(PulsarClient pulsarClient, MultiConsumerPulsarSourceConfig multiConsumerPulsarSourceConfig, Map<String, String> map, ClassLoader classLoader) {
        super(pulsarClient, multiConsumerPulsarSourceConfig, map, classLoader);
        this.inputConsumers = new LinkedList();
        this.pulsarSourceConfig = multiConsumerPulsarSourceConfig;
        this.functionClassLoader = classLoader;
    }

    @Override // org.apache.pulsar.functions.source.PushPulsarSource, org.apache.pulsar.io.core.Source
    public void open(Map<String, Object> map, SourceContext sourceContext) throws Exception {
        log.info("Opening pulsar source with config: {}", this.pulsarSourceConfig);
        for (Map.Entry<String, PulsarSourceConsumerConfig<T>> entry : setupConsumerConfigs().entrySet()) {
            String key = entry.getKey();
            PulsarSourceConsumerConfig<T> value = entry.getValue();
            log.info("Creating consumers for topic : {}, schema : {}, schemaInfo: {}", key, value.getSchema(), value.getSchema().getSchemaInfo());
            ConsumerBuilder<T> createConsumeBuilder = createConsumeBuilder(key, value);
            createConsumeBuilder.messageListener(this);
            this.inputConsumers.add(createConsumeBuilder.subscribeAsync().join());
        }
    }

    @Override // org.apache.pulsar.client.api.MessageListener
    public void received(Consumer<T> consumer, Message<T> message) {
        consume(buildRecord(consumer, message));
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.inputConsumers != null) {
            this.inputConsumers.forEach(consumer -> {
                try {
                    consumer.close();
                } catch (PulsarClientException e) {
                }
            });
        }
    }

    private Map<String, PulsarSourceConsumerConfig<T>> setupConsumerConfigs() throws ClassNotFoundException {
        TreeMap treeMap = new TreeMap();
        Class loadClass = Reflections.loadClass(this.pulsarSourceConfig.getTypeClassName(), this.functionClassLoader);
        Preconditions.checkArgument(!Void.class.equals(loadClass), "Input type of Pulsar Function cannot be Void");
        this.pulsarSourceConfig.getTopicSchema().forEach((str, consumerConfig) -> {
            treeMap.put(str, buildPulsarSourceConsumerConfig(str, consumerConfig, loadClass));
        });
        return treeMap;
    }

    @Override // org.apache.pulsar.functions.source.PulsarSource
    public List<Consumer<T>> getInputConsumers() {
        return this.inputConsumers;
    }
}
