package io.streamthoughts.kafka.connect.filepulse.filter;

import io.streamthoughts.kafka.connect.filepulse.config.DelimitedRowFilterConfig;
import io.streamthoughts.kafka.connect.filepulse.config.JSONFilterConfig;
import io.streamthoughts.kafka.connect.filepulse.data.Type;
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
import io.streamthoughts.kafka.connect.filepulse.json.JSONStructConverter;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef;

/* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/filter/JSONFilter.class */
public class JSONFilter extends AbstractMergeRecordFilter<JSONFilter> {
    private final JSONStructConverter converter = JSONStructConverter.createDefault();
    private JSONFilterConfig configs;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter$1, reason: invalid class name */
    /* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/filter/JSONFilter$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$streamthoughts$kafka$connect$filepulse$data$Type = new int[Type.values().length];

        static {
            try {
                $SwitchMap$io$streamthoughts$kafka$connect$filepulse$data$Type[Type.STRING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$streamthoughts$kafka$connect$filepulse$data$Type[Type.BYTES.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.filter.AbstractRecordFilter
    public void configure(Map<String, ?> map) {
        super.configure(map);
        this.configs = new JSONFilterConfig(map);
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.filter.AbstractRecordFilter
    public ConfigDef configDef() {
        return JSONFilterConfig.configDef();
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.filter.AbstractMergeRecordFilter
    protected RecordsIterable<TypedStruct> apply(FilterContext filterContext, TypedStruct typedStruct) {
        try {
            TypedValue readJson = this.converter.readJson(extractJsonField(checkIsNotNull(typedStruct.get(this.configs.source()))));
            Type type = readJson.type();
            if (type != Type.ARRAY && type != Type.STRUCT) {
                throw new FilterException("Cannot process JSON value with unsupported type. Expected Array or Object, was " + type);
            }
            if (type == Type.STRUCT && this.configs.merge()) {
                return RecordsIterable.of(new TypedStruct[]{readJson.getStruct()});
            }
            if (type == Type.ARRAY) {
                if (this.configs.explode()) {
                    Collection array = readJson.getArray();
                    Type type2 = readJson.schema().valueSchema().type();
                    if (!this.configs.merge()) {
                        return new RecordsIterable<>((List) array.stream().map(obj -> {
                            return TypedStruct.create().put(targetField(), TypedValue.of(obj, type2));
                        }).collect(Collectors.toList()));
                    }
                    if (type2 == Type.STRUCT) {
                        return new RecordsIterable<>((List) array.stream().map(obj2 -> {
                            return TypedValue.any(obj2).getStruct();
                        }).collect(Collectors.toList()));
                    }
                    throw new FilterException("Unsupported operation. Cannot merge array value of type '" + type2 + "' into the top level of the input record");
                }
                if (this.configs.merge()) {
                    throw new FilterException("Unsupported operation. Cannot merge JSON Array into the top level of the input record");
                }
            }
            return RecordsIterable.of(new TypedStruct[]{TypedStruct.create().put(targetField(), readJson)});
        } catch (Exception e) {
            throw new FilterException(e.getLocalizedMessage(), e.getCause());
        }
    }

    private String extractJsonField(TypedValue typedValue) {
        switch (AnonymousClass1.$SwitchMap$io$streamthoughts$kafka$connect$filepulse$data$Type[typedValue.type().ordinal()]) {
            case DelimitedRowFilterConfig.READER_AUTO_GENERATE_COLUMN_NAME_DEFAULT /* 1 */:
                return typedValue.getString();
            case 2:
                return new String(typedValue.getBytes(), this.configs.charset());
            default:
                throw new FilterException("Invalid field '" + this.configs.source() + "', cannot parse JSON field of type '" + typedValue.type() + "'");
        }
    }

    private TypedValue checkIsNotNull(TypedValue typedValue) {
        if (typedValue.isNull()) {
            throw new FilterException("Invalid field '" + this.configs.source() + "', cannot convert empty value to JSON");
        }
        return typedValue;
    }

    private String targetField() {
        return this.configs.target() != null ? this.configs.target() : this.configs.source();
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.filter.AbstractMergeRecordFilter
    protected Set<String> overwrite() {
        return (this.configs.target() != null || this.configs.merge()) ? this.configs.overwrite() : Collections.singleton(this.configs.source());
    }
}
