package org.cg.eventbus.stream;

import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.log4j.Logger;
import org.cg.eventbus.consumer.ConsumerConfigrator;
import org.cg.eventbus.hook.FetcherHook;

/* loaded from: input_file:org/cg/eventbus/stream/Fetcher.class */
public class Fetcher<K, V> {
    private static final Logger LOG = Logger.getLogger(Fetcher.class);
    private final int STREAM_NUMBER = 1;
    private Configuration config;
    private ConsumerConnector connector;
    private KafkaStream<K, V> stream;
    private ConsumerIterator<K, V> consumerIter;

    public Fetcher(Configuration configuration) throws Exception {
        this.config = configuration;
        init();
    }

    private void init() throws Exception {
        Properties properties = ConfigurationConverter.getProperties(this.config);
        ConsumerConfigrator.validate(properties);
        LOG.info("Consumer initialization started.");
        this.connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
        LOG.debug("Kafka consumer connector has been built.");
        this.stream = (KafkaStream) this.connector.createMessageStreamsByFilter(new Whitelist(this.config.getString("consumer.topic")), 1, ConsumerConfigrator.configKeyDecoder(properties), ConsumerConfigrator.configValueDecoder(properties)).get(0);
        this.consumerIter = this.stream.iterator();
        Runtime.getRuntime().addShutdownHook(new FetcherHook(this.connector));
    }

    public KafkaStream<K, V> getStream() {
        return this.stream;
    }

    public boolean hasNext() {
        return this.consumerIter.hasNext();
    }

    public void commit() {
        this.connector.commitOffsets(true);
    }

    public void close() {
        this.connector.shutdown();
    }

    public MessageAndMetadata<K, V> nextMessage() {
        return this.consumerIter.next();
    }
}
