package dev.responsive.kafka.bootstrap;

import com.datastax.oss.driver.api.core.ConsistencyLevel;
import dev.responsive.kafka.api.ResponsiveKafkaStreams;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.config.ResponsiveMode;
import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams;
import dev.responsive.kafka.api.stores.ResponsiveStores;
import java.time.Instant;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/responsive/kafka/bootstrap/ChangelogMigrationTool.class */
public class ChangelogMigrationTool {
    private static final Logger LOG = LoggerFactory.getLogger(ChangelogMigrationTool.class);
    private final Consumer<Record<byte[], byte[]>> processor;
    private final String changelogTopic;
    private final Properties properties;
    private final ResponsiveKeyValueParams params;

    public ChangelogMigrationTool(Properties properties, ResponsiveKeyValueParams responsiveKeyValueParams, String str) {
        this(properties, responsiveKeyValueParams, str, record -> {
        });
    }

    ChangelogMigrationTool(Properties properties, ResponsiveKeyValueParams responsiveKeyValueParams, String str, Consumer<Record<byte[], byte[]>> consumer) {
        this.processor = consumer;
        properties.put("topology.optimization", "reuse.ktable.source.topics");
        properties.put("auto.offset.reset", "earliest");
        properties.put(ResponsiveConfig.RESPONSIVE_MODE, ResponsiveMode.MIGRATE.name());
        properties.put(ResponsiveConfig.WRITE_CONSISTENCY_LEVEL_CONFIG, ConsistencyLevel.ALL.name());
        properties.putIfAbsent(ResponsiveConfig.STORE_FLUSH_RECORDS_TRIGGER_CONFIG, 10000);
        this.properties = properties;
        this.params = responsiveKeyValueParams;
        this.changelogTopic = str;
    }

    public ResponsiveKafkaStreams buildStreams() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table(this.changelogTopic, Consumed.with(Serdes.ByteArray(), Serdes.ByteArray()), Materialized.as(ResponsiveStores.keyValueStore(this.params)).withValueSerde(Serdes.ByteArray()).withKeySerde(Serdes.ByteArray())).toStream().process(() -> {
            return new Processor<byte[], byte[], byte[], byte[]>() { // from class: dev.responsive.kafka.bootstrap.ChangelogMigrationTool.1
                private final AtomicLong lastLogged = new AtomicLong();
                private ProcessorContext context;

                public void init(ProcessorContext<byte[], byte[]> processorContext) {
                    this.context = processorContext;
                }

                public void process(Record<byte[], byte[]> record) {
                    ChangelogMigrationTool.this.processor.accept(record);
                    if (this.context.currentSystemTimeMs() - this.lastLogged.get() > TimeUnit.SECONDS.toMillis(30L)) {
                        this.lastLogged.set(this.context.currentSystemTimeMs());
                        ChangelogMigrationTool.LOG.info("Migration has restored task {} up until stream time {}", this.context.taskId(), Instant.ofEpochMilli(this.context.currentStreamTimeMs()));
                    }
                }
            };
        }, new String[0]);
        return new ResponsiveKafkaStreams(streamsBuilder.build(this.properties), this.properties);
    }
}
