package com.alogic.kafka.consumer;

import com.alogic.kafka.MQCommitter;
import com.alogic.kafka.MQConsumer;
import com.alogic.kafka.MQServer;
import com.alogic.kafka.committer.RightNow;
import com.anysoft.util.Properties;
import com.anysoft.util.PropertiesConstants;
import com.anysoft.util.XmlElementProperties;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Element;

/* loaded from: input_file:com/alogic/kafka/consumer/Direct.class */
public class Direct implements MQConsumer, Runnable {
    protected static final Logger LOG = LoggerFactory.getLogger(MQConsumer.class);
    protected String groupId;
    protected Thread thread;
    protected String topicList = "default";
    protected MQServer server = null;
    protected MQCommitter committer = null;
    protected volatile boolean toStop = false;
    protected long interval = 100;

    public void configure(Element element, Properties properties) {
        configure(new XmlElementProperties(element, properties));
        try {
            this.committer = (MQCommitter) new MQCommitter.TheFactory().newInstance(element, properties, "committer", RightNow.class.getName());
        } catch (Exception e) {
            LOG.warn("Can not create committer..");
        }
    }

    protected boolean canCommit(long j, long j2) {
        if (this.committer == null) {
            return true;
        }
        return this.committer.canCommit(j, j2);
    }

    public void configure(Properties properties) {
        this.groupId = PropertiesConstants.getString(properties, "groupId", "");
        this.topicList = PropertiesConstants.getString(properties, "topicList", this.topicList);
        this.interval = PropertiesConstants.getLong(properties, "interval", this.interval);
    }

    @Override // java.lang.Runnable
    public void run() {
        Consumer consumer = null;
        try {
            Map<String, Object> consumerProps = this.server.getConsumerProps();
            if (StringUtils.isNotEmpty(this.groupId)) {
                consumerProps.put("group.id", this.groupId);
            }
            consumer = new KafkaConsumer(consumerProps);
            ArrayList arrayList = new ArrayList();
            for (String str : this.topicList.split(",")) {
                arrayList.add(str);
            }
            consumer.subscribe(arrayList);
            LOG.info(String.format("Consumer [%d] started.", Long.valueOf(this.thread.getId())));
            this.toStop = false;
            while (!this.toStop) {
                Iterator it = consumer.poll(Duration.ofMillis(this.interval)).iterator();
                while (it.hasNext()) {
                    dispatch(consumer, (ConsumerRecord) it.next());
                }
            }
            LOG.info(String.format("Consumer [%d] stopped.", Long.valueOf(this.thread.getId())));
            if (consumer != null) {
                consumer.close();
            }
        } catch (Throwable th) {
            if (consumer != null) {
                consumer.close();
            }
            throw th;
        }
    }

    protected void dispatch(Consumer<String, byte[]> consumer, ConsumerRecord<String, byte[]> consumerRecord) {
        try {
            this.server.dispath(consumerRecord.topic(), (byte[]) consumerRecord.value());
            if (canCommit(System.currentTimeMillis(), 1L)) {
                consumer.commitSync();
            }
        } catch (Exception e) {
            LOG.error(ExceptionUtils.getStackTrace(e));
        }
    }

    @Override // com.alogic.kafka.MQConsumer
    public void start(MQServer mQServer) {
        try {
            this.server = mQServer;
            this.thread = new Thread(this);
            this.thread.setDaemon(true);
            LOG.info(String.format("Consumer [%d] is starting..", Long.valueOf(this.thread.getId())));
            this.thread.start();
        } catch (Exception e) {
            LOG.error("Failed to start mq consumer..");
            LOG.error(ExceptionUtils.getStackTrace(e));
        }
    }

    @Override // com.alogic.kafka.MQConsumer
    public void stop(MQServer mQServer) {
        this.toStop = true;
        LOG.info(String.format("Consumer [%d] is stopping..", Long.valueOf(this.thread.getId())));
    }
}
