package net.sf.jabb.dstream.kafka;

import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.sf.jabb.dstream.ReceiveStatus;
import net.sf.jabb.dstream.SimpleReceiveStatus;
import net.sf.jabb.dstream.StreamDataSupplier;
import net.sf.jabb.dstream.ex.DataStreamInfrastructureException;
import org.apache.commons.lang3.Validate;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:net/sf/jabb/dstream/kafka/KafkaStreamDataSupplier.class */
public class KafkaStreamDataSupplier<M> implements StreamDataSupplier<M> {
    private static final Logger logger = Logger.getLogger("KafkaStreamDataSupplier");
    private Consumer<Void, M> consumer;
    private TopicPartition subscribedPartition;

    KafkaStreamDataSupplier(Properties properties, List<TopicPartition> list) {
        Validate.isTrue(list.size() == 1);
        this.subscribedPartition = list.get(0);
        this.consumer = new KafkaConsumer(properties);
        this.consumer.assign(list);
    }

    KafkaStreamDataSupplier(Consumer<Void, M> consumer, List<TopicPartition> list) {
        Validate.isTrue(list.size() == 1);
        this.subscribedPartition = list.get(0);
        this.consumer = consumer;
        consumer.assign(list);
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public String firstPosition() {
        long position = this.consumer.position(this.subscribedPartition);
        this.consumer.seekToBeginning(new TopicPartition[]{this.subscribedPartition});
        Long valueOf = Long.valueOf(this.consumer.position(this.subscribedPartition));
        this.consumer.seek(this.subscribedPartition, position);
        return valueOf.toString();
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public String firstPosition(Instant instant, Duration duration) throws InterruptedException, DataStreamInfrastructureException {
        throw new DataStreamInfrastructureException("Kafka do not support enqueue timestamp");
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public String lastPosition() throws DataStreamInfrastructureException {
        long position = this.consumer.position(this.subscribedPartition);
        this.consumer.seekToEnd(new TopicPartition[]{this.subscribedPartition});
        long position2 = this.consumer.position(this.subscribedPartition) - 1;
        this.consumer.seek(this.subscribedPartition, position);
        return String.valueOf(position2);
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public Instant enqueuedTime(String str) throws DataStreamInfrastructureException {
        throw new DataStreamInfrastructureException("Kafka do not support enqueue timestamp");
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public String nextStartPosition(String str) {
        return Long.valueOf(Long.parseLong(str) + 1).toString();
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public boolean isInRange(String str, String str2) {
        Validate.isTrue(str != null, "position cannot be null", new Object[0]);
        return str2 == null || Long.parseLong(str) <= Long.parseLong(str2);
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public boolean isInRange(Instant instant, Instant instant2) {
        return false;
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus fetch(List<? super M> list, String str, String str2, int i, Duration duration) throws InterruptedException, DataStreamInfrastructureException {
        Long valueOf = Long.valueOf(Long.parseLong(str));
        Long valueOf2 = Long.valueOf(Long.parseLong(str2));
        this.consumer.seek(this.subscribedPartition, valueOf.longValue());
        this.consumer.position(this.subscribedPartition);
        long currentTimeMillis = System.currentTimeMillis() + duration.toMillis();
        logger.log(Level.INFO, "maxTime=" + String.valueOf(currentTimeMillis) + ",now=" + String.valueOf(System.currentTimeMillis()));
        long j = 0;
        int i2 = 0;
        do {
            ConsumerRecords poll = this.consumer.poll(currentTimeMillis - System.currentTimeMillis());
            logger.log(Level.INFO, "poll fetched " + poll.count() + " messages");
            Iterator it = poll.iterator();
            while (it.hasNext() && i2 < i) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                if (consumerRecord.offset() > valueOf2.longValue()) {
                    return new SimpleReceiveStatus(String.valueOf(j), null, true);
                }
                i2++;
                list.add((Object) consumerRecord.value());
                j = consumerRecord.offset();
                logger.log(Level.INFO, "lastPos=" + String.valueOf(j));
            }
        } while (System.currentTimeMillis() < currentTimeMillis);
        logger.log(Level.INFO, "timeout, lastPos=" + String.valueOf(j));
        return new SimpleReceiveStatus(String.valueOf(j), null, false);
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus fetch(List<? super M> list, Instant instant, Instant instant2, int i, Duration duration) throws InterruptedException, DataStreamInfrastructureException {
        throw new DataStreamInfrastructureException("Kafka do not support enqueue timestamp");
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus fetch(List<? super M> list, String str, Instant instant, int i, Duration duration) throws InterruptedException, DataStreamInfrastructureException {
        throw new DataStreamInfrastructureException("Kafka do not support enqueue timestamp");
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public String startAsyncReceiving(java.util.function.Consumer<M> consumer, String str) throws DataStreamInfrastructureException {
        throw new DataStreamInfrastructureException("Kafka do not support startAsyncReceiving");
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public String startAsyncReceiving(java.util.function.Consumer<M> consumer, Instant instant) throws DataStreamInfrastructureException {
        throw new DataStreamInfrastructureException("Kafka do not support startAsyncReceiving");
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public void stopAsyncReceiving(String str) {
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus receive(Function<M, Long> function, String str, String str2) throws DataStreamInfrastructureException {
        long parseLong = Long.parseLong(str);
        long parseLong2 = Long.parseLong(str2);
        this.consumer.seek(this.subscribedPartition, parseLong);
        this.consumer.position(this.subscribedPartition);
        long longValue = ((Long) function.apply(null)).longValue();
        long currentTimeMillis = System.currentTimeMillis() + longValue;
        long j = -1;
        logger.log(Level.INFO, "receive called with startPos " + String.valueOf(parseLong) + " endPos " + String.valueOf(parseLong2) + " timeout:" + longValue);
        while (true) {
            ConsumerRecords poll = this.consumer.poll(longValue);
            Iterator it = poll.iterator();
            logger.log(Level.INFO, "poll got " + poll.count() + " records");
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                if (consumerRecord.offset() > parseLong2) {
                    return new SimpleReceiveStatus(String.valueOf(j), null, true);
                }
                long longValue2 = ((Long) function.apply(consumerRecord.value())).longValue();
                j = consumerRecord.offset();
                if (longValue2 < 0) {
                    return new SimpleReceiveStatus(String.valueOf(j), null, false);
                }
            }
            if (System.currentTimeMillis() >= currentTimeMillis) {
                return new SimpleReceiveStatus(String.valueOf(j), null, false);
            }
            longValue = currentTimeMillis - System.currentTimeMillis();
        }
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus receive(Function<M, Long> function, Instant instant, Instant instant2) throws DataStreamInfrastructureException {
        throw new DataStreamInfrastructureException("Kafka do not support Receive with start/end enqueue time");
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus receive(Function<M, Long> function, String str, Instant instant) throws DataStreamInfrastructureException {
        throw new DataStreamInfrastructureException("Kafka do not support receive with end enqueue time");
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus receive(Function<M, Long> function, Instant instant, String str) throws DataStreamInfrastructureException {
        throw new DataStreamInfrastructureException("Kafka do not support Receive with start/end enqueue time");
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public void start() throws Exception {
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public void stop() throws Exception {
    }
}
