package io.quarkus.qe.kafka.streams;

import io.quarkus.kafka.client.serialization.JsonbSerde;
import io.quarkus.qe.kafka.model.LoginAggregation;
import io.quarkus.qe.kafka.model.LoginAttempt;
import io.smallrye.reactive.messaging.annotations.Broadcast;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.ws.rs.core.Response;
import java.time.Duration;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
/* loaded from: input_file:io/quarkus/qe/kafka/streams/WindowedLoginDeniedStream.class */
public class WindowedLoginDeniedStream {
    public static final String LOGIN_AGGREGATION_STORE = "login-aggregation-store";
    public static final String LOGIN_ATTEMPTS_TOPIC = "login-http-response-values";
    public static final String LOGIN_DENIED_AGGREGATED_TOPIC = "login-denied";
    public static final String LOGIN_ALERTS_TOPIC = "login-alerts";

    @ConfigProperty(name = "login.denied.windows.sec")
    int windowsLoginSec;

    @Produces
    public Topology buildTopology() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(LOGIN_ATTEMPTS_TOPIC, Consumed.with(Serdes.String(), new JsonbSerde(LoginAttempt.class))).groupByKey().windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(this.windowsLoginSec))).aggregate(LoginAggregation::new, (str, loginAttempt, loginAggregation) -> {
            return loginAggregation.updateFrom(loginAttempt);
        }, Materialized.as(LOGIN_AGGREGATION_STORE).withKeySerde(Serdes.String()).withValueSerde(new JsonbSerde(LoginAggregation.class))).toStream().filter((windowed, loginAggregation2) -> {
            return loginAggregation2.code == Response.Status.UNAUTHORIZED.getStatusCode() || loginAggregation2.code == Response.Status.FORBIDDEN.getStatusCode();
        }).to(LOGIN_DENIED_AGGREGATED_TOPIC);
        return streamsBuilder.build();
    }

    @Outgoing(LOGIN_ALERTS_TOPIC)
    @Broadcast
    @Incoming(LOGIN_DENIED_AGGREGATED_TOPIC)
    public String fanOut(String str) {
        return str;
    }
}
