package net.sf.jabb.dstream.kinesis;

import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import java.math.BigInteger;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import net.sf.jabb.dstream.ReceiveStatus;
import net.sf.jabb.dstream.SimpleReceiveStatus;
import net.sf.jabb.dstream.StreamDataSupplier;
import net.sf.jabb.dstream.StreamDataSupplierWithId;
import net.sf.jabb.dstream.ex.DataStreamInfrastructureException;
import net.sf.jabb.util.attempt.AttemptStrategy;
import net.sf.jabb.util.attempt.StopStrategies;
import net.sf.jabb.util.ex.ExceptionUncheckUtility;
import net.sf.jabb.util.parallel.BackoffStrategies;
import net.sf.jabb.util.parallel.WaitStrategies;
import net.sf.jabb.util.parallel.WaitStrategy;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/sf/jabb/dstream/kinesis/KinesisStreamDataSupplier.class */
public class KinesisStreamDataSupplier<M> implements StreamDataSupplier<M> {
    private static final Logger logger = LoggerFactory.getLogger(KinesisStreamDataSupplier.class);
    private static final int MAX_GET_RECORDS_LIMIT = 1000;
    private static final long RETRY_INTERVAL_AFTER_THRESHOLD_EXCEEDED = 2000;
    private static final long DEFAULT_RETRY_INTERVAL_BASE = 1000;
    private static final int LAST_POSITION_POLL_SECONDS = 3;
    protected Function<UserRecord, M> messageConverter;
    protected AmazonKinesisClient client;
    protected String streamName;
    protected String shardId;
    protected long pollInterval;
    protected int fetchBatchSize;
    protected int receiveBatchSize;
    protected WaitStrategy waitStrategy = WaitStrategies.threadSleepStrategy();
    protected AttemptStrategy attemptStrategy = new AttemptStrategy().withWaitStrategy(this.waitStrategy).withBackoffStrategy(BackoffStrategies.fibonacciBackoff(DEFAULT_RETRY_INTERVAL_BASE, 5000)).withStopStrategy(StopStrategies.stopAfterTotalDuration(Duration.ofMillis(20000)));

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:net/sf/jabb/dstream/kinesis/KinesisStreamDataSupplier$Position.class */
    public static class Position {
        private String sequenceNumber;
        private long subSequenceNumber;
        private boolean isLastUserRecord;

        Position(String str) {
            if (isBeforeTheVeryFirst(str)) {
                return;
            }
            int indexOf = str.indexOf(47);
            this.sequenceNumber = str.substring(0, indexOf);
            if (str.charAt(str.length() - 1) != '/') {
                this.subSequenceNumber = Long.parseLong(str.substring(indexOf + 1));
            } else {
                this.isLastUserRecord = true;
                this.subSequenceNumber = Long.parseLong(str.substring(indexOf + 1, str.length() - 1));
            }
        }

        static Position of(String str) {
            return new Position(str);
        }

        public String toString() {
            return toString(this.sequenceNumber, this.subSequenceNumber, this.isLastUserRecord);
        }

        static String toString(String str, long j, boolean z) {
            return z ? str + "/" + j + "/" : str + "/" + j;
        }

        static String toString(Position position) {
            return toString(position.sequenceNumber, position.subSequenceNumber, position.isLastUserRecord);
        }

        static String getSequenceNumber(String str) {
            return str.substring(0, str.indexOf(47));
        }

        static boolean isBeforeTheVeryFirst(String str) {
            return str == null || str.length() == 0 || str.equals("-1");
        }

        public boolean isBeforeTheVeryFirst() {
            return this.sequenceNumber == null;
        }

        public BigInteger getSequenceNumberAsBigInteger() {
            return new BigInteger(this.sequenceNumber);
        }

        public String getSequenceNumber() {
            return this.sequenceNumber;
        }

        public long getSubSequenceNumber() {
            return this.subSequenceNumber;
        }

        public boolean isLastUserRecord() {
            return this.isLastUserRecord;
        }
    }

