package net.sf.jabb.dstream.mock;

import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAmount;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import net.sf.jabb.dstream.ReceiveStatus;
import net.sf.jabb.dstream.SimpleReceiveStatus;
import net.sf.jabb.dstream.StreamDataSupplier;
import net.sf.jabb.dstream.ex.DataStreamInfrastructureException;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/sf/jabb/dstream/mock/MockedStreamDataSupplier.class */
public class MockedStreamDataSupplier implements StreamDataSupplier<String> {
    private static final Logger logger = LoggerFactory.getLogger(MockedStreamDataSupplier.class);
    private static DateTimeFormatter utcIsoDateTimeFormatter = DateTimeFormatter.BASIC_ISO_DATE.withZone(ZoneId.of("UTC"));
    protected int intervalMillis;
    protected Instant firstEventTime;
    protected Instant lastEventTime;

    public MockedStreamDataSupplier(int i, Instant instant, Instant instant2) {
        Validate.isTrue(i >= 1 && i <= 1000, "number of events per second must be between 1 and 1000", new Object[0]);
        Validate.isTrue(1000 % i == 0, "1000 must be dividable by number of events per second", new Object[0]);
        Validate.notNull(instant);
        this.intervalMillis = 1000 / i;
        this.firstEventTime = Instant.ofEpochMilli(((instant.toEpochMilli() / this.intervalMillis) + 1) * this.intervalMillis);
        if (instant2 != null) {
            this.lastEventTime = Instant.ofEpochMilli((instant2.toEpochMilli() / this.intervalMillis) * this.intervalMillis);
        }
    }

