package net.sf.jabb.dstream;

import com.google.common.base.Throwables;
import com.google.common.util.concurrent.SimpleTimeLimiter;
import com.google.common.util.concurrent.TimeLimiter;
import com.google.common.util.concurrent.UncheckedTimeoutException;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import net.sf.jabb.dstream.ex.DataStreamInfrastructureException;
import net.sf.jabb.util.bean.DoubleValueBean;
import net.sf.jabb.util.ex.ExceptionUncheckUtility;
import net.sf.jabb.util.jms.JmsUtility;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.jgroups.util.UUID;

/* loaded from: input_file:net/sf/jabb/dstream/JmsConsumerStreamDataSupplier.class */
public abstract class JmsConsumerStreamDataSupplier<M> implements StreamDataSupplier<M> {
    protected static long MAX_RECEIVE_TIMEOUT = 60000;
    protected static TimeLimiter timeLimiter = new SimpleTimeLimiter(new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, (BlockingQueue<Runnable>) new SynchronousQueue(), (ThreadFactory) new BasicThreadFactory.Builder().namingPattern(JmsConsumerStreamDataSupplier.class.getSimpleName() + "-time-limiter-%d").build()));
    protected Queue destination;
    protected Map<String, DoubleValueBean<Session, MessageConsumer>> receivingConsumers = new ConcurrentHashMap();

    protected abstract Connection getConnection();

    protected abstract String messageSelector(String str);

    protected abstract String messageSelector(Instant instant);

    protected abstract M convert(Message message);

    protected abstract String position(Message message);

    protected abstract Instant enqueuedTime(Message message);

    protected ReceiveStatus fetch(List<? super M> list, Duration duration, ExceptionUncheckUtility.FunctionThrowsExceptions<java.util.Queue<Message>, Boolean> functionThrowsExceptions) throws DataStreamInfrastructureException, InterruptedException {
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        boolean z = false;
        try {
            z = functionThrowsExceptions.apply(concurrentLinkedQueue).booleanValue();
        } catch (InterruptedException e) {
            throw e;
        } catch (TimeoutException | UncheckedTimeoutException e2) {
        } catch (Exception e3) {
            throw Throwables.propagate(e3);
        } catch (JMSException e4) {
            throw new DataStreamInfrastructureException((Throwable) e4);
        }
        int size = concurrentLinkedQueue.size();
        if (size <= 0) {
            return new SimpleReceiveStatus(null, null, z);
        }
        Message message = null;
        while (true) {
            int i = size;
            size--;
            if (i <= 0) {
                return new SimpleReceiveStatus(position(message), enqueuedTime(message), z);
            }
            message = (Message) concurrentLinkedQueue.remove();
            list.add(convert(message));
        }
    }

    protected ReceiveStatus fetch(List<? super M> list, String str, Predicate<Message> predicate, int i, Duration duration) throws DataStreamInfrastructureException, InterruptedException {
        return fetch(list, duration, queue -> {
            long nanoTime = System.nanoTime() + duration.toNanos();
            long nanoTime2 = (nanoTime - System.nanoTime()) / 1000000;
            Session session = null;
            MessageConsumer messageConsumer = null;
            boolean z = false;
            try {
                session = getConnection().createSession(false, 1);
                messageConsumer = session.createConsumer(this.destination, str);
                int i2 = 0;
                while (true) {
                    i2++;
                    if (i2 > i || nanoTime2 <= 0) {
                        break;
                    }
                    Message receive = messageConsumer.receive(nanoTime2);
                    if (receive != null) {
                        if (predicate.test(receive)) {
                            z = true;
                            break;
                        }
                        queue.add(receive);
                    }
                    nanoTime2 = (nanoTime - System.nanoTime()) / 1000000;
                }
                JmsUtility.closeSilently(messageConsumer, session);
                return Boolean.valueOf(z);
            } catch (Throwable th) {
                JmsUtility.closeSilently(messageConsumer, session);
                throw th;
            }
        });
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus fetch(List<? super M> list, String str, String str2, int i, Duration duration) throws DataStreamInfrastructureException, InterruptedException {
        return fetch(list, messageSelector(str), message -> {
            return (str2 == null || isInRange(position(message), str2)) ? false : true;
        }, i, duration);
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus fetch(List<? super M> list, Instant instant, Instant instant2, int i, Duration duration) throws DataStreamInfrastructureException, InterruptedException {
        return fetch(list, messageSelector(instant), message -> {
            return (instant2 == null || isInRange(enqueuedTime(message), instant2)) ? false : true;
        }, i, duration);
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus fetch(List<? super M> list, String str, Instant instant, int i, Duration duration) throws DataStreamInfrastructureException, InterruptedException {
        return fetch(list, messageSelector(str), message -> {
            return (instant == null || isInRange(enqueuedTime(message), instant)) ? false : true;
        }, i, duration);
    }

    protected String doStartAsyncReceiving(Consumer<M> consumer, String str) throws DataStreamInfrastructureException {
        String uuid = UUID.randomUUID().toString();
        try {
            Connection connection = getConnection();
            connection.stop();
            try {
                Session createSession = connection.createSession(false, 1);
                MessageConsumer createConsumer = createSession.createConsumer(this.destination, str);
                createConsumer.setMessageListener(message -> {
                    consumer.accept(convert(message));
                });
                this.receivingConsumers.put(uuid, new DoubleValueBean<>(createSession, createConsumer));
                connection.start();
                return uuid;
            } catch (Throwable th) {
                connection.start();
                throw th;
            }
        } catch (JMSException e) {
            throw new DataStreamInfrastructureException((Throwable) e);
        }
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public String startAsyncReceiving(Consumer<M> consumer, String str) throws DataStreamInfrastructureException {
        return doStartAsyncReceiving(consumer, messageSelector(str));
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public String startAsyncReceiving(Consumer<M> consumer, Instant instant) throws DataStreamInfrastructureException {
        return doStartAsyncReceiving(consumer, messageSelector(instant));
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public void stopAsyncReceiving(String str) {
        DoubleValueBean<Session, MessageConsumer> remove = this.receivingConsumers.remove(str);
        if (remove != null) {
            JmsUtility.closeSilently((MessageConsumer) remove.getValue2(), (Session) remove.getValue1());
        }
    }

    protected ReceiveStatus receive(Function<M, Long> function, String str, Function<Message, Integer> function2) throws DataStreamInfrastructureException {
        boolean z = false;
        try {
            try {
                Session createSession = getConnection().createSession(false, 1);
                MessageConsumer createConsumer = createSession.createConsumer(this.destination, str);
                long longValue = function.apply(null).longValue();
                Message message = null;
                while (true) {
                    if (longValue <= 0) {
                        break;
                    }
                    Message receive = createConsumer.receive(longValue < MAX_RECEIVE_TIMEOUT ? longValue : MAX_RECEIVE_TIMEOUT);
                    if (receive != null) {
                        int intValue = function2.apply(receive).intValue();
                        if (intValue > 0) {
                            z = true;
                            break;
                        }
                        longValue = function.apply(convert(receive)).longValue();
                        message = receive;
                        if (intValue == 0) {
                            break;
                        }
                    } else {
                        longValue = function.apply(null).longValue();
                    }
                }
                if (message != null) {
                    SimpleReceiveStatus simpleReceiveStatus = new SimpleReceiveStatus(position(message), enqueuedTime(message), z);
                    JmsUtility.closeSilently(createConsumer, createSession);
                    return simpleReceiveStatus;
                }
                SimpleReceiveStatus simpleReceiveStatus2 = new SimpleReceiveStatus(null, null, z);
                JmsUtility.closeSilently(createConsumer, createSession);
                return simpleReceiveStatus2;
            } catch (JMSException e) {
                throw new DataStreamInfrastructureException((Throwable) e);
            }
        } catch (Throwable th) {
            JmsUtility.closeSilently((MessageConsumer) null, (Session) null);
            throw th;
        }
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus receive(Function<M, Long> function, String str, String str2) throws DataStreamInfrastructureException {
        return receive(function, messageSelector(str), message -> {
            return Integer.valueOf(checkInRange(position(message), str2));
        });
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus receive(Function<M, Long> function, Instant instant, Instant instant2) throws DataStreamInfrastructureException {
        return receive(function, messageSelector(instant), message -> {
            return Integer.valueOf(checkInRange(enqueuedTime(message), instant2));
        });
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus receive(Function<M, Long> function, String str, Instant instant) throws DataStreamInfrastructureException {
        return receive(function, messageSelector(str), message -> {
            return Integer.valueOf(checkInRange(enqueuedTime(message), instant));
        });
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public ReceiveStatus receive(Function<M, Long> function, Instant instant, String str) throws DataStreamInfrastructureException {
        return receive(function, messageSelector(instant), message -> {
            return Integer.valueOf(checkInRange(position(message), str));
        });
    }
}
