package com.hazelcast.jet.impl.processor;

import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.util.Preconditions;
import com.hazelcast.internal.util.QuickMath;
import com.hazelcast.internal.util.counters.Counter;
import com.hazelcast.internal.util.counters.SwCounter;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.core.function.KeyedWindowResultFunction;
import com.hazelcast.jet.impl.execution.init.JetInitDataSerializerHook;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import java.util.StringJoiner;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.ToLongFunction;
import javax.annotation.Nonnull;

/* loaded from: input_file:BOOT-INF/lib/hazelcast-5.2.5.jar:com/hazelcast/jet/impl/processor/SessionWindowP.class */
public class SessionWindowP<K, A, R, OUT> extends AbstractProcessor {
    private static final Watermark COMPLETING_WM;
    private final long sessionTimeout;

    @Nonnull
    private final List<ToLongFunction<Object>> timestampFns;

    @Nonnull
    private final List<Function<Object, K>> keyFns;

    @Nonnull
    private final AggregateOperation<A, ? extends R> aggrOp;

    @Nonnull
    private final BiConsumer<? super A, ? super A> combineFn;

    @Nonnull
    private final KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn;

    @Nonnull
    private final AbstractProcessor.FlatMapper<Watermark, Object> closedWindowFlatmapper;
    private ProcessingGuarantee processingGuarantee;
    private final byte windowWatermarkKey;
    private final long earlyResultsPeriod;
    private long lastTimeEarlyResultsEmitted;
    private Traverser<OUT> earlyWinTraverser;
    private Traverser snapshotTraverser;
    private boolean inComplete;
    static final /* synthetic */ boolean $assertionsDisabled;
    final Map<K, Windows<A>> keyToWindows = new HashMap();
    final SortedMap<Long, Set<K>> deadlineToKeys = new TreeMap();
    long currentWatermark = Long.MIN_VALUE;

    @Probe(name = "lateEventsDropped")
    private final Counter lateEventsDropped = SwCounter.newSwCounter();

    @Probe(name = "totalKeys")
    private final Counter totalKeys = SwCounter.newSwCounter();

    @Probe(name = "totalWindows")
    private final Counter totalWindows = SwCounter.newSwCounter();
    private long minRestoredCurrentWatermark = Long.MAX_VALUE;
    private final Function<K, Windows<A>> newWindowsFunction = obj -> {
        this.totalKeys.inc();
        return new Windows();
    };

    /* loaded from: input_file:BOOT-INF/lib/hazelcast-5.2.5.jar:com/hazelcast/jet/impl/processor/SessionWindowP$Keys.class */
    enum Keys {
        CURRENT_WATERMARK
    }

    /* loaded from: input_file:BOOT-INF/lib/hazelcast-5.2.5.jar:com/hazelcast/jet/impl/processor/SessionWindowP$Windows.class */
    public static class Windows<A> implements IdentifiedDataSerializable {
        private int size;
        private long[] starts = new long[2];
        private long[] ends = new long[2];
        private A[] accs = (A[]) new Object[2];

