package net.sf.jabb.txsdp;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import net.sf.jabb.dstream.ReceiveStatus;
import net.sf.jabb.dstream.StreamDataSupplier;
import net.sf.jabb.dstream.StreamDataSupplierWithIdAndPositionRange;
import net.sf.jabb.dstream.StreamDataSupplierWithIdAndRange;
import net.sf.jabb.dstream.ex.DataStreamInfrastructureException;
import net.sf.jabb.seqtx.ReadOnlySequentialTransaction;
import net.sf.jabb.seqtx.SequentialTransaction;
import net.sf.jabb.seqtx.SequentialTransactionsCoordinator;
import net.sf.jabb.seqtx.ex.DuplicatedTransactionIdException;
import net.sf.jabb.seqtx.ex.TransactionStorageInfrastructureException;
import net.sf.jabb.util.parallel.WaitStrategy;
import net.sf.jabb.util.text.DurationFormatter;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/sf/jabb/txsdp/TransactionalStreamDataBatchProcessing.class */
public class TransactionalStreamDataBatchProcessing<M> {
    static final Logger logger = LoggerFactory.getLogger(TransactionalStreamDataBatchProcessing.class);
    protected String id;
    protected FlexibleBatchProcessor<M> batchProcessor;
    protected List<StreamDataSupplierWithIdAndRange<M, ?>> suppliers;
    protected SequentialTransactionsCoordinator txCoordinator;
    protected Options processorOptions;
    protected Map<String, TransactionalStreamDataBatchProcessing<M>.Processor> processors;

    /* loaded from: input_file:net/sf/jabb/txsdp/TransactionalStreamDataBatchProcessing$Options.class */
    public static class Options {
        private Duration initialTransactionTimeoutDuration;
        private int maxInProgressTransactions;
        private int maxRetringTransactions;
        private Duration transactionAcquisitionDelay;
        private WaitStrategy waitStrategy;

        public Options() {
        }

        public Options(Options options) {
            this.initialTransactionTimeoutDuration = options.initialTransactionTimeoutDuration;
            this.maxInProgressTransactions = options.maxInProgressTransactions;
            this.maxRetringTransactions = options.maxRetringTransactions;
            this.transactionAcquisitionDelay = options.transactionAcquisitionDelay;
            this.waitStrategy = options.waitStrategy;
        }

        public Duration getInitialTransactionTimeoutDuration() {
            return this.initialTransactionTimeoutDuration;
        }

        public void setInitialTransactionTimeoutDuration(Duration duration) {
            this.initialTransactionTimeoutDuration = duration;
        }

        public Options withInitialTransactionTimeoutDuration(Duration duration) {
            this.initialTransactionTimeoutDuration = duration;
            return this;
        }

        public int getMaxInProgressTransactions() {
            return this.maxInProgressTransactions;
        }

        public void setMaxInProgressTransactions(int i) {
            this.maxInProgressTransactions = i;
        }

        public Options withMaxInProgressTransactions(int i) {
            this.maxInProgressTransactions = i;
            return this;
        }

        public int getMaxRetringTransactions() {
            return this.maxRetringTransactions;
        }

        public void setMaxRetringTransactions(int i) {
            this.maxRetringTransactions = i;
        }

        public Options withMaxRetringTransactions(int i) {
            this.maxRetringTransactions = i;
            return this;
        }

        public Duration getTransactionAcquisitionDelay() {
            return this.transactionAcquisitionDelay;
        }

        public void setTransactionAcquisitionDelay(Duration duration) {
            this.transactionAcquisitionDelay = duration;
        }

        public Options withTransactionAcquisitionDelay(Duration duration) {
            this.transactionAcquisitionDelay = duration;
            return this;
        }

        public WaitStrategy getWaitStrategy() {
            return this.waitStrategy;
        }

        public void setWaitStrategy(WaitStrategy waitStrategy) {
            this.waitStrategy = waitStrategy;
        }

