package org.flowable.eventregistry.spring.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.lang.reflect.Field;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.flowable.common.engine.api.FlowableException;
import org.flowable.common.engine.api.FlowableIllegalArgumentException;
import org.flowable.eventregistry.api.ChannelModelProcessor;
import org.flowable.eventregistry.api.EventRegistry;
import org.flowable.eventregistry.api.EventRepositoryService;
import org.flowable.eventregistry.model.ChannelModel;
import org.flowable.eventregistry.model.InboundChannelModel;
import org.flowable.eventregistry.model.KafkaInboundChannelModel;
import org.flowable.eventregistry.model.KafkaOutboundChannelModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.config.BeanExpressionContext;
import org.springframework.beans.factory.config.BeanExpressionResolver;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.config.EmbeddedValueResolver;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.expression.StandardBeanExpressionResolver;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.KafkaAdminOperations;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.ExceptionClassifier;
import org.springframework.kafka.listener.FailedRecordProcessor;
import org.springframework.kafka.listener.GenericMessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.PartitionPausingBackOffManagerFactory;
import org.springframework.kafka.retrytopic.DeadLetterPublishingRecovererFactory;
import org.springframework.kafka.retrytopic.DefaultDestinationTopicProcessor;
import org.springframework.kafka.retrytopic.DefaultDestinationTopicResolver;
import org.springframework.kafka.retrytopic.DestinationTopic;
import org.springframework.kafka.retrytopic.DestinationTopicProcessor;
import org.springframework.kafka.retrytopic.FixedDelayStrategy;
import org.springframework.kafka.retrytopic.ListenerContainerFactoryConfigurer;
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder;
import org.springframework.kafka.retrytopic.TopicSuffixingStrategy;
import org.springframework.kafka.support.Suffixer;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.converter.ConversionException;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.handler.invocation.MethodArgumentResolutionException;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.backoff.ExponentialRandomBackOffPolicy;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.backoff.SleepingBackOffPolicy;
import org.springframework.retry.backoff.UniformRandomBackOffPolicy;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.StringValueResolver;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;

/* loaded from: input_file:WEB-INF/lib/flowable-event-registry-spring-6.8.0.jar:org/flowable/eventregistry/spring/kafka/KafkaChannelDefinitionProcessor.class */
public class KafkaChannelDefinitionProcessor implements BeanFactoryAware, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, ChannelModelProcessor {
    public static final String CHANNEL_ID_PREFIX = "org.flowable.eventregistry.kafka.ChannelKafkaListenerEndpointContainer#";
    protected static final int DEFAULT_PARTITION_FOR_MANUAL_ASSIGNMENT = 0;
    protected KafkaOperations<Object, Object> kafkaOperations;
    protected KafkaAdminOperations kafkaAdminOperations;
    protected KafkaListenerEndpointRegistry endpointRegistry;
    protected KafkaListenerContainerFactory<?> containerFactory;
    protected BeanFactory beanFactory;
    protected ApplicationContext applicationContext;
    protected boolean contextRefreshed;
    protected ObjectMapper objectMapper;
    protected StringValueResolver embeddedValueResolver;
    protected BeanExpressionContext expressionContext;
    protected final Logger logger = LoggerFactory.getLogger(getClass());
    protected String containerFactoryBeanName = "kafkaListenerContainerFactory";
    protected BeanExpressionResolver resolver = new StandardBeanExpressionResolver();
    protected Map<String, Collection<String>> retryEndpointsByMainEndpointId = new HashMap();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:WEB-INF/lib/flowable-event-registry-spring-6.8.0.jar:org/flowable/eventregistry/spring/kafka/KafkaChannelDefinitionProcessor$Configuration.class */
    public static class Configuration {
        protected final KafkaListenerEndpoint endpoint;
        protected final KafkaListenerContainerFactory<?> factory;

        protected Configuration(KafkaListenerEndpoint kafkaListenerEndpoint, KafkaListenerContainerFactory<?> kafkaListenerContainerFactory) {
            this.endpoint = kafkaListenerEndpoint;
            this.factory = kafkaListenerContainerFactory;
        }

        public KafkaListenerEndpoint getEndpoint() {
            return this.endpoint;
        }