    public KinesisStreamDataSupplier(AmazonKinesisClient amazonKinesisClient, String str, String str2, Function<UserRecord, M> function, long j, int i, int i2) {
        this.client = amazonKinesisClient;
        this.streamName = str;
        this.shardId = str2;
        this.messageConverter = function;
        this.pollInterval = j;
        Validate.isTrue(i <= MAX_GET_RECORDS_LIMIT, "fetchBatchSize should not be greater than %d: %d", new Object[]{Integer.valueOf(MAX_GET_RECORDS_LIMIT), Integer.valueOf(i)});
        this.fetchBatchSize = i;
        Validate.isTrue(i2 <= MAX_GET_RECORDS_LIMIT, "receiveBatchSize should not be greater than %d: %d", new Object[]{Integer.valueOf(MAX_GET_RECORDS_LIMIT), Integer.valueOf(i2)});
        this.receiveBatchSize = i2;
    }

    public static <M> List<StreamDataSupplierWithId<M>> create(String str, String str2, Function<UserRecord, M> function, long j, int i, int i2) {
        return create(null, null, str, str2, function, j, i, i2);
    }

    public static <M> List<StreamDataSupplierWithId<M>> create(String str, String str2, String str3, String str4, Function<UserRecord, M> function, long j, int i, int i2) {
        AmazonKinesisClient amazonKinesisClient = (str == null || str2 == null) ? new AmazonKinesisClient() : new AmazonKinesisClient(new BasicAWSCredentials(str, str2));
        amazonKinesisClient.setEndpoint(str3);
        DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
        describeStreamRequest.setStreamName(str4);
        ArrayList arrayList = new ArrayList();
        String str5 = null;
        do {
            describeStreamRequest.setExclusiveStartShardId(str5);
            DescribeStreamResult describeStream = amazonKinesisClient.describeStream(describeStreamRequest);
            arrayList.addAll(describeStream.getStreamDescription().getShards());
            str5 = (!describeStream.getStreamDescription().getHasMoreShards().booleanValue() || arrayList.size() <= 0) ? null : ((Shard) arrayList.get(arrayList.size() - 1)).getShardId();
        } while (str5 != null);
        AmazonKinesisClient amazonKinesisClient2 = amazonKinesisClient;
        return (List) arrayList.stream().map(shard -> {
            String shardId = shard.getShardId();
            return new KinesisStreamDataSupplier(amazonKinesisClient2, str4, shardId, function, j, i, i2).withId(shardId);
        }).collect(Collectors.toList());
    }

