package org.springframework.cloud.stream.binder.kafka.provisioning;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.BinderException;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.common.TopicInformation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaTopicProperties;
import org.springframework.cloud.stream.binder.kafka.utils.KafkaTopicUtils;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:BOOT-INF/lib/spring-cloud-stream-binder-kafka-core-4.1.1.jar:org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisioner.class */
public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>>, InitializingBean {
    private static final Log logger = LogFactory.getLog(KafkaTopicProvisioner.class);
    private static final int DEFAULT_OPERATION_TIMEOUT = 30;
    private final KafkaBinderConfigurationProperties configurationProperties;
    private final int operationTimeout = 30;
    private final Map<String, Object> adminClientProperties;
    private RetryOperations metadataRetryOperations;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/spring-cloud-stream-binder-kafka-core-4.1.1.jar:org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisioner$KafkaConsumerDestination.class */
    public static final class KafkaConsumerDestination implements ConsumerDestination {
        private final String consumerDestinationName;
        private final int partitions;
        private final String dlqName;

        KafkaConsumerDestination(String str) {
            this(str, 0, null);
        }

        KafkaConsumerDestination(String str, int i) {
            this(str, Integer.valueOf(i), null);
        }

        KafkaConsumerDestination(String str, Integer num, String str2) {
            this.consumerDestinationName = str;
            this.partitions = num.intValue();
            this.dlqName = str2;
        }

        @Override // org.springframework.cloud.stream.provisioning.ConsumerDestination
        public String getName() {
            return this.consumerDestinationName;
        }

        public String toString() {
            return "KafkaConsumerDestination{consumerDestinationName='" + this.consumerDestinationName + "', partitions=" + this.partitions + ", dlqName='" + this.dlqName + "'}";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/spring-cloud-stream-binder-kafka-core-4.1.1.jar:org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisioner$KafkaProducerDestination.class */
    public static final class KafkaProducerDestination implements ProducerDestination {
        private final String producerDestinationName;
        private final int partitions;

        KafkaProducerDestination(String str, Integer num) {
            this.producerDestinationName = str;
            this.partitions = num.intValue();
        }

        @Override // org.springframework.cloud.stream.provisioning.ProducerDestination
        public String getName() {
            return this.producerDestinationName;
        }

        @Override // org.springframework.cloud.stream.provisioning.ProducerDestination
        public String getNameForPartition(int i) {
            return this.producerDestinationName;
        }

        public String toString() {
            return "KafkaProducerDestination{producerDestinationName='" + this.producerDestinationName + "', partitions=" + this.partitions + "}";
        }
    }

    public KafkaTopicProvisioner(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties, KafkaProperties kafkaProperties, AdminClientConfigCustomizer adminClientConfigCustomizer) {
        this(kafkaBinderConfigurationProperties, kafkaProperties, (List<AdminClientConfigCustomizer>) (adminClientConfigCustomizer != null ? Arrays.asList(adminClientConfigCustomizer) : new ArrayList()));
    }

    public KafkaTopicProvisioner(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties, KafkaProperties kafkaProperties, List<AdminClientConfigCustomizer> list) {
        this.operationTimeout = 30;
        Assert.isTrue(kafkaProperties != null, "KafkaProperties cannot be null");
        this.configurationProperties = kafkaBinderConfigurationProperties;
        this.adminClientProperties = kafkaProperties.buildAdminProperties();
        normalalizeBootPropsWithBinder(this.adminClientProperties, kafkaProperties, kafkaBinderConfigurationProperties);
        list.forEach(adminClientConfigCustomizer -> {
            adminClientConfigCustomizer.configure(this.adminClientProperties);
        });
    }

    public Map<String, Object> getAdminClientProperties() {
        return Collections.unmodifiableMap(this.adminClientProperties);
    }

    public void setMetadataRetryOperations(RetryOperations retryOperations) {
        this.metadataRetryOperations = retryOperations;
    }

    @Override // org.springframework.beans.factory.InitializingBean
    public void afterPropertiesSet() {
        if (this.metadataRetryOperations == null) {
            RetryTemplate retryTemplate = new RetryTemplate();
            SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
            simpleRetryPolicy.setMaxAttempts(10);
            retryTemplate.setRetryPolicy(simpleRetryPolicy);
            ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
            exponentialBackOffPolicy.setInitialInterval(100L);
            exponentialBackOffPolicy.setMultiplier(2.0d);
            exponentialBackOffPolicy.setMaxInterval(1000L);
            retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);
            this.metadataRetryOperations = retryTemplate;
        }
    }

    @Override // org.springframework.cloud.stream.provisioning.ProvisioningProvider
    public ProducerDestination provisionProducerDestination(String str, ExtendedProducerProperties<KafkaProducerProperties> extendedProducerProperties) {
        if (logger.isInfoEnabled()) {
            logger.info("Using kafka topic for outbound: " + str);
        }
        if (!this.configurationProperties.isAutoCreateTopics()) {
            return new KafkaProducerDestination(str, 0);
        }
        KafkaTopicUtils.validateTopicName(str);
        AdminClient createAdminClient = createAdminClient();
        try {
            createTopic(createAdminClient, str, extendedProducerProperties.getPartitionCount(), false, extendedProducerProperties.getExtension().getTopic());
            KafkaProducerDestination kafkaProducerDestination = new KafkaProducerDestination(str, Integer.valueOf(getPartitionsForTopic(str, createAdminClient)));
            if (createAdminClient != null) {
                createAdminClient.close();
            }
            return kafkaProducerDestination;
        } catch (Throwable th) {
            if (createAdminClient != null) {
                try {
                    createAdminClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // org.springframework.cloud.stream.provisioning.ProvisioningProvider
    public ConsumerDestination provisionConsumerDestination(String str, String str2, ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        if (!extendedConsumerProperties.isMultiplex()) {
            return doProvisionConsumerDestination(str, str2, extendedConsumerProperties);
        }
        for (String str3 : StringUtils.commaDelimitedListToStringArray(str)) {
            doProvisionConsumerDestination(str3.trim(), str2, extendedConsumerProperties);
        }
        return new KafkaConsumerDestination(str);
    }

    private ConsumerDestination doProvisionConsumerDestination(String str, String str2, ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        KafkaConsumerDestination kafkaConsumerDestination = new KafkaConsumerDestination(str);
        if (extendedConsumerProperties.getExtension().isDestinationIsPattern()) {
            Assert.isTrue(!extendedConsumerProperties.getExtension().isEnableDlq(), "enableDLQ is not allowed when listening to topic patterns");
            if (logger.isDebugEnabled()) {
                logger.debug("Listening to a topic pattern - " + str + " - no provisioning performed");
            }
            return kafkaConsumerDestination;
        }
        if (!this.configurationProperties.isAutoCreateTopics()) {
            return kafkaConsumerDestination;
        }
        KafkaTopicUtils.validateTopicName(str);
        boolean z = !StringUtils.hasText(str2);
        Assert.isTrue((z && extendedConsumerProperties.getExtension().isEnableDlq()) ? false : true, "DLQ support is not available for anonymous subscriptions");
        if (extendedConsumerProperties.getInstanceCount() == 0) {
            throw new IllegalArgumentException("Instance count cannot be zero");
        }
        int instanceCount = extendedConsumerProperties.getInstanceCount() * extendedConsumerProperties.getConcurrency();
        AdminClient createAdminClient = createAdminClient();
        try {
            createTopic(createAdminClient, str, instanceCount, extendedConsumerProperties.getExtension().isAutoRebalanceEnabled(), extendedConsumerProperties.getExtension().getTopic());
            int partitionsForTopic = getPartitionsForTopic(str, createAdminClient);
            ConsumerDestination createDlqIfNeedBe = createDlqIfNeedBe(createAdminClient, str, str2, extendedConsumerProperties, z, partitionsForTopic);
            if (createDlqIfNeedBe == null) {
                createDlqIfNeedBe = new KafkaConsumerDestination(str, partitionsForTopic);
            }
            ConsumerDestination consumerDestination = createDlqIfNeedBe;
            if (createAdminClient != null) {
                createAdminClient.close();
            }
            return consumerDestination;
        } catch (Throwable th) {
            if (createAdminClient != null) {
                try {
                    createAdminClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private int getPartitionsForTopic(String str, AdminClient adminClient) {
        int i = 0;
        TopicDescription topicDescription = retrieveTopicDescriptions(str, adminClient).get(str);
        if (topicDescription != null) {
            i = topicDescription.partitions().size();
        }
        return i;
    }

    private Map<String, TopicDescription> retrieveTopicDescriptions(String str, AdminClient adminClient) {
        return (Map) this.metadataRetryOperations.execute(retryContext -> {
            try {
                if (logger.isDebugEnabled()) {
                    logger.debug("Attempting to retrieve the description for the topic: " + str);
                }
                KafkaFuture<Map<String, TopicDescription>> allTopicNames = adminClient.describeTopics(Collections.singletonList(str)).allTopicNames();
                Objects.requireNonNull(this);
                return allTopicNames.get(30L, TimeUnit.SECONDS);
            } catch (Exception e) {
                throw new ProvisioningException("Problems encountered with partitions finding for: " + str, e);
            }
        });
    }

    AdminClient createAdminClient() {
        return AdminClient.create(this.adminClientProperties);
    }

    public static void normalalizeBootPropsWithBinder(Map<String, Object> map, KafkaProperties kafkaProperties, KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
        String kafkaConnectionString = kafkaBinderConfigurationProperties.getKafkaConnectionString();
        if (ObjectUtils.isEmpty(map.get("bootstrap.servers")) || !kafkaConnectionString.equals(kafkaBinderConfigurationProperties.getDefaultKafkaConnectionString())) {
            map.put("bootstrap.servers", kafkaConnectionString);
        }
        Map<String, String> configuration = kafkaBinderConfigurationProperties.getConfiguration();
        Set<String> configNames = AdminClientConfig.configNames();
        configuration.forEach((str, str2) -> {
            Object put;
            if (str.equals("bootstrap.servers")) {
                throw new IllegalStateException("Set binder bootstrap servers via the 'brokers' property, not 'configuration'");
            }
            if (configNames.contains(str) && (put = map.put(str, str2)) != null && logger.isDebugEnabled()) {
                logger.debug("Overrode boot property: [" + str + "], from: [" + put + "] to: [" + str2 + "]");
            }
        });
    }

    private ConsumerDestination createDlqIfNeedBe(AdminClient adminClient, String str, String str2, ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties, boolean z, int i) {
        if (!extendedConsumerProperties.getExtension().isEnableDlq() || z) {
            return null;
        }
        String dlqName = StringUtils.hasText(extendedConsumerProperties.getExtension().getDlqName()) ? extendedConsumerProperties.getExtension().getDlqName() : "error." + str + "." + str2;
        try {
            createTopicAndPartitions(adminClient, dlqName, extendedConsumerProperties.getExtension().getDlqPartitions() == null ? i : extendedConsumerProperties.getExtension().getDlqPartitions().intValue(), extendedConsumerProperties.getExtension().isAutoRebalanceEnabled(), extendedConsumerProperties.getExtension().getDlqProducerProperties().getTopic());
            return new KafkaConsumerDestination(str, Integer.valueOf(i), dlqName);
        } catch (Throwable th) {
            if (th instanceof Error) {
                throw ((Error) th);
            }
            throw new ProvisioningException("Provisioning exception encountered for " + str, th);
        }
    }

    private void createTopic(AdminClient adminClient, String str, int i, boolean z, KafkaTopicProperties kafkaTopicProperties) {
        try {
            createTopicIfNecessary(adminClient, str, i, z, kafkaTopicProperties);
        } catch (Throwable th) {
            if (!(th instanceof Error)) {
                throw new ProvisioningException("Provisioning exception encountered for " + str, th);
            }
            throw ((Error) th);
        }
    }

    private void createTopicIfNecessary(AdminClient adminClient, String str, int i, boolean z, KafkaTopicProperties kafkaTopicProperties) throws Throwable {
        if (this.configurationProperties.isAutoCreateTopics()) {
            createTopicAndPartitions(adminClient, str, i, z, kafkaTopicProperties);
        } else {
            logger.info("Auto creation of topics is disabled.");
        }
    }

    private void createTopicAndPartitions(AdminClient adminClient, String str, int i, boolean z, KafkaTopicProperties kafkaTopicProperties) throws Throwable {
        KafkaFuture<Set<String>> names = adminClient.listTopics().names();
        Objects.requireNonNull(this);
        if (!names.get(30L, TimeUnit.SECONDS).contains(str)) {
            int max = Math.max(this.configurationProperties.getMinPartitionCount(), i);
            this.metadataRetryOperations.execute(retryContext -> {
                NewTopic newTopic;
                if (CollectionUtils.isEmpty(kafkaTopicProperties.getReplicasAssignments())) {
                    newTopic = new NewTopic(str, max, kafkaTopicProperties.getReplicationFactor() != null ? kafkaTopicProperties.getReplicationFactor().shortValue() : this.configurationProperties.getReplicationFactor());
                } else {
                    newTopic = new NewTopic(str, kafkaTopicProperties.getReplicasAssignments());
                }
                if (!kafkaTopicProperties.getProperties().isEmpty()) {
                    newTopic.configs(kafkaTopicProperties.getProperties());
                }
                try {
                    KafkaFuture<Void> all = adminClient.createTopics(Collections.singletonList(newTopic)).all();
                    Objects.requireNonNull(this);
                    all.get(30L, TimeUnit.SECONDS);
                    return null;
                } catch (Exception e) {
                    if (!(e instanceof ExecutionException)) {
                        logger.error("Failed to create topics", e.getCause());
                        throw e.getCause();
                    }
                    if (!(e.getCause() instanceof TopicExistsException)) {
                        logger.error("Failed to create topics", e.getCause());
                        throw e.getCause();
                    }
                    if (!logger.isWarnEnabled()) {
                        return null;
                    }
                    logger.warn("Attempt to create topic: " + str + ". Topic already exists.");
                    return null;
                }
            });
            return;
        }
        if (this.configurationProperties.isAutoAlterTopics()) {
            alterTopicConfigsIfNecessary(adminClient, str, kafkaTopicProperties);
        }
        int max2 = this.configurationProperties.isAutoAddPartitions() ? Math.max(this.configurationProperties.getMinPartitionCount(), i) : i;
        KafkaFuture<Map<String, TopicDescription>> all = adminClient.describeTopics(Collections.singletonList(str)).all();
        Objects.requireNonNull(this);
        int size = all.get(30L, TimeUnit.SECONDS).get(str).partitions().size();
        if (size < max2) {
            if (this.configurationProperties.isAutoAddPartitions()) {
                KafkaFuture<Void> all2 = adminClient.createPartitions(Collections.singletonMap(str, NewPartitions.increaseTo(max2))).all();
                Objects.requireNonNull(this);
                all2.get(30L, TimeUnit.SECONDS);
            } else {
                if (!z) {
                    throw new ProvisioningException("The number of expected partitions for topic " + str + " was: " + i + ", but " + size + (size > 1 ? " have " : " has ") + "been found instead. Consider either increasing the partition count of the topic or enabling `autoAddPartitions`");
                }
                logger.warn("The number of expected partitions for topic " + str + " was: " + i + ", but " + size + (size > 1 ? " have " : " has ") + "been found instead. There will be " + (max2 - size) + " idle consumers");
            }
        }
    }

    private void alterTopicConfigsIfNecessary(AdminClient adminClient, String str, KafkaTopicProperties kafkaTopicProperties) throws InterruptedException, ExecutionException, TimeoutException {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, str);
        KafkaFuture<Map<ConfigResource, Config>> all = adminClient.describeConfigs(Collections.singletonList(configResource)).all();
        Objects.requireNonNull(this);
        Config config = all.get(30L, TimeUnit.SECONDS).get(configResource);
        List list = (List) kafkaTopicProperties.getProperties().entrySet().stream().filter(entry -> {
            return config.get((String) entry.getKey()) == null || !config.get((String) entry.getKey()).value().equals(entry.getValue());
        }).map(entry2 -> {
            return new ConfigEntry((String) entry2.getKey(), (String) entry2.getValue());
        }).map(configEntry -> {
            return new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET);
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Attempting to alter configs " + list + " for the topic:" + str);
        }
        Map<ConfigResource, Collection<AlterConfigOp>> hashMap = new HashMap<>();
        hashMap.put(configResource, list);
        KafkaFuture<Void> all2 = adminClient.incrementalAlterConfigs(hashMap).all();
        Objects.requireNonNull(this);
        all2.get(30L, TimeUnit.SECONDS);
    }

    public Collection<PartitionInfo> getListenedPartitions(String str, ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties, ConsumerFactory<?, ?> consumerFactory, int i, boolean z, boolean z2, String str2, Map<String, TopicInformation> map) {
        Collection<PartitionInfo> collection;
        Collection<PartitionInfo> emptyList = z ? Collections.emptyList() : getPartitionInfoForConsumer(str2, extendedConsumerProperties, consumerFactory, i);
        if (z2 || extendedConsumerProperties.getInstanceCount() == 1) {
            collection = emptyList;
        } else {
            collection = new ArrayList();
            for (PartitionInfo partitionInfo : emptyList) {
                if (partitionInfo.partition() % extendedConsumerProperties.getInstanceCount() == extendedConsumerProperties.getInstanceIndex()) {
                    collection.add(partitionInfo);
                }
            }
        }
        map.put(str2, new TopicInformation(str, collection, z));
        return collection;
    }

    public Collection<PartitionInfo> getPartitionInfoForConsumer(String str, ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties, ConsumerFactory<?, ?> consumerFactory, int i) {
        return getPartitionsForTopic(i, extendedConsumerProperties.getExtension().isAutoRebalanceEnabled(), () -> {
            Consumer createConsumer = consumerFactory.createConsumer();
            try {
                List<PartitionInfo> partitionsFor = createConsumer.partitionsFor(str);
                if (createConsumer != null) {
                    createConsumer.close();
                }
                return partitionsFor;
            } catch (Throwable th) {
                if (createConsumer != null) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }, str);
    }

    public Collection<PartitionInfo> getPartitionInfoForProducer(String str, ProducerFactory<byte[], byte[]> producerFactory, ExtendedProducerProperties<KafkaProducerProperties> extendedProducerProperties) {
        return getPartitionsForTopic(extendedProducerProperties.getPartitionCount(), false, () -> {
            Producer createProducer = producerFactory.createProducer();
            List<PartitionInfo> partitionsFor = createProducer.partitionsFor(str);
            createProducer.close();
            return partitionsFor;
        }, str);
    }

    public Collection<PartitionInfo> getPartitionsForTopic(int i, boolean z, Callable<Collection<PartitionInfo>> callable, String str) {
        try {
            return (Collection) this.metadataRetryOperations.execute(retryContext -> {
                List emptyList = Collections.emptyList();
                try {
                    emptyList = (Collection) callable.call();
                } catch (Exception e) {
                    if (e instanceof UnknownTopicOrPartitionException) {
                        throw e;
                    }
                }
                if (!CollectionUtils.isEmpty(emptyList)) {
                    int size = emptyList.size();
                    if (size < i) {
                        if (!z) {
                            throw new IllegalStateException("The number of expected partitions for topic " + str + " was: " + i + ", but " + size + (size > 1 ? " have " : " has ") + "been found instead");
                        }
                        logger.warn("The number of expected partitions for topic " + str + " was: " + i + ", but " + size + (size > 1 ? " have " : " has ") + "been found instead. There will be " + (i - size) + " idle consumers");
                    }
                    return emptyList;
                }
                try {
                    AdminClient createAdminClient = createAdminClient();
                    try {
                        createAdminClient.describeTopics(Collections.singletonList(str)).allTopicNames().get();
                        if (createAdminClient != null) {
                            createAdminClient.close();
                        }
                    } finally {
                    }
                } catch (ExecutionException e2) {
                    Throwable cause = e2.getCause();
                    if (cause instanceof UnknownTopicOrPartitionException) {
                        throw ((UnknownTopicOrPartitionException) cause);
                    }
                    logger.warn("No partitions have been retrieved for the topic (" + str + "). This will affect the health check.");
                }
                throw new RuntimeException("Failed to obtain partition information for the topic " + str);
            });
        } catch (Exception e) {
            logger.error("Cannot initialize Binder checking the topic (" + str + ").", e);
            throw new BinderException("Cannot initialize binder checking the topic (" + str + "):", e);
        }
    }
}
