package net.sf.jabb.dstream.eventhub;

import com.google.common.base.Throwables;
import com.google.common.util.concurrent.UncheckedTimeoutException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.time.Instant;
import java.util.Enumeration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import net.sf.jabb.azure.AzureEventHubUtility;
import net.sf.jabb.azure.EventHubAnnotations;
import net.sf.jabb.dstream.JmsConsumerStreamDataSupplier;
import net.sf.jabb.dstream.WrappedJmsConnection;
import net.sf.jabb.dstream.ex.DataStreamInfrastructureException;
import net.sf.jabb.util.jms.JmsUtility;
import net.sf.jabb.util.parallel.BackoffStrategies;
import net.sf.jabb.util.parallel.BackoffStrategy;
import net.sf.jabb.util.parallel.Sequencer;
import net.sf.jabb.util.parallel.WaitStrategies;
import net.sf.jabb.util.parallel.WaitStrategy;
import net.sf.jabb.util.text.DurationFormatter;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/sf/jabb/dstream/eventhub/EventHubQpidStreamDataSupplier.class */
public class EventHubQpidStreamDataSupplier<M> extends JmsConsumerStreamDataSupplier<M> {
    protected Function<Message, M> messageConverter;
    protected WrappedJmsConnection wrappedConnection;
    protected String identifier;
    protected WaitStrategy waitStrategy;
    private static final Logger logger = LoggerFactory.getLogger(EventHubQpidStreamDataSupplier.class);
    protected static final Sequencer idSequencer = new Sequencer();

    public EventHubQpidStreamDataSupplier(ConnectionFactory connectionFactory, Queue queue, Predicate<Connection> predicate, BackoffStrategy backoffStrategy, WaitStrategy waitStrategy, Function<Message, M> function) {
        Validate.notNull(connectionFactory, "connection factory cannot be null", new Object[0]);
        Validate.notNull(queue, "destination cannot be null", new Object[0]);
        Validate.notNull(predicate, "connection validator cannot be null", new Object[0]);
        Validate.notNull(backoffStrategy, "connect backoff strategy cannot be null", new Object[0]);
        Validate.notNull(waitStrategy, "wait strategy cannot be null", new Object[0]);
        Validate.notNull(function, "message converter cannot be null", new Object[0]);
        this.waitStrategy = waitStrategy;
        this.messageConverter = function;
        this.destination = queue;
        try {
            this.identifier = connectionFactory.toString() + "->" + queue.getQueueName();
        } catch (JMSException e) {
            this.identifier = connectionFactory.toString() + "->UNKNOWN";
        }
        this.wrappedConnection = new WrappedJmsConnection(connectionFactory, predicate, backoffStrategy, waitStrategy, false);
    }

    public EventHubQpidStreamDataSupplier(String str, String str2, String str3, String str4, String str5, String str6, Function<Message, M> function) {
        this(str, str2, str3, str4, str5, str6, BackoffStrategies.fibonacciBackoff(1000L, 20L, TimeUnit.SECONDS), WaitStrategies.threadSleepStrategy(), function);
    }

    public EventHubQpidStreamDataSupplier(String str, String str2, String str3, String str4, String str5, int i, Function<Message, M> function) {
        this(str, str2, str3, str4, str5, String.valueOf(i), BackoffStrategies.fibonacciBackoff(1000L, 20L, TimeUnit.SECONDS), WaitStrategies.threadSleepStrategy(), function);
    }

    public EventHubQpidStreamDataSupplier(String str, String str2, String str3, String str4, String str5, int i, BackoffStrategy backoffStrategy, WaitStrategy waitStrategy, Function<Message, M> function) {
        this(str, str2, str3, str4, str5, String.valueOf(i), backoffStrategy, waitStrategy, function);
    }

    public EventHubQpidStreamDataSupplier(String str, String str2, String str3, String str4, String str5, String str6, BackoffStrategy backoffStrategy, WaitStrategy waitStrategy, Function<Message, M> function) {
        Validate.notBlank(str, "server cannot be blank", new Object[0]);
        Validate.notBlank(str2, "Event Hub name cannot be blank", new Object[0]);
        Validate.notBlank(str3, "access policy name cannot be blank", new Object[0]);
        Validate.notBlank(str4, "access policy key cannot be blank", new Object[0]);
        Validate.notNull(backoffStrategy, "connect backoff strategy cannot be null", new Object[0]);
        Validate.notNull(waitStrategy, "connect wait strategy cannot be null", new Object[0]);
        Validate.notNull(function, "message converter cannot be null", new Object[0]);
        String makeClientId = makeClientId();
        this.messageConverter = function;
        this.destination = createQueue(str2, str5, str6);
        this.identifier = makeClientId + "->" + str3 + ":" + str + "/" + str2 + "/" + str5 + "/" + str6;
        this.wrappedConnection = createConnectionForReceiving(str, str3, str4, this.destination, backoffStrategy, waitStrategy);
    }

