package net.sf.jabb.txsdp;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import net.sf.jabb.dstream.ReceiveStatus;
import net.sf.jabb.dstream.StreamDataSupplier;
import net.sf.jabb.dstream.StreamDataSupplierWithIdAndPositionRange;
import net.sf.jabb.dstream.StreamDataSupplierWithIdAndRange;
import net.sf.jabb.dstream.ex.DataStreamInfrastructureException;
import net.sf.jabb.seqtx.ReadOnlySequentialTransaction;
import net.sf.jabb.seqtx.SequentialTransaction;
import net.sf.jabb.seqtx.SequentialTransactionsCoordinator;
import net.sf.jabb.seqtx.ex.DuplicatedTransactionIdException;
import net.sf.jabb.seqtx.ex.TransactionStorageInfrastructureException;
import net.sf.jabb.txsdp.TransactionalStreamDataBatchProcessing;
import net.sf.jabb.util.parallel.WaitStrategy;
import net.sf.jabb.util.text.DurationFormatter;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/sf/jabb/txsdp/DefaultTransactionalStreamDataBatchProcessing.class */
public class DefaultTransactionalStreamDataBatchProcessing<M> implements TransactionalStreamDataBatchProcessing<M> {
    static final Logger logger = LoggerFactory.getLogger(DefaultTransactionalStreamDataBatchProcessing.class);
    protected String id;
    protected FlexibleBatchProcessor<M> batchProcessor;
    protected List<StreamDataSupplierWithIdAndRange<M, ?>> suppliers;
    protected SequentialTransactionsCoordinator txCoordinator;
    protected Options processorOptions;
    protected Map<String, DefaultTransactionalStreamDataBatchProcessing<M>.Processor> processors;

    /* loaded from: input_file:net/sf/jabb/txsdp/DefaultTransactionalStreamDataBatchProcessing$Options.class */
    public static class Options {
        public static final int STICKY_NEVER = 0;
        public static final int STICKY_WHEN_OPEN_RANGE_SUCCEEDED = 1;
        public static final int STICKY_WHEN_OPEN_RANGE_SUCCEEDED_OR_NO_DATA = 2;
        private Duration initialTransactionTimeoutDuration;
        private int maxInProgressTransactions;
        private int maxRetringTransactions;
        private Duration transactionAcquisitionDelay;
        private WaitStrategy waitStrategy;
        private int stickyMode;

        public Options() {
            this.stickyMode = 0;
        }

        public Options(Options options) {
            this.stickyMode = 0;
            this.initialTransactionTimeoutDuration = options.initialTransactionTimeoutDuration;
            this.maxInProgressTransactions = options.maxInProgressTransactions;
            this.maxRetringTransactions = options.maxRetringTransactions;
            this.transactionAcquisitionDelay = options.transactionAcquisitionDelay;
            this.waitStrategy = options.waitStrategy;
            this.stickyMode = options.stickyMode;
        }

        public Duration getInitialTransactionTimeoutDuration() {
            return this.initialTransactionTimeoutDuration;
        }

        public void setInitialTransactionTimeoutDuration(Duration duration) {
            this.initialTransactionTimeoutDuration = duration;
        }

        public Options withInitialTransactionTimeoutDuration(Duration duration) {
            this.initialTransactionTimeoutDuration = duration;
            return this;
        }

        public int getMaxInProgressTransactions() {
            return this.maxInProgressTransactions;
        }

        public void setMaxInProgressTransactions(int i) {
            this.maxInProgressTransactions = i;
        }

        public Options withMaxInProgressTransactions(int i) {
            this.maxInProgressTransactions = i;
            return this;
        }

        public int getMaxRetringTransactions() {
            return this.maxRetringTransactions;
        }

        public void setMaxRetringTransactions(int i) {
            this.maxRetringTransactions = i;
        }

        public Options withMaxRetringTransactions(int i) {
            this.maxRetringTransactions = i;
            return this;
        }

        public Duration getTransactionAcquisitionDelay() {
            return this.transactionAcquisitionDelay;
        }

        public void setTransactionAcquisitionDelay(Duration duration) {
            this.transactionAcquisitionDelay = duration;
        }

        public Options withTransactionAcquisitionDelay(Duration duration) {
            this.transactionAcquisitionDelay = duration;
            return this;
        }

        public WaitStrategy getWaitStrategy() {
            return this.waitStrategy;
        }

        public void setWaitStrategy(WaitStrategy waitStrategy) {
            this.waitStrategy = waitStrategy;
        }

        public Options withWaitStrategy(WaitStrategy waitStrategy) {
            this.waitStrategy = waitStrategy;
            return this;
        }

