package org.apache.kafka.streams;

import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

/* loaded from: input_file:org/apache/kafka/streams/StreamsConfigTest.class */
public class StreamsConfigTest {
    private StreamsConfig streamsConfig;

    @Rule
    public Timeout globalTimeout = Timeout.seconds(600);
    private final Properties props = new Properties();
    private final String groupId = "example-application";
    private final String clientId = "client";
    private final int threadIdx = 1;

    /* loaded from: input_file:org/apache/kafka/streams/StreamsConfigTest$MisconfiguredSerde.class */
    static class MisconfiguredSerde implements Serde<Object> {
        MisconfiguredSerde() {
        }

        public void configure(Map<String, ?> map, boolean z) {
            throw new RuntimeException("boom");
        }

        public Serializer<Object> serializer() {
            return null;
        }

        public Deserializer<Object> deserializer() {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/StreamsConfigTest$MockTimestampExtractor.class */
    public static class MockTimestampExtractor implements TimestampExtractor {
        public long extract(ConsumerRecord<Object, Object> consumerRecord, long j) {
            return 0L;
        }
    }

    @Before
    public void setUp() {
        this.props.put("application.id", "streams-config-test");
        this.props.put("bootstrap.servers", "localhost:9092");
        this.props.put("default.key.serde", Serdes.String().getClass().getName());
        this.props.put("default.value.serde", Serdes.String().getClass().getName());
        this.props.put("key.deserializer.encoding", StandardCharsets.UTF_8.name());
        this.props.put("value.deserializer.encoding", StandardCharsets.UTF_16.name());
        this.streamsConfig = new StreamsConfig(this.props);
    }

    @Test
    public void testIllegalMetricsRecordingLevel() {
        this.props.put("metrics.recording.level", "illegalConfig");
        Assert.assertThrows(ConfigException.class, () -> {
            new StreamsConfig(this.props);
        });
    }

    @Test
    public void testOsDefaultSocketBufferSizes() {
        this.props.put("send.buffer.bytes", -1);
        this.props.put("receive.buffer.bytes", -1);
        new StreamsConfig(this.props);
    }

    @Test
    public void testInvalidSocketSendBufferSize() {
        this.props.put("send.buffer.bytes", -2);
        Assert.assertThrows(ConfigException.class, () -> {
            new StreamsConfig(this.props);
        });
    }

    @Test
    public void testInvalidSocketReceiveBufferSize() {
        this.props.put("receive.buffer.bytes", -2);
        Assert.assertThrows(ConfigException.class, () -> {
            new StreamsConfig(this.props);
        });
    }

    @Test
    public void shouldThrowExceptionIfApplicationIdIsNotSet() {
        this.props.remove("application.id");
        Assert.assertThrows(ConfigException.class, () -> {
            new StreamsConfig(this.props);
        });
    }

    @Test
    public void shouldThrowExceptionIfBootstrapServersIsNotSet() {
        this.props.remove("bootstrap.servers");
        Assert.assertThrows(ConfigException.class, () -> {
            new StreamsConfig(this.props);
        });
    }

    @Test
    public void testGetProducerConfigs() {
        Map producerConfigs = this.streamsConfig.getProducerConfigs("client");
        MatcherAssert.assertThat(producerConfigs.get("client.id"), IsEqual.equalTo("client"));
        MatcherAssert.assertThat(producerConfigs.get("linger.ms"), IsEqual.equalTo("100"));
    }

    @Test
    public void testGetConsumerConfigs() {
        Map mainConsumerConfigs = this.streamsConfig.getMainConsumerConfigs("example-application", "client", 1);
        MatcherAssert.assertThat(mainConsumerConfigs.get("client.id"), IsEqual.equalTo("client"));
        MatcherAssert.assertThat(mainConsumerConfigs.get("group.id"), IsEqual.equalTo("example-application"));
        MatcherAssert.assertThat(mainConsumerConfigs.get("max.poll.records"), IsEqual.equalTo("1000"));
        Assert.assertNull(mainConsumerConfigs.get("group.instance.id"));
    }

    @Test
    public void testGetGroupInstanceIdConfigs() {
        this.props.put("group.instance.id", "group-instance-id");
        this.props.put(StreamsConfig.mainConsumerPrefix("group.instance.id"), "group-instance-id-1");
        this.props.put(StreamsConfig.restoreConsumerPrefix("group.instance.id"), "group-instance-id-2");
        this.props.put(StreamsConfig.globalConsumerPrefix("group.instance.id"), "group-instance-id-3");
        StreamsConfig streamsConfig = new StreamsConfig(this.props);
        MatcherAssert.assertThat(streamsConfig.getMainConsumerConfigs("example-application", "client", 1).get("group.instance.id"), IsEqual.equalTo("group-instance-id-1-1"));
        Assert.assertNull(streamsConfig.getRestoreConsumerConfigs("client").get("group.instance.id"));
        Assert.assertNull(streamsConfig.getGlobalConsumerConfigs("client").get("group.instance.id"));
    }

    @Test
    public void consumerConfigMustContainStreamPartitionAssignorConfig() {
        this.props.put("replication.factor", 42);
        this.props.put("num.standby.replicas", 1);
        this.props.put("acceptable.recovery.lag", 99L);
        this.props.put("max.warmup.replicas", 9);
        this.props.put("probing.rebalance.interval.ms", 99999L);
        this.props.put("windowstore.changelog.additional.retention.ms", 7L);
        this.props.put("application.server", "dummy:host");
        this.props.put(StreamsConfig.topicPrefix("segment.bytes"), 100);
        Map mainConsumerConfigs = new StreamsConfig(this.props).getMainConsumerConfigs("example-application", "client", 1);
        Assert.assertEquals(42, mainConsumerConfigs.get("replication.factor"));
        Assert.assertEquals(1, mainConsumerConfigs.get("num.standby.replicas"));
        Assert.assertEquals(99L, mainConsumerConfigs.get("acceptable.recovery.lag"));
        Assert.assertEquals(9, mainConsumerConfigs.get("max.warmup.replicas"));
        Assert.assertEquals(99999L, mainConsumerConfigs.get("probing.rebalance.interval.ms"));
        Assert.assertEquals(StreamsPartitionAssignor.class.getName(), mainConsumerConfigs.get("partition.assignment.strategy"));
        Assert.assertEquals(7L, mainConsumerConfigs.get("windowstore.changelog.additional.retention.ms"));
        Assert.assertEquals("dummy:host", mainConsumerConfigs.get("application.server"));
        Assert.assertEquals(100, mainConsumerConfigs.get(StreamsConfig.topicPrefix("segment.bytes")));
    }

    @Test
    public void testGetMainConsumerConfigsWithMainConsumerOverridenPrefix() {
        this.props.put(StreamsConfig.consumerPrefix("max.poll.records"), "5");
        this.props.put(StreamsConfig.mainConsumerPrefix("max.poll.records"), "50");
        this.props.put(StreamsConfig.mainConsumerPrefix("group.id"), "another-id");
        Map mainConsumerConfigs = new StreamsConfig(this.props).getMainConsumerConfigs("example-application", "client", 1);
        Assert.assertEquals("example-application", mainConsumerConfigs.get("group.id"));
        Assert.assertEquals("50", mainConsumerConfigs.get("max.poll.records"));
    }

    @Test
    public void testGetRestoreConsumerConfigs() {
        Map restoreConsumerConfigs = this.streamsConfig.getRestoreConsumerConfigs("client");
        Assert.assertEquals(restoreConsumerConfigs.get("client.id"), "client");
        Assert.assertNull(restoreConsumerConfigs.get("group.id"));
    }

    @Test
    public void defaultSerdeShouldBeConfigured() {
        HashMap hashMap = new HashMap();
        hashMap.put("key.serializer.encoding", StandardCharsets.UTF_8.name());
        hashMap.put("value.serializer.encoding", StandardCharsets.UTF_16.name());
        Serializer serializer = Serdes.String().serializer();
        serializer.configure(hashMap, true);
        Assert.assertEquals("Should get the original string after serialization and deserialization with the configured encoding", "my string for testing", this.streamsConfig.defaultKeySerde().deserializer().deserialize("my topic", serializer.serialize("my topic", "my string for testing")));
        serializer.configure(hashMap, false);
        Assert.assertEquals("Should get the original string after serialization and deserialization with the configured encoding", "my string for testing", this.streamsConfig.defaultValueSerde().deserializer().deserialize("my topic", serializer.serialize("my topic", "my string for testing")));
    }

    @Test
    public void shouldSupportMultipleBootstrapServers() {
        List asList = Arrays.asList("broker1:9092", "broker2:9092");
        String join = Utils.join(asList, ",");
        Properties properties = new Properties();
        properties.put("application.id", "irrelevant");
        properties.put("bootstrap.servers", join);
        Assert.assertEquals(asList, new StreamsConfig(properties).getList("bootstrap.servers"));
    }

    @Test
    public void shouldSupportPrefixedConsumerConfigs() {
        this.props.put(StreamsConfig.consumerPrefix("auto.offset.reset"), "earliest");
        this.props.put(StreamsConfig.consumerPrefix("metrics.num.samples"), 1);
        Map mainConsumerConfigs = new StreamsConfig(this.props).getMainConsumerConfigs("example-application", "client", 1);
        Assert.assertEquals("earliest", mainConsumerConfigs.get("auto.offset.reset"));
        Assert.assertEquals(1, mainConsumerConfigs.get("metrics.num.samples"));
    }

    @Test
    public void shouldSupportPrefixedRestoreConsumerConfigs() {
        this.props.put(StreamsConfig.consumerPrefix("metrics.num.samples"), 1);
        Assert.assertEquals(1, new StreamsConfig(this.props).getRestoreConsumerConfigs("client").get("metrics.num.samples"));
    }

    @Test
    public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() {
        this.props.put(StreamsConfig.consumerPrefix("interceptor.statsd.host"), "host");
        Assert.assertEquals("host", new StreamsConfig(this.props).getMainConsumerConfigs("example-application", "client", 1).get("interceptor.statsd.host"));
    }

    @Test
    public void shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() {
        this.props.put(StreamsConfig.consumerPrefix("interceptor.statsd.host"), "host");
        Assert.assertEquals("host", new StreamsConfig(this.props).getRestoreConsumerConfigs("client").get("interceptor.statsd.host"));
    }

    @Test
    public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() {
        this.props.put(StreamsConfig.producerPrefix("interceptor.statsd.host"), "host");
        Assert.assertEquals("host", new StreamsConfig(this.props).getProducerConfigs("client").get("interceptor.statsd.host"));
    }

    @Test
    public void shouldSupportPrefixedProducerConfigs() {
        this.props.put(StreamsConfig.producerPrefix("buffer.memory"), 10);
        this.props.put(StreamsConfig.producerPrefix("metrics.num.samples"), 1);
        Map producerConfigs = new StreamsConfig(this.props).getProducerConfigs("client");
        Assert.assertEquals(10, producerConfigs.get("buffer.memory"));
        Assert.assertEquals(1, producerConfigs.get("metrics.num.samples"));
    }

    @Test
    public void shouldBeSupportNonPrefixedConsumerConfigs() {
        this.props.put("auto.offset.reset", "earliest");
        this.props.put("metrics.num.samples", 1);
        Map mainConsumerConfigs = new StreamsConfig(this.props).getMainConsumerConfigs("example-application", "client", 1);
        Assert.assertEquals("earliest", mainConsumerConfigs.get("auto.offset.reset"));
        Assert.assertEquals(1, mainConsumerConfigs.get("metrics.num.samples"));
    }

    @Test
    public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() {
        this.props.put("metrics.num.samples", 1);
        Assert.assertEquals(1, new StreamsConfig(this.props).getRestoreConsumerConfigs("example-application").get("metrics.num.samples"));
    }

    @Test
    public void shouldSupportNonPrefixedProducerConfigs() {
        this.props.put("buffer.memory", 10);
        this.props.put("metrics.num.samples", 1);
        Map producerConfigs = new StreamsConfig(this.props).getProducerConfigs("client");
        Assert.assertEquals(10, producerConfigs.get("buffer.memory"));
        Assert.assertEquals(1, producerConfigs.get("metrics.num.samples"));
    }

    @Test
    public void shouldForwardCustomConfigsWithNoPrefixToAllClients() {
        this.props.put("custom.property.host", "host");
        StreamsConfig streamsConfig = new StreamsConfig(this.props);
        Map mainConsumerConfigs = streamsConfig.getMainConsumerConfigs("example-application", "client", 1);
        Map restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("client");
        Map producerConfigs = streamsConfig.getProducerConfigs("client");
        Map adminConfigs = streamsConfig.getAdminConfigs("client");
        Assert.assertEquals("host", mainConsumerConfigs.get("custom.property.host"));
        Assert.assertEquals("host", restoreConsumerConfigs.get("custom.property.host"));
        Assert.assertEquals("host", producerConfigs.get("custom.property.host"));
        Assert.assertEquals("host", adminConfigs.get("custom.property.host"));
    }

    @Test
    public void shouldOverrideNonPrefixedCustomConfigsWithPrefixedConfigs() {
        this.props.put("custom.property.host", "host0");
        this.props.put(StreamsConfig.consumerPrefix("custom.property.host"), "host1");
        this.props.put(StreamsConfig.producerPrefix("custom.property.host"), "host2");
        this.props.put(StreamsConfig.adminClientPrefix("custom.property.host"), "host3");
        StreamsConfig streamsConfig = new StreamsConfig(this.props);
        Map mainConsumerConfigs = streamsConfig.getMainConsumerConfigs("example-application", "client", 1);
        Map restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("client");
        Map producerConfigs = streamsConfig.getProducerConfigs("client");
        Map adminConfigs = streamsConfig.getAdminConfigs("client");
        Assert.assertEquals("host1", mainConsumerConfigs.get("custom.property.host"));
        Assert.assertEquals("host1", restoreConsumerConfigs.get("custom.property.host"));
        Assert.assertEquals("host2", producerConfigs.get("custom.property.host"));
        Assert.assertEquals("host3", adminConfigs.get("custom.property.host"));
    }

    @Test
    public void shouldSupportNonPrefixedAdminConfigs() {
        this.props.put("default.api.timeout.ms", 10);
        Assert.assertEquals(10, new StreamsConfig(this.props).getAdminConfigs("client").get("default.api.timeout.ms"));
    }

    @Test
    public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() {
        this.props.put("default.key.serde", MisconfiguredSerde.class);
        StreamsConfig streamsConfig = new StreamsConfig(this.props);
        streamsConfig.getClass();
        Assert.assertThrows(StreamsException.class, streamsConfig::defaultKeySerde);
    }

    @Test
    public void shouldThrowStreamsExceptionIfValueSerdeConfigFails() {
        this.props.put("default.value.serde", MisconfiguredSerde.class);
        StreamsConfig streamsConfig = new StreamsConfig(this.props);
        streamsConfig.getClass();
        Assert.assertThrows(StreamsException.class, streamsConfig::defaultValueSerde);
    }

    @Test
    public void shouldOverrideStreamsDefaultConsumerConfigs() {
        this.props.put(StreamsConfig.consumerPrefix("auto.offset.reset"), "latest");
        this.props.put(StreamsConfig.consumerPrefix("max.poll.records"), "10");
        Map mainConsumerConfigs = new StreamsConfig(this.props).getMainConsumerConfigs("example-application", "client", 1);
        Assert.assertEquals("latest", mainConsumerConfigs.get("auto.offset.reset"));
        Assert.assertEquals("10", mainConsumerConfigs.get("max.poll.records"));
    }

    @Test
    public void shouldOverrideStreamsDefaultProducerConfigs() {
        this.props.put(StreamsConfig.producerPrefix("linger.ms"), "10000");
        this.props.put(StreamsConfig.producerPrefix("transaction.timeout.ms"), "30000");
        Map producerConfigs = new StreamsConfig(this.props).getProducerConfigs("client");
        Assert.assertEquals("10000", producerConfigs.get("linger.ms"));
        Assert.assertEquals("30000", producerConfigs.get("transaction.timeout.ms"));
    }

    @Test
    public void shouldThrowIfTransactionTimeoutSmallerThanCommitIntervalForEOSAlpha() {
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            testTransactionTimeoutSmallerThanCommitInterval("exactly_once");
        });
    }