    protected static String makeClientId() {
        return EventHubQpidStreamDataSupplier.class.getSimpleName() + "-" + idSequencer.next() + "@" + getHostName();
    }

    protected static String getHostName() {
        String str;
        try {
            str = InetAddress.getLocalHost().getHostName().toLowerCase();
        } catch (UnknownHostException e) {
            str = "<HOSTNAME UNKNOWN>";
        }
        return str;
    }

    public static ConnectionFactory createConnectionFactory(String str, String str2, String str3) {
        ConnectionFactoryImpl connectionFactoryImpl = new ConnectionFactoryImpl("amqps", str, 5671, str2, str3, makeClientId(), str, true, 0);
        connectionFactoryImpl.setSyncPublish(false);
        return connectionFactoryImpl;
    }

    public static WrappedJmsConnection createConnection(String str, String str2, String str3, BackoffStrategy backoffStrategy, WaitStrategy waitStrategy, Predicate<Connection> predicate) {
        return new WrappedJmsConnection(createConnectionFactory(str, str2, str3), predicate, backoffStrategy, waitStrategy, false);
    }

    public static WrappedJmsConnection createConnectionForReceiving(String str, String str2, String str3, Destination destination, BackoffStrategy backoffStrategy, WaitStrategy waitStrategy) {
        return createConnection(str, str2, str3, backoffStrategy, waitStrategy, connection -> {
            return WrappedJmsConnection.validateConnectionByCreatingConsumer(connection, destination);
        });
    }

    public static WrappedJmsConnection createConnectionForSending(String str, String str2, String str3, Destination destination, BackoffStrategy backoffStrategy, WaitStrategy waitStrategy, String str4) {
        return createConnection(str, str2, str3, backoffStrategy, waitStrategy, connection -> {
            return WrappedJmsConnection.validateConnectionByCreatingProducer(connection, destination);
        });
    }

    public static Queue createQueue(String str, String str2, String str3) {
        return new QueueImpl(str + "/ConsumerGroups/" + (str2 == null ? AzureEventHubUtility.DEFAULT_CONSUMER_GROUP : str2) + "/Partitions/" + str3);
    }

    public static Queue createQueue(String str, String str2, int i) {
        return createQueue(str, str2, String.valueOf(i));
    }