        /* JADX INFO: Access modifiers changed from: private */
        public void removeWindow(int i) {
            this.size--;
            copy(i + 1, i, this.size - i);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void removeHead(int i) {
            copy(i, 0, this.size - i);
            this.size -= i;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void copy(int i, int i2, int i3) {
            System.arraycopy(this.starts, i, this.starts, i2, i3);
            System.arraycopy(this.ends, i, this.ends, i2, i3);
            System.arraycopy(this.accs, i, this.accs, i2, i3);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void expandIfNeeded() {
            if (this.size == this.starts.length) {
                this.starts = Arrays.copyOf(this.starts, 2 * this.starts.length);
                this.ends = Arrays.copyOf(this.ends, 2 * this.ends.length);
                this.accs = (A[]) Arrays.copyOf(this.accs, 2 * this.accs.length);
            }
        }

        @Override // com.hazelcast.nio.serialization.IdentifiedDataSerializable
        public int getFactoryId() {
            return JetInitDataSerializerHook.FACTORY_ID;
        }

        @Override // com.hazelcast.nio.serialization.IdentifiedDataSerializable
        public int getClassId() {
            return 12;
        }

        @Override // com.hazelcast.nio.serialization.DataSerializable
        public void writeData(ObjectDataOutput objectDataOutput) throws IOException {
            objectDataOutput.writeInt(this.size);
            for (int i = 0; i < this.size; i++) {
                objectDataOutput.writeLong(this.starts[i]);
                objectDataOutput.writeLong(this.ends[i]);
                objectDataOutput.writeObject(this.accs[i]);
            }
        }

        @Override // com.hazelcast.nio.serialization.DataSerializable
        public void readData(ObjectDataInput objectDataInput) throws IOException {
            this.size = objectDataInput.readInt();
            if (this.size > this.starts.length) {
                int nextPowerOfTwo = QuickMath.nextPowerOfTwo(this.size);
                this.starts = new long[nextPowerOfTwo];
                this.ends = new long[nextPowerOfTwo];
                this.accs = (A[]) new Object[nextPowerOfTwo];
            }
            for (int i = 0; i < this.size; i++) {
                this.starts[i] = objectDataInput.readLong();
                this.ends[i] = objectDataInput.readLong();
                ((A[]) this.accs)[i] = objectDataInput.readObject();
            }
        }

        public String toString() {
            StringJoiner stringJoiner = new StringJoiner(", ", getClass().getSimpleName() + '{', "}");
            for (int i = 0; i < this.size; i++) {
                stringJoiner.add("[s=" + Util.toLocalDateTime(this.starts[i]).toLocalTime() + ", e=" + Util.toLocalDateTime(this.ends[i]).toLocalTime() + ", a=" + this.accs[i] + ']');
            }
            return stringJoiner.toString();
        }

        static /* synthetic */ int access$108(Windows windows) {
            int i = windows.size;
            windows.size = i + 1;
            return i;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public SessionWindowP(long j, long j2, @Nonnull List<? extends ToLongFunction<?>> list, @Nonnull List<? extends Function<?, ? extends K>> list2, @Nonnull AggregateOperation<A, ? extends R> aggregateOperation, @Nonnull KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> keyedWindowResultFunction, byte b) {
        Preconditions.checkTrue(list2.size() == aggregateOperation.arity(), list2.size() + " key functions provided for " + aggregateOperation.arity() + "-arity aggregate operation");
        this.timestampFns = list;
        this.keyFns = list2;
        this.earlyResultsPeriod = j2;
        this.aggrOp = aggregateOperation;
        this.combineFn = (BiConsumer) Objects.requireNonNull(aggregateOperation.combineFn());
        this.mapToOutputFn = keyedWindowResultFunction;
        this.sessionTimeout = j;
        this.closedWindowFlatmapper = flatMapper(this::traverseClosedWindows);
        this.windowWatermarkKey = b;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.hazelcast.jet.core.AbstractProcessor
    public void init(@Nonnull Processor.Context context) {
        this.processingGuarantee = context.processingGuarantee();
        this.lastTimeEarlyResultsEmitted = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean tryProcess() {
        if (this.earlyResultsPeriod == 0) {
            return true;
        }
        if (this.earlyWinTraverser != null) {
            return emitFromTraverser(this.earlyWinTraverser);
        }
        long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
        if (millis < this.lastTimeEarlyResultsEmitted + this.earlyResultsPeriod) {
            return true;
        }
        this.lastTimeEarlyResultsEmitted = millis;
        this.earlyWinTraverser = Traversers.traverseIterable(this.keyToWindows.entrySet()).flatMap(entry -> {
            return earlyWindows(entry.getKey(), (Windows) entry.getValue());
        }).onFirstNull(() -> {
            this.earlyWinTraverser = null;
        });
        return emitFromTraverser(this.earlyWinTraverser);
    }

    @Override // com.hazelcast.jet.core.AbstractProcessor
    protected boolean tryProcess(int i, @Nonnull Object obj) {
        long applyAsLong = this.timestampFns.get(i).applyAsLong(obj);
        if (applyAsLong < this.currentWatermark) {
            Util.logLateEvent(getLogger(), (byte) 0, this.currentWatermark, obj);
            this.lateEventsDropped.inc();
            return true;
        }
        K apply = this.keyFns.get(i).apply(obj);
        addItem(i, (Windows) this.keyToWindows.computeIfAbsent(apply, this.newWindowsFunction), apply, applyAsLong, obj);
        return true;
    }

    @Override // com.hazelcast.jet.core.AbstractProcessor, com.hazelcast.jet.core.Processor
    public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
        if (watermark.key() != this.windowWatermarkKey) {
            return true;
        }
        this.currentWatermark = watermark.timestamp();
        if ($assertionsDisabled || this.totalWindows.get() == this.deadlineToKeys.values().stream().mapToInt((v0) -> {
            return v0.size();
        }).sum()) {
            return this.closedWindowFlatmapper.tryProcess(watermark);
        }
        throw new AssertionError("unexpected totalWindows. Expected=" + this.deadlineToKeys.values().stream().mapToInt((v0) -> {
            return v0.size();
        }).sum() + ", actual=" + this.totalWindows.get());
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean complete() {
        this.inComplete = true;
        return this.closedWindowFlatmapper.tryProcess(COMPLETING_WM);
    }

    private Traverser<Object> traverseClosedWindows(Watermark watermark) {
        SortedMap<Long, Set<K>> headMap = this.deadlineToKeys.headMap(Long.valueOf(watermark.timestamp()));
        Traverser<Object> onFirstNull = Traversers.traverseStream(headMap.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).map(obj -> {
            return closeWindows(this.keyToWindows.get(obj), obj, watermark.timestamp());
        }).flatMap((v0) -> {
            return v0.stream();
        })).onFirstNull(() -> {
            this.totalWindows.inc(-headMap.values().stream().mapToInt((v0) -> {
                return v0.size();
            }).sum());
            headMap.clear();
        });
        if (watermark != COMPLETING_WM) {
            onFirstNull = onFirstNull.append(watermark);
        }
        return onFirstNull;
    }

    private void addToDeadlines(K k, long j) {
        if (this.deadlineToKeys.computeIfAbsent(Long.valueOf(j), l -> {
            return new HashSet();
        }).add(k)) {
            this.totalWindows.inc();
        }
    }

    private void removeFromDeadlines(K k, long j) {
        Set<K> set = this.deadlineToKeys.get(Long.valueOf(j));
        set.remove(k);
        this.totalWindows.inc(-1L);
        if (set.isEmpty()) {
            this.deadlineToKeys.remove(Long.valueOf(j));
        }
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean saveToSnapshot() {
        if (this.inComplete) {
            return complete();
        }
        if (this.snapshotTraverser == null) {
            this.snapshotTraverser = Traversers.traverseIterable(this.keyToWindows.entrySet()).append(com.hazelcast.jet.Util.entry(BroadcastKey.broadcastKey(Keys.CURRENT_WATERMARK), Long.valueOf(this.currentWatermark))).onFirstNull(() -> {
                this.snapshotTraverser = null;
            });
        }
        return emitFromTraverserToSnapshot(this.snapshotTraverser);
    }

    @Override // com.hazelcast.jet.core.AbstractProcessor
    protected void restoreFromSnapshot(@Nonnull Object obj, @Nonnull Object obj2) {
        if (!(obj instanceof BroadcastKey)) {
            if (this.keyToWindows.put(obj, (Windows) obj2) != null) {
                throw new JetException("Duplicate key in snapshot: " + obj);
            }
            return;
        }
        BroadcastKey broadcastKey = (BroadcastKey) obj;
        if (!Keys.CURRENT_WATERMARK.equals(broadcastKey.key())) {
            throw new JetException("Unexpected broadcast key: " + broadcastKey.key());
        }
        long longValue = ((Long) obj2).longValue();
        if (!$assertionsDisabled && this.processingGuarantee == ProcessingGuarantee.EXACTLY_ONCE && this.minRestoredCurrentWatermark != Long.MAX_VALUE && this.minRestoredCurrentWatermark != longValue) {
            throw new AssertionError("different values for currentWatermark restored, before=" + this.minRestoredCurrentWatermark + ", new=" + longValue);
        }
        this.minRestoredCurrentWatermark = Math.min(longValue, this.minRestoredCurrentWatermark);
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean finishSnapshotRestore() {
        if (!$assertionsDisabled && !this.deadlineToKeys.isEmpty()) {
            throw new AssertionError();
        }
        for (Map.Entry<K, Windows<A>> entry : this.keyToWindows.entrySet()) {
            for (long j : ((Windows) entry.getValue()).ends) {
                addToDeadlines(entry.getKey(), j);
            }
        }
        this.currentWatermark = this.minRestoredCurrentWatermark;
        this.totalKeys.set(this.keyToWindows.size());
        LoggingUtil.logFine(getLogger(), "Restored currentWatermark from snapshot to: %s", Long.valueOf(this.currentWatermark));
        return true;
    }

    private void addItem(int i, Windows<A> windows, K k, long j, Object obj) {
        this.aggrOp.accumulateFn(i).accept(resolveAcc(windows, k, j), obj);
    }

    private Traverser<OUT> earlyWindows(final K k, final Windows<A> windows) {
        return new Traverser<OUT>() { // from class: com.hazelcast.jet.impl.processor.SessionWindowP.1
            private int i;

            /* JADX WARN: Multi-variable type inference failed */
            @Override // com.hazelcast.jet.Traverser
            public OUT next() {
                while (this.i < windows.size) {
                    OUT out = (OUT) SessionWindowP.this.mapToOutputFn.apply(windows.starts[this.i], windows.ends[this.i], k, SessionWindowP.this.aggrOp.exportFn().apply(windows.accs[this.i]), true);
                    this.i++;
                    if (out != null) {
                        return out;
                    }
                }
                return null;
            }
        };
    }

    private List<OUT> closeWindows(Windows<A> windows, K k, long j) {
        if (windows == null) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        int i = 0;
        while (i < ((Windows) windows).size && ((Windows) windows).ends[i] < j) {
            OUT apply = this.mapToOutputFn.apply(((Windows) windows).starts[i], ((Windows) windows).ends[i], k, this.aggrOp.finishFn().apply(((Windows) windows).accs[i]), false);
            if (apply != null) {
                arrayList.add(apply);
            }
            i++;
        }
        if (i != ((Windows) windows).size) {
            windows.removeHead(i);
        } else {
            this.keyToWindows.remove(k);
            this.totalKeys.set(this.keyToWindows.size());
        }
        return arrayList;
    }

    private A resolveAcc(Windows<A> windows, K k, long j) {
        long j2 = j + this.sessionTimeout;
        int i = 0;
        while (i < ((Windows) windows).size && ((Windows) windows).starts[i] < j2) {
            if (((Windows) windows).ends[i] > j) {
                if (((Windows) windows).starts[i] <= j && ((Windows) windows).ends[i] >= j2) {
                    return (A) ((Windows) windows).accs[i];
                }
                if (i + 1 != ((Windows) windows).size && ((Windows) windows).starts[i + 1] < j2) {
                    removeFromDeadlines(k, ((Windows) windows).ends[i]);
                    ((Windows) windows).ends[i] = ((Windows) windows).ends[i + 1];
                    this.combineFn.accept(((Windows) windows).accs[i], ((Windows) windows).accs[i + 1]);
                    windows.removeWindow(i + 1);
                    return (A) ((Windows) windows).accs[i];
                }
                ((Windows) windows).starts[i] = Math.min(((Windows) windows).starts[i], j);
                if (((Windows) windows).ends[i] < j2) {
                    removeFromDeadlines(k, ((Windows) windows).ends[i]);
                    ((Windows) windows).ends[i] = j2;
                    addToDeadlines(k, ((Windows) windows).ends[i]);
                }
                return (A) ((Windows) windows).accs[i];
            }
            i++;
        }
        addToDeadlines(k, j2);
        return insertWindow(windows, i, j, j2);
    }

    private A insertWindow(Windows<A> windows, int i, long j, long j2) {
        windows.expandIfNeeded();
        windows.copy(i, i + 1, ((Windows) windows).size - i);
        Windows.access$108(windows);
        ((Windows) windows).starts[i] = j;
        ((Windows) windows).ends[i] = j2;
        ((Windows) windows).accs[i] = this.aggrOp.createFn().get();
        return (A) ((Windows) windows).accs[i];
    }

    static {
        $assertionsDisabled = !SessionWindowP.class.desiredAssertionStatus();
        COMPLETING_WM = new Watermark(Long.MAX_VALUE);
    }
}
