package org.datavec.camel.component;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.ScheduledPollConsumer;
import org.datavec.api.conf.Configuration;
import org.datavec.api.formats.input.InputFormat;
import org.datavec.api.records.reader.RecordReader;
import org.datavec.api.split.InputSplit;

/* loaded from: input_file:org/datavec/camel/component/DataVecConsumer.class */
public class DataVecConsumer extends ScheduledPollConsumer {
    private final DataVecEndpoint endpoint;
    private Class<? extends InputFormat> inputFormatClazz;
    private Class<? extends DataVecMarshaller> marshallerClazz;
    private InputFormat inputFormat;
    private Configuration configuration;
    private DataVecMarshaller marshaller;

    public DataVecConsumer(DataVecEndpoint dataVecEndpoint, Processor processor) {
        super(dataVecEndpoint, processor);
        this.endpoint = dataVecEndpoint;
        try {
            this.inputFormatClazz = Class.forName(dataVecEndpoint.getInputFormat());
            this.inputFormat = this.inputFormatClazz.newInstance();
            this.marshallerClazz = Class.forName(dataVecEndpoint.getInputMarshaller());
            this.marshaller = this.marshallerClazz.newInstance();
            this.configuration = new Configuration();
            for (String str : dataVecEndpoint.getConsumerProperties().keySet()) {
                this.configuration.set(str, dataVecEndpoint.getConsumerProperties().get(str).toString());
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    protected InputSplit inputFromExchange(Exchange exchange) {
        return this.marshaller.getSplit(exchange);
    }

    protected int poll() throws Exception {
        Exchange createExchange = this.endpoint.createExchange();
        RecordReader createReader = this.inputFormat.createReader(inputFromExchange(createExchange), this.configuration);
        int i = 0;
        while (createReader.hasNext()) {
            while (createReader.hasNext()) {
                createExchange.getIn().setBody(createReader.next());
                try {
                    getProcessor().process(createExchange);
                    i++;
                    if (createExchange.getException() != null) {
                        getExceptionHandler().handleException("Error processing exchange", createExchange, createExchange.getException());
                    }
                } catch (Throwable th) {
                    if (createExchange.getException() != null) {
                        getExceptionHandler().handleException("Error processing exchange", createExchange, createExchange.getException());
                    }
                    throw th;
                }
            }
        }
        return i;
    }
}