        public Options withStickyMode(int i) {
            this.stickyMode = i;
            return this;
        }

        public Options withNoSticky() {
            this.stickyMode = 0;
            return this;
        }

        public Options withStickyWhenOpenRangeSucceeded() {
            this.stickyMode = 1;
            return this;
        }

        public Options withStickyWhenOpenRangeSucceededOrNoData() {
            this.stickyMode = 2;
            return this;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:net/sf/jabb/txsdp/DefaultTransactionalStreamDataBatchProcessing$Processor.class */
    public class Processor implements Runnable {
        protected AtomicReference<TransactionalStreamDataBatchProcessing.State> state = new AtomicReference<>(TransactionalStreamDataBatchProcessing.State.READY);
        private String processorId;

        Processor(String str) {
            this.processorId = str;
        }

        private void await() {
            long millis = DefaultTransactionalStreamDataBatchProcessing.this.processorOptions.getTransactionAcquisitionDelay().toMillis();
            if (millis > 0) {
                WaitStrategy waitStrategy = DefaultTransactionalStreamDataBatchProcessing.this.processorOptions.getWaitStrategy();
                try {
                    waitStrategy.await(millis);
                } catch (InterruptedException e) {
                    waitStrategy.handleInterruptedException(e);
                }
            }
        }

        private boolean allProcessed(boolean[] zArr) {
            for (boolean z : zArr) {
                if (!z) {
                    return false;
                }
            }
            return true;
        }

        @Override // java.lang.Runnable
        public void run() {
            String transactionId;
            String endPosition;
            String nextStartPosition;
            DefaultTransactionalStreamDataBatchProcessing.logger.debug("[{}] Start running: {}", this.processorId, this.state);
            ArrayList arrayList = new ArrayList(DefaultTransactionalStreamDataBatchProcessing.this.suppliers.size());
            arrayList.addAll(DefaultTransactionalStreamDataBatchProcessing.this.suppliers);
            boolean[] zArr = new boolean[arrayList.size()];
            int nextInt = new Random().nextInt(zArr.length);
            ProcessingContextImpl processingContextImpl = new ProcessingContextImpl(DefaultTransactionalStreamDataBatchProcessing.this.txCoordinator);
            boolean z = false;
            while (true) {
                if (this.state.compareAndSet(TransactionalStreamDataBatchProcessing.State.STOPPING, TransactionalStreamDataBatchProcessing.State.STOPPED)) {
                    break;
                }
                if (!arrayList.equals(DefaultTransactionalStreamDataBatchProcessing.this.suppliers)) {
                    arrayList.clear();
                    arrayList.addAll(DefaultTransactionalStreamDataBatchProcessing.this.suppliers);
                    zArr = new boolean[arrayList.size()];
                    nextInt %= zArr.length;
                }
                while (true) {
                    if (this.state.get() != TransactionalStreamDataBatchProcessing.State.RUNNING) {
                        break;
                    }
                    if (allProcessed(zArr)) {
                        this.state.set(TransactionalStreamDataBatchProcessing.State.FINISHED);
                        break;
                    }
                    long currentTimeMillis = System.currentTimeMillis();
                    int i = 0;
                    SequentialTransaction sequentialTransaction = null;
                    String str = null;
                    StreamDataSupplierWithIdAndRange<M, ?> streamDataSupplierWithIdAndRange = null;
                    StreamDataSupplier<M> streamDataSupplier = null;
                    boolean z2 = (DefaultTransactionalStreamDataBatchProcessing.this.processorOptions.stickyMode == 1 && processingContextImpl.isOpenRangeSuccessfullyClosed) || (DefaultTransactionalStreamDataBatchProcessing.this.processorOptions.stickyMode == 2 && (processingContextImpl.isOpenRangeSuccessfullyClosed || processingContextImpl.isOpenRangeAbortedBecauseNothingReceived));
                    if (z != z2) {
                        z = z2;
                        Logger logger = DefaultTransactionalStreamDataBatchProcessing.logger;
                        Object[] objArr = new Object[3];
                        objArr[0] = this.processorId;
                        objArr[1] = z ? "" : "no longer ";
                        objArr[2] = DefaultTransactionalStreamDataBatchProcessing.this.seriesId((StreamDataSupplierWithIdAndRange) arrayList.get(nextInt));
                        logger.debug("Processor '{}' {}stick on '{}'", objArr);
                    }
                    if (!z) {
                        nextInt = (nextInt + 1) % zArr.length;
                    }
                    while (true) {
                        try {
                            try {
                                try {
                                    if (this.state.get() != TransactionalStreamDataBatchProcessing.State.RUNNING || zArr[nextInt]) {
                                        break;
                                    }
                                    streamDataSupplierWithIdAndRange = (StreamDataSupplierWithIdAndRange) arrayList.get(nextInt);
                                    streamDataSupplier = streamDataSupplierWithIdAndRange.getSupplier();
                                    str = DefaultTransactionalStreamDataBatchProcessing.this.seriesId(streamDataSupplierWithIdAndRange);
                                    if (z) {
                                        sequentialTransaction = processingContextImpl.transaction;
                                        break;
                                    }
                                    i++;
                                    try {
                                        sequentialTransaction = DefaultTransactionalStreamDataBatchProcessing.this.txCoordinator.startTransaction(str, this.processorId, DefaultTransactionalStreamDataBatchProcessing.this.processorOptions.getInitialTransactionTimeoutDuration(), DefaultTransactionalStreamDataBatchProcessing.this.processorOptions.getMaxInProgressTransactions(), DefaultTransactionalStreamDataBatchProcessing.this.processorOptions.getMaxRetringTransactions());
                                    } catch (Exception e) {
                                        DefaultTransactionalStreamDataBatchProcessing.logger.warn("[{}] Processor {} startTransaction(...) failed", new Object[]{str, this.processorId, e});
                                    }
                                    if (sequentialTransaction != null) {
                                        break;
                                    }
                                    await();
                                    nextInt = (nextInt + 1) % zArr.length;
                                } catch (Throwable th) {
                                    processingContextImpl.isOpenRangeAbortedBecauseNothingReceived = false;
                                    processingContextImpl.isOpenRangeSuccessfullyClosed = false;
                                    processingContextImpl.isOutOfRangeMessageReached = false;
                                    throw th;
                                }
                            } catch (Exception e2) {
                                DefaultTransactionalStreamDataBatchProcessing.logger.error("[{}] Error happened", str, e2);
                                sequentialTransaction = null;
                                processingContextImpl.isOpenRangeAbortedBecauseNothingReceived = false;
                                processingContextImpl.isOpenRangeSuccessfullyClosed = false;
                                processingContextImpl.isOutOfRangeMessageReached = false;
                            }
                        } catch (DuplicatedTransactionIdException e3) {
                            DefaultTransactionalStreamDataBatchProcessing.logger.warn("[{}] Transaction ID is duplicated: " + sequentialTransaction.getTransactionId(), str, e3);
                            sequentialTransaction = null;
                            processingContextImpl.isOpenRangeAbortedBecauseNothingReceived = false;
                            processingContextImpl.isOpenRangeSuccessfullyClosed = false;
                            processingContextImpl.isOutOfRangeMessageReached = false;
                        } catch (TransactionStorageInfrastructureException e4) {
                            DefaultTransactionalStreamDataBatchProcessing.logger.debug("[{}] In transaction storage infrastructure error happened", str, e4);
                            sequentialTransaction = null;
                            processingContextImpl.isOpenRangeAbortedBecauseNothingReceived = false;
                            processingContextImpl.isOpenRangeSuccessfullyClosed = false;
                            processingContextImpl.isOutOfRangeMessageReached = false;
                        }
                    }
                    while (sequentialTransaction != null && ((!sequentialTransaction.hasStarted() || (z && sequentialTransaction == processingContextImpl.transaction)) && this.state.get() == TransactionalStreamDataBatchProcessing.State.RUNNING)) {
                        if (z && sequentialTransaction == processingContextImpl.transaction && processingContextImpl.isOpenRangeAbortedBecauseNothingReceived) {
                            transactionId = processingContextImpl.previousTransactionPreviousTransactionId;
                            endPosition = processingContextImpl.previousTransactionEndPosition;
                            nextStartPosition = sequentialTransaction.getStartPosition();
                        } else {
                            transactionId = sequentialTransaction.getTransactionId();
                            endPosition = (z && sequentialTransaction == processingContextImpl.transaction && processingContextImpl.isOpenRangeSuccessfullyClosed) ? sequentialTransaction.getEndPosition() : sequentialTransaction.getStartPosition();
                            nextStartPosition = endPosition == null ? "" : streamDataSupplier.nextStartPosition(endPosition);
                        }
                        sequentialTransaction.setTransactionId(null);
                        sequentialTransaction.setStartPosition(nextStartPosition);
                        sequentialTransaction.setEndPositionNull();
                        sequentialTransaction.setTimeout(DefaultTransactionalStreamDataBatchProcessing.this.processorOptions.getInitialTransactionTimeoutDuration());
                        i++;
                        processingContextImpl.previousTransactionPreviousTransactionId = transactionId;
                        processingContextImpl.previousTransactionEndPosition = endPosition;
                        sequentialTransaction = DefaultTransactionalStreamDataBatchProcessing.this.txCoordinator.startTransaction(str, transactionId, endPosition, sequentialTransaction, DefaultTransactionalStreamDataBatchProcessing.this.processorOptions.getMaxInProgressTransactions(), DefaultTransactionalStreamDataBatchProcessing.this.processorOptions.getMaxRetringTransactions());
                        if (DefaultTransactionalStreamDataBatchProcessing.logger.isDebugEnabled()) {
                            Logger logger2 = DefaultTransactionalStreamDataBatchProcessing.logger;
                            Object[] objArr2 = new Object[8];
                            objArr2[0] = str;
                            objArr2[1] = this.processorId;
                            objArr2[2] = Boolean.valueOf(z);
                            objArr2[3] = transactionId;
                            objArr2[4] = endPosition;
                            objArr2[5] = nextStartPosition;
                            objArr2[6] = sequentialTransaction == null ? null : sequentialTransaction.getTransactionId();
                            objArr2[7] = sequentialTransaction == null ? null : Boolean.valueOf(sequentialTransaction.hasStarted());
                            logger2.debug("[{}] Processor {} tried to start transaction: sticky={}, previousTransactionId={}, previousEndPosition={}, startPosition={}, returnedTransactionId={}, returnedTransactionHasStarted={}", objArr2);
                        }
                    }
                    processingContextImpl.isOpenRangeAbortedBecauseNothingReceived = false;
                    processingContextImpl.isOpenRangeSuccessfullyClosed = false;
                    processingContextImpl.isOutOfRangeMessageReached = false;
                    if (sequentialTransaction != null && sequentialTransaction.hasStarted() && this.state.get() == TransactionalStreamDataBatchProcessing.State.RUNNING) {
                        if (DefaultTransactionalStreamDataBatchProcessing.logger.isDebugEnabled()) {
                            Logger logger3 = DefaultTransactionalStreamDataBatchProcessing.logger;
                            Object[] objArr3 = new Object[8];
                            objArr3[0] = str;
                            objArr3[1] = this.processorId;
                            objArr3[2] = sequentialTransaction.getAttempts() == 1 ? "new" : "failed";
                            objArr3[3] = sequentialTransaction.getTransactionId();
                            objArr3[4] = sequentialTransaction.getStartPosition();
                            objArr3[5] = sequentialTransaction.getEndPosition();
                            objArr3[6] = Integer.valueOf(i);
                            objArr3[7] = DurationFormatter.formatSince(currentTimeMillis);
                            logger3.debug("[{}] Processor {} got a {} transaction {} ({}-{}] after {} attempts: {}", objArr3);
                        }
                        doTransaction(processingContextImpl.withSeriesId(str).withTransaction(sequentialTransaction), streamDataSupplierWithIdAndRange);
                        if (processingContextImpl.isOutOfRangeMessageReached) {
                            try {
                                String finishedPosition = SequentialTransactionsCoordinator.getFinishedPosition(DefaultTransactionalStreamDataBatchProcessing.this.txCoordinator.getRecentTransactions(str));
                                if (finishedPosition != null && (finishedPosition.equals(sequentialTransaction.getStartPosition()) || finishedPosition.equals(sequentialTransaction.getEndPosition()))) {
                                    zArr[nextInt] = true;
                                }
                            } catch (Exception e5) {
                                DefaultTransactionalStreamDataBatchProcessing.logger.warn("[{}] Processor {} failed to get recent transactions", new Object[]{str, this.processorId, e5});
                            }
                        }
                    } else if (this.state.get() == TransactionalStreamDataBatchProcessing.State.RUNNING && !zArr[nextInt]) {
                        await();
                    }
                }
                this.state.compareAndSet(TransactionalStreamDataBatchProcessing.State.PAUSING, TransactionalStreamDataBatchProcessing.State.PAUSED);
                if (allProcessed(zArr)) {
                    this.state.set(TransactionalStreamDataBatchProcessing.State.FINISHED);
                    break;
                }
            }
            DefaultTransactionalStreamDataBatchProcessing.logger.debug("[{}] Finish running: {}", this.processorId, this.state);
        }

        protected void doTransaction(ProcessingContextImpl processingContextImpl, StreamDataSupplierWithIdAndRange<M, ?> streamDataSupplierWithIdAndRange) {
            String str = processingContextImpl.seriesId;
            SequentialTransaction sequentialTransaction = processingContextImpl.transaction;
            Boolean bool = false;
            String str2 = null;
            ReceiveStatus receiveStatus = null;
            boolean z = sequentialTransaction.getEndPosition() == null;
            boolean z2 = false;
            boolean z3 = false;
            try {
            } catch (Exception e) {
                if (DefaultTransactionalStreamDataBatchProcessing.logger.isDebugEnabled()) {
                    logDebugInTransaction("Processing is not successful", processingContextImpl, str2, e);
                }
            }
            if (!DefaultTransactionalStreamDataBatchProcessing.this.batchProcessor.initialize(processingContextImpl)) {
                throw new Exception("Unable to initilize processor");
            }
            long receive = DefaultTransactionalStreamDataBatchProcessing.this.batchProcessor.receive(processingContextImpl, null);
            receiveStatus = streamDataSupplierWithIdAndRange.receiveInRange(obj -> {
                return Long.valueOf((this.state.get() != TransactionalStreamDataBatchProcessing.State.RUNNING || processingContextImpl.getTransactionTimeout().toEpochMilli() <= System.currentTimeMillis()) ? 0L : DefaultTransactionalStreamDataBatchProcessing.this.batchProcessor.receive(processingContextImpl, obj));
            }, sequentialTransaction.getStartPosition(), sequentialTransaction.getEndPosition());
            str2 = receiveStatus.getLastPosition();
            if (str2 != null) {
                if (z) {
                    try {
                        DefaultTransactionalStreamDataBatchProcessing.this.txCoordinator.updateTransactionEndPosition(str, this.processorId, sequentialTransaction.getTransactionId(), str2);
                        sequentialTransaction.setEndPosition(str2);
                        z2 = true;
                    } catch (Exception e2) {
                        throw new Exception("Unable to update end position in open range transaction", e2);
                    }
                } else if (!str2.equals(sequentialTransaction.getEndPosition())) {
                    throw new Exception("Unable to fetch all the data in range within duration " + DurationFormatter.format(receive) + ", endPosition=" + sequentialTransaction.getEndPosition() + ", fetchedLastPosition=" + str2);
                }
                bool = DefaultTransactionalStreamDataBatchProcessing.this.batchProcessor.finish(processingContextImpl);
            } else {
                if (DefaultTransactionalStreamDataBatchProcessing.logger.isDebugEnabled()) {
                    logDebugInTransaction("Fetched nothing within " + DurationFormatter.format(receive), processingContextImpl, str2, receiveStatus.isOutOfRangeReached());
                }
                if (receiveStatus.isOutOfRangeReached()) {
                    bool = DefaultTransactionalStreamDataBatchProcessing.this.batchProcessor.finish(processingContextImpl);
                }
            }
            if (bool != null) {
                if (bool.booleanValue()) {
                    try {
                        DefaultTransactionalStreamDataBatchProcessing.this.txCoordinator.finishTransaction(str, this.processorId, sequentialTransaction.getTransactionId());
                    } catch (Exception e3) {
                        if (DefaultTransactionalStreamDataBatchProcessing.logger.isDebugEnabled()) {
                            logDebugInTransaction("Unable to finish transaction", processingContextImpl, str2, e3);
                        }
                    }
                } else {
                    z3 = true;
                    try {
                        DefaultTransactionalStreamDataBatchProcessing.this.txCoordinator.abortTransaction(str, this.processorId, sequentialTransaction.getTransactionId());
                        if (DefaultTransactionalStreamDataBatchProcessing.logger.isDebugEnabled()) {
                            logDebugInTransaction("Aborted transaction", processingContextImpl, str2);
                        }
                    } catch (Exception e4) {
                        if (DefaultTransactionalStreamDataBatchProcessing.logger.isDebugEnabled()) {
                            logDebugInTransaction("Unable to abort transaction", processingContextImpl, str2, e4);
                        }
                    }
                }
            }
            processingContextImpl.isOutOfRangeMessageReached = receiveStatus == null ? false : receiveStatus.isOutOfRangeReached();
            processingContextImpl.isOpenRangeSuccessfullyClosed = z && z2 && !z3;
            processingContextImpl.isOpenRangeAbortedBecauseNothingReceived = z && str2 == null;
        }

        protected void logDebugInTransaction(String str, ProcessingContextImpl processingContextImpl, String str2, Exception exc) {
            SequentialTransaction sequentialTransaction = processingContextImpl.transaction;
            DefaultTransactionalStreamDataBatchProcessing.logger.debug("[{} - {}] " + str + ": transactionId={}, startPosition={}, endPosition={}, fetchedLastPosition={}. Exception: {}", new Object[]{processingContextImpl.seriesId, this.processorId, sequentialTransaction.getTransactionId(), sequentialTransaction.getStartPosition(), sequentialTransaction.getEndPosition(), str2, DefaultTransactionalStreamDataBatchProcessing.exceptionSummary(exc)});
        }

        protected void logDebugInTransaction(String str, ProcessingContextImpl processingContextImpl, String str2) {
            SequentialTransaction sequentialTransaction = processingContextImpl.transaction;
            DefaultTransactionalStreamDataBatchProcessing.logger.debug("[{} - {}] " + str + ": transactionId={}, startPosition={}, endPosition={}, fetchedLastPosition={}", new Object[]{processingContextImpl.seriesId, this.processorId, sequentialTransaction.getTransactionId(), sequentialTransaction.getStartPosition(), sequentialTransaction.getEndPosition(), str2});
        }

        protected void logDebugInTransaction(String str, ProcessingContextImpl processingContextImpl, String str2, boolean z) {
            SequentialTransaction sequentialTransaction = processingContextImpl.transaction;
            DefaultTransactionalStreamDataBatchProcessing.logger.debug("[{} - {}] " + str + ": transactionId={}, startPosition={}, endPosition={}, fetchedLastPosition={}, isOutOfRangeReached={}", new Object[]{processingContextImpl.seriesId, this.processorId, sequentialTransaction.getTransactionId(), sequentialTransaction.getStartPosition(), sequentialTransaction.getEndPosition(), str2, Boolean.valueOf(z)});
        }
    }

    public List<StreamDataSupplierWithIdAndRange<M, ?>> getSuppliers() {
        return this.suppliers;
    }

    @Override // net.sf.jabb.txsdp.TransactionalStreamDataBatchProcessing
    public void setSuppliers(List<StreamDataSupplierWithIdAndRange<M, ?>> list) {
        this.suppliers = list;
    }

    public SequentialTransactionsCoordinator getTransactionCoordinator() {
        return this.txCoordinator;
    }

    public DefaultTransactionalStreamDataBatchProcessing(String str, Options options, SequentialTransactionsCoordinator sequentialTransactionsCoordinator, FlexibleBatchProcessor<M> flexibleBatchProcessor, List<StreamDataSupplierWithIdAndRange<M, ?>> list) {
        this.processors = new ConcurrentHashMap();
        this.id = str;
        this.processorOptions = new Options(options);
        this.txCoordinator = sequentialTransactionsCoordinator;
        this.batchProcessor = flexibleBatchProcessor;
        this.suppliers = new ArrayList();
        this.suppliers.addAll(list);
    }

    @SafeVarargs
    public DefaultTransactionalStreamDataBatchProcessing(String str, Options options, SequentialTransactionsCoordinator sequentialTransactionsCoordinator, FlexibleBatchProcessor<M> flexibleBatchProcessor, StreamDataSupplierWithIdAndRange<M, ?>... streamDataSupplierWithIdAndRangeArr) {
        this(str, options, sequentialTransactionsCoordinator, flexibleBatchProcessor, Arrays.asList(streamDataSupplierWithIdAndRangeArr));
    }

    public DefaultTransactionalStreamDataBatchProcessing(String str, Options options, SequentialTransactionsCoordinator sequentialTransactionsCoordinator, SimpleBatchProcessor<M> simpleBatchProcessor, int i, Duration duration, Duration duration2, List<StreamDataSupplierWithIdAndRange<M, ?>> list) {
        this(str, options, sequentialTransactionsCoordinator, new SimpleFlexibleBatchProcessor(simpleBatchProcessor, i, duration, duration2), list);
    }

    @SafeVarargs
    public DefaultTransactionalStreamDataBatchProcessing(String str, Options options, SequentialTransactionsCoordinator sequentialTransactionsCoordinator, SimpleBatchProcessor<M> simpleBatchProcessor, int i, Duration duration, Duration duration2, StreamDataSupplierWithIdAndPositionRange<M>... streamDataSupplierWithIdAndPositionRangeArr) {
        this(str, options, sequentialTransactionsCoordinator, simpleBatchProcessor, i, duration, duration2, Arrays.asList(streamDataSupplierWithIdAndPositionRangeArr));
    }

    @Override // net.sf.jabb.txsdp.TransactionalStreamDataBatchProcessing
    public Runnable createProcessor(String str) {
        Validate.notNull(str, "Processor id cannot be null", new Object[0]);
        if (this.processors.containsKey(str)) {
            throw new IllegalArgumentException("Another runnable with the same processor ID already exists: " + str);
        }
        DefaultTransactionalStreamDataBatchProcessing<M>.Processor processor = new Processor(str);
        this.processors.put(str, processor);
        return processor;
    }

    protected void start(DefaultTransactionalStreamDataBatchProcessing<M>.Processor processor) {
        if (!processor.state.compareAndSet(TransactionalStreamDataBatchProcessing.State.READY, TransactionalStreamDataBatchProcessing.State.RUNNING) && !processor.state.compareAndSet(TransactionalStreamDataBatchProcessing.State.PAUSED, TransactionalStreamDataBatchProcessing.State.RUNNING)) {
            throw new IllegalStateException("Cannot start when in " + processor.state.get() + " state: " + ((Processor) processor).processorId);
        }
    }

    protected void pause(DefaultTransactionalStreamDataBatchProcessing<M>.Processor processor) {
        if (!processor.state.compareAndSet(TransactionalStreamDataBatchProcessing.State.RUNNING, TransactionalStreamDataBatchProcessing.State.PAUSING)) {
            throw new IllegalStateException("Cannot pause when not in running state: " + ((Processor) processor).processorId);
        }
    }

    protected void stop(DefaultTransactionalStreamDataBatchProcessing<M>.Processor processor) {
        if (processor.state.compareAndSet(TransactionalStreamDataBatchProcessing.State.RUNNING, TransactionalStreamDataBatchProcessing.State.STOPPING) || processor.state.compareAndSet(TransactionalStreamDataBatchProcessing.State.PAUSED, TransactionalStreamDataBatchProcessing.State.STOPPING) || !processor.state.compareAndSet(TransactionalStreamDataBatchProcessing.State.PAUSING, TransactionalStreamDataBatchProcessing.State.STOPPING)) {
        }
    }

    @Override // net.sf.jabb.txsdp.TransactionalStreamDataBatchProcessing
    public void start(String str) {
        DefaultTransactionalStreamDataBatchProcessing<M>.Processor processor = this.processors.get(str);
        Validate.notNull(processor, "There is no processor with the id: " + str, new Object[0]);
        start(processor);
    }

    @Override // net.sf.jabb.txsdp.TransactionalStreamDataBatchProcessing
    public void pause(String str) {
        DefaultTransactionalStreamDataBatchProcessing<M>.Processor processor = this.processors.get(str);
        Validate.notNull(processor, "There is no processor with the id: " + str, new Object[0]);
        pause(processor);
    }

    @Override // net.sf.jabb.txsdp.TransactionalStreamDataBatchProcessing
    public void stop(String str) {
        DefaultTransactionalStreamDataBatchProcessing<M>.Processor processor = this.processors.get(str);
        Validate.notNull(processor, "There is no processor with the id: " + str, new Object[0]);
        stop(processor);
    }

    @Override // net.sf.jabb.txsdp.TransactionalStreamDataBatchProcessing
    public void remove(String str) {
        this.processors.remove(str);
    }

    @Override // net.sf.jabb.txsdp.TransactionalStreamDataBatchProcessing
    public void removeUnused() {
        this.processors.keySet().removeAll((List) getProcessorStatus().entrySet().stream().filter(entry -> {
            return ((TransactionalStreamDataBatchProcessing.ProcessorStatus) entry.getValue()).getState().isUnused();
        }).map(entry2 -> {
            return (String) entry2.getKey();
        }).collect(Collectors.toList()));
    }

    @Override // net.sf.jabb.txsdp.TransactionalStreamDataBatchProcessing
    public void startAll() {
        Iterator<DefaultTransactionalStreamDataBatchProcessing<M>.Processor> it = this.processors.values().iterator();
        while (it.hasNext()) {
            start(it.next());
        }
    }

    @Override // net.sf.jabb.txsdp.TransactionalStreamDataBatchProcessing
    public void pauseAll() {
        Iterator<DefaultTransactionalStreamDataBatchProcessing<M>.Processor> it = this.processors.values().iterator();
        while (it.hasNext()) {
            pause(it.next());
        }
    }

    @Override // net.sf.jabb.txsdp.TransactionalStreamDataBatchProcessing
    public void stopAll() {
        Iterator<DefaultTransactionalStreamDataBatchProcessing<M>.Processor> it = this.processors.values().iterator();
        while (it.hasNext()) {
            stop(it.next());
        }
    }

    protected String seriesId(StreamDataSupplierWithIdAndRange<M, ?> streamDataSupplierWithIdAndRange) {
        return (this.id == null || this.id.length() == 0) ? streamDataSupplierWithIdAndRange.getId().replace('/', '_') : this.id + "_" + streamDataSupplierWithIdAndRange.getId().replace('/', '_');
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static String exceptionSummary(Throwable th) {
        Throwable th2 = th;
        StringBuilder sb = new StringBuilder();
        sb.append('[');
        sb.append(th2.getClass().getName());
        sb.append(":").append(th2.getMessage());
        sb.append(']');
        for (int i = 0; i < 5 && th2.getCause() != null && th2.getCause() != th2; i++) {
            th2 = th2.getCause();
            sb.append(" caused by [");
            sb.append(th2.getClass().getName());
            sb.append(":").append(th2.getMessage());
            sb.append(']');
        }
        return sb.toString();
    }

    @Override // net.sf.jabb.txsdp.TransactionalStreamDataBatchProcessing
    public TransactionalStreamDataBatchProcessing.ProcessorStatus getProcessorStatus(String str) {
        DefaultTransactionalStreamDataBatchProcessing<M>.Processor processor = this.processors.get(str);
        if (processor == null) {
            return null;
        }
        return new TransactionalStreamDataBatchProcessing.ProcessorStatus(processor.state.get());
    }

    @Override // net.sf.jabb.txsdp.TransactionalStreamDataBatchProcessing
    public SortedMap<String, TransactionalStreamDataBatchProcessing.ProcessorStatus> getProcessorStatus() {
        TreeMap treeMap = new TreeMap();
        for (DefaultTransactionalStreamDataBatchProcessing<M>.Processor processor : this.processors.values()) {
            treeMap.put(((Processor) processor).processorId, new TransactionalStreamDataBatchProcessing.ProcessorStatus(processor.state.get()));
        }
        return treeMap;
    }

    @Override // net.sf.jabb.txsdp.TransactionalStreamDataBatchProcessing
    public LinkedHashMap<String, TransactionalStreamDataBatchProcessing.StreamStatus> getStreamStatus() throws TransactionStorageInfrastructureException, DataStreamInfrastructureException {
        ArrayList<StreamDataSupplierWithIdAndRange<M, ?>> arrayList = new ArrayList(this.suppliers.size());
        arrayList.addAll(this.suppliers);
        LinkedHashMap<String, TransactionalStreamDataBatchProcessing.StreamStatus> linkedHashMap = new LinkedHashMap<>(arrayList.size());
        for (StreamDataSupplierWithIdAndRange<M, ?> streamDataSupplierWithIdAndRange : arrayList) {
            List<? extends ReadOnlySequentialTransaction> recentTransactions = this.txCoordinator.getRecentTransactions(seriesId(streamDataSupplierWithIdAndRange));
            SequentialTransactionsCoordinator.TransactionCounts transactionCounts = SequentialTransactionsCoordinator.getTransactionCounts(recentTransactions);
            String finishedPosition = SequentialTransactionsCoordinator.getFinishedPosition(recentTransactions);
            String str = null;
            String str2 = null;
            Instant instant = null;
            Instant instant2 = null;
            if (recentTransactions != null && recentTransactions.size() > 0) {
                ReadOnlySequentialTransaction readOnlySequentialTransaction = recentTransactions.get(recentTransactions.size() - 1);
                if (readOnlySequentialTransaction.isInProgress()) {
                    str = readOnlySequentialTransaction.getStartPosition();
                    str2 = readOnlySequentialTransaction.getEndPosition();
                    r16 = StringUtils.isNotBlank(str) ? streamDataSupplierWithIdAndRange.getSupplier().enqueuedTime(str) : null;
                    if (StringUtils.isNotBlank(str2)) {
                        instant2 = streamDataSupplierWithIdAndRange.getSupplier().enqueuedTime(str2);
                    }
                }
            }
            if (finishedPosition != null) {
                instant = streamDataSupplierWithIdAndRange.getSupplier().enqueuedTime(finishedPosition);
            }
            TransactionalStreamDataBatchProcessing.StreamStatus streamStatus = new TransactionalStreamDataBatchProcessing.StreamStatus();
            streamStatus.transactionCounts = transactionCounts;
            streamStatus.finishedPosition = finishedPosition;
            streamStatus.lastUnfinishedStartPosition = str;
            streamStatus.lastUnfinishedEndPosition = str2;
            streamStatus.finishedEnqueuedTime = instant;
            streamStatus.lastUnfinishedStartEnqueuedTime = r16;
            streamStatus.lastUnfinishedEndEnqueuedTime = instant2;
            linkedHashMap.put(streamDataSupplierWithIdAndRange.getId(), streamStatus);
        }
        return linkedHashMap;
    }
}
