package dev.responsive.kafka.internal.db.mongo;

import com.mongodb.BasicDBObject;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.FindOneAndUpdateOptions;
import com.mongodb.client.model.ReturnDocument;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.Updates;
import com.mongodb.client.model.WriteModel;
import com.mongodb.client.result.UpdateResult;
import dev.responsive.kafka.internal.db.MongoWindowFlushManager;
import dev.responsive.kafka.internal.db.RemoteWindowedTable;
import dev.responsive.kafka.internal.db.partitioning.Segmenter;
import dev.responsive.kafka.internal.db.partitioning.WindowSegmentPartitioner;
import dev.responsive.kafka.internal.stores.RemoteWriteResult;
import dev.responsive.kafka.internal.utils.Iterators;
import dev.responsive.kafka.internal.utils.WindowedKey;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.bson.codecs.configuration.CodecProvider;
import org.bson.codecs.configuration.CodecRegistries;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.codecs.pojo.PojoCodecProvider;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/responsive/kafka/internal/db/mongo/MongoWindowedTable.class */
public class MongoWindowedTable implements RemoteWindowedTable<WriteModel<WindowDoc>> {
    private static final String METADATA_COLLECTION_NAME = "window_metadata";
    private final String name;
    private final WindowSegmentPartitioner partitioner;
    private final boolean timestampFirstOrder;
    private final MongoDatabase database;
    private final MongoDatabase adminDatabase;
    private final MongoCollection<WindowMetadataDoc> metadata;
    private final ConcurrentMap<Integer, PartitionSegments> kafkaPartitionToSegments = new ConcurrentHashMap();
    private final CollectionCreationOptions collectionCreationOptions;
    private static final Logger LOG = LoggerFactory.getLogger(MongoWindowedTable.class);
    private static final UpdateOptions UPSERT_OPTIONS = new UpdateOptions().upsert(true);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dev/responsive/kafka/internal/db/mongo/MongoWindowedTable$PartitionSegments.class */
    public static class PartitionSegments {
        private final MongoDatabase database;
        private final MongoDatabase adminDatabase;
        private final Segmenter segmenter;
        private final long epoch;
        private final CollectionCreationOptions collectionCreationOptions;
        private final Map<Segmenter.SegmentPartition, MongoCollection<WindowDoc>> segmentWindows = new ConcurrentHashMap();

        public PartitionSegments(MongoDatabase mongoDatabase, MongoDatabase mongoDatabase2, Segmenter segmenter, int i, long j, long j2, CollectionCreationOptions collectionCreationOptions) {
            this.database = mongoDatabase;
            this.adminDatabase = mongoDatabase2;
            this.segmenter = segmenter;
            this.epoch = j2;
            this.collectionCreationOptions = collectionCreationOptions;
            List<Segmenter.SegmentPartition> activeSegments = segmenter.activeSegments(i, j);
            if (activeSegments.isEmpty()) {
                MongoWindowedTable.LOG.info("{}[{}] No active segments for initial streamTime {}", new Object[]{mongoDatabase.getName(), Integer.valueOf(i), Long.valueOf(j)});
                return;
            }
            Iterator<Segmenter.SegmentPartition> it = activeSegments.iterator();
            while (it.hasNext()) {
                createSegment(it.next());
            }
            long j3 = activeSegments.get(0).segmentStartTimestamp;
            MongoWindowedTable.LOG.info("{}[{}] Initialized active segments with start timestamps in [{} - {}]", new Object[]{mongoDatabase.getName(), Integer.valueOf(i), Long.valueOf(j3), Long.valueOf(j3 + (activeSegments.size() * segmenter.segmentIntervalMs()))});
        }

        private String collectionNameForSegment(Segmenter.SegmentPartition segmentPartition) {
            return String.format("%d-%d", Integer.valueOf(segmentPartition.tablePartition), Long.valueOf(segmentPartition.segmentStartTimestamp * this.segmenter.segmentIntervalMs()));
        }

        private void createSegment(Segmenter.SegmentPartition segmentPartition) {
            MongoWindowedTable.LOG.info("{}[{}] Creating segment start timestamp {}", new Object[]{this.database.getName(), Integer.valueOf(segmentPartition.tablePartition), Long.valueOf(segmentPartition.segmentStartTimestamp)});
            this.segmentWindows.put(segmentPartition, this.collectionCreationOptions.sharded() ? MongoUtils.createShardedCollection(collectionNameForSegment(segmentPartition), WindowDoc.class, this.database, this.adminDatabase, this.collectionCreationOptions.numChunks()) : this.database.getCollection(collectionNameForSegment(segmentPartition), WindowDoc.class));
        }

        private void deleteSegment(Segmenter.SegmentPartition segmentPartition) {
            MongoWindowedTable.LOG.info("{}[{}] Expiring segment start timestamp {}", new Object[]{this.database.getName(), Integer.valueOf(segmentPartition.tablePartition), Long.valueOf(segmentPartition.segmentStartTimestamp)});
            this.segmentWindows.get(segmentPartition).drop();
        }
    }