    @Test
    public void shouldThrowIfTransactionTimeoutSmallerThanCommitIntervalForEOSBeta() {
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            testTransactionTimeoutSmallerThanCommitInterval("exactly_once_beta");
        });
    }

    @Test
    public void shouldNotThrowIfTransactionTimeoutSmallerThanCommitIntervalForAtLeastOnce() {
        testTransactionTimeoutSmallerThanCommitInterval("at_least_once");
    }

    private void testTransactionTimeoutSmallerThanCommitInterval(String str) {
        this.props.put("processing.guarantee", str);
        this.props.put("commit.interval.ms", 10000L);
        this.props.put(StreamsConfig.producerPrefix("transaction.timeout.ms"), 3000);
        new StreamsConfig(this.props);
    }

    @Test
    public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() {
        this.props.put(StreamsConfig.consumerPrefix("max.poll.records"), "10");
        Assert.assertEquals("10", new StreamsConfig(this.props).getRestoreConsumerConfigs("client").get("max.poll.records"));
    }

    @Test
    public void shouldResetToDefaultIfConsumerAutoCommitIsOverridden() {
        this.props.put(StreamsConfig.consumerPrefix("enable.auto.commit"), "true");
        Assert.assertEquals("false", new StreamsConfig(this.props).getMainConsumerConfigs("a", "b", 1).get("enable.auto.commit"));
    }

    @Test
    public void shouldResetToDefaultIfRestoreConsumerAutoCommitIsOverridden() {
        this.props.put(StreamsConfig.consumerPrefix("enable.auto.commit"), "true");
        Assert.assertEquals("false", new StreamsConfig(this.props).getRestoreConsumerConfigs("client").get("enable.auto.commit"));
    }

    @Test
    public void testGetRestoreConsumerConfigsWithRestoreConsumerOverridenPrefix() {
        this.props.put(StreamsConfig.consumerPrefix("max.poll.records"), "5");
        this.props.put(StreamsConfig.restoreConsumerPrefix("max.poll.records"), "50");
        Assert.assertEquals("50", new StreamsConfig(this.props).getRestoreConsumerConfigs("client").get("max.poll.records"));
    }

    @Test
    public void testGetGlobalConsumerConfigs() {
        Map globalConsumerConfigs = this.streamsConfig.getGlobalConsumerConfigs("client");
        Assert.assertEquals(globalConsumerConfigs.get("client.id"), "client-global-consumer");
        Assert.assertNull(globalConsumerConfigs.get("group.id"));
    }

    @Test
    public void shouldSupportPrefixedGlobalConsumerConfigs() {
        this.props.put(StreamsConfig.consumerPrefix("metrics.num.samples"), 1);
        Assert.assertEquals(1, new StreamsConfig(this.props).getGlobalConsumerConfigs("client").get("metrics.num.samples"));
    }

    @Test
    public void shouldSupportPrefixedPropertiesThatAreNotPartOfGlobalConsumerConfig() {
        this.props.put(StreamsConfig.consumerPrefix("interceptor.statsd.host"), "host");
        Assert.assertEquals("host", new StreamsConfig(this.props).getGlobalConsumerConfigs("client").get("interceptor.statsd.host"));
    }

    @Test
    public void shouldBeSupportNonPrefixedGlobalConsumerConfigs() {
        this.props.put("metrics.num.samples", 1);
        Assert.assertEquals(1, new StreamsConfig(this.props).getGlobalConsumerConfigs("example-application").get("metrics.num.samples"));
    }

    @Test
    public void shouldResetToDefaultIfGlobalConsumerAutoCommitIsOverridden() {
        this.props.put(StreamsConfig.consumerPrefix("enable.auto.commit"), "true");
        Assert.assertEquals("false", new StreamsConfig(this.props).getGlobalConsumerConfigs("client").get("enable.auto.commit"));
    }

    @Test
    public void testGetGlobalConsumerConfigsWithGlobalConsumerOverridenPrefix() {
        this.props.put(StreamsConfig.consumerPrefix("max.poll.records"), "5");
        this.props.put(StreamsConfig.globalConsumerPrefix("max.poll.records"), "50");
        Assert.assertEquals("50", new StreamsConfig(this.props).getGlobalConsumerConfigs("client").get("max.poll.records"));
    }

    @Test
    public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() {
        MatcherAssert.assertThat(new StreamsConfig(this.props).getMainConsumerConfigs("example-application", "client", 1).get("internal.leave.group.on.close"), CoreMatchers.is(false));
    }

    @Test
    public void shouldNotSetInternalThrowOnFetchStableOffsetUnsupportedConfigToFalseInConsumerForEosDisabled() {
        MatcherAssert.assertThat(this.streamsConfig.getMainConsumerConfigs("example-application", "client", 1).get("internal.throw.on.fetch.stable.offset.unsupported"), CoreMatchers.is(CoreMatchers.nullValue()));
    }

    @Test
    public void shouldNotSetInternalThrowOnFetchStableOffsetUnsupportedConfigToFalseInConsumerForEosAlpha() {
        this.props.put("processing.guarantee", "exactly_once");
        MatcherAssert.assertThat(new StreamsConfig(this.props).getMainConsumerConfigs("example-application", "client", 1).get("internal.throw.on.fetch.stable.offset.unsupported"), CoreMatchers.is(CoreMatchers.nullValue()));
    }

    @Test
    public void shouldNotSetInternalThrowOnFetchStableOffsetUnsupportedConfigToFalseInConsumerForEosBeta() {
        this.props.put("processing.guarantee", "exactly_once_beta");
        MatcherAssert.assertThat(new StreamsConfig(this.props).getMainConsumerConfigs("example-application", "client", 1).get("internal.throw.on.fetch.stable.offset.unsupported"), CoreMatchers.is(true));
    }

    @Test
    public void shouldNotSetInternalThrowOnFetchStableOffsetUnsupportedConfigToFalseInConsumerForEosV2() {
        this.props.put("processing.guarantee", "exactly_once_v2");
        MatcherAssert.assertThat(new StreamsConfig(this.props).getMainConsumerConfigs("example-application", "client", 1).get("internal.throw.on.fetch.stable.offset.unsupported"), CoreMatchers.is(true));
    }

    @Test
    public void shouldNotSetInternalAutoDowngradeTxnCommitToTrueInProducerForEosDisabled() {
        MatcherAssert.assertThat(this.streamsConfig.getProducerConfigs("client").get("internal.auto.downgrade.txn.commit"), CoreMatchers.is(CoreMatchers.nullValue()));
    }

    @Test
    public void shouldSetInternalAutoDowngradeTxnCommitToTrueInProducerForEosAlpha() {
        this.props.put("processing.guarantee", "exactly_once");
        MatcherAssert.assertThat(new StreamsConfig(this.props).getProducerConfigs("client").get("internal.auto.downgrade.txn.commit"), CoreMatchers.is(true));
    }

    @Test
    public void shouldNotSetInternalAutoDowngradeTxnCommitToTrueInProducerForEosBeta() {
        this.props.put("processing.guarantee", "exactly_once_beta");
        MatcherAssert.assertThat(new StreamsConfig(this.props).getProducerConfigs("client").get("internal.auto.downgrade.txn.commit"), CoreMatchers.is(CoreMatchers.nullValue()));
    }

    @Test
    public void shouldNotSetInternalAutoDowngradeTxnCommitToTrueInProducerForEosV2() {
        this.props.put("processing.guarantee", "exactly_once_v2");
        MatcherAssert.assertThat(new StreamsConfig(this.props).getProducerConfigs("client").get("internal.auto.downgrade.txn.commit"), CoreMatchers.is(CoreMatchers.nullValue()));
    }

    @Test
    public void shouldAcceptAtLeastOnce() {
        this.props.put("processing.guarantee", "at_least_once");
        new StreamsConfig(this.props);
    }

    @Test
    public void shouldAcceptExactlyOnce() {
        this.props.put("processing.guarantee", "exactly_once");
        new StreamsConfig(this.props);
    }

    @Test
    public void shouldAcceptExactlyOnceBeta() {
        this.props.put("processing.guarantee", "exactly_once_beta");
        new StreamsConfig(this.props);
    }

    @Test
    public void shouldThrowExceptionIfNotAtLeastOnceOrExactlyOnce() {
        this.props.put("processing.guarantee", "bad_value");
        Assert.assertThrows(ConfigException.class, () -> {
            new StreamsConfig(this.props);
        });
    }

    @Test
    public void shouldAcceptBuiltInMetricsLatestVersion() {
        this.props.put("built.in.metrics.version", "latest");
        new StreamsConfig(this.props);
    }

    @Test
    public void shouldSetDefaultBuiltInMetricsVersionIfNoneIsSpecified() {
        MatcherAssert.assertThat(new StreamsConfig(this.props).getString("built.in.metrics.version"), CoreMatchers.is("latest"));
    }

    @Test
    public void shouldThrowIfBuiltInMetricsVersionInvalid() {
        this.props.put("built.in.metrics.version", "0.0.1");
        MatcherAssert.assertThat(((Exception) Assert.assertThrows(ConfigException.class, () -> {
            new StreamsConfig(this.props);
        })).getMessage(), CoreMatchers.containsString("Invalid value 0.0.1 for configuration built.in.metrics.version"));
    }

    @Test
    public void shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosAlphaEnabled() {
        this.props.put("processing.guarantee", "exactly_once");
        shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosEnabled();
    }

    @Test
    public void shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosBetaEnabled() {
        this.props.put("processing.guarantee", "exactly_once_beta");
        shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosEnabled();
    }

    @Test
    public void shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosV2Enabled() {
        this.props.put("processing.guarantee", "exactly_once_v2");
        shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosEnabled();
    }

    private void shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosEnabled() {
        this.props.put("isolation.level", "anyValue");
        MatcherAssert.assertThat(new StreamsConfig(this.props).getMainConsumerConfigs("example-application", "client", 1).get("isolation.level"), IsEqual.equalTo(IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
    }

    @Test
    public void shouldAllowSettingConsumerIsolationLevelIfEosDisabled() {
        this.props.put("isolation.level", IsolationLevel.READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
        MatcherAssert.assertThat(new StreamsConfig(this.props).getMainConsumerConfigs("example-application", "client", 1).get("isolation.level"), IsEqual.equalTo(IsolationLevel.READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT)));
    }

    @Test
    public void shouldResetToDefaultIfProducerEnableIdempotenceIsOverriddenIfEosAlphaEnabled() {
        this.props.put("processing.guarantee", "exactly_once");
        shouldResetToDefaultIfProducerEnableIdempotenceIsOverriddenIfEosEnabled();
    }

    @Test
    public void shouldResetToDefaultIfProducerEnableIdempotenceIsOverriddenIfEosBetaEnabled() {
        this.props.put("processing.guarantee", "exactly_once_beta");
        shouldResetToDefaultIfProducerEnableIdempotenceIsOverriddenIfEosEnabled();
    }

    @Test
    public void shouldResetToDefaultIfProducerEnableIdempotenceIsOverriddenIfEosV2Enabled() {
        this.props.put("processing.guarantee", "exactly_once_v2");
        shouldResetToDefaultIfProducerEnableIdempotenceIsOverriddenIfEosEnabled();
    }

    private void shouldResetToDefaultIfProducerEnableIdempotenceIsOverriddenIfEosEnabled() {
        this.props.put("enable.idempotence", "anyValue");
        Assert.assertTrue(((Boolean) new StreamsConfig(this.props).getProducerConfigs("client").get("enable.idempotence")).booleanValue());
    }

    @Test
    public void shouldAllowSettingProducerEnableIdempotenceIfEosDisabled() {
        this.props.put("enable.idempotence", false);
        MatcherAssert.assertThat(new StreamsConfig(this.props).getProducerConfigs("client").get("enable.idempotence"), IsEqual.equalTo(false));
    }

    @Test
    public void shouldSetDifferentDefaultsIfEosAlphaEnabled() {
        this.props.put("processing.guarantee", "exactly_once");
        shouldSetDifferentDefaultsIfEosEnabled();
    }

    @Test
    public void shouldSetDifferentDefaultsIfEosBetaEnabled() {
        this.props.put("processing.guarantee", "exactly_once_beta");
        shouldSetDifferentDefaultsIfEosEnabled();
    }

    @Test
    public void shouldSetDifferentDefaultsIfEosV2Enabled() {
        this.props.put("processing.guarantee", "exactly_once_v2");
        shouldSetDifferentDefaultsIfEosEnabled();
    }

    private void shouldSetDifferentDefaultsIfEosEnabled() {
        StreamsConfig streamsConfig = new StreamsConfig(this.props);
        Map mainConsumerConfigs = streamsConfig.getMainConsumerConfigs("example-application", "client", 1);
        Map producerConfigs = streamsConfig.getProducerConfigs("client");
        MatcherAssert.assertThat(mainConsumerConfigs.get("isolation.level"), IsEqual.equalTo(IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
        Assert.assertTrue(((Boolean) producerConfigs.get("enable.idempotence")).booleanValue());
        MatcherAssert.assertThat(producerConfigs.get("delivery.timeout.ms"), IsEqual.equalTo(Integer.MAX_VALUE));
        MatcherAssert.assertThat(producerConfigs.get("transaction.timeout.ms"), IsEqual.equalTo(10000));
        MatcherAssert.assertThat(streamsConfig.getLong("commit.interval.ms"), IsEqual.equalTo(100L));
    }

    @Test
    public void shouldOverrideUserConfigTransactionalIdIfEosAlphaEnabled() {
        this.props.put("processing.guarantee", "exactly_once");
        shouldOverrideUserConfigTransactionalIdIfEosEnable();
    }

    @Test
    public void shouldOverrideUserConfigTransactionalIdIfEosBetaEnabled() {
        this.props.put("processing.guarantee", "exactly_once_beta");
        shouldOverrideUserConfigTransactionalIdIfEosEnable();
    }

    @Test
    public void shouldOverrideUserConfigTransactionalIdIfEosV2Enabled() {
        this.props.put("processing.guarantee", "exactly_once_v2");
        shouldOverrideUserConfigTransactionalIdIfEosEnable();
    }

    private void shouldOverrideUserConfigTransactionalIdIfEosEnable() {
        this.props.put("transactional.id", "user-TxId");
        MatcherAssert.assertThat(new StreamsConfig(this.props).getProducerConfigs("client").get("transactional.id"), CoreMatchers.is(CoreMatchers.nullValue()));
    }

    @Test
    public void shouldNotOverrideUserConfigRetriesIfExactlyAlphaOnceEnabled() {
        this.props.put("processing.guarantee", "exactly_once");
        shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled();
    }

    @Test
    public void shouldNotOverrideUserConfigRetriesIfExactlyBetaOnceEnabled() {
        this.props.put("processing.guarantee", "exactly_once_beta");
        shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled();
    }

    @Test
    public void shouldNotOverrideUserConfigRetriesIfExactlyV2OnceEnabled() {
        this.props.put("processing.guarantee", "exactly_once_v2");
        shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled();
    }

    private void shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled() {
        this.props.put("retries", 42);
        MatcherAssert.assertThat(new StreamsConfig(this.props).getProducerConfigs("client").get("retries"), IsEqual.equalTo(42));
    }

    @Test
    public void shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceAlphaEnabled() {
        this.props.put("processing.guarantee", "exactly_once");
        shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceEnabled();
    }

    @Test
    public void shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceBetaEnabled() {
        this.props.put("processing.guarantee", "exactly_once_beta");
        shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceEnabled();
    }

    @Test
    public void shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceV2Enabled() {
        this.props.put("processing.guarantee", "exactly_once_v2");
        shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceEnabled();
    }

    private void shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceEnabled() {
        this.props.put("commit.interval.ms", 73L);
        MatcherAssert.assertThat(new StreamsConfig(this.props).getLong("commit.interval.ms"), IsEqual.equalTo(73L));
    }

    @Test
    public void shouldThrowExceptionIfCommitIntervalMsIsNegative() {
        this.props.put("commit.interval.ms", -1L);
        try {
            new StreamsConfig(this.props);
            Assert.fail("Should throw ConfigException when commitIntervalMs is set to a negative value");
        } catch (ConfigException e) {
            Assert.assertEquals("Invalid value -1 for configuration commit.interval.ms: Value must be at least 0", e.getMessage());
        }
    }

    @Test
    public void shouldUseNewConfigsWhenPresent() {
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.put("default.key.serde", Serdes.Long().getClass());
        streamsConfig.put("default.value.serde", Serdes.Long().getClass());
        streamsConfig.put("default.timestamp.extractor", MockTimestampExtractor.class);
        StreamsConfig streamsConfig2 = new StreamsConfig(streamsConfig);
        Assert.assertTrue(streamsConfig2.defaultKeySerde() instanceof Serdes.LongSerde);
        Assert.assertTrue(streamsConfig2.defaultValueSerde() instanceof Serdes.LongSerde);
        Assert.assertTrue(streamsConfig2.defaultTimestampExtractor() instanceof MockTimestampExtractor);
    }

    @Test
    public void shouldUseCorrectDefaultsWhenNoneSpecified() {
        StreamsConfig streamsConfig = new StreamsConfig(StreamsTestUtils.getStreamsConfig());
        Assert.assertTrue(streamsConfig.defaultTimestampExtractor() instanceof FailOnInvalidTimestamp);
        streamsConfig.getClass();
        Assert.assertThrows(ConfigException.class, streamsConfig::defaultKeySerde);
        streamsConfig.getClass();
        Assert.assertThrows(ConfigException.class, streamsConfig::defaultValueSerde);
    }

    @Test
    public void shouldSpecifyCorrectKeySerdeClassOnError() {
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.put("default.key.serde", MisconfiguredSerde.class);
        try {
            new StreamsConfig(streamsConfig).defaultKeySerde();
            Assert.fail("Test should throw a StreamsException");
        } catch (StreamsException e) {
            Assert.assertEquals("Failed to configure key serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage());
        }
    }

    @Test
    public void shouldSpecifyCorrectValueSerdeClassOnError() {
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.put("default.value.serde", MisconfiguredSerde.class);
        try {
            new StreamsConfig(streamsConfig).defaultValueSerde();
            Assert.fail("Test should throw a StreamsException");
        } catch (StreamsException e) {
            Assert.assertEquals("Failed to configure value serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage());
        }
    }

    @Test
    public void shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosAlphaEnabled() {
        this.props.put("processing.guarantee", "exactly_once");
        shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosEnabled();
    }

    @Test
    public void shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosBetaEnabled() {
        this.props.put("processing.guarantee", "exactly_once_beta");
        shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosEnabled();
    }

    @Test
    public void shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosV2Enabled() {
        this.props.put("processing.guarantee", "exactly_once_v2");
        shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosEnabled();
    }

    private void shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosEnabled() {
        this.props.put("max.in.flight.requests.per.connection", 7);
        try {
            new StreamsConfig(this.props).getProducerConfigs("client");
            Assert.fail("Should throw ConfigException when ESO is enabled and maxInFlight requests exceeds 5");
        } catch (ConfigException e) {
            Assert.assertEquals("Invalid value 7 for configuration max.in.flight.requests.per.connection: Can't exceed 5 when exactly-once processing is enabled", e.getMessage());
        }
    }

    @Test
    public void shouldAllowToSpecifyMaxInFlightRequestsPerConnectionAsStringIfEosAlphaEnabled() {
        this.props.put("processing.guarantee", "exactly_once");
        shouldAllowToSpecifyMaxInFlightRequestsPerConnectionAsStringIfEosEnabled();
    }

    @Test
    public void shouldAllowToSpecifyMaxInFlightRequestsPerConnectionAsStringIfEosBetaEnabled() {
        this.props.put("processing.guarantee", "exactly_once_beta");
        shouldAllowToSpecifyMaxInFlightRequestsPerConnectionAsStringIfEosEnabled();
    }

    @Test
    public void shouldAllowToSpecifyMaxInFlightRequestsPerConnectionAsStringIfEosV2Enabled() {
        this.props.put("processing.guarantee", "exactly_once_v2");
        shouldAllowToSpecifyMaxInFlightRequestsPerConnectionAsStringIfEosEnabled();
    }

    private void shouldAllowToSpecifyMaxInFlightRequestsPerConnectionAsStringIfEosEnabled() {
        this.props.put("max.in.flight.requests.per.connection", "3");
        new StreamsConfig(this.props).getProducerConfigs("client");
    }

    @Test
    public void shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInvalidStringIfEosAlphaEnabled() {
        this.props.put("processing.guarantee", "exactly_once");
        shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInvalidStringIfEosEnabled();
    }

    @Test
    public void shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInvalidStringIfEosBetaEnabled() {
        this.props.put("processing.guarantee", "exactly_once_beta");
        shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInvalidStringIfEosEnabled();
    }

    @Test
    public void shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInvalidStringIfEosV2Enabled() {
        this.props.put("processing.guarantee", "exactly_once_v2");
        shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInvalidStringIfEosEnabled();
    }

    private void shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInvalidStringIfEosEnabled() {
        this.props.put("max.in.flight.requests.per.connection", "not-a-number");
        try {
            new StreamsConfig(this.props).getProducerConfigs("client");
            Assert.fail("Should throw ConfigException when EOS is enabled and maxInFlight cannot be paresed into an integer");
        } catch (ConfigException e) {
            Assert.assertEquals("Invalid value not-a-number for configuration max.in.flight.requests.per.connection: String value could not be parsed as 32-bit integer", e.getMessage());
        }
    }

    @Test
    public void shouldStateDirStartsWithJavaIOTmpDir() {
        Assert.assertTrue(this.streamsConfig.getString("state.dir").startsWith(System.getProperty("java.io.tmpdir") + File.separator));
    }

    @Test
    public void shouldSpecifyNoOptimizationWhenNotExplicitlyAddedToConfigs() {
        Assert.assertEquals("Optimization should be \"none\"", "none", this.streamsConfig.getString("topology.optimization"));
    }

    @Test
    public void shouldSpecifyOptimizationWhenExplicitlyAddedToConfigs() {
        this.props.put("topology.optimization", "all");
        Assert.assertEquals("Optimization should be \"all\"", "all", new StreamsConfig(this.props).getString("topology.optimization"));
    }

    @Test
    public void shouldThrowConfigExceptionWhenOptimizationConfigNotValueInRange() {
        this.props.put("topology.optimization", "maybe");
        Assert.assertThrows(ConfigException.class, () -> {
            new StreamsConfig(this.props);
        });
    }

    @Test
    public void shouldSpecifyRocksdbWhenNotExplicitlyAddedToConfigs() {
        Assert.assertEquals("default.dsl.store should be \"rocksDB\"", "rocksDB", this.streamsConfig.getString("default.dsl.store"));
    }

    @Test
    public void shouldSpecifyInMemoryWhenExplicitlyAddedToConfigs() {
        this.props.put("default.dsl.store", "in_memory");
        Assert.assertEquals("default.dsl.store should be \"in_memory\"", "in_memory", new StreamsConfig(this.props).getString("default.dsl.store"));
    }

    @Test
    public void shouldThrowConfigExceptionWhenStoreTypeConfigNotValueInRange() {
        this.props.put("default.dsl.store", "bad_config");
        Assert.assertThrows(ConfigException.class, () -> {
            new StreamsConfig(this.props);
        });
    }

    @Test
    public void shouldLogWarningWhenEosAlphaIsUsed() {
        this.props.put("processing.guarantee", "exactly_once");
        LogCaptureAppender.setClassLoggerToDebug(StreamsConfig.class);
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(StreamsConfig.class);
        Throwable th = null;
        try {
            new StreamsConfig(this.props);
            MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("Configuration parameter `exactly_once` is deprecated and will be removed in the 4.0.0 release. Please use `exactly_once_v2` instead. Note that this requires broker version 2.5+ so you should prepare to upgrade your brokers if necessary."));
            if (createAndRegister != null) {
                if (0 == 0) {
                    createAndRegister.close();
                    return;
                }
                try {
                    createAndRegister.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldLogWarningWhenEosBetaIsUsed() {
        this.props.put("processing.guarantee", "exactly_once_beta");
        LogCaptureAppender.setClassLoggerToDebug(StreamsConfig.class);
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(StreamsConfig.class);
        Throwable th = null;
        try {
            new StreamsConfig(this.props);
            MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("Configuration parameter `exactly_once_beta` is deprecated and will be removed in the 4.0.0 release. Please use `exactly_once_v2` instead."));
            if (createAndRegister != null) {
                if (0 == 0) {
                    createAndRegister.close();
                    return;
                }
                try {
                    createAndRegister.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldLogWarningWhenRetriesIsUsed() {
        this.props.put("retries", 0);
        LogCaptureAppender.setClassLoggerToDebug(StreamsConfig.class);
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(StreamsConfig.class);
        Throwable th = null;
        try {
            new StreamsConfig(this.props);
            MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("Configuration parameter `retries` is deprecated and will be removed in the 4.0.0 release."));
            if (createAndRegister != null) {
                if (0 == 0) {
                    createAndRegister.close();
                    return;
                }
                try {
                    createAndRegister.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldSetDefaultAcceptableRecoveryLag() {
        MatcherAssert.assertThat(new StreamsConfig(this.props).getLong("acceptable.recovery.lag"), CoreMatchers.is(10000L));
    }

    @Test
    public void shouldThrowConfigExceptionIfAcceptableRecoveryLagIsOutsideBounds() {
        this.props.put("acceptable.recovery.lag", -1L);
        Assert.assertThrows(ConfigException.class, () -> {
            new StreamsConfig(this.props);
        });
    }

    @Test
    public void shouldSetDefaultNumStandbyReplicas() {
        MatcherAssert.assertThat(new StreamsConfig(this.props).getInt("num.standby.replicas"), CoreMatchers.is(0));
    }

    @Test
    public void shouldThrowConfigExceptionIfNumStandbyReplicasIsOutsideBounds() {
        this.props.put("num.standby.replicas", -1L);
        Assert.assertThrows(ConfigException.class, () -> {
            new StreamsConfig(this.props);
        });
    }

    @Test
    public void shouldSetDefaultMaxWarmupReplicas() {
        MatcherAssert.assertThat(new StreamsConfig(this.props).getInt("max.warmup.replicas"), CoreMatchers.is(2));
    }

    @Test
    public void shouldThrowConfigExceptionIfMaxWarmupReplicasIsOutsideBounds() {
        this.props.put("max.warmup.replicas", 0L);
        Assert.assertThrows(ConfigException.class, () -> {
            new StreamsConfig(this.props);
        });
    }

    @Test
    public void shouldSetDefaultProbingRebalanceInterval() {
        MatcherAssert.assertThat(new StreamsConfig(this.props).getLong("probing.rebalance.interval.ms"), CoreMatchers.is(600000L));
    }

    @Test
    public void shouldThrowConfigExceptionIfProbingRebalanceIntervalIsOutsideBounds() {
        this.props.put("probing.rebalance.interval.ms", 59999L);
        Assert.assertThrows(ConfigException.class, () -> {
            new StreamsConfig(this.props);
        });
    }

    @Test
    public void shouldDefaultToEmptyListIfRackAwareAssignmentTagsIsNotSet() {
        Assert.assertTrue(new StreamsConfig(this.props).getList("rack.aware.assignment.tags").isEmpty());
    }

    @Test
    public void shouldThrowExceptionWhenClientTagsExceedTheLimit() {
        for (int i = 0; i < 6; i++) {
            this.props.put(StreamsConfig.clientTagPrefix("k" + i), "v" + i);
        }
        Assert.assertEquals(String.format("At most %s client tags can be specified using %s prefix.", 5, "client.tag."), Assert.assertThrows(ConfigException.class, () -> {
            new StreamsConfig(this.props);
        }).getMessage());
    }

    @Test
    public void shouldThrowExceptionWhenRackAwareAssignmentTagsExceedsMaxListSize() {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 6; i++) {
            String str = "k" + i;
            arrayList.add(str);
            this.props.put(StreamsConfig.clientTagPrefix(str), "v" + i);
        }
        this.props.put("rack.aware.assignment.tags", String.join(",", arrayList));
        Assert.assertEquals(String.format("Invalid value %s for configuration %s: exceeds maximum list size of [%s].", arrayList, "rack.aware.assignment.tags", 5), Assert.assertThrows(ConfigException.class, () -> {
            new StreamsConfig(this.props);
        }).getMessage());
    }

    @Test
    public void shouldSetRackAwareAssignmentTags() {
        this.props.put(StreamsConfig.clientTagPrefix("cluster"), "cluster-1");
        this.props.put(StreamsConfig.clientTagPrefix("zone"), "eu-central-1a");
        this.props.put("rack.aware.assignment.tags", "cluster,zone");
        Assert.assertEquals(new HashSet(new StreamsConfig(this.props).getList("rack.aware.assignment.tags")), Utils.mkSet(new String[]{"cluster", "zone"}));
    }

    @Test
    public void shouldGetEmptyMapIfClientTagsAreNotSet() {
        Assert.assertTrue(new StreamsConfig(this.props).getClientTags().isEmpty());
    }

    @Test
    public void shouldGetClientTagsMapWhenSet() {
        this.props.put(StreamsConfig.clientTagPrefix("zone"), "eu-central-1a");
        this.props.put(StreamsConfig.clientTagPrefix("cluster"), "cluster-1");
        Map clientTags = new StreamsConfig(this.props).getClientTags();
        Assert.assertEquals(clientTags.size(), 2L);
        Assert.assertEquals(clientTags.get("zone"), "eu-central-1a");
        Assert.assertEquals(clientTags.get("cluster"), "cluster-1");
    }

    @Test
    public void shouldThrowExceptionWhenClientTagRackAwarenessIsConfiguredWithUnknownTags() {
        this.props.put("rack.aware.assignment.tags", "cluster");
        Assert.assertThrows(ConfigException.class, () -> {
            new StreamsConfig(this.props);
        });
    }

    @Test
    public void shouldThrowExceptionWhenClientTagKeyExceedMaxLimit() {
        String join = String.join("", Collections.nCopies(21, "k"));
        this.props.put(StreamsConfig.clientTagPrefix(join), "eu-central-1a");
        Assert.assertEquals(String.format("Invalid value %s for configuration %s: Tag key exceeds maximum length of %s.", join, "client.tag.", 20), Assert.assertThrows(ConfigException.class, () -> {
            new StreamsConfig(this.props);
        }).getMessage());
    }

    @Test
    public void shouldThrowExceptionWhenClientTagValueExceedMaxLimit() {
        String join = String.join("", Collections.nCopies(31, "v"));
        this.props.put(StreamsConfig.clientTagPrefix("x"), join);
        Assert.assertEquals(String.format("Invalid value %s for configuration %s: Tag value exceeds maximum length of %s.", join, "client.tag.", 30), Assert.assertThrows(ConfigException.class, () -> {
            new StreamsConfig(this.props);
        }).getMessage());
    }

    @Test
    public void testInvalidSecurityProtocol() {
        this.props.put("security.protocol", "abc");
        Assert.assertTrue(Assert.assertThrows(ConfigException.class, () -> {
            new StreamsConfig(this.props);
        }).getMessage().contains("security.protocol"));
    }
}
