package io.streamthoughts.kafka.connect.filepulse.filter;

import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectContext;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecordOffset;
import io.streamthoughts.kafka.connect.filepulse.source.TypedFileRecord;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Objects;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/filter/DefaultRecordFilterPipeline.class */
public class DefaultRecordFilterPipeline implements RecordFilterPipeline<FileRecord<TypedStruct>> {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultRecordFilterPipeline.class);
    private final FilterNode rootNode;
    private FileObjectContext fileObjectObject;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/filter/DefaultRecordFilterPipeline$FilterNode.class */
    public class FilterNode {
        private final RecordFilter filter;
        private final FilterNode onSuccess;

        private FilterNode(RecordFilter recordFilter, FilterNode filterNode) {
            this.filter = recordFilter;
            this.onSuccess = filterNode;
        }

        public List<FileRecord<TypedStruct>> apply(FilterContext filterContext, TypedStruct typedStruct, boolean z) {
            LinkedList linkedList = new LinkedList();
            if (!this.filter.accept(filterContext, typedStruct)) {
                if (this.onSuccess != null) {
                    linkedList.addAll(this.onSuccess.apply(filterContext, typedStruct, z));
                } else {
                    if (!z) {
                        linkedList.addAll(flush(filterContext));
                    }
                    linkedList.add(newRecordFor(filterContext, typedStruct));
                }
                return linkedList;
            }
            try {
                linkedList.addAll((List) this.filter.apply(filterContext, typedStruct, z).stream().map(typedStruct2 -> {
                    return newRecordFor(filterContext, typedStruct2);
                }).collect(Collectors.toList()));
                return this.onSuccess == null ? linkedList : (List) linkedList.stream().flatMap(fileRecord -> {
                    return this.onSuccess.apply(FilterContextBuilder.newBuilder(filterContext).build(), (TypedStruct) fileRecord.value(), z).stream();
                }).collect(Collectors.toList());
            } catch (Exception e) {
                RecordFilterPipeline<FileRecord<TypedStruct>> onFailure = this.filter.onFailure();
                if (onFailure == null && !this.filter.ignoreFailure()) {
                    DefaultRecordFilterPipeline.LOG.error("Failed to execute filter '{}' on record at offset '{}' from object-file {}. Error: {}", new Object[]{this.filter.label(), filterContext.offset(), filterContext.metadata(), e.getLocalizedMessage()});
                    throw e;
                }
                DefaultRecordFilterPipeline.LOG.debug("Failed to execute filter '{}' on record at offset '{}' from object-file {}. Error: {} Exception will be either ignored or handled by a dedicated filter chain.", new Object[]{this.filter.label(), filterContext.offset(), filterContext.metadata(), e.getLocalizedMessage()});
                linkedList.addAll(flush(filterContext));
                if (onFailure != null) {
                    linkedList.addAll(onFailure.apply(FilterContextBuilder.newBuilder(filterContext).withError(FilterError.of(e, this.filter.label())).build(), typedStruct, z));
                } else if (this.onSuccess != null) {
                    linkedList.addAll(this.onSuccess.apply(filterContext, typedStruct, z));
                } else {
                    linkedList.add(newRecordFor(filterContext, typedStruct));
                }
                return linkedList;
            }
        }

        private TypedFileRecord newRecordFor(FilterContext filterContext, TypedStruct typedStruct) {
            return new TypedFileRecord(filterContext.offset(), typedStruct).withTopic(filterContext.topic()).withPartition(filterContext.partition()).withTimestamp(filterContext.timestamp()).withHeaders(filterContext.headers()).withKey(TypedValue.string(filterContext.key()));
        }

        List<FileRecord<TypedStruct>> flush(FilterContext filterContext) {
            LinkedList linkedList = new LinkedList();
            RecordsIterable<FileRecord<TypedStruct>> flush = this.filter.flush();
            if (this.onSuccess != null) {
                Iterator<FileRecord<TypedStruct>> it = flush.iterator();
                while (it.hasNext()) {
                    FileRecord<TypedStruct> next = it.next();
                    linkedList.addAll(this.onSuccess.apply(DefaultRecordFilterPipeline.this.newContextFor(next.offset(), filterContext.metadata()), next.value(), it.hasNext()));
                }
            } else {
                linkedList.addAll(flush.collect());
            }
            return linkedList;
        }
    }

    public DefaultRecordFilterPipeline(List<RecordFilter> list) {
        Objects.requireNonNull(list, "filters can't be null");
        ListIterator<RecordFilter> listIterator = list.listIterator(list.size());
        FilterNode filterNode = null;
        while (true) {
            FilterNode filterNode2 = filterNode;
            if (!listIterator.hasPrevious()) {
                this.rootNode = filterNode2;
                return;
            }
            filterNode = new FilterNode(listIterator.previous(), filterNode2);
        }
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.filter.RecordFilterPipeline
    public void init(FileObjectContext fileObjectContext) {
        this.fileObjectObject = fileObjectContext;
        FilterNode filterNode = this.rootNode;
        while (true) {
            FilterNode filterNode2 = filterNode;
            if (filterNode2 == null) {
                return;
            }
            RecordFilterPipeline<FileRecord<TypedStruct>> onFailure = filterNode2.filter.onFailure();
            if (onFailure != null) {
                onFailure.init(fileObjectContext);
            }
            filterNode2.filter.clear();
            filterNode = filterNode2.onSuccess;
        }
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.filter.RecordFilterPipeline
    public RecordsIterable<FileRecord<TypedStruct>> apply(RecordsIterable<FileRecord<TypedStruct>> recordsIterable, boolean z) throws FilterException {
        checkState();
        if (this.rootNode == null) {
            return recordsIterable;
        }
        LinkedList linkedList = new LinkedList();
        Iterator<FileRecord<TypedStruct>> it = recordsIterable.iterator();
        while (it.hasNext()) {
            FileRecord<TypedStruct> next = it.next();
            linkedList.addAll(apply(newContextFor(next.offset(), this.fileObjectObject.metadata()), next.value(), z || it.hasNext()));
        }
        return new RecordsIterable<>(linkedList);
    }

    private FilterContext newContextFor(FileRecordOffset fileRecordOffset, FileObjectMeta fileObjectMeta) {
        return FilterContextBuilder.newBuilder().withMetadata(fileObjectMeta).withOffset(fileRecordOffset).build();
    }

    private void checkState() {
        if (this.fileObjectObject == null) {
            throw new IllegalStateException("Cannot apply this pipeline, no context initialized");
        }
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.filter.RecordFilterPipeline
    public List<FileRecord<TypedStruct>> apply(FilterContext filterContext, TypedStruct typedStruct, boolean z) {
        return this.rootNode == null ? Collections.singletonList(new TypedFileRecord(filterContext.offset(), typedStruct)) : this.rootNode.apply(filterContext, typedStruct, z);
    }
}
