package io.quarkus.smallrye.reactivemessaging.kafka;

import io.quarkus.redis.client.RedisClientName;
import io.quarkus.redis.datasource.ReactiveRedisDataSource;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.tuples.Tuple2;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore;
import io.smallrye.reactive.messaging.kafka.commit.ProcessingState;
import io.smallrye.reactive.messaging.kafka.commit.ProcessingStateCodec;
import io.smallrye.reactive.messaging.kafka.commit.VertxJsonProcessingStateCodec;
import io.smallrye.reactive.messaging.providers.helpers.CDIUtils;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Default;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import java.lang.annotation.Annotation;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:io/quarkus/smallrye/reactivemessaging/kafka/RedisStateStore.class */
public class RedisStateStore implements CheckpointStateStore {
    public static final String REDIS_STATE_STORE = "quarkus-redis";
    private final ReactiveRedisDataSource redis;
    private final String consumerGroupId;
    private final ProcessingStateCodec stateCodec;
    private final AtomicBoolean closed = new AtomicBoolean(false);

    @ApplicationScoped
    @Identifier(RedisStateStore.REDIS_STATE_STORE)
    /* loaded from: input_file:io/quarkus/smallrye/reactivemessaging/kafka/RedisStateStore$Factory.class */
    public static class Factory implements CheckpointStateStore.Factory {

        @Inject
        @Any
        Instance<ReactiveRedisDataSource> redisDataSource;

        @Inject
        Instance<ProcessingStateCodec.Factory> stateCodecFactory;

        public CheckpointStateStore create(KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, Vertx vertx, KafkaConsumer<?, ?> kafkaConsumer, Class<?> cls) {
            String str = (String) kafkaConsumer.configuration().get("group.id");
            String str2 = (String) kafkaConnectorIncomingConfiguration.config().getOptionalValue("checkpoint.quarkus-redis.client-name", String.class).orElse(null);
            return new RedisStateStore(str2 != null ? (ReactiveRedisDataSource) this.redisDataSource.select(new Annotation[]{RedisClientName.Literal.of(str2)}).get() : (ReactiveRedisDataSource) this.redisDataSource.select(new Annotation[]{Default.Literal.INSTANCE}).get(), str, ((ProcessingStateCodec.Factory) CDIUtils.getInstanceById(this.stateCodecFactory, kafkaConnectorIncomingConfiguration.getChannel(), () -> {
                return this.stateCodecFactory.isUnsatisfied() ? VertxJsonProcessingStateCodec.FACTORY : (ProcessingStateCodec.Factory) this.stateCodecFactory.get();
            })).create(cls));
        }
    }

    public RedisStateStore(ReactiveRedisDataSource reactiveRedisDataSource, String str, ProcessingStateCodec processingStateCodec) {
        this.redis = reactiveRedisDataSource;
        this.consumerGroupId = str;
        this.stateCodec = processingStateCodec;
    }

    public void close() {
        this.closed.set(true);
    }

    public Uni<Map<TopicPartition, ProcessingState<?>>> fetchProcessingState(Collection<TopicPartition> collection) {
        if (collection.isEmpty() || this.closed.get()) {
            return Uni.createFrom().item(Collections.emptyMap());
        }
        return this.redis.value(byte[].class).mget((String[]) ((List) collection.stream().map(topicPartition -> {
            return Tuple2.of(topicPartition, getKey(topicPartition));
        }).collect(Collectors.toList())).stream().map((v0) -> {
            return v0.getItem2();
        }).toArray(i -> {
            return new String[i];
        })).map(map -> {
            return (Map) map.entrySet().stream().filter(entry -> {
                return entry.getValue() != null;
            }).collect(Collectors.toMap(entry2 -> {
                return getTpFromKey((String) entry2.getKey());
            }, entry3 -> {
                return ProcessingState.getOrEmpty(this.stateCodec.decode((byte[]) entry3.getValue()));
            }));
        });
    }

    private String getKey(TopicPartition topicPartition) {
        return this.consumerGroupId + ":" + topicPartition.topic() + ":" + topicPartition.partition();
    }

    private TopicPartition getTpFromKey(String str) {
        String[] split = str.split(":");
        return new TopicPartition(split[1], Integer.parseInt(split[2]));
    }

    public Uni<Void> persistProcessingState(Map<TopicPartition, ProcessingState<?>> map) {
        if (map.isEmpty() || this.closed.get()) {
            return Uni.createFrom().voidItem();
        }
        String[] strArr = (String[]) map.keySet().stream().map(this::getKey).toArray(i -> {
            return new String[i];
        });
        return this.redis.withTransaction(reactiveRedisDataSource -> {
            return reactiveRedisDataSource.value(byte[].class).mget(strArr);
        }, (map2, reactiveTransactionalRedisDataSource) -> {
            Map map2 = (Map) map.entrySet().stream().filter(entry -> {
                String key = getKey((TopicPartition) entry.getKey());
                ProcessingState processingState = (ProcessingState) entry.getValue();
                if (!map2.containsKey(key)) {
                    return true;
                }
                ProcessingState decode = this.stateCodec.decode((byte[]) map2.get(key));
                return ProcessingState.isEmptyOrNull(decode) || (!ProcessingState.isEmptyOrNull(processingState) && processingState.getOffset().longValue() >= decode.getOffset().longValue());
            }).collect(Collectors.toMap(entry2 -> {
                return getKey((TopicPartition) entry2.getKey());
            }, entry3 -> {
                return this.stateCodec.encode((ProcessingState) entry3.getValue());
            }));
            return map2.isEmpty() ? Uni.createFrom().voidItem() : reactiveTransactionalRedisDataSource.value(byte[].class).mset(map2);
        }, strArr).replaceWithVoid();
    }
}
