package io.hyperfoil.tools.horreum.svc;

import io.hyperfoil.tools.horreum.api.alerting.Change;
import io.hyperfoil.tools.horreum.bus.BlockingTaskDispatcher;
import io.hyperfoil.tools.horreum.entity.data.DatasetDAO;
import io.hyperfoil.tools.horreum.events.DatasetChanges;
import io.hyperfoil.tools.horreum.server.WithRoles;
import io.quarkus.runtime.Startup;
import io.vertx.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;
import java.util.HashMap;
import java.util.Map;

@Startup
@ApplicationScoped
/* loaded from: input_file:io/hyperfoil/tools/horreum/svc/EventAggregator.class */
public class EventAggregator {

    @Inject
    Vertx vertx;

    @Inject
    BlockingTaskDispatcher messageBus;

    @Inject
    ServiceMediator mediator;
    private final Map<Integer, DatasetChanges> datasetChanges = new HashMap();
    private long timerId = -1;

    @Transactional
    @WithRoles(extras = {Roles.HORREUM_SYSTEM})
    public synchronized void onNewChange(Change.Event event) {
        this.datasetChanges.computeIfAbsent(Integer.valueOf(event.dataset.id), num -> {
            return new DatasetChanges(event.dataset, ((DatasetDAO) DatasetDAO.getEntityManager().getReference(DatasetDAO.class, Integer.valueOf(event.dataset.id))).getFingerprint(), event.testName, event.notify);
        }).addChange(event);
        handleDatasetChanges();
    }

    @Transactional
    void handleDatasetChanges() {
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            DatasetChanges orElse = this.datasetChanges.values().stream().reduce((datasetChanges, datasetChanges2) -> {
                return datasetChanges.emitTimestamp() < datasetChanges2.emitTimestamp() ? datasetChanges : datasetChanges2;
            }).orElse(null);
            if (orElse == null) {
                return;
            }
            if (orElse.emitTimestamp() > currentTimeMillis) {
                if (this.timerId >= 0) {
                    this.vertx.cancelTimer(this.timerId);
                }
                this.timerId = this.vertx.setTimer(orElse.emitTimestamp() - currentTimeMillis, l -> {
                    this.messageBus.executeForTest(orElse.dataset.testId, this::handleDatasetChanges);
                });
                return;
            }
            this.mediator.executeBlocking(() -> {
                this.mediator.newDatasetChanges(orElse);
            });
            this.datasetChanges.remove(Integer.valueOf(orElse.dataset.id));
        }
    }
}