    protected String streamNameAndShardId() {
        return this.streamName + "/" + this.shardId;
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public String firstPosition() {
        return "-1";
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public String firstPosition(Instant instant, Duration duration) throws InterruptedException, DataStreamInfrastructureException {
        throw new UnsupportedOperationException("Seeking by enqueuedAfter is not supported by Kinesis");
    }

    protected Record getOneRecord(String str, int i) throws DataStreamInfrastructureException {
        GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
        getRecordsRequest.setShardIterator(str);
        getRecordsRequest.setLimit(1);
        return (Record) ExceptionUncheckUtility.getThrowingUnchecked(() -> {
            return (Record) new AttemptStrategy(this.attemptStrategy).retryIfException(ProvisionedThroughputExceededException.class).callThrowingSuppressed(() -> {
                int i2;
                int i3 = i;
                do {
                    GetRecordsResult records = this.client.getRecords(getRecordsRequest);
                    List records2 = records.getRecords();
                    if (records2 != null && records2.size() > 0) {
                        return (Record) records2.get(0);
                    }
                    String nextShardIterator = records.getNextShardIterator();
                    if (nextShardIterator == null) {
                        return null;
                    }
                    getRecordsRequest.setShardIterator(nextShardIterator);
                    i2 = i3;
                    i3--;
                } while (i2 > 0);
                return null;
            });
        });
    }

    protected UserRecord getUserRecord(Record record, long j) {
        List<UserRecord> deaggregate = UserRecord.deaggregate(Collections.singletonList(record));
        int i = (int) j;
        if (i < deaggregate.size()) {
            UserRecord userRecord = (UserRecord) deaggregate.get(i);
            if (userRecord.getSubSequenceNumber() == j) {
                return userRecord;
            }
        }
        for (UserRecord userRecord2 : deaggregate) {
            if (userRecord2.getSubSequenceNumber() == j) {
                return userRecord2;
            }
        }
        return null;
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public String lastPosition() throws DataStreamInfrastructureException {
        return lastPosition(3);
    }

    public String lastPosition(int i) throws DataStreamInfrastructureException {
        String shardIterator = this.client.getShardIterator(this.streamName, this.shardId, ShardIteratorType.LATEST.name()).getShardIterator();
        if (shardIterator == null) {
            throw new DataStreamInfrastructureException("Failed to get shard iterator for " + streamNameAndShardId() + " at the end of the stream");
        }
        Record oneRecord = getOneRecord(shardIterator, i);
        if (oneRecord != null) {
            List deaggregate = UserRecord.deaggregate(Collections.singletonList(oneRecord));
            return Position.toString(oneRecord.getSequenceNumber(), ((UserRecord) deaggregate.get(deaggregate.size() - 1)).getSubSequenceNumber(), true);
        }
        String actualFirstPosition = actualFirstPosition();
        if (actualFirstPosition == null) {
            return null;
        }
        throw new DataStreamInfrastructureException("No record had been received in the last 3 seconds, the first position is: " + actualFirstPosition);
    }

    public String actualFirstPosition() throws DataStreamInfrastructureException {
        String shardIterator = this.client.getShardIterator(this.streamName, this.shardId, ShardIteratorType.TRIM_HORIZON.name()).getShardIterator();
        if (shardIterator == null) {
            throw new DataStreamInfrastructureException("Failed to get shard iterator for " + streamNameAndShardId() + " at the start of the stream");
        }
        Record oneRecord = getOneRecord(shardIterator, 0);
        if (oneRecord == null) {
            return null;
        }
        return Position.toString(oneRecord.getSequenceNumber(), 0L, UserRecord.deaggregate(Collections.singletonList(oneRecord)).size() == 1);
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public Instant enqueuedTime(String str) throws DataStreamInfrastructureException {
        String shardIterator = this.client.getShardIterator(this.streamName, this.shardId, ShardIteratorType.AT_SEQUENCE_NUMBER.name(), Position.getSequenceNumber(str)).getShardIterator();
        if (shardIterator == null) {
            throw new DataStreamInfrastructureException("Failed to get shard iterator for " + streamNameAndShardId() + " starting at " + str);
        }
        Record oneRecord = getOneRecord(shardIterator, 0);
        if (oneRecord == null) {
            return null;
        }
        return oneRecord.getApproximateArrivalTimestamp().toInstant();
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public String nextStartPosition(String str) {
        return str;
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public boolean isInRange(String str, String str2) {
        Validate.isTrue(str != null, "position cannot be null", new Object[0]);
        if (str2 == null || Position.isBeforeTheVeryFirst(str)) {
            return true;
        }
        Position of = Position.of(str);
        Position of2 = Position.of(str2);
        switch (of.getSequenceNumberAsBigInteger().compareTo(of2.getSequenceNumberAsBigInteger())) {
            case -1:
                return true;
            case 1:
                return false;
            default:
                return of.getSubSequenceNumber() <= of2.getSubSequenceNumber();
        }
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public boolean isInRange(Instant instant, Instant instant2) {
        Validate.isTrue(instant != null, "enqueuedTime cannot be null", new Object[0]);
        return instant2 == null || !instant.isAfter(instant2);
    }

    protected String getShardIterator(Position position) throws DataStreamInfrastructureException {
        GetShardIteratorResult shardIterator;
        try {
            if (position.isBeforeTheVeryFirst()) {
                shardIterator = this.client.getShardIterator(this.streamName, this.shardId, ShardIteratorType.TRIM_HORIZON.name());
            } else {
                shardIterator = this.client.getShardIterator(this.streamName, this.shardId, position.isLastUserRecord() ? ShardIteratorType.AFTER_SEQUENCE_NUMBER.name() : ShardIteratorType.AT_SEQUENCE_NUMBER.name(), position.getSequenceNumber());
            }
            return shardIterator.getShardIterator();
        } catch (Exception e) {
            throw new DataStreamInfrastructureException("Failed to get shard iterator for " + streamNameAndShardId() + " starting from " + position, e);
        }
    }

    protected SimpleReceiveStatus fetch(List<? super M> list, String str, Predicate<Record> predicate, int i, long j) throws InterruptedException, DataStreamInfrastructureException {
        Position of = Position.of(str);
        SimpleReceiveStatus simpleReceiveStatus = new SimpleReceiveStatus();
        String shardIterator = getShardIterator(of);
        int i2 = i;
        GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
        long currentTimeMillis = System.currentTimeMillis() + j;
        while (shardIterator != null && i2 > 0 && System.currentTimeMillis() < currentTimeMillis) {
            getRecordsRequest.setShardIterator(shardIterator);
            getRecordsRequest.setLimit(Integer.valueOf(i2 > this.fetchBatchSize ? this.fetchBatchSize : i2));
            try {
                GetRecordsResult records = this.client.getRecords(getRecordsRequest);
                List<Record> records2 = records.getRecords();
                if (records2 == null || records2.size() <= 0) {
                    this.waitStrategy.await(this.pollInterval);
                } else {
                    boolean z = of.isBeforeTheVeryFirst() || of.isLastUserRecord();
                    for (Record record : records2) {
                        boolean equals = record.getSequenceNumber().equals(of.getSequenceNumber());
                        List deaggregate = UserRecord.deaggregate(Collections.singletonList(record));
                        int i3 = 0;
                        while (i3 < deaggregate.size()) {
                            UserRecord userRecord = (UserRecord) deaggregate.get(i3);
                            if (z || !equals || userRecord.getSubSequenceNumber() > of.getSubSequenceNumber()) {
                                if (!predicate.test(userRecord)) {
                                    simpleReceiveStatus.setOutOfRangeReached(true);
                                    return simpleReceiveStatus;
                                }
                                list.add(this.messageConverter.apply(userRecord));
                                simpleReceiveStatus.setLastPosition(Position.toString(userRecord.getSequenceNumber(), userRecord.getSubSequenceNumber(), i3 == deaggregate.size() - 1));
                                simpleReceiveStatus.setLastEnqueuedTime(userRecord.getApproximateArrivalTimestamp().toInstant());
                                i2--;
                                if (i2 <= 0) {
                                    return simpleReceiveStatus;
                                }
                            }
                            i3++;
                        }
                    }
                }
                shardIterator = records.getNextShardIterator();
            } catch (Exception e) {
                throw new DataStreamInfrastructureException("Failed to get records", e);
            } catch (ProvisionedThroughputExceededException e2) {
                logger.debug("ProvisionedThroughputExceeded, will retry after 2000ms");
                this.waitStrategy.await(RETRY_INTERVAL_AFTER_THRESHOLD_EXCEEDED);
            }
        }
        return simpleReceiveStatus;
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus fetch(List<? super M> list, String str, String str2, int i, Duration duration) throws InterruptedException, DataStreamInfrastructureException {
        return fetch(list, str, record -> {
            return isInRange(record.getSequenceNumber(), str2);
        }, i, duration.toMillis());
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus fetch(List<? super M> list, Instant instant, Instant instant2, int i, Duration duration) throws InterruptedException, DataStreamInfrastructureException {
        throw new UnsupportedOperationException("Fetching by startEnqueuedTime is not supported by Kinesis");
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus fetch(List<? super M> list, String str, Instant instant, int i, Duration duration) throws InterruptedException, DataStreamInfrastructureException {
        return fetch(list, str, record -> {
            return isInRange(record.getApproximateArrivalTimestamp().toInstant(), instant);
        }, i, duration.toMillis());
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public String startAsyncReceiving(Consumer<M> consumer, String str) throws DataStreamInfrastructureException {
        throw new UnsupportedOperationException("Not implemented yet");
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public String startAsyncReceiving(Consumer<M> consumer, Instant instant) throws DataStreamInfrastructureException {
        throw new UnsupportedOperationException("Not implemented yet");
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public void stopAsyncReceiving(String str) {
        throw new UnsupportedOperationException("Not implemented yet");
    }

    protected SimpleReceiveStatus receive(Function<M, Long> function, String str, Predicate<Record> predicate) throws DataStreamInfrastructureException {
        Position of = Position.of(str);
        SimpleReceiveStatus simpleReceiveStatus = new SimpleReceiveStatus();
        String shardIterator = getShardIterator(of);
        try {
            GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
            long currentTimeMillis = System.currentTimeMillis() + function.apply(null).longValue();
            while (shardIterator != null && System.currentTimeMillis() < currentTimeMillis) {
                getRecordsRequest.setShardIterator(shardIterator);
                getRecordsRequest.setLimit(Integer.valueOf(this.receiveBatchSize));
                try {
                    GetRecordsResult records = this.client.getRecords(getRecordsRequest);
                    List<Record> records2 = records.getRecords();
                    if (records2 == null || records2.size() <= 0) {
                        this.waitStrategy.await(this.pollInterval);
                    } else {
                        boolean z = of.isBeforeTheVeryFirst() || of.isLastUserRecord();
                        for (Record record : records2) {
                            boolean equals = record.getSequenceNumber().equals(of.getSequenceNumber());
                            List deaggregate = UserRecord.deaggregate(Collections.singletonList(record));
                            int i = 0;
                            while (i < deaggregate.size()) {
                                UserRecord userRecord = (UserRecord) deaggregate.get(i);
                                if (z || !equals || userRecord.getSubSequenceNumber() > of.getSubSequenceNumber()) {
                                    if (!predicate.test(userRecord)) {
                                        simpleReceiveStatus.setOutOfRangeReached(true);
                                        return simpleReceiveStatus;
                                    }
                                    long longValue = function.apply(this.messageConverter.apply(userRecord)).longValue();
                                    simpleReceiveStatus.setLastPosition(Position.toString(userRecord.getSequenceNumber(), userRecord.getSubSequenceNumber(), i == deaggregate.size() - 1));
                                    simpleReceiveStatus.setLastEnqueuedTime(userRecord.getApproximateArrivalTimestamp().toInstant());
                                    if (longValue <= 0) {
                                        return simpleReceiveStatus;
                                    }
                                    currentTimeMillis = System.currentTimeMillis() + longValue;
                                }
                                i++;
                            }
                        }
                    }
                    shardIterator = records.getNextShardIterator();
                } catch (Exception e) {
                    throw new DataStreamInfrastructureException("Failed to get records from " + streamNameAndShardId() + " starting from " + str, e);
                } catch (ProvisionedThroughputExceededException e2) {
                    logger.debug("ProvisionedThroughputExceeded, will retry after 2000ms");
                    this.waitStrategy.await(RETRY_INTERVAL_AFTER_THRESHOLD_EXCEEDED);
                }
            }
            return simpleReceiveStatus;
        } catch (InterruptedException e3) {
            throw new DataStreamInfrastructureException("Interrupted while receiving from " + streamNameAndShardId() + " starting from " + str, e3);
        } catch (DataStreamInfrastructureException e4) {
            throw e4;
        } catch (Exception e5) {
            throw new DataStreamInfrastructureException("Failed to receive from " + streamNameAndShardId() + " starting from " + str, e5);
        }
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus receive(Function<M, Long> function, String str, String str2) throws DataStreamInfrastructureException {
        return receive(function, str, record -> {
            return isInRange(record.getSequenceNumber(), str2);
        });
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus receive(Function<M, Long> function, Instant instant, Instant instant2) throws DataStreamInfrastructureException {
        throw new UnsupportedOperationException("Receiving by startEnqueuedTime is not supported by Kinesis");
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus receive(Function<M, Long> function, String str, Instant instant) throws DataStreamInfrastructureException {
        return receive(function, str, record -> {
            return isInRange(record.getApproximateArrivalTimestamp().toInstant(), instant);
        });
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus receive(Function<M, Long> function, Instant instant, String str) throws DataStreamInfrastructureException {
        throw new UnsupportedOperationException("Receiving by startEnqueuedTime is not supported by Kinesis");
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public void start() throws Exception {
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public void stop() throws Exception {
    }
}
