package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.RangeQuery;
import org.apache.kafka.streams.query.StateQueryRequest;
import org.apache.kafka.streams.query.WindowKeyQuery;
import org.apache.kafka.streams.query.WindowRangeQuery;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.StoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag("integration")
@Timeout(600)
/* loaded from: input_file:org/apache/kafka/streams/integration/PositionRestartIntegrationTest.class */
public class PositionRestartIntegrationTest {
    private static final String INPUT_TOPIC_NAME = "input-topic";
    private static final String STORE_NAME = "kv-store";
    private KafkaStreams kafkaStreams;
    private static final Logger LOG = LoggerFactory.getLogger(PositionRestartIntegrationTest.class);
    private static final long SEED = new Random().nextLong();
    public static final Duration WINDOW_SIZE = Duration.ofMinutes(5);
    private static int port = 0;
    private static final Position INPUT_POSITION = Position.emptyPosition();
    private static final long RECORD_TIME = System.currentTimeMillis();
    private static final long WINDOW_START = (RECORD_TIME / WINDOW_SIZE.toMillis()) * WINDOW_SIZE.toMillis();
    private static final int NUM_BROKERS = 1;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);

    /* loaded from: input_file:org/apache/kafka/streams/integration/PositionRestartIntegrationTest$StoresToTest.class */
    public enum StoresToTest {
        IN_MEMORY_KV { // from class: org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest.1
            @Override // org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest
            public StoreSupplier<?> supplier() {
                return Stores.inMemoryKeyValueStore(PositionRestartIntegrationTest.STORE_NAME);
            }

            @Override // org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest
            public boolean keyValue() {
                return true;
            }
        },
        IN_MEMORY_LRU { // from class: org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest.2
            @Override // org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest
            public StoreSupplier<?> supplier() {
                return Stores.lruMap(PositionRestartIntegrationTest.STORE_NAME, 100);
            }

            @Override // org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest
            public boolean keyValue() {
                return true;
            }
        },
        ROCKS_KV { // from class: org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest.3
            @Override // org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest
            public StoreSupplier<?> supplier() {
                return Stores.persistentKeyValueStore(PositionRestartIntegrationTest.STORE_NAME);
            }

            @Override // org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest
            public boolean timestamped() {
                return false;
            }

            @Override // org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest
            public boolean keyValue() {
                return true;
            }
        },
        TIME_ROCKS_KV { // from class: org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest.4
            @Override // org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest
            public StoreSupplier<?> supplier() {
                return Stores.persistentTimestampedKeyValueStore(PositionRestartIntegrationTest.STORE_NAME);
            }

            @Override // org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest
            public boolean keyValue() {
                return true;
            }
        },
        IN_MEMORY_WINDOW { // from class: org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest.5
            @Override // org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest
            public StoreSupplier<?> supplier() {
                return Stores.inMemoryWindowStore(PositionRestartIntegrationTest.STORE_NAME, Duration.ofDays(1L), PositionRestartIntegrationTest.WINDOW_SIZE, false);
            }

            @Override // org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest
            public boolean isWindowed() {
                return true;
            }
        },
        ROCKS_WINDOW { // from class: org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest.6
            @Override // org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest
            public StoreSupplier<?> supplier() {
                return Stores.persistentWindowStore(PositionRestartIntegrationTest.STORE_NAME, Duration.ofDays(1L), PositionRestartIntegrationTest.WINDOW_SIZE, false);
            }

            @Override // org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest
            public boolean isWindowed() {
                return true;
            }

            @Override // org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest
            public boolean timestamped() {
                return false;
            }
        },
        TIME_ROCKS_WINDOW { // from class: org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest.7
            @Override // org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest
            public StoreSupplier<?> supplier() {
                return Stores.persistentTimestampedWindowStore(PositionRestartIntegrationTest.STORE_NAME, Duration.ofDays(1L), PositionRestartIntegrationTest.WINDOW_SIZE, false);
            }

            @Override // org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest
            public boolean isWindowed() {
                return true;
            }
        },
        IN_MEMORY_SESSION { // from class: org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest.8
            @Override // org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest
            public StoreSupplier<?> supplier() {
                return Stores.inMemorySessionStore(PositionRestartIntegrationTest.STORE_NAME, Duration.ofDays(1L));
            }

            @Override // org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest
            public boolean isSession() {
                return true;
            }
        },
        ROCKS_SESSION { // from class: org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest.9
            @Override // org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest
            public StoreSupplier<?> supplier() {
                return Stores.persistentSessionStore(PositionRestartIntegrationTest.STORE_NAME, Duration.ofDays(1L));
            }

            @Override // org.apache.kafka.streams.integration.PositionRestartIntegrationTest.StoresToTest
            public boolean isSession() {
                return true;
            }
        };

        public abstract StoreSupplier<?> supplier();

        public boolean timestamped() {
            return true;
        }

        public boolean keyValue() {
            return false;
        }

        public boolean isWindowed() {
            return false;
        }

        public boolean isSession() {
            return false;
        }
    }

    public static Stream<Arguments> data() {
        LOG.info("Generating test cases according to random seed: {}", Long.valueOf(SEED));
        ArrayList arrayList = new ArrayList();
        Iterator it = Arrays.asList(true, false).iterator();
        while (it.hasNext()) {
            boolean booleanValue = ((Boolean) it.next()).booleanValue();
            Iterator it2 = Arrays.asList(true, false).iterator();
            while (it2.hasNext()) {
                boolean booleanValue2 = ((Boolean) it2.next()).booleanValue();
                StoresToTest[] values = StoresToTest.values();
                int length = values.length;
                for (int i = 0; i < length; i += NUM_BROKERS) {
                    StoresToTest storesToTest = values[i];
                    if (booleanValue2 || storesToTest.supplier().get().persistent()) {
                        Iterator it3 = Arrays.asList("DSL", "PAPI").iterator();
                        while (it3.hasNext()) {
                            arrayList.add(Arguments.of(new Object[]{Boolean.valueOf(booleanValue), Boolean.valueOf(booleanValue2), storesToTest, (String) it3.next()}));
                        }
                    }
                }
            }
        }
        return arrayList.stream();
    }

    @BeforeAll
    public static void before() throws InterruptedException, IOException, ExecutionException, TimeoutException {
        CLUSTER.start();
        CLUSTER.deleteAllTopicsAndWait(IntegrationTestUtils.DEFAULT_TIMEOUT);
        CLUSTER.createTopic(INPUT_TOPIC_NAME, 2, NUM_BROKERS);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("key.serializer", IntegerSerializer.class);
        properties.put("value.serializer", IntegerSerializer.class);
        LinkedList linkedList = new LinkedList();
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Throwable th = null;
        for (int i = 0; i < 4; i += NUM_BROKERS) {
            try {
                try {
                    linkedList.add(kafkaProducer.send(new ProducerRecord(INPUT_TOPIC_NAME, Integer.valueOf(i % 2), Long.valueOf(RECORD_TIME), Integer.valueOf(i), Integer.valueOf(i), (Iterable) null)));
                    Time.SYSTEM.sleep(1L);
                } finally {
                }
            } catch (Throwable th2) {
                if (kafkaProducer != null) {
                    if (th != null) {
                        try {
                            kafkaProducer.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        kafkaProducer.close();
                    }
                }
                throw th2;
            }
        }
        kafkaProducer.flush();
        Iterator it = linkedList.iterator();
        while (it.hasNext()) {
            RecordMetadata recordMetadata = (RecordMetadata) ((Future) it.next()).get(1L, TimeUnit.MINUTES);
            MatcherAssert.assertThat(Boolean.valueOf(recordMetadata.hasOffset()), Matchers.is(true));
            INPUT_POSITION.withComponent(recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
        }
        if (kafkaProducer != null) {
            if (0 != 0) {
                try {
                    kafkaProducer.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                kafkaProducer.close();
            }
        }
        MatcherAssert.assertThat(INPUT_POSITION, Matchers.equalTo(Position.emptyPosition().withComponent(INPUT_TOPIC_NAME, 0, 1L).withComponent(INPUT_TOPIC_NAME, NUM_BROKERS, 1L)));
    }

    public static StreamsBuilder getStreamBuilder(boolean z, boolean z2, StoresToTest storesToTest, String str) {
        KeyValueBytesStoreSupplier supplier = storesToTest.supplier();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        if (Objects.equals(str, "DSL") && (supplier instanceof KeyValueBytesStoreSupplier)) {
            setUpKeyValueDSLTopology(supplier, streamsBuilder, z, z2);
        } else if (Objects.equals(str, "PAPI") && (supplier instanceof KeyValueBytesStoreSupplier)) {
            setUpKeyValuePAPITopology(supplier, streamsBuilder, z, z2, storesToTest);
        } else if (Objects.equals(str, "DSL") && (supplier instanceof WindowBytesStoreSupplier)) {
            setUpWindowDSLTopology((WindowBytesStoreSupplier) supplier, streamsBuilder, z, z2);
        } else if (Objects.equals(str, "PAPI") && (supplier instanceof WindowBytesStoreSupplier)) {
            setUpWindowPAPITopology((WindowBytesStoreSupplier) supplier, streamsBuilder, z, z2, storesToTest);
        } else if (Objects.equals(str, "DSL") && (supplier instanceof SessionBytesStoreSupplier)) {
            setUpSessionDSLTopology((SessionBytesStoreSupplier) supplier, streamsBuilder, z, z2);
        } else {
            if (!Objects.equals(str, "PAPI") || !(supplier instanceof SessionBytesStoreSupplier)) {
                throw new AssertionError("Store supplier is an unrecognized type.");
            }
            setUpSessionPAPITopology((SessionBytesStoreSupplier) supplier, streamsBuilder, z, z2);
        }
        return streamsBuilder;
    }

    @AfterEach
    public void afterTest() {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
            this.kafkaStreams.cleanUp();
        }
    }

    @AfterAll
    public static void after() {
        CLUSTER.stop();
    }

    @MethodSource({"data"})
    @ParameterizedTest
    public void verifyStore(boolean z, boolean z2, StoresToTest storesToTest, String str) {
        RangeQuery withKey;
        Properties streamsConfiguration = streamsConfiguration(z, z2, storesToTest.name(), str);
        StreamsBuilder streamBuilder = getStreamBuilder(z, z2, storesToTest, str);
        this.kafkaStreams = IntegrationTestUtils.getStartedStreams(streamsConfiguration, streamBuilder, true);
        if (storesToTest.keyValue()) {
            withKey = RangeQuery.withNoBounds();
        } else if (storesToTest.isWindowed()) {
            withKey = WindowKeyQuery.withKeyAndWindowStartRange(2, Instant.ofEpochMilli(WINDOW_START), Instant.ofEpochMilli(WINDOW_START));
        } else {
            if (!storesToTest.isSession()) {
                throw new AssertionError("Unhandled store type: " + storesToTest);
            }
            withKey = WindowRangeQuery.withKey(2);
        }
        shouldReachExpectedPosition(withKey);
        this.kafkaStreams.close();
        this.kafkaStreams = IntegrationTestUtils.getStartedStreams(streamsConfiguration, streamBuilder, false);
        shouldReachExpectedPosition(withKey);
    }

    private void shouldReachExpectedPosition(Query<?> query) {
        MatcherAssert.assertThat(IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPartitions(Utils.mkSet(new Integer[]{0, Integer.valueOf(NUM_BROKERS)})).withPositionBound(PositionBound.at(INPUT_POSITION))).getPosition(), Matchers.is(INPUT_POSITION));
    }

    private static void setUpSessionDSLTopology(SessionBytesStoreSupplier sessionBytesStoreSupplier, StreamsBuilder streamsBuilder, boolean z, boolean z2) {
        Materialized as = Materialized.as(sessionBytesStoreSupplier);
        if (z) {
            as.withCachingEnabled();
        } else {
            as.withCachingDisabled();
        }
        if (z2) {
            as.withLoggingEnabled(Collections.emptyMap());
        } else {
            as.withLoggingDisabled();
        }
        streamsBuilder.stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer())).groupByKey().windowedBy(SessionWindows.ofInactivityGapWithNoGrace(WINDOW_SIZE)).aggregate(() -> {
            return 0;
        }, (num, num2, num3) -> {
            return Integer.valueOf(num3.intValue() + num2.intValue());
        }, (num4, num5, num6) -> {
            return Integer.valueOf(num5.intValue() + num6.intValue());
        }, as);
    }

    private static void setUpWindowDSLTopology(WindowBytesStoreSupplier windowBytesStoreSupplier, StreamsBuilder streamsBuilder, boolean z, boolean z2) {
        Materialized as = Materialized.as(windowBytesStoreSupplier);
        if (z) {
            as.withCachingEnabled();
        } else {
            as.withCachingDisabled();
        }
        if (z2) {
            as.withLoggingEnabled(Collections.emptyMap());
        } else {
            as.withLoggingDisabled();
        }
        streamsBuilder.stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer())).groupByKey().windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE)).aggregate(() -> {
            return 0;
        }, (num, num2, num3) -> {
            return Integer.valueOf(num3.intValue() + num2.intValue());
        }, as);
    }

    private static void setUpKeyValueDSLTopology(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier, StreamsBuilder streamsBuilder, boolean z, boolean z2) {
        Materialized as = Materialized.as(keyValueBytesStoreSupplier);
        if (z) {
            as.withCachingEnabled();
        } else {
            as.withCachingDisabled();
        }
        if (z2) {
            as.withLoggingEnabled(Collections.emptyMap());
        } else {
            as.withLoggingDisabled();
        }
        streamsBuilder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), as);
    }

    private static void setUpKeyValuePAPITopology(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier, StreamsBuilder streamsBuilder, boolean z, boolean z2, StoresToTest storesToTest) {
        StoreBuilder keyValueStoreBuilder;
        ProcessorSupplier processorSupplier;
        if (storesToTest.timestamped()) {
            keyValueStoreBuilder = Stores.timestampedKeyValueStoreBuilder(keyValueBytesStoreSupplier, Serdes.Integer(), Serdes.Integer());
            processorSupplier = () -> {
                return new ContextualProcessor<Integer, Integer, Void, Void>() { // from class: org.apache.kafka.streams.integration.PositionRestartIntegrationTest.1
                    public void process(Record<Integer, Integer> record) {
                        context().getStateStore(keyValueStoreBuilder.name()).put(record.key(), ValueAndTimestamp.make(record.value(), record.timestamp()));
                    }
                };
            };
        } else {
            keyValueStoreBuilder = Stores.keyValueStoreBuilder(keyValueBytesStoreSupplier, Serdes.Integer(), Serdes.Integer());
            processorSupplier = () -> {
                return new ContextualProcessor<Integer, Integer, Void, Void>() { // from class: org.apache.kafka.streams.integration.PositionRestartIntegrationTest.2
                    public void process(Record<Integer, Integer> record) {
                        context().getStateStore(keyValueStoreBuilder.name()).put(record.key(), record.value());
                    }
                };
            };
        }
        if (z) {
            keyValueStoreBuilder.withCachingEnabled();
        } else {
            keyValueStoreBuilder.withCachingDisabled();
        }
        if (z2) {
            keyValueStoreBuilder.withLoggingEnabled(Collections.emptyMap());
        } else {
            keyValueStoreBuilder.withLoggingDisabled();
        }
        streamsBuilder.addStateStore(keyValueStoreBuilder);
        streamsBuilder.stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer())).process(processorSupplier, new String[]{keyValueStoreBuilder.name()});
    }

    private static void setUpWindowPAPITopology(WindowBytesStoreSupplier windowBytesStoreSupplier, StreamsBuilder streamsBuilder, boolean z, boolean z2, StoresToTest storesToTest) {
        StoreBuilder windowStoreBuilder;
        ProcessorSupplier processorSupplier;
        if (storesToTest.timestamped()) {
            windowStoreBuilder = Stores.timestampedWindowStoreBuilder(windowBytesStoreSupplier, Serdes.Integer(), Serdes.Integer());
            processorSupplier = () -> {
                return new ContextualProcessor<Integer, Integer, Void, Void>() { // from class: org.apache.kafka.streams.integration.PositionRestartIntegrationTest.3
                    public void process(Record<Integer, Integer> record) {
                        context().getStateStore(windowStoreBuilder.name()).put(record.key(), ValueAndTimestamp.make(record.value(), record.timestamp()), PositionRestartIntegrationTest.WINDOW_START);
                    }
                };
            };
        } else {
            windowStoreBuilder = Stores.windowStoreBuilder(windowBytesStoreSupplier, Serdes.Integer(), Serdes.Integer());
            processorSupplier = () -> {
                return new ContextualProcessor<Integer, Integer, Void, Void>() { // from class: org.apache.kafka.streams.integration.PositionRestartIntegrationTest.4
                    public void process(Record<Integer, Integer> record) {
                        context().getStateStore(windowStoreBuilder.name()).put(record.key(), record.value(), PositionRestartIntegrationTest.WINDOW_START);
                    }
                };
            };
        }
        if (z) {
            windowStoreBuilder.withCachingEnabled();
        } else {
            windowStoreBuilder.withCachingDisabled();
        }
        if (z2) {
            windowStoreBuilder.withLoggingEnabled(Collections.emptyMap());
        } else {
            windowStoreBuilder.withLoggingDisabled();
        }
        streamsBuilder.addStateStore(windowStoreBuilder);
        streamsBuilder.stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer())).process(processorSupplier, new String[]{windowStoreBuilder.name()});
    }

    private static void setUpSessionPAPITopology(SessionBytesStoreSupplier sessionBytesStoreSupplier, StreamsBuilder streamsBuilder, boolean z, boolean z2) {
        StoreBuilder sessionStoreBuilder = Stores.sessionStoreBuilder(sessionBytesStoreSupplier, Serdes.Integer(), Serdes.Integer());
        ProcessorSupplier processorSupplier = () -> {
            return new ContextualProcessor<Integer, Integer, Void, Void>() { // from class: org.apache.kafka.streams.integration.PositionRestartIntegrationTest.5
                public void process(Record<Integer, Integer> record) {
                    context().getStateStore(sessionStoreBuilder.name()).put(new Windowed(record.key(), new SessionWindow(PositionRestartIntegrationTest.WINDOW_START, PositionRestartIntegrationTest.WINDOW_START)), record.value());
                }
            };
        };
        if (z) {
            sessionStoreBuilder.withCachingEnabled();
        } else {
            sessionStoreBuilder.withCachingDisabled();
        }
        if (z2) {
            sessionStoreBuilder.withLoggingEnabled(Collections.emptyMap());
        } else {
            sessionStoreBuilder.withLoggingDisabled();
        }
        streamsBuilder.addStateStore(sessionStoreBuilder);
        streamsBuilder.stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer())).process(processorSupplier, new String[]{sessionStoreBuilder.name()});
    }

    private static Properties streamsConfiguration(boolean z, boolean z2, String str, String str2) {
        String str3 = PositionRestartIntegrationTest.class.getName() + "-" + z + "-" + z2 + "-" + str + "-" + str2;
        Properties properties = new Properties();
        properties.put("topology.optimization", "all");
        properties.put("application.id", "app-" + str3);
        StringBuilder append = new StringBuilder().append("localhost:");
        int i = port + NUM_BROKERS;
        port = i;
        properties.put("application.server", append.append(i).toString());
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("state.dir", TestUtils.tempDirectory().getPath());
        properties.put("default.key.serde", Serdes.Integer().getClass());
        properties.put("default.value.serde", Serdes.Integer().getClass());
        properties.put("num.standby.replicas", Integer.valueOf(NUM_BROKERS));
        properties.put("max.poll.records", 100);
        properties.put("heartbeat.interval.ms", 200);
        properties.put("session.timeout.ms", 1000);
        properties.put("commit.interval.ms", 100L);
        properties.put("num.stream.threads", Integer.valueOf(NUM_BROKERS));
        properties.put("__iq.consistency.offset.vector.enabled__", true);
        return properties;
    }
}