        public KafkaListenerContainerFactory<?> getFactory() {
            return this.factory;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:WEB-INF/lib/flowable-event-registry-spring-6.8.0.jar:org/flowable/eventregistry/spring/kafka/KafkaChannelDefinitionProcessor$ResolvedRetryConfiguration.class */
    public static class ResolvedRetryConfiguration {
        protected Integer attempts;
        protected String dltTopicSuffix;
        protected String retryTopicSuffix;
        protected FixedDelayStrategy fixedDelayTopicStrategy;
        protected TopicSuffixingStrategy topicSuffixingStrategy;
        protected SleepingBackOffPolicy<?> nonBlockingBackOff;
        protected boolean autoCreateTopics;
        protected int numPartitions;
        protected short replicationFactor;

        protected ResolvedRetryConfiguration() {
        }

        protected boolean hasRetryTopic() {
            return this.retryTopicSuffix != null;
        }

        protected boolean hasNoRetryTopic() {
            return !hasRetryTopic();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:WEB-INF/lib/flowable-event-registry-spring-6.8.0.jar:org/flowable/eventregistry/spring/kafka/KafkaChannelDefinitionProcessor$RetryTopicContainerFactoryDecorator.class */
    public static class RetryTopicContainerFactoryDecorator implements KafkaListenerContainerFactory<MessageListenerContainer> {
        private final KafkaListenerContainerFactory<?> delegate;
        private final Supplier<CommonErrorHandler> errorHandlerProvider;

        private RetryTopicContainerFactoryDecorator(KafkaListenerContainerFactory<?> kafkaListenerContainerFactory, Supplier<CommonErrorHandler> supplier) {
            this.delegate = kafkaListenerContainerFactory;
            this.errorHandlerProvider = supplier;
        }

        public MessageListenerContainer createListenerContainer(KafkaListenerEndpoint kafkaListenerEndpoint) {
            return decorate(this.delegate.createListenerContainer(kafkaListenerEndpoint));
        }

        public MessageListenerContainer createContainer(TopicPartitionOffset... topicPartitionOffsetArr) {
            return decorate(this.delegate.createContainer(topicPartitionOffsetArr));
        }

        public MessageListenerContainer createContainer(String... strArr) {
            return decorate(this.delegate.createContainer(strArr));
        }

        public MessageListenerContainer createContainer(Pattern pattern) {
            return decorate(this.delegate.createContainer(pattern));
        }

        protected MessageListenerContainer decorate(MessageListenerContainer messageListenerContainer) {
            if (messageListenerContainer instanceof AbstractMessageListenerContainer) {
                ((ConcurrentMessageListenerContainer) messageListenerContainer).setCommonErrorHandler(this.errorHandlerProvider.get());
            }
            return messageListenerContainer;
        }
    }

    public KafkaChannelDefinitionProcessor(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    }

    @Override // org.flowable.eventregistry.api.ChannelModelProcessor
    public boolean canProcess(ChannelModel channelModel) {
        return (channelModel instanceof KafkaInboundChannelModel) || (channelModel instanceof KafkaOutboundChannelModel);
    }

    @Override // org.flowable.eventregistry.api.ChannelModelProcessor
    public boolean canProcessIfChannelModelAlreadyRegistered(ChannelModel channelModel) {
        return channelModel instanceof KafkaOutboundChannelModel;
    }

    @Override // org.flowable.eventregistry.api.ChannelModelProcessor
    public void registerChannelModel(ChannelModel channelModel, String str, EventRegistry eventRegistry, EventRepositoryService eventRepositoryService, boolean z) {
        if (channelModel instanceof KafkaInboundChannelModel) {
            this.logger.info("Starting to register inbound channel {} in tenant {}", channelModel.getKey(), str);
            processAndRegisterEndpoints((KafkaInboundChannelModel) channelModel, str, eventRegistry);
            this.logger.info("Finished registering inbound channel {} in tenant {}", channelModel.getKey(), str);
        } else if (channelModel instanceof KafkaOutboundChannelModel) {
            this.logger.info("Starting to register outbound channel {} in tenant {}", channelModel.getKey(), str);
            processOutboundDefinition((KafkaOutboundChannelModel) channelModel);
            this.logger.info("Finished registering outbound channel {} in tenant {}", channelModel.getKey(), str);
        }
    }

    protected KafkaListenerEndpoint createKafkaListenerEndpoint(KafkaInboundChannelModel kafkaInboundChannelModel, String str, EventRegistry eventRegistry) {
        String endpointId = getEndpointId(kafkaInboundChannelModel, str);
        SimpleKafkaListenerEndpoint simpleKafkaListenerEndpoint = new SimpleKafkaListenerEndpoint();
        simpleKafkaListenerEndpoint.setId(endpointId);
        simpleKafkaListenerEndpoint.setGroupId(getEndpointGroupId(kafkaInboundChannelModel, simpleKafkaListenerEndpoint.getId()));
        simpleKafkaListenerEndpoint.setTopics(resolveTopics(kafkaInboundChannelModel));
        simpleKafkaListenerEndpoint.setTopicPattern(resolvePattern(kafkaInboundChannelModel));
        simpleKafkaListenerEndpoint.setTopicPartitions(resolveTopicPartitions(kafkaInboundChannelModel));
        simpleKafkaListenerEndpoint.setClientIdPrefix(resolveExpressionAsString(kafkaInboundChannelModel.getClientIdPrefix(), "clientIdPrefix"));
        simpleKafkaListenerEndpoint.setConcurrency(resolveExpressionAsInteger(kafkaInboundChannelModel.getConcurrency(), "concurrency"));
        simpleKafkaListenerEndpoint.setConsumerProperties(resolveProperties(kafkaInboundChannelModel.getCustomProperties()));
        simpleKafkaListenerEndpoint.setMessageListener(createMessageListener(eventRegistry, kafkaInboundChannelModel));
        return simpleKafkaListenerEndpoint;
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected void processAndRegisterEndpoints(KafkaInboundChannelModel kafkaInboundChannelModel, String str, EventRegistry eventRegistry) {
        KafkaListenerEndpoint createKafkaListenerEndpoint = createKafkaListenerEndpoint(kafkaInboundChannelModel, str, eventRegistry);
        Collection<Configuration> createEndpointConfigurations = createEndpointConfigurations(kafkaInboundChannelModel, str, eventRegistry, createKafkaListenerEndpoint, resolveContainerFactory(createKafkaListenerEndpoint, null));
        this.retryEndpointsByMainEndpointId.put(createKafkaListenerEndpoint.getId(), createEndpointConfigurations.stream().map((v0) -> {
            return v0.getEndpoint();
        }).map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        for (Configuration configuration : createEndpointConfigurations) {
            registerEndpoint(configuration.getEndpoint(), configuration.getFactory());
        }
    }

    protected Collection<Configuration> createEndpointConfigurations(KafkaInboundChannelModel kafkaInboundChannelModel, String str, EventRegistry eventRegistry, KafkaListenerEndpoint kafkaListenerEndpoint, KafkaListenerContainerFactory<?> kafkaListenerContainerFactory) {
        Collection topics;
        ResolvedRetryConfiguration resolveRetryConfiguration = resolveRetryConfiguration(kafkaInboundChannelModel);
        FixedBackOff fixedBackOff = (resolveRetryConfiguration == null || resolveRetryConfiguration.attempts == null) ? null : new FixedBackOff(0L, resolveRetryConfiguration.attempts.intValue() - 1);
        RetryTopicConfiguration createRetryTopicConfiguration = createRetryTopicConfiguration(resolveRetryConfiguration);
        if (createRetryTopicConfiguration == null) {
            if (fixedBackOff == null) {
                return Collections.singleton(new Configuration(kafkaListenerEndpoint, kafkaListenerContainerFactory));
            }
            FixedBackOff fixedBackOff2 = fixedBackOff;
            return Collections.singleton(new Configuration(kafkaListenerEndpoint, new RetryTopicContainerFactoryDecorator(kafkaListenerContainerFactory, () -> {
                return new DefaultErrorHandler(fixedBackOff2);
            })));
        }
        if (kafkaListenerEndpoint.getTopics().isEmpty()) {
            TopicPartitionOffset[] topicPartitionsToAssign = kafkaListenerEndpoint.getTopicPartitionsToAssign();
            topics = (topicPartitionsToAssign == null || topicPartitionsToAssign.length <= 0) ? Collections.emptyList() : (Collection) Arrays.stream(topicPartitionsToAssign).map((v0) -> {
                return v0.getTopic();
            }).collect(Collectors.toCollection(LinkedHashSet::new));
        } else {
            topics = kafkaListenerEndpoint.getTopics();
        }
        if (topics.isEmpty()) {
            throw new FlowableException("Channel model " + kafkaInboundChannelModel.getKey() + " in tenant " + str + " has retry configuration but no topics have been provided for it");
        }
        ArrayList arrayList = new ArrayList(createRetryTopicConfiguration.getDestinationTopicProperties().size());
        DefaultDestinationTopicResolver defaultDestinationTopicResolver = new DefaultDestinationTopicResolver(Clock.systemUTC(), this.applicationContext);
        DefaultDestinationTopicProcessor defaultDestinationTopicProcessor = new DefaultDestinationTopicProcessor(defaultDestinationTopicResolver);
        ListenerContainerFactoryConfigurer createListenerContainerFactoryConfigurer = createListenerContainerFactoryConfigurer(resolveRetryConfiguration, fixedBackOff, defaultDestinationTopicResolver);
        DestinationTopicProcessor.Context context = new DestinationTopicProcessor.Context(createRetryTopicConfiguration.getDestinationTopicProperties());
        Collection collection = topics;
        defaultDestinationTopicProcessor.processDestinationTopicProperties(properties -> {
            Suffixer suffixer = new Suffixer(properties.suffix());
            if (properties.isMainEndpoint() || properties.isDltTopic() || resolveRetryConfiguration.hasRetryTopic()) {
                Iterator it = collection.iterator();
                while (it.hasNext()) {
                    String str2 = (String) it.next();
                    defaultDestinationTopicProcessor.registerDestinationTopic(str2, suffixer.maybeAddTo(str2), properties, context);
                }
            }
            KafkaListenerEndpoint createKafkaListenerEndpoint = properties.isMainEndpoint() ? kafkaListenerEndpoint : (properties.isDltTopic() || !resolveRetryConfiguration.hasRetryTopic()) ? null : createKafkaListenerEndpoint(kafkaInboundChannelModel, str, eventRegistry);
            if (createKafkaListenerEndpoint instanceof SimpleKafkaListenerEndpoint) {
                SimpleKafkaListenerEndpoint simpleKafkaListenerEndpoint = (SimpleKafkaListenerEndpoint) createKafkaListenerEndpoint;
                simpleKafkaListenerEndpoint.setId(suffixer.maybeAddTo(simpleKafkaListenerEndpoint.getId()));
                simpleKafkaListenerEndpoint.setGroupId(suffixer.maybeAddTo(simpleKafkaListenerEndpoint.getGroupId()));
                TopicPartitionOffset[] topicPartitionsToAssign2 = createKafkaListenerEndpoint.getTopicPartitionsToAssign();
                if (!createKafkaListenerEndpoint.getTopics().isEmpty() || topicPartitionsToAssign2 == null) {
                    simpleKafkaListenerEndpoint.setTopics(suffixer.maybeAddTo(simpleKafkaListenerEndpoint.getTopics()));
                } else {
                    simpleKafkaListenerEndpoint.setTopicPartitions(getTopicPartitions(properties, suffixer, createKafkaListenerEndpoint.getTopicPartitionsToAssign()));
                }
                simpleKafkaListenerEndpoint.setClientIdPrefix(suffixer.maybeAddTo(simpleKafkaListenerEndpoint.getClientIdPrefix()));
                arrayList.add(new Configuration(simpleKafkaListenerEndpoint, decorateFactory(properties, createListenerContainerFactoryConfigurer, createRetryTopicConfiguration)));
            }
        }, context);
        defaultDestinationTopicProcessor.processRegisteredDestinations(getTopicCreationFunction(resolveRetryConfiguration), context);
        defaultDestinationTopicResolver.onApplicationEvent(new ContextRefreshedEvent(this.applicationContext));
        return arrayList;
    }

    protected static Collection<TopicPartitionOffset> getTopicPartitions(DestinationTopic.Properties properties, Suffixer suffixer, TopicPartitionOffset[] topicPartitionOffsetArr) {
        return (Collection) Stream.of((Object[]) topicPartitionOffsetArr).map(topicPartitionOffset -> {
            return properties.isMainEndpoint() ? getTPOForMainTopic(suffixer, topicPartitionOffset) : getTPOForRetryTopics(properties, suffixer, topicPartitionOffset);
        }).collect(Collectors.toList());
    }

    protected static TopicPartitionOffset getTPOForRetryTopics(DestinationTopic.Properties properties, Suffixer suffixer, TopicPartitionOffset topicPartitionOffset) {
        return new TopicPartitionOffset(suffixer.maybeAddTo(topicPartitionOffset.getTopic()), topicPartitionOffset.getPartition() <= properties.numPartitions() ? topicPartitionOffset.getPartition() : 0);
    }

    protected static TopicPartitionOffset getTPOForMainTopic(Suffixer suffixer, TopicPartitionOffset topicPartitionOffset) {
        TopicPartitionOffset topicPartitionOffset2 = new TopicPartitionOffset(suffixer.maybeAddTo(topicPartitionOffset.getTopic()), topicPartitionOffset.getPartition(), topicPartitionOffset.getOffset(), topicPartitionOffset.getPosition());
        topicPartitionOffset2.setRelativeToCurrent(topicPartitionOffset.isRelativeToCurrent());
        return topicPartitionOffset2;
    }

    protected Consumer<Collection<String>> getTopicCreationFunction(ResolvedRetryConfiguration resolvedRetryConfiguration) {
        if (!resolvedRetryConfiguration.autoCreateTopics) {
            return collection -> {
            };
        }
        if (this.kafkaAdminOperations == null) {
            throw new FlowableException("It is not possible to auto create new topics when no kafka admin operations have been configured");
        }
        return collection2 -> {
            createNewTopics(collection2, resolvedRetryConfiguration.numPartitions, resolvedRetryConfiguration.replicationFactor);
        };
    }

    protected void createNewTopics(Collection<String> collection, int i, short s) {
        this.kafkaAdminOperations.createOrModifyTopics((NewTopic[]) collection.stream().map(str -> {
            return new NewTopic(str, i, s);
        }).toArray(i2 -> {
            return new NewTopic[i2];
        }));
    }

    protected ListenerContainerFactoryConfigurer createListenerContainerFactoryConfigurer(ResolvedRetryConfiguration resolvedRetryConfiguration, BackOff backOff, DefaultDestinationTopicResolver defaultDestinationTopicResolver) {
        DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory = new DeadLetterPublishingRecovererFactory(defaultDestinationTopicResolver);
        PartitionPausingBackOffManagerFactory partitionPausingBackOffManagerFactory = new PartitionPausingBackOffManagerFactory(this.endpointRegistry);
        partitionPausingBackOffManagerFactory.setApplicationContext(this.applicationContext);
        ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer = new ListenerContainerFactoryConfigurer(partitionPausingBackOffManagerFactory.create(), deadLetterPublishingRecovererFactory, Clock.systemUTC());
        if (resolvedRetryConfiguration.hasNoRetryTopic()) {
            listenerContainerFactoryConfigurer.setErrorHandlerCustomizer(commonErrorHandler -> {
                if (commonErrorHandler instanceof ExceptionClassifier) {
                    HashMap hashMap = new HashMap();
                    hashMap.put(DeserializationException.class, false);
                    hashMap.put(MessageConversionException.class, false);
                    hashMap.put(ConversionException.class, false);
                    hashMap.put(MethodArgumentResolutionException.class, false);
                    hashMap.put(NoSuchMethodException.class, false);
                    hashMap.put(ClassCastException.class, false);
                    ((ExceptionClassifier) commonErrorHandler).setClassifications(hashMap, true);
                }
                if (commonErrorHandler instanceof FailedRecordProcessor) {
                    ((FailedRecordProcessor) commonErrorHandler).setCommitRecovered(false);
                }
            });
            if (backOff != null) {
                listenerContainerFactoryConfigurer.setBlockingRetriesBackOff(backOff);
            }
        }
        return listenerContainerFactoryConfigurer;
    }

    protected KafkaListenerContainerFactory<?> decorateFactory(DestinationTopic.Properties properties, ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer, RetryTopicConfiguration retryTopicConfiguration) {
        return properties.isMainEndpoint() ? listenerContainerFactoryConfigurer.decorateFactoryWithoutSettingContainerProperties(this.containerFactory, retryTopicConfiguration.forContainerFactoryConfigurer()) : listenerContainerFactoryConfigurer.decorateFactory(this.containerFactory, retryTopicConfiguration.forContainerFactoryConfigurer());
    }

    protected void processOutboundDefinition(KafkaOutboundChannelModel kafkaOutboundChannelModel) {
        String topic = kafkaOutboundChannelModel.getTopic();
        if (kafkaOutboundChannelModel.getOutboundEventChannelAdapter() == null && StringUtils.hasText(topic)) {
            String resolve = resolve(topic);
            kafkaOutboundChannelModel.setOutboundEventChannelAdapter(new KafkaOperationsOutboundEventChannelAdapter(this.kafkaOperations, resolveKafkaPartitionProvider(kafkaOutboundChannelModel), resolve, kafkaOutboundChannelModel.getRecordKey()));
        }
    }

    protected Integer resolveExpressionAsInteger(String str, String str2) {
        return resolveExpressionAsInteger(str, str2, null);
    }

    protected Integer resolveExpressionAsInteger(String str, String str2, Integer num) {
        Object resolveExpression = resolveExpression(str);
        Integer num2 = num;
        if (resolveExpression instanceof String) {
            num2 = Integer.valueOf(Integer.parseInt((String) resolveExpression));
        } else if (resolveExpression instanceof Number) {
            num2 = Integer.valueOf(((Number) resolveExpression).intValue());
        } else if (resolveExpression != null) {
            throw new IllegalStateException("The [" + str2 + "] must resolve to an Number or a String that can be parsed as an Integer. Resolved to [" + resolveExpression.getClass() + "] for [" + str + "]");
        }
        return num2;
    }

    protected Long resolveExpressionAsLong(String str, String str2) {
        Object resolveExpression = resolveExpression(str);
        Long l = null;
        if (resolveExpression instanceof String) {
            l = Long.valueOf(Long.parseLong((String) resolveExpression));
        } else if (resolveExpression instanceof Number) {
            l = Long.valueOf(((Number) resolveExpression).longValue());
        } else if (resolveExpression != null) {
            throw new IllegalStateException("The [" + str2 + "] must resolve to an Number or a String that can be parsed as a Long. Resolved to [" + resolveExpression.getClass() + "] for [" + str + "]");
        }
        return l;
    }

    protected Double resolveExpressionAsDouble(String str, String str2) {
        Object resolveExpression = resolveExpression(str);
        Double d = null;
        if (resolveExpression instanceof String) {
            d = Double.valueOf(Double.parseDouble((String) resolveExpression));
        } else if (resolveExpression instanceof Number) {
            d = Double.valueOf(((Number) resolveExpression).doubleValue());
        } else if (resolveExpression != null) {
            throw new IllegalStateException("The [" + str2 + "] must resolve to an Number or a String that can be parsed as a Double. Resolved to [" + resolveExpression.getClass() + "] for [" + str + "]");
        }
        return d;
    }

    protected Boolean resolveExpressionAsBoolean(String str, String str2) {
        return resolveExpressionAsBoolean(str, str2, null);
    }

    protected Boolean resolveExpressionAsBoolean(String str, String str2, Boolean bool) {
        Object resolveExpression = resolveExpression(str);
        Boolean bool2 = bool;
        if (resolveExpression instanceof String) {
            bool2 = Boolean.valueOf(Boolean.parseBoolean((String) resolveExpression));
        } else if (resolveExpression instanceof Boolean) {
            bool2 = (Boolean) resolveExpression;
        } else if (resolveExpression != null) {
            throw new IllegalStateException("The [" + str2 + "] must resolve to a Boolean or a String that can be parsed as a Boolean. Resolved to [" + resolveExpression.getClass() + "] for [" + str + "]");
        }
        return bool2;
    }

    protected String resolveExpressionAsString(String str, String str2) {
        if (!StringUtils.hasLength(str)) {
            return null;
        }
        Object resolveExpression = resolveExpression(str);
        if (resolveExpression instanceof String) {
            return (String) resolveExpression;
        }
        throw new IllegalStateException("The [" + str2 + "] must resolve to a String. Resolved to [" + resolveExpression.getClass() + "] for [" + str + "]");
    }

    protected Collection<String> resolveTopics(KafkaInboundChannelModel kafkaInboundChannelModel) {
        Collection<String> topics = kafkaInboundChannelModel.getTopics();
        if (topics == null || topics.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = topics.iterator();
        while (it.hasNext()) {
            resolveTopics(resolveExpression(it.next()), arrayList, kafkaInboundChannelModel);
        }
        return arrayList;
    }

    protected void resolveTopics(Object obj, List<String> list, KafkaInboundChannelModel kafkaInboundChannelModel) {
        if (obj instanceof String[]) {
            for (String str : (String[]) obj) {
                resolveTopics(str, list, kafkaInboundChannelModel);
            }
            return;
        }
        if (obj instanceof String) {
            list.add((String) obj);
        } else {
            if (!(obj instanceof Iterable)) {
                throw new IllegalArgumentException("Channel definition " + kafkaInboundChannelModel + " cannot resolve " + obj + " as a String[] or a String");
            }
            Iterator it = ((Iterable) obj).iterator();
            while (it.hasNext()) {
                resolveTopics(it.next(), list, kafkaInboundChannelModel);
            }
        }
    }

    protected Pattern resolvePattern(KafkaInboundChannelModel kafkaInboundChannelModel) {
        Pattern pattern = null;
        String topicPattern = kafkaInboundChannelModel.getTopicPattern();
        if (StringUtils.hasText(topicPattern)) {
            Object resolveExpression = resolveExpression(topicPattern);
            if (resolveExpression instanceof String) {
                pattern = Pattern.compile((String) resolveExpression);
            } else if (resolveExpression instanceof Pattern) {
                pattern = (Pattern) resolveExpression;
            } else if (resolveExpression != null) {
                throw new IllegalStateException("topicPattern in channel model [ " + kafkaInboundChannelModel + " ] must resolve to a Pattern or String, not " + resolveExpression.getClass());
            }
        }
        return pattern;
    }

    protected Collection<TopicPartitionOffset> resolveTopicPartitions(KafkaInboundChannelModel kafkaInboundChannelModel) {
        Collection<KafkaInboundChannelModel.TopicPartition> topicPartitions = kafkaInboundChannelModel.getTopicPartitions();
        if (topicPartitions == null || topicPartitions.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        for (KafkaInboundChannelModel.TopicPartition topicPartition : topicPartitions) {
            String resolveExpressionAsString = resolveExpressionAsString(topicPartition.getTopic(), "topicPartitions[].topic");
            if (!StringUtils.hasText(resolveExpressionAsString)) {
                throw new FlowableIllegalArgumentException("topic in topic partition in channel model [ " + kafkaInboundChannelModel.getKey() + " ] must resolve to a non empty string");
            }
            Collection<String> partitions = topicPartition.getPartitions();
            if (partitions == null || partitions.isEmpty()) {
                throw new FlowableIllegalArgumentException("partitions in topic partition in channel model [ " + kafkaInboundChannelModel.getKey() + " ] must not be empty");
            }
            Iterator<String> it = partitions.iterator();
            while (it.hasNext()) {
                resolvePartitionAsInteger(resolveExpressionAsString, resolveExpression(it.next()), arrayList);
            }
        }
        return arrayList;
    }

    protected void resolvePartitionAsInteger(String str, Object obj, List<TopicPartitionOffset> list) {
        if (obj instanceof String[]) {
            for (String str2 : (String[]) obj) {
                resolvePartitionAsInteger(str, str2, list);
            }
            return;
        }
        if (obj instanceof String) {
            Assert.state(StringUtils.hasText((String) obj), (Supplier<String>) () -> {
                return "partition in TopicPartition for topic '" + str + "' cannot be empty";
            });
            list.addAll((List) parsePartitions((String) obj).map(num -> {
                return new TopicPartitionOffset(str, num.intValue());
            }).collect(Collectors.toList()));
            return;
        }
        if (obj instanceof Integer[]) {
            for (Integer num2 : (Integer[]) obj) {
                list.add(new TopicPartitionOffset(str, num2.intValue()));
            }
            return;
        }
        if (obj instanceof Integer) {
            list.add(new TopicPartitionOffset(str, ((Integer) obj).intValue()));
        } else {
            if (!(obj instanceof Iterable)) {
                throw new IllegalArgumentException("partition in TopicPartition for topic '" + str + "' can't resolve '" + obj + "' as an Integer or String");
            }
            Iterator it = ((Iterable) obj).iterator();
            while (it.hasNext()) {
                resolvePartitionAsInteger(str, it.next(), list);
            }
        }
    }

    protected Stream<Integer> parsePartitions(String str) {
        String[] split = str.split(",");
        if (split.length == 1 && !split[0].contains("-")) {
            return Stream.of(Integer.valueOf(Integer.parseInt(split[0].trim())));
        }
        ArrayList arrayList = new ArrayList();
        for (String str2 : split) {
            if (str2.contains("-")) {
                String[] split2 = str2.split("-");
                Assert.state(split2.length == 2, "Only one hyphen allowed for a range of partitions: " + str2);
                int parseInt = Integer.parseInt(split2[0].trim());
                int parseInt2 = Integer.parseInt(split2[1].trim());
                Assert.state(parseInt2 >= parseInt, "Invalid range: " + str2);
                for (int i = parseInt; i <= parseInt2; i++) {
                    arrayList.add(Integer.valueOf(i));
                }
            } else {
                parsePartitions(str2).forEach(num -> {
                    arrayList.add(num);
                });
            }
        }
        return arrayList.stream().sorted().distinct();
    }

    protected KafkaPartitionProvider resolveKafkaPartitionProvider(KafkaOutboundChannelModel kafkaOutboundChannelModel) {
        KafkaOutboundChannelModel.KafkaPartition partition = kafkaOutboundChannelModel.getPartition();
        if (partition == null) {
            return null;
        }
        if (StringUtils.hasText(partition.getEventField())) {
            return new EventPayloadKafkaPartitionProvider(partition.getEventField());
        }
        if (StringUtils.hasText(partition.getDelegateExpression())) {
            return (KafkaPartitionProvider) resolveExpression(partition.getDelegateExpression(), KafkaPartitionProvider.class);
        }
        if (!StringUtils.hasText(partition.getRoundRobin())) {
            throw new FlowableException("The kafka partition value was not found for the channel model with key " + kafkaOutboundChannelModel.getKey() + ". One of eventField, delegateExpression should be set.");
        }
        ArrayList arrayList = new ArrayList();
        resolvePartitionAsInteger(kafkaOutboundChannelModel.getTopic(), resolveExpression(partition.getRoundRobin()), arrayList);
        ArrayList arrayList2 = new ArrayList(arrayList.size());
        Iterator<TopicPartitionOffset> it = arrayList.iterator();
        while (it.hasNext()) {
            arrayList2.add(Integer.valueOf(it.next().getPartition()));
        }
        return new RoundRobinKafkaPartitionProvider(arrayList2);
    }

    protected <T> T resolveExpression(String str, Class<T> cls) {
        Object resolveExpression = resolveExpression(str);
        if (cls.isInstance(resolveExpression)) {
            return cls.cast(resolveExpression);
        }
        throw new FlowableException("expected expression " + str + " to resolve to " + cls + " but it did not. Resolved value is " + resolveExpression);
    }

    protected Object resolveExpression(String str) {
        return this.resolver.evaluate(resolve(str), this.expressionContext);
    }

    protected GenericMessageListener<ConsumerRecord<Object, Object>> createMessageListener(EventRegistry eventRegistry, InboundChannelModel inboundChannelModel) {
        return new KafkaChannelMessageListenerAdapter(eventRegistry, inboundChannelModel);
    }

    @Override // org.flowable.eventregistry.api.ChannelModelProcessor
    public void unregisterChannelModel(ChannelModel channelModel, String str, EventRepositoryService eventRepositoryService) {
        this.logger.info("Starting to unregister channel {} in tenant {}", channelModel.getKey(), str);
        String endpointId = getEndpointId(channelModel, str);
        Iterator<String> it = this.retryEndpointsByMainEndpointId.getOrDefault(endpointId, Collections.singleton(endpointId)).iterator();
        while (it.hasNext()) {
            unregisterEndpoint(it.next(), channelModel, str);
        }
        this.logger.info("Finished unregistering channel {} in tenant {}", channelModel.getKey(), str);
    }

    protected void unregisterEndpoint(String str, ChannelModel channelModel, String str2) {
        this.logger.info("Unregistering endpoint {}", str);
        MessageListenerContainer listenerContainer = this.endpointRegistry.getListenerContainer(str);
        if (listenerContainer != null) {
            this.logger.debug("Stopping message listener {} for channel {} in tenant {}", listenerContainer, channelModel.getKey(), str2);
            listenerContainer.stop();
        }
        if (listenerContainer instanceof DisposableBean) {
            try {
                this.logger.debug("Destroying message listener {} for channel {} in tenant {}", listenerContainer, channelModel.getKey(), str2);
                listenerContainer.destroy();
            } catch (Exception e) {
                throw new RuntimeException("Failed to destroy listener container", e);
            }
        }
        Field findField = ReflectionUtils.findField(this.endpointRegistry.getClass(), "listenerContainers");
        if (findField == null) {
            throw new IllegalStateException("Endpoint registry " + this.endpointRegistry + " does not have listenerContainers field");
        }
        findField.setAccessible(true);
        Map map = (Map) ReflectionUtils.getField(findField, this.endpointRegistry);
        if (map != null) {
            map.remove(str);
        }
        this.logger.info("Finished unregistering endpoint {}", str);
    }

    protected void registerEndpoint(KafkaListenerEndpoint kafkaListenerEndpoint, KafkaListenerContainerFactory<?> kafkaListenerContainerFactory) {
        Assert.notNull(kafkaListenerEndpoint, "Endpoint must not be null");
        Assert.hasText(kafkaListenerEndpoint.getId(), "Endpoint id must be set");
        Assert.state(this.endpointRegistry != null, "No KafkaListenerEndpointRegistry set");
        boolean z = this.contextRefreshed || this.endpointRegistry.isRunning();
        this.logger.info("Registering endpoint {}", kafkaListenerEndpoint);
        this.endpointRegistry.registerListenerContainer(kafkaListenerEndpoint, resolveContainerFactory(kafkaListenerEndpoint, kafkaListenerContainerFactory), z);
        this.logger.info("Finished registering endpoint {}", kafkaListenerEndpoint);
    }

    protected KafkaListenerContainerFactory<?> resolveContainerFactory(KafkaListenerEndpoint kafkaListenerEndpoint, KafkaListenerContainerFactory<?> kafkaListenerContainerFactory) {
        if (kafkaListenerContainerFactory != null) {
            return kafkaListenerContainerFactory;
        }
        if (this.containerFactory != null) {
            return this.containerFactory;
        }
        if (this.containerFactoryBeanName == null) {
            throw new IllegalStateException("Could not resolve the " + KafkaListenerContainerFactory.class.getSimpleName() + " to use for [" + kafkaListenerEndpoint + "] no factory was given and no default is set.");
        }
        Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
        this.containerFactory = (KafkaListenerContainerFactory) this.beanFactory.getBean(this.containerFactoryBeanName, KafkaListenerContainerFactory.class);
        return this.containerFactory;
    }

    protected String getEndpointId(ChannelModel channelModel, String str) {
        String key = channelModel.getKey();
        return !StringUtils.hasText(str) ? CHANNEL_ID_PREFIX + key : CHANNEL_ID_PREFIX + str + "#" + key;
    }

    protected String getEndpointGroupId(KafkaInboundChannelModel kafkaInboundChannelModel, String str) {
        String resolveExpressionAsString = resolveExpressionAsString(kafkaInboundChannelModel.getGroupId(), "groupId");
        if (resolveExpressionAsString == null) {
            resolveExpressionAsString = str;
        }
        return resolveExpressionAsString;
    }

    protected String resolve(String str) {
        if (str == null) {
            return null;
        }
        return this.embeddedValueResolver != null ? this.embeddedValueResolver.resolveStringValue(str) : str;
    }

    protected Properties resolveProperties(List<KafkaInboundChannelModel.CustomProperty> list) {
        if (list == null || list.isEmpty()) {
            return null;
        }
        Properties properties = new Properties();
        for (KafkaInboundChannelModel.CustomProperty customProperty : list) {
            properties.put(customProperty.getName(), resolveExpressionAsString(customProperty.getValue(), customProperty.getName()));
        }
        return properties;
    }

    @Override // org.springframework.beans.factory.BeanFactoryAware
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;
        if (beanFactory instanceof ConfigurableListableBeanFactory) {
            this.embeddedValueResolver = new EmbeddedValueResolver((ConfigurableBeanFactory) beanFactory);
            this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();
            this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory, null);
        }
    }

    @Override // org.springframework.context.ApplicationContextAware
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @Override // org.springframework.context.ApplicationListener
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        if (contextRefreshedEvent.getApplicationContext() == this.applicationContext) {
            this.contextRefreshed = true;
        }
    }

    public KafkaOperations<Object, Object> getKafkaOperations() {
        return this.kafkaOperations;
    }

    public void setKafkaOperations(KafkaOperations<Object, Object> kafkaOperations) {
        this.kafkaOperations = kafkaOperations;
    }

    public KafkaAdminOperations getKafkaAdminOperations() {
        return this.kafkaAdminOperations;
    }

    public void setKafkaAdminOperations(KafkaAdminOperations kafkaAdminOperations) {
        this.kafkaAdminOperations = kafkaAdminOperations;
    }

    public KafkaListenerEndpointRegistry getEndpointRegistry() {
        return this.endpointRegistry;
    }

    public void setEndpointRegistry(KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) {
        this.endpointRegistry = kafkaListenerEndpointRegistry;
    }

    public String getContainerFactoryBeanName() {
        return this.containerFactoryBeanName;
    }

    public void setContainerFactoryBeanName(String str) {
        this.containerFactoryBeanName = str;
    }

    public KafkaListenerContainerFactory<?> getContainerFactory() {
        return this.containerFactory;
    }

    public void setContainerFactory(KafkaListenerContainerFactory<?> kafkaListenerContainerFactory) {
        this.containerFactory = kafkaListenerContainerFactory;
    }

    protected RetryTopicConfiguration createRetryTopicConfiguration(ResolvedRetryConfiguration resolvedRetryConfiguration) {
        if (resolvedRetryConfiguration == null) {
            return null;
        }
        String str = resolvedRetryConfiguration.dltTopicSuffix;
        String str2 = resolvedRetryConfiguration.retryTopicSuffix;
        if (str == null && str2 == null) {
            return null;
        }
        Integer num = resolvedRetryConfiguration.attempts;
        RetryTopicConfigurationBuilder topicSuffixingStrategy = RetryTopicConfigurationBuilder.newInstance().autoStartDltHandler(false).autoCreateTopics(resolvedRetryConfiguration.autoCreateTopics, resolvedRetryConfiguration.numPartitions, resolvedRetryConfiguration.replicationFactor).dltSuffix(str).retryTopicSuffix(str2).useSingleTopicForFixedDelays(resolvedRetryConfiguration.fixedDelayTopicStrategy).setTopicSuffixingStrategy(resolvedRetryConfiguration.topicSuffixingStrategy);
        if (str == null) {
            topicSuffixingStrategy.doNotConfigureDlt();
        }
        if (resolvedRetryConfiguration.hasRetryTopic()) {
            topicSuffixingStrategy.customBackoff(resolvedRetryConfiguration.nonBlockingBackOff);
        } else {
            topicSuffixingStrategy.noBackoff();
        }
        if (num != null) {
            topicSuffixingStrategy.maxAttempts(num.intValue());
        }
        return topicSuffixingStrategy.create(this.kafkaOperations);
    }

    protected ResolvedRetryConfiguration resolveRetryConfiguration(KafkaInboundChannelModel kafkaInboundChannelModel) {
        KafkaInboundChannelModel.RetryConfiguration retry = kafkaInboundChannelModel.getRetry();
        if (retry == null) {
            return null;
        }
        ResolvedRetryConfiguration resolvedRetryConfiguration = new ResolvedRetryConfiguration();
        resolvedRetryConfiguration.attempts = resolveExpressionAsInteger(retry.getAttempts(), "retry.attempts");
        resolvedRetryConfiguration.dltTopicSuffix = resolveExpressionAsString(retry.getDltTopicSuffix(), "retry.dltTopicSuffix");
        resolvedRetryConfiguration.retryTopicSuffix = resolveExpressionAsString(retry.getRetryTopicSuffix(), "retry.retryTopicSuffix");
        String resolveExpressionAsString = resolveExpressionAsString(retry.getFixedDelayTopicStrategy(), "retry.fixedDelayTopicStrategy");
        resolvedRetryConfiguration.fixedDelayTopicStrategy = resolveExpressionAsString == null ? FixedDelayStrategy.SINGLE_TOPIC : FixedDelayStrategy.valueOf(resolveExpressionAsString);
        String resolveExpressionAsString2 = resolveExpressionAsString(retry.getTopicSuffixingStrategy(), "retry.topicSuffixingStrategy");
        resolvedRetryConfiguration.topicSuffixingStrategy = resolveExpressionAsString2 == null ? TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE : TopicSuffixingStrategy.valueOf(resolveExpressionAsString2);
        resolvedRetryConfiguration.nonBlockingBackOff = createNonBlockingBackOffPolicy(retry.getNonBlockingBackOff());
        resolvedRetryConfiguration.autoCreateTopics = resolveExpressionAsBoolean(retry.getAutoCreateTopics(), "retry.autoCreateTopics", true).booleanValue();
        resolvedRetryConfiguration.numPartitions = resolveExpressionAsInteger(retry.getNumPartitions(), "retry.numPartitions", 1).intValue();
        resolvedRetryConfiguration.replicationFactor = resolveExpressionAsInteger(retry.getReplicationFactor(), "retry.replicationFactor", 1).shortValue();
        return resolvedRetryConfiguration;
    }

    protected SleepingBackOffPolicy<?> createNonBlockingBackOffPolicy(KafkaInboundChannelModel.NonBlockingRetryBackOff nonBlockingRetryBackOff) {
        if (nonBlockingRetryBackOff == null) {
            return new FixedBackOffPolicy();
        }
        Long resolveExpressionAsLong = resolveExpressionAsLong(nonBlockingRetryBackOff.getDelay(), "retry.nonBlockingBackOff.delay");
        Long resolveExpressionAsLong2 = resolveExpressionAsLong(nonBlockingRetryBackOff.getMaxDelay(), "retry.nonBlockingBackOff.maxDelay");
        Double resolveExpressionAsDouble = resolveExpressionAsDouble(nonBlockingRetryBackOff.getMultiplier(), "retry.nonBlockingBackOff.multiplier");
        if (resolveExpressionAsDouble != null && resolveExpressionAsDouble.doubleValue() > 0.0d) {
            ExponentialRandomBackOffPolicy exponentialRandomBackOffPolicy = Boolean.TRUE.equals(resolveExpressionAsBoolean(nonBlockingRetryBackOff.getRandom(), "retry.nonBlockingBackOff.random")) ? new ExponentialRandomBackOffPolicy() : new ExponentialBackOffPolicy();
            if (resolveExpressionAsLong != null) {
                exponentialRandomBackOffPolicy.setInitialInterval(resolveExpressionAsLong.longValue());
            }
            exponentialRandomBackOffPolicy.setMultiplier(resolveExpressionAsDouble.doubleValue());
            if (resolveExpressionAsLong2 != null && resolveExpressionAsLong2.longValue() > exponentialRandomBackOffPolicy.getInitialInterval()) {
                exponentialRandomBackOffPolicy.setMaxInterval(resolveExpressionAsLong2.longValue());
            }
            return exponentialRandomBackOffPolicy;
        }
        if (resolveExpressionAsLong2 == null || resolveExpressionAsLong == null || resolveExpressionAsLong2.longValue() <= resolveExpressionAsLong.longValue()) {
            FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
            if (resolveExpressionAsLong != null) {
                fixedBackOffPolicy.setBackOffPeriod(resolveExpressionAsLong.longValue());
            }
            return fixedBackOffPolicy;
        }
        UniformRandomBackOffPolicy uniformRandomBackOffPolicy = new UniformRandomBackOffPolicy();
        uniformRandomBackOffPolicy.setMinBackOffPeriod(resolveExpressionAsLong.longValue());
        uniformRandomBackOffPolicy.setMaxBackOffPeriod(resolveExpressionAsLong2.longValue());
        return uniformRandomBackOffPolicy;
    }
}