    public MongoWindowedTable(MongoClient mongoClient, String str, WindowSegmentPartitioner windowSegmentPartitioner, boolean z, CollectionCreationOptions collectionCreationOptions) {
        this.name = str;
        this.partitioner = windowSegmentPartitioner;
        this.timestampFirstOrder = z;
        this.collectionCreationOptions = collectionCreationOptions;
        this.database = mongoClient.getDatabase(str).withCodecRegistry(CodecRegistries.fromRegistries(new CodecRegistry[]{MongoClientSettings.getDefaultCodecRegistry(), CodecRegistries.fromProviders(new CodecProvider[]{PojoCodecProvider.builder().automatic(true).build()})}));
        this.adminDatabase = mongoClient.getDatabase("admin");
        this.metadata = this.database.getCollection(METADATA_COLLECTION_NAME, WindowMetadataDoc.class);
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public String name() {
        return this.name;
    }

    @Override // dev.responsive.kafka.internal.db.RemoteWindowedTable
    public MongoWindowFlushManager init(int i) {
        WindowMetadataDoc windowMetadataDoc = (WindowMetadataDoc) this.metadata.findOneAndUpdate(Filters.eq("_id", Integer.valueOf(i)), Updates.combine(new Bson[]{Updates.setOnInsert("_id", Integer.valueOf(i)), Updates.setOnInsert("offset", -1L), Updates.setOnInsert(WindowMetadataDoc.STREAM_TIME, -1L), Updates.inc("epoch", 1)}), new FindOneAndUpdateOptions().upsert(true).returnDocument(ReturnDocument.AFTER));
        if (windowMetadataDoc == null) {
            throw new IllegalStateException("Uninitialized metadata for partition " + i);
        }
        LOG.info("{}[{}] Retrieved initial metadata {}", new Object[]{this.name, Integer.valueOf(i), windowMetadataDoc});
        this.kafkaPartitionToSegments.put(Integer.valueOf(i), new PartitionSegments(this.database, this.adminDatabase, this.partitioner.segmenter(), i, windowMetadataDoc.streamTime, windowMetadataDoc.epoch, this.collectionCreationOptions));
        return new MongoWindowFlushManager(this, segmentPartition -> {
            return windowsForSegmentPartition(i, segmentPartition);
        }, this.partitioner, i, windowMetadataDoc.streamTime);
    }

    private MongoCollection<WindowDoc> windowsForSegmentPartition(int i, Segmenter.SegmentPartition segmentPartition) {
        return this.kafkaPartitionToSegments.get(Integer.valueOf(i)).segmentWindows.get(segmentPartition);
    }

    public RemoteWriteResult<Segmenter.SegmentPartition> createSegmentForPartition(int i, Segmenter.SegmentPartition segmentPartition) {
        this.kafkaPartitionToSegments.get(Integer.valueOf(i)).createSegment(segmentPartition);
        return RemoteWriteResult.success(segmentPartition);
    }

    public RemoteWriteResult<Segmenter.SegmentPartition> deleteSegmentForPartition(int i, Segmenter.SegmentPartition segmentPartition) {
        this.kafkaPartitionToSegments.get(Integer.valueOf(i)).deleteSegment(segmentPartition);
        return RemoteWriteResult.success(segmentPartition);
    }

    public long localEpoch(int i) {
        return this.kafkaPartitionToSegments.get(Integer.valueOf(i)).epoch;
    }

    public long fetchEpoch(int i) {
        WindowMetadataDoc windowMetadataDoc = (WindowMetadataDoc) this.metadata.find(Filters.eq("_id", Integer.valueOf(i))).first();
        if (windowMetadataDoc != null) {
            return windowMetadataDoc.epoch;
        }
        LOG.error("{}[{}] Epoch fetch failed due to missing metadata row", this.name, Integer.valueOf(i));
        throw new IllegalStateException("No metadata row found for partition " + i);
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public long fetchOffset(int i) {
        WindowMetadataDoc windowMetadataDoc = (WindowMetadataDoc) this.metadata.find(Filters.eq("_id", Integer.valueOf(i))).first();
        if (windowMetadataDoc == null) {
            LOG.error("{}[{}] Offset fetch failed due to missing metadata row", this.name, Integer.valueOf(i));
            throw new IllegalStateException("No metadata row found for partition " + i);
        }
        long j = this.kafkaPartitionToSegments.get(Integer.valueOf(i)).epoch;
        if (windowMetadataDoc.epoch <= j) {
            return windowMetadataDoc.offset;
        }
        LOG.warn("{}[{}] Fenced retrieving start offset due to stored epoch {} being greater than local epoch {} ", new Object[]{this.name, Integer.valueOf(i), Long.valueOf(windowMetadataDoc.epoch), Long.valueOf(j)});
        throw new TaskMigratedException("Fenced while fetching offset to start restoration from");
    }

    public UpdateResult setOffsetAndStreamTime(int i, long j, long j2) {
        long j3 = this.kafkaPartitionToSegments.get(Integer.valueOf(i)).epoch;
        LOG.info("{}[{}] Updating offset to {} and streamTime to {} with epoch {}", new Object[]{this.name, Integer.valueOf(i), Long.valueOf(j), Long.valueOf(j2), Long.valueOf(j3)});
        return this.metadata.updateOne(Filters.and(new Bson[]{Filters.eq("_id", Integer.valueOf(i)), Filters.lte("epoch", Long.valueOf(j3))}), Updates.combine(new Bson[]{Updates.set("offset", Long.valueOf(j)), Updates.set(WindowMetadataDoc.STREAM_TIME, Long.valueOf(j2)), Updates.set("epoch", Long.valueOf(j3))}));
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public WriteModel<WindowDoc> insert(int i, WindowedKey windowedKey, byte[] bArr, long j) {
        long j2 = this.kafkaPartitionToSegments.get(Integer.valueOf(i)).epoch;
        return new UpdateOneModel(Filters.and(new Bson[]{Filters.eq("_id", compositeKey(windowedKey)), Filters.lte("epoch", Long.valueOf(j2))}), Updates.combine(new Bson[]{Updates.set("value", bArr), Updates.set("epoch", Long.valueOf(j2))}), UPSERT_OPTIONS);
    }

    @Override // dev.responsive.kafka.internal.db.RemoteTable
    public WriteModel<WindowDoc> delete(int i, WindowedKey windowedKey) {
        this.kafkaPartitionToSegments.get(Integer.valueOf(i));
        throw new UnsupportedOperationException("Deletes not yet supported for MongoDB window stores");
    }

    @Override // dev.responsive.kafka.internal.db.RemoteWindowedTable
    public byte[] fetch(int i, Bytes bytes, long j) {
        WindowDoc windowDoc;
        WindowedKey windowedKey = new WindowedKey(bytes, j);
        MongoCollection<WindowDoc> windowsForSegmentPartition = windowsForSegmentPartition(i, this.partitioner.tablePartition(i, windowedKey));
        if (windowsForSegmentPartition == null || (windowDoc = (WindowDoc) windowsForSegmentPartition.find(Filters.and(new Bson[]{Filters.eq("_id", compositeKey(windowedKey))})).first()) == null) {
            return null;
        }
        return windowDoc.getValue();
    }

    @Override // dev.responsive.kafka.internal.db.RemoteWindowedTable
    public KeyValueIterator<WindowedKey, byte[]> fetch(int i, Bytes bytes, long j, long j2) {
        LinkedList linkedList = new LinkedList();
        PartitionSegments partitionSegments = this.kafkaPartitionToSegments.get(Integer.valueOf(i));
        Iterator<Segmenter.SegmentPartition> it = this.partitioner.segmenter().range(i, j, j2).iterator();
        while (it.hasNext()) {
            MongoCollection<WindowDoc> mongoCollection = partitionSegments.segmentWindows.get(it.next());
            if (mongoCollection != null) {
                linkedList.add(Iterators.kv(mongoCollection.find(Filters.and(new Bson[]{Filters.gte("_id", compositeKey(bytes, j)), Filters.lte("_id", compositeKey(bytes, j2))})).iterator(), MongoWindowedTable::windowFromDoc));
            }
        }
        return Iterators.wrapped(linkedList);
    }

    @Override // dev.responsive.kafka.internal.db.RemoteWindowedTable
    public KeyValueIterator<WindowedKey, byte[]> backFetch(int i, Bytes bytes, long j, long j2) {
        throw new UnsupportedOperationException("backFetch not yet supported for MongoDB backends");
    }

    @Override // dev.responsive.kafka.internal.db.RemoteWindowedTable
    public KeyValueIterator<WindowedKey, byte[]> fetchRange(int i, Bytes bytes, Bytes bytes2, long j, long j2) {
        throw new UnsupportedOperationException("fetchRange not yet supported for Mongo backends");
    }

    @Override // dev.responsive.kafka.internal.db.RemoteWindowedTable
    public KeyValueIterator<WindowedKey, byte[]> backFetchRange(int i, Bytes bytes, Bytes bytes2, long j, long j2) {
        throw new UnsupportedOperationException("backFetchRange not yet supported for Mongo backends");
    }

    @Override // dev.responsive.kafka.internal.db.RemoteWindowedTable
    public KeyValueIterator<WindowedKey, byte[]> fetchAll(int i, long j, long j2) {
        throw new UnsupportedOperationException("fetchAll not yet supported for Mongo backends");
    }

    @Override // dev.responsive.kafka.internal.db.RemoteWindowedTable
    public KeyValueIterator<WindowedKey, byte[]> backFetchAll(int i, long j, long j2) {
        throw new UnsupportedOperationException("backFetchAll not yet supported for MongoDB backends");
    }

    public BasicDBObject compositeKey(WindowedKey windowedKey) {
        return compositeKey(windowedKey.key, windowedKey.windowStartMs);
    }

    public BasicDBObject compositeKey(Bytes bytes, long j) {
        return WindowDoc.compositeKey(bytes.get(), j, this.timestampFirstOrder);
    }

    private static KeyValue<WindowedKey, byte[]> windowFromDoc(WindowDoc windowDoc) {
        return new KeyValue<>(WindowDoc.windowedKey(windowDoc.id), windowDoc.value);
    }
}