    protected String eventAt(Instant instant) {
        LocalDateTime ofInstant = LocalDateTime.ofInstant(instant, ZoneId.of("UTC"));
        StringBuilder sb = new StringBuilder();
        sb.append("{\"timeZone\": \"UTC\"");
        sb.append(", \"timestamp\": ").append(instant.toEpochMilli());
        sb.append(", \"timestampString\": \"").append(instant.toString()).append("\"");
        sb.append(", \"s1\": ").append(ofInstant.getSecond());
        sb.append(", \"s5\": ").append((ofInstant.getSecond() / 5) * 5);
        sb.append(", \"s10\": ").append((ofInstant.getSecond() / 10) * 10);
        sb.append(", \"s30\": ").append((ofInstant.getSecond() / 30) * 30);
        sb.append(", \"m1\": ").append(ofInstant.getMinute());
        sb.append(", \"m5\": ").append((ofInstant.getMinute() / 5) * 5);
        sb.append(", \"m10\": ").append((ofInstant.getMinute() / 10) * 10);
        sb.append(", \"m30\": ").append((ofInstant.getMinute() / 30) * 30);
        sb.append(", \"h1\": ").append(ofInstant.getHour());
        sb.append(", \"h4\": ").append((ofInstant.getHour() / 4) * 4);
        sb.append(", \"h6\": ").append((ofInstant.getHour() / 6) * 6);
        sb.append(", \"h8\": ").append((ofInstant.getHour() / 8) * 8);
        sb.append(", \"h12\": ").append((ofInstant.getHour() / 12) * 12);
        sb.append(", \"date\": \"").append(ofInstant.format(utcIsoDateTimeFormatter)).append("\"");
        sb.append("}");
        return sb.toString();
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public String firstPosition() {
        return String.valueOf(-1);
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public String firstPosition(Instant instant, Duration duration) throws InterruptedException, DataStreamInfrastructureException {
        Instant ofEpochMilli;
        if (instant.isBefore(this.firstEventTime)) {
            ofEpochMilli = this.firstEventTime;
        } else {
            if (this.lastEventTime != null && this.lastEventTime.isBefore(instant)) {
                Thread.sleep(duration.toMillis());
                return null;
            }
            ofEpochMilli = Instant.ofEpochMilli(((instant.toEpochMilli() / this.intervalMillis) + 1) * this.intervalMillis);
        }
        return String.valueOf(ofEpochMilli.toEpochMilli());
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public String lastPosition() throws DataStreamInfrastructureException {
        try {
            String firstPosition = firstPosition(Instant.now(), Duration.ofMillis(1L));
            return firstPosition == null ? String.valueOf(this.lastEventTime.toEpochMilli()) : String.valueOf(Long.valueOf(firstPosition).longValue() - this.intervalMillis);
        } catch (InterruptedException e) {
            throw new DataStreamInfrastructureException(e);
        }
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public Instant enqueuedTime(String str) throws DataStreamInfrastructureException {
        return Instant.ofEpochMilli(Long.parseLong(str));
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public String nextStartPosition(String str) {
        return str;
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public boolean isInRange(String str, String str2) {
        Validate.isTrue(str != null, "position cannot be null", new Object[0]);
        return str2 == null || Long.parseLong(str) <= Long.parseLong(str2);
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public boolean isInRange(Instant instant, Instant instant2) {
        Validate.isTrue(instant != null, "enqueuedTime cannot be null", new Object[0]);
        return instant2 == null || !instant.isAfter(instant2);
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus fetch(List<? super String> list, String str, String str2, int i, Duration duration) throws InterruptedException, DataStreamInfrastructureException {
        Instant plus = Instant.now().plus((TemporalAmount) duration);
        Long valueOf = str2 == null ? null : Long.valueOf(Long.parseLong(str2));
        long parseLong = Long.parseLong(str);
        long epochMilli = parseLong < this.firstEventTime.toEpochMilli() ? this.firstEventTime.toEpochMilli() : ((parseLong / this.intervalMillis) + 1) * this.intervalMillis;
        int i2 = 0;
        Long l = null;
        while (true) {
            if (valueOf != null && epochMilli > valueOf.longValue()) {
                break;
            }
            int i3 = i2;
            i2++;
            if (i3 > i || !Instant.now().isBefore(plus)) {
                break;
            }
            if (epochMilli < System.currentTimeMillis()) {
                l = Long.valueOf(epochMilli);
                list.add(eventAt(Instant.ofEpochMilli(epochMilli)));
            }
            epochMilli += this.intervalMillis;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Fetched for ({}-{}],{},{}: ? - {}", new Object[]{str, str2, Integer.valueOf(i), duration, l});
        }
        if (l != null) {
            return new SimpleReceiveStatus(String.valueOf(l), Instant.ofEpochMilli(l.longValue()), valueOf != null && epochMilli > valueOf.longValue());
        }
        return new SimpleReceiveStatus(null, null, valueOf != null && epochMilli > valueOf.longValue());
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus fetch(List<? super String> list, Instant instant, Instant instant2, int i, Duration duration) throws InterruptedException, DataStreamInfrastructureException {
        return fetch(list, String.valueOf(instant.toEpochMilli()), instant2 == null ? null : String.valueOf(instant2.toEpochMilli()), i, duration);
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus fetch(List<? super String> list, String str, Instant instant, int i, Duration duration) throws InterruptedException, DataStreamInfrastructureException {
        return fetch(list, str, instant == null ? null : String.valueOf(instant.toEpochMilli()), i, duration);
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public String startAsyncReceiving(Consumer<String> consumer, String str) throws DataStreamInfrastructureException {
        throw new UnsupportedOperationException();
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public String startAsyncReceiving(Consumer<String> consumer, Instant instant) throws DataStreamInfrastructureException {
        throw new UnsupportedOperationException();
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public void stopAsyncReceiving(String str) {
        throw new UnsupportedOperationException();
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus receive(Function<String, Long> function, String str, String str2) throws DataStreamInfrastructureException {
        Long valueOf = str2 == null ? null : Long.valueOf(Long.parseLong(str2));
        long parseLong = ((Long.parseLong(str) / this.intervalMillis) + 1) * this.intervalMillis;
        Long l = null;
        while (true) {
            if ((valueOf != null && parseLong > valueOf.longValue()) || function.apply(null).longValue() <= 0) {
                break;
            }
            if (parseLong < System.currentTimeMillis()) {
                boolean z = function.apply(eventAt(Instant.ofEpochMilli(parseLong))).longValue() <= 0;
                l = Long.valueOf(parseLong);
                if (z) {
                    break;
                }
                parseLong += this.intervalMillis;
            }
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Received for ({}-{}]: ? - {}", new Object[]{str, str2, l});
        }
        if (l != null) {
            return new SimpleReceiveStatus(String.valueOf(l), Instant.ofEpochMilli(l.longValue()), valueOf != null && parseLong > valueOf.longValue());
        }
        return new SimpleReceiveStatus(null, null, valueOf != null && parseLong > valueOf.longValue());
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus receive(Function<String, Long> function, Instant instant, Instant instant2) throws DataStreamInfrastructureException {
        return receive(function, String.valueOf(instant.toEpochMilli()), String.valueOf(instant2.toEpochMilli()));
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus receive(Function<String, Long> function, String str, Instant instant) throws DataStreamInfrastructureException {
        return receive(function, str, String.valueOf(instant.toEpochMilli()));
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus receive(Function<String, Long> function, Instant instant, String str) throws DataStreamInfrastructureException {
        return receive(function, String.valueOf(instant.toEpochMilli()), str);
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public void start() throws Exception {
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public void stop() throws Exception {
    }
}