        public Options withWaitStrategy(WaitStrategy waitStrategy) {
            this.waitStrategy = waitStrategy;
            return this;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:net/sf/jabb/txsdp/TransactionalStreamDataBatchProcessing$Processor.class */
    public class Processor implements Runnable {
        protected AtomicReference<State> state = new AtomicReference<>(State.READY);
        private String processorId;

        Processor(String str) {
            this.processorId = str;
        }

        private void await() {
            WaitStrategy waitStrategy = TransactionalStreamDataBatchProcessing.this.processorOptions.getWaitStrategy();
            try {
                waitStrategy.await(TransactionalStreamDataBatchProcessing.this.processorOptions.getTransactionAcquisitionDelay().toMillis());
            } catch (InterruptedException e) {
                waitStrategy.handleInterruptedException(e);
            }
        }

        private boolean allProcessed(boolean[] zArr) {
            for (boolean z : zArr) {
                if (!z) {
                    return false;
                }
            }
            return true;
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean[] zArr = new boolean[TransactionalStreamDataBatchProcessing.this.suppliers.size()];
            int nextInt = new Random(System.currentTimeMillis()).nextInt(TransactionalStreamDataBatchProcessing.this.suppliers.size()) - 1;
            ProcessingContextImpl processingContextImpl = new ProcessingContextImpl(TransactionalStreamDataBatchProcessing.this.txCoordinator);
            while (this.state.get() != State.STOPPED) {
                while (true) {
                    if (this.state.get() != State.RUNNING) {
                        break;
                    }
                    if (allProcessed(zArr)) {
                        this.state.set(State.FINISHED);
                        break;
                    }
                    long currentTimeMillis = System.currentTimeMillis();
                    int i = 0;
                    SequentialTransaction sequentialTransaction = null;
                    String str = null;
                    StreamDataSupplierWithIdAndRange<M, ?> streamDataSupplierWithIdAndRange = null;
                    StreamDataSupplier<M> streamDataSupplier = null;
                    do {
                        try {
                            nextInt = (nextInt + 1) % TransactionalStreamDataBatchProcessing.this.suppliers.size();
                            if (!zArr[nextInt]) {
                                streamDataSupplierWithIdAndRange = TransactionalStreamDataBatchProcessing.this.suppliers.get(nextInt);
                                streamDataSupplier = streamDataSupplierWithIdAndRange.getSupplier();
                                i++;
                                str = TransactionalStreamDataBatchProcessing.this.seriesId(streamDataSupplierWithIdAndRange);
                                try {
                                    sequentialTransaction = TransactionalStreamDataBatchProcessing.this.txCoordinator.startTransaction(str, this.processorId, TransactionalStreamDataBatchProcessing.this.processorOptions.getInitialTransactionTimeoutDuration(), TransactionalStreamDataBatchProcessing.this.processorOptions.getMaxInProgressTransactions(), TransactionalStreamDataBatchProcessing.this.processorOptions.getMaxRetringTransactions());
                                } catch (Exception e) {
                                    TransactionalStreamDataBatchProcessing.logger.warn("[{}] startTransaction(...) failed", str, e);
                                    await();
                                }
                                if (sequentialTransaction != null) {
                                    break;
                                }
                            } else {
                                break;
                            }
                        } catch (DuplicatedTransactionIdException e2) {
                            TransactionalStreamDataBatchProcessing.logger.warn("[{}] Transaction ID is duplicated: " + sequentialTransaction.getTransactionId(), str, e2);
                        } catch (TransactionStorageInfrastructureException e3) {
                            TransactionalStreamDataBatchProcessing.logger.debug("[{}] In transaction storage infrastructure error happened", str, e3);
                        } catch (Exception e4) {
                            TransactionalStreamDataBatchProcessing.logger.error("[{}] Error happened", str, e4);
                        }
                    } while (this.state.get() == State.RUNNING);
                    while (sequentialTransaction != null && !sequentialTransaction.hasStarted() && this.state.get() == State.RUNNING) {
                        String transactionId = sequentialTransaction.getTransactionId();
                        String startPosition = sequentialTransaction.getStartPosition();
                        sequentialTransaction.setTransactionId(null);
                        sequentialTransaction.setStartPosition(startPosition == null ? "" : streamDataSupplier.nextStartPosition(startPosition));
                        sequentialTransaction.setEndPositionNull();
                        sequentialTransaction.setTimeout(TransactionalStreamDataBatchProcessing.this.processorOptions.getInitialTransactionTimeoutDuration());
                        i++;
                        sequentialTransaction = TransactionalStreamDataBatchProcessing.this.txCoordinator.startTransaction(str, transactionId, startPosition, sequentialTransaction, TransactionalStreamDataBatchProcessing.this.processorOptions.getMaxInProgressTransactions(), TransactionalStreamDataBatchProcessing.this.processorOptions.getMaxRetringTransactions());
                    }
                    if (sequentialTransaction != null && sequentialTransaction.hasStarted() && this.state.get() == State.RUNNING) {
                        Logger logger = TransactionalStreamDataBatchProcessing.logger;
                        Object[] objArr = new Object[6];
                        objArr[0] = sequentialTransaction.getAttempts() == 1 ? "new" : "failed";
                        objArr[1] = sequentialTransaction.getTransactionId();
                        objArr[2] = sequentialTransaction.getStartPosition();
                        objArr[3] = sequentialTransaction.getEndPosition();
                        objArr[4] = Integer.valueOf(i);
                        objArr[5] = DurationFormatter.formatSince(currentTimeMillis);
                        logger.debug("Got a {} transaction {} [{}-{}] after {} attempts: {}", objArr);
                        if (doTransaction(processingContextImpl.withSeriesId(str).withTransaction(sequentialTransaction), streamDataSupplierWithIdAndRange)) {
                            try {
                                String finishedPosition = SequentialTransactionsCoordinator.getFinishedPosition(TransactionalStreamDataBatchProcessing.this.txCoordinator.getRecentTransactions(str));
                                if (finishedPosition != null && (finishedPosition.equals(sequentialTransaction.getStartPosition()) || finishedPosition.equals(sequentialTransaction.getEndPosition()))) {
                                    zArr[nextInt] = true;
                                }
                            } catch (Exception e5) {
                                TransactionalStreamDataBatchProcessing.logger.warn("[{}] Failed to get recent transactions", str, e5);
                            }
                        }
                    } else if (this.state.get() == State.RUNNING && !zArr[nextInt]) {
                        await();
                    }
                }
                await();
            }
        }

        protected boolean doTransaction(ProcessingContextImpl processingContextImpl, StreamDataSupplierWithIdAndRange<M, ?> streamDataSupplierWithIdAndRange) {
            String str = processingContextImpl.seriesId;
            SequentialTransaction sequentialTransaction = processingContextImpl.transaction;
            boolean z = false;
            String str2 = null;
            ReceiveStatus receiveStatus = null;
            try {
            } catch (Exception e) {
                if (TransactionalStreamDataBatchProcessing.logger.isDebugEnabled()) {
                    logDebugInTransaction("Processing is not successful", processingContextImpl, str2, e);
                }
            }
            if (!TransactionalStreamDataBatchProcessing.this.batchProcessor.initialize(processingContextImpl)) {
                throw new Exception("Unable to initilize processor");
            }
            long receive = TransactionalStreamDataBatchProcessing.this.batchProcessor.receive(processingContextImpl, null);
            receiveStatus = streamDataSupplierWithIdAndRange.receiveInRange(obj -> {
                return Long.valueOf(TransactionalStreamDataBatchProcessing.this.batchProcessor.receive(processingContextImpl, obj));
            }, sequentialTransaction.getStartPosition());
            str2 = receiveStatus.getLastPosition();
            if (str2 != null) {
                if (sequentialTransaction.getEndPosition() == null) {
                    try {
                        TransactionalStreamDataBatchProcessing.this.txCoordinator.updateTransactionEndPosition(str, this.processorId, sequentialTransaction.getTransactionId(), str2);
                        sequentialTransaction.setEndPosition(str2);
                    } catch (Exception e2) {
                        throw new Exception("Unable to update end position in open range transaction", e2);
                    }
                } else if (!str2.equals(sequentialTransaction.getEndPosition())) {
                    throw new Exception("Unable to fetch all the data in range within duration " + DurationFormatter.format(receive));
                }
                z = TransactionalStreamDataBatchProcessing.this.batchProcessor.finish(processingContextImpl);
            } else if (TransactionalStreamDataBatchProcessing.logger.isDebugEnabled()) {
                logDebugInTransaction("Fetched nothing within " + DurationFormatter.format(receive), processingContextImpl, str2);
            }
            if (z) {
                try {
                    TransactionalStreamDataBatchProcessing.this.txCoordinator.finishTransaction(str, this.processorId, sequentialTransaction.getTransactionId(), str2);
                } catch (Exception e3) {
                    if (TransactionalStreamDataBatchProcessing.logger.isDebugEnabled()) {
                        logDebugInTransaction("Unable to finish transaction", processingContextImpl, str2, e3);
                    }
                }
            } else {
                try {
                    TransactionalStreamDataBatchProcessing.this.txCoordinator.abortTransaction(str, this.processorId, sequentialTransaction.getTransactionId());
                    if (TransactionalStreamDataBatchProcessing.logger.isDebugEnabled()) {
                        logDebugInTransaction("Aborted transaction", processingContextImpl, str2);
                    }
                } catch (Exception e4) {
                    if (TransactionalStreamDataBatchProcessing.logger.isDebugEnabled()) {
                        logDebugInTransaction("Unable to abort transaction", processingContextImpl, str2, e4);
                    }
                }
            }
            if (receiveStatus == null) {
                return false;
            }
            return receiveStatus.isOutOfRangeReached();
        }

        protected void logDebugInTransaction(String str, ProcessingContextImpl processingContextImpl, String str2, Exception exc) {
            SequentialTransaction sequentialTransaction = processingContextImpl.transaction;
            TransactionalStreamDataBatchProcessing.logger.debug("[{} - {}] " + str + ": transactionId={}, startPosition={}, endPosition={}, fetchedLastPosition={}. Exception: {}", new Object[]{processingContextImpl.seriesId, this.processorId, sequentialTransaction.getTransactionId(), sequentialTransaction.getStartPosition(), sequentialTransaction.getEndPosition(), str2, TransactionalStreamDataBatchProcessing.exceptionSummary(exc)});
        }

        protected void logDebugInTransaction(String str, ProcessingContextImpl processingContextImpl, String str2) {
            SequentialTransaction sequentialTransaction = processingContextImpl.transaction;
            TransactionalStreamDataBatchProcessing.logger.debug("[{} - {}] " + str + ": transactionId={}, startPosition={}, endPosition={}, fetchedLastPosition={}", new Object[]{processingContextImpl.seriesId, this.processorId, sequentialTransaction.getTransactionId(), sequentialTransaction.getStartPosition(), sequentialTransaction.getEndPosition(), str2});
        }
    }

    /* loaded from: input_file:net/sf/jabb/txsdp/TransactionalStreamDataBatchProcessing$ProcessorStatus.class */
    public static class ProcessorStatus {
        private State state;

        public String toString() {
            return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
        }

        public State getState() {
            return this.state;
        }
    }

    /* loaded from: input_file:net/sf/jabb/txsdp/TransactionalStreamDataBatchProcessing$State.class */
    public enum State {
        READY,
        STOPPED,
        PAUSED,
        RUNNING,
        FINISHED
    }

    /* loaded from: input_file:net/sf/jabb/txsdp/TransactionalStreamDataBatchProcessing$Status.class */
    public static class Status {
        private Map<String, ProcessorStatus> processorStatus;
        private LinkedHashMap<String, StreamStatus> streamStatus;

        public String toString() {
            return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
        }

        public Map<String, ProcessorStatus> getProcessorStatus() {
            return this.processorStatus;
        }

        public LinkedHashMap<String, StreamStatus> getStreamStatus() {
            return this.streamStatus;
        }
    }

    /* loaded from: input_file:net/sf/jabb/txsdp/TransactionalStreamDataBatchProcessing$StreamStatus.class */
    public static class StreamStatus {
        private String finishedPosition;
        private Instant finishedEnqueuedTime;
        private String lastUnfinishedStartPosition;
        private Instant lastUnfinishedStartEnqueuedTime;
        private String lastUnfinishedEndPosition;
        private Instant lastUnfinishedEndEnqueuedTime;
        private SequentialTransactionsCoordinator.TransactionCounts transactionCounts;

        StreamStatus() {
        }

        public String toString() {
            return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
        }

        public String getFinishedPosition() {
            return this.finishedPosition;
        }

        public Instant getFinishedEnqueuedTime() {
            return this.finishedEnqueuedTime;
        }

        public String getLastUnfinishedStartPosition() {
            return this.lastUnfinishedStartPosition;
        }

        public Instant getLastUnfinishedStartEnqueuedTime() {
            return this.lastUnfinishedStartEnqueuedTime;
        }

        public String getLastUnfinishedEndPosition() {
            return this.lastUnfinishedEndPosition;
        }

        public Instant getLastUnfinishedEndEnqueuedTime() {
            return this.lastUnfinishedEndEnqueuedTime;
        }

        public SequentialTransactionsCoordinator.TransactionCounts getTransactionCounts() {
            return this.transactionCounts;
        }
    }

    public TransactionalStreamDataBatchProcessing(String str, Options options, SequentialTransactionsCoordinator sequentialTransactionsCoordinator, FlexibleBatchProcessor<M> flexibleBatchProcessor, List<StreamDataSupplierWithIdAndRange<M, ?>> list) {
        this.processors = new HashMap();
        this.id = str;
        this.processorOptions = new Options(options);
        this.txCoordinator = sequentialTransactionsCoordinator;
        this.batchProcessor = flexibleBatchProcessor;
        this.suppliers = new ArrayList();
        this.suppliers.addAll(list);
    }

    @SafeVarargs
    public TransactionalStreamDataBatchProcessing(String str, Options options, SequentialTransactionsCoordinator sequentialTransactionsCoordinator, FlexibleBatchProcessor<M> flexibleBatchProcessor, StreamDataSupplierWithIdAndRange<M, ?>... streamDataSupplierWithIdAndRangeArr) {
        this(str, options, sequentialTransactionsCoordinator, flexibleBatchProcessor, Arrays.asList(streamDataSupplierWithIdAndRangeArr));
    }

    public TransactionalStreamDataBatchProcessing(String str, Options options, SequentialTransactionsCoordinator sequentialTransactionsCoordinator, SimpleBatchProcessor<M> simpleBatchProcessor, int i, Duration duration, Duration duration2, List<StreamDataSupplierWithIdAndRange<M, ?>> list) {
        this(str, options, sequentialTransactionsCoordinator, new SimpleFlexibleBatchProcessor(simpleBatchProcessor, i, duration, duration2), list);
    }

    @SafeVarargs
    public TransactionalStreamDataBatchProcessing(String str, Options options, SequentialTransactionsCoordinator sequentialTransactionsCoordinator, SimpleBatchProcessor<M> simpleBatchProcessor, int i, Duration duration, Duration duration2, StreamDataSupplierWithIdAndPositionRange<M>... streamDataSupplierWithIdAndPositionRangeArr) {
        this(str, options, sequentialTransactionsCoordinator, simpleBatchProcessor, i, duration, duration2, Arrays.asList(streamDataSupplierWithIdAndPositionRangeArr));
    }

    public Runnable createProcessor(String str) {
        Validate.notNull(str, "Processor id cannot be null", new Object[0]);
        if (this.processors.containsKey(str)) {
            throw new IllegalArgumentException("Another runnable with the same processor ID already exists: " + str);
        }
        TransactionalStreamDataBatchProcessing<M>.Processor processor = new Processor(str);
        this.processors.put(str, processor);
        return processor;
    }

    protected void start(TransactionalStreamDataBatchProcessing<M>.Processor processor) {
        if (!processor.state.compareAndSet(State.READY, State.RUNNING) && !processor.state.compareAndSet(State.PAUSED, State.RUNNING)) {
            throw new IllegalStateException("Cannot start when in " + processor.state.get() + " state: " + ((Processor) processor).processorId);
        }
    }

    protected void pause(TransactionalStreamDataBatchProcessing<M>.Processor processor) {
        if (!processor.state.compareAndSet(State.RUNNING, State.PAUSED)) {
            throw new IllegalStateException("Cannot pause when not in running state: " + ((Processor) processor).processorId);
        }
    }

    protected void stop(TransactionalStreamDataBatchProcessing<M>.Processor processor) {
        processor.state.set(State.STOPPED);
    }

    public void start(String str) {
        TransactionalStreamDataBatchProcessing<M>.Processor processor = this.processors.get(str);
        Validate.notNull(processor, "There is no processor with the id: " + str, new Object[0]);
        start(processor);
    }

    public void pause(String str) {
        TransactionalStreamDataBatchProcessing<M>.Processor processor = this.processors.get(str);
        Validate.notNull(processor, "There is no processor with the id: " + str, new Object[0]);
        pause(processor);
    }

    public void stop(String str) {
        TransactionalStreamDataBatchProcessing<M>.Processor processor = this.processors.get(str);
        Validate.notNull(processor, "There is no processor with the id: " + str, new Object[0]);
        stop(processor);
    }

    public void startAll() {
        Iterator<TransactionalStreamDataBatchProcessing<M>.Processor> it = this.processors.values().iterator();
        while (it.hasNext()) {
            start(it.next());
        }
    }

    public void pauseAll() {
        Iterator<TransactionalStreamDataBatchProcessing<M>.Processor> it = this.processors.values().iterator();
        while (it.hasNext()) {
            pause(it.next());
        }
    }

    public void stopAll() {
        Iterator<TransactionalStreamDataBatchProcessing<M>.Processor> it = this.processors.values().iterator();
        while (it.hasNext()) {
            stop(it.next());
        }
    }

    protected String seriesId(StreamDataSupplierWithIdAndRange<M, ?> streamDataSupplierWithIdAndRange) {
        return this.id + "-" + streamDataSupplierWithIdAndRange.getId();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static String exceptionSummary(Throwable th) {
        Throwable th2 = th;
        StringBuilder sb = new StringBuilder();
        sb.append('[');
        sb.append(th2.getClass().getName());
        sb.append(":").append(th2.getMessage());
        sb.append(']');
        for (int i = 0; i < 5 && th2.getCause() != null && th2.getCause() != th2; i++) {
            th2 = th2.getCause();
            sb.append(" caused by [");
            sb.append(th2.getClass().getName());
            sb.append(":").append(th2.getMessage());
            sb.append(']');
        }
        return sb.toString();
    }

    public Status getStatus() throws TransactionStorageInfrastructureException, DataStreamInfrastructureException {
        Status status = new Status();
        status.processorStatus = getProcessorStatus();
        status.streamStatus = getStreamStatus();
        return status;
    }

    public Map<String, ProcessorStatus> getProcessorStatus() {
        TreeMap treeMap = new TreeMap();
        for (TransactionalStreamDataBatchProcessing<M>.Processor processor : this.processors.values()) {
            ProcessorStatus processorStatus = new ProcessorStatus();
            processorStatus.state = processor.state.get();
            treeMap.put(((Processor) processor).processorId, processorStatus);
        }
        return treeMap;
    }

    public LinkedHashMap<String, StreamStatus> getStreamStatus() throws TransactionStorageInfrastructureException, DataStreamInfrastructureException {
        LinkedHashMap<String, StreamStatus> linkedHashMap = new LinkedHashMap<>(this.suppliers.size());
        for (StreamDataSupplierWithIdAndRange<M, ?> streamDataSupplierWithIdAndRange : this.suppliers) {
            List<? extends ReadOnlySequentialTransaction> recentTransactions = this.txCoordinator.getRecentTransactions(seriesId(streamDataSupplierWithIdAndRange));
            SequentialTransactionsCoordinator.TransactionCounts transactionCounts = SequentialTransactionsCoordinator.getTransactionCounts(recentTransactions);
            String finishedPosition = SequentialTransactionsCoordinator.getFinishedPosition(recentTransactions);
            String str = null;
            String str2 = null;
            Instant instant = null;
            Instant instant2 = null;
            if (recentTransactions != null && recentTransactions.size() > 0) {
                ReadOnlySequentialTransaction readOnlySequentialTransaction = recentTransactions.get(recentTransactions.size() - 1);
                if (readOnlySequentialTransaction.isInProgress()) {
                    str = readOnlySequentialTransaction.getStartPosition();
                    str2 = readOnlySequentialTransaction.getEndPosition();
                    r15 = StringUtils.isNotBlank(str) ? streamDataSupplierWithIdAndRange.getSupplier().enqueuedTime(str) : null;
                    if (StringUtils.isNotBlank(str2)) {
                        instant2 = streamDataSupplierWithIdAndRange.getSupplier().enqueuedTime(str2);
                    }
                }
            }
            if (finishedPosition != null) {
                instant = streamDataSupplierWithIdAndRange.getSupplier().enqueuedTime(finishedPosition);
            }
            StreamStatus streamStatus = new StreamStatus();
            streamStatus.transactionCounts = transactionCounts;
            streamStatus.finishedPosition = finishedPosition;
            streamStatus.lastUnfinishedStartPosition = str;
            streamStatus.lastUnfinishedEndPosition = str2;
            streamStatus.finishedEnqueuedTime = instant;
            streamStatus.lastUnfinishedStartEnqueuedTime = r15;
            streamStatus.lastUnfinishedEndEnqueuedTime = instant2;
            linkedHashMap.put(streamDataSupplierWithIdAndRange.getId(), streamStatus);
        }
        return linkedHashMap;
    }
}