    public static Queue createQueue(String str) {
        return new QueueImpl(str);
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public String firstPosition() {
        return String.valueOf(-1);
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public Instant enqueuedTime(String str) throws DataStreamInfrastructureException {
        try {
            Message firstMessageByReceive = firstMessageByReceive("amqp.annotation.x-opt-offset >= '" + str + "'", 0L);
            if (firstMessageByReceive != null) {
                return AzureEventHubUtility.getEventHubAnnotations(firstMessageByReceive).getEnqueuedTime();
            }
            logger.warn("Null message received for position {}", str);
            return null;
        } catch (JMSException | InterruptedException e) {
            throw new DataStreamInfrastructureException((Throwable) e);
        }
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public String firstPosition(Instant instant, Duration duration) throws DataStreamInfrastructureException, InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            Message firstMessageByReceive = firstMessageByReceive("amqp.annotation.x-opt-enqueued-time > '" + instant.toEpochMilli() + "'", duration.toMillis());
            EventHubAnnotations eventHubAnnotations = null;
            String str = null;
            if (firstMessageByReceive != null) {
                eventHubAnnotations = AzureEventHubUtility.getEventHubAnnotations(firstMessageByReceive);
                str = String.valueOf(eventHubAnnotations.getOffset() - 1);
            }
            logger.debug("First position in {} after {} identified within {}: {}", new Object[]{this.identifier, instant, DurationFormatter.formatSince(currentTimeMillis), eventHubAnnotations});
            return str;
        } catch (JMSException e) {
            throw new DataStreamInfrastructureException((Throwable) e);
        }
    }

    protected Message firstMessageByListener(String str, long j) throws JMSException, InterruptedException {
        AtomicReference atomicReference = new AtomicReference(null);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Session session = null;
        MessageConsumer messageConsumer = null;
        try {
            Connection connection = getConnection();
            session = connection.createSession(false, 1);
            connection.stop();
            try {
                messageConsumer = session.createConsumer(this.destination, str);
                messageConsumer.setMessageListener(message -> {
                    if (atomicBoolean.compareAndSet(false, true)) {
                        atomicReference.set(message);
                        countDownLatch.countDown();
                    }
                });
                connection.start();
                countDownLatch.await(j, TimeUnit.MILLISECONDS);
                JmsUtility.closeSilently(messageConsumer, session);
                return (Message) atomicReference.get();
            } catch (Throwable th) {
                connection.start();
                throw th;
            }
        } catch (Throwable th2) {
            JmsUtility.closeSilently(messageConsumer, session);
            throw th2;
        }
    }

    protected Message firstMessageByReceive(String str, long j) throws JMSException, InterruptedException {
        Session session = null;
        MessageConsumer messageConsumer = null;
        try {
            session = getConnection().createSession(false, 1);
            messageConsumer = session.createConsumer(this.destination, str);
            Message message = null;
            long currentTimeMillis = System.currentTimeMillis();
            for (long j2 = j > 0 ? j : Long.MAX_VALUE; message == null && j2 > 0; j2 -= System.currentTimeMillis() - currentTimeMillis) {
                message = messageConsumer.receive(j2);
            }
            Message message2 = message;
            JmsUtility.closeSilently(messageConsumer, session);
            return message2;
        } catch (Throwable th) {
            JmsUtility.closeSilently(messageConsumer, session);
            throw th;
        }
    }

    protected Message firstMessageByBrowser(String str, long j) throws JMSException, InterruptedException {
        Session session = null;
        QueueBrowser queueBrowser = null;
        try {
            session = getConnection().createSession(false, 1);
            queueBrowser = session.createBrowser(this.destination, str);
            Enumeration enumeration = queueBrowser.getEnumeration();
            Message message = null;
            try {
                message = (Message) timeLimiter.callWithTimeout(() -> {
                    if (enumeration.hasMoreElements()) {
                        return (Message) enumeration.nextElement();
                    }
                    return null;
                }, j, TimeUnit.MILLISECONDS, true);
            } catch (UncheckedTimeoutException e) {
            } catch (JMSException e2) {
                throw e2;
            } catch (Exception e3) {
                throw Throwables.propagate(e3);
            }
            Message message2 = message;
            JmsUtility.closeSilently(queueBrowser, session);
            return message2;
        } catch (Throwable th) {
            JmsUtility.closeSilently(queueBrowser, session);
            throw th;
        }
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public String lastPosition() throws DataStreamInfrastructureException {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            Long lastPositionByTryError = lastPositionByTryError();
            logger.debug("Last position in {} identified within {}: {}", new Object[]{this.identifier, DurationFormatter.formatSince(currentTimeMillis), lastPositionByTryError});
            if (lastPositionByTryError == null) {
                return null;
            }
            return lastPositionByTryError.toString();
        } catch (JMSException e) {
            throw new DataStreamInfrastructureException((Throwable) e);
        }
    }

    protected Long lastPositionByTryError() throws JMSException {
        String message;
        int indexOf;
        Long l = null;
        Session session = null;
        MessageConsumer messageConsumer = null;
        try {
            session = getConnection().createSession(false, 1);
            try {
                messageConsumer = session.createConsumer(this.destination, "amqp.annotation.x-opt-offset >= '4611686018427387902'");
            } catch (JMSException e) {
                Throwable cause = e.getCause();
                if (cause != null && cause.getMessage() != null && (indexOf = (message = cause.getMessage()).indexOf("The last offset in the system is ")) >= 0) {
                    int length = indexOf + "The last offset in the system is ".length();
                    int indexOf2 = message.indexOf(32, length + 1);
                    if (indexOf2 >= 0) {
                        l = Long.valueOf(StringUtils.removeEnd(StringUtils.removeStart(StringUtils.trimToNull(message.substring(length, indexOf2)), "'"), "'"));
                    }
                }
                if (l == null) {
                    throw e;
                }
            }
            JmsUtility.closeSilently(messageConsumer, session);
            return l;
        } catch (Throwable th) {
            JmsUtility.closeSilently(messageConsumer, session);
            throw th;
        }
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public String nextStartPosition(String str) {
        return str;
    }

    @Override // net.sf.jabb.dstream.JmsConsumerStreamDataSupplier
    protected Connection getConnection() {
        return this.wrappedConnection;
    }

    @Override // net.sf.jabb.dstream.JmsConsumerStreamDataSupplier
    protected String messageSelector(String str) {
        return "amqp.annotation.x-opt-offset > '" + str + "'";
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public boolean isInRange(String str, String str2) {
        Validate.isTrue(str != null, "position cannot be null", new Object[0]);
        return str2 == null || Long.parseLong(str) <= Long.parseLong(str2);
    }

    @Override // net.sf.jabb.dstream.JmsConsumerStreamDataSupplier
    protected M convert(Message message) {
        return this.messageConverter.apply(message);
    }

    @Override // net.sf.jabb.dstream.JmsConsumerStreamDataSupplier
    protected String position(Message message) {
        return String.valueOf(AzureEventHubUtility.getEventHubAnnotations(message).getOffset());
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public void start() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        this.wrappedConnection.establishConnection();
        while (System.currentTimeMillis() < currentTimeMillis + 60000) {
            if (!this.wrappedConnection.isConnecting()) {
                return;
            }
            try {
                this.waitStrategy.await(100L);
            } catch (InterruptedException e) {
                this.waitStrategy.handleInterruptedException(e);
            }
        }
        logger.warn("Connection is still not established after start: {}", DurationFormatter.formatSince(currentTimeMillis));
    }

    @Override // net.sf.jabb.dstream.StreamDataSupplier
    public void stop() throws Exception {
        this.wrappedConnection.close();
    }
}
