package io.floodplain.streams.serializer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants;
import io.debezium.relational.history.ConnectTableChangeSerializer;
import io.debezium.time.Date;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.Time;
import io.floodplain.immutable.api.ImmutableMessage;
import io.floodplain.replication.api.ReplicationMessage;
import io.floodplain.streams.debezium.DebeziumParseException;
import io.floodplain.streams.debezium.JSONToReplicationMessage;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.transforms.TimestampConverter;
import org.apache.kafka.connect.transforms.ValueToKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/floodplain/streams/serializer/ConnectReplicationMessageSerdeWithSchema.class */
public class ConnectReplicationMessageSerdeWithSchema implements Serde<ReplicationMessage> {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) ConnectReplicationMessageSerdeWithSchema.class);
    private static final ConnectKeySerde keySerde = new ConnectKeySerde();
    private static final ObjectMapper objectMapper = new ObjectMapper();

    @Override // org.apache.kafka.common.serialization.Serde, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
    }

    @Override // org.apache.kafka.common.serialization.Serde
    public void configure(Map<String, ?> map, boolean z) {
    }

    private static String parseConnectKey(byte[] bArr) throws IOException {
        return JSONToReplicationMessage.processDebeziumKey((ObjectNode) objectMapper.readTree(bArr)).combinedKey;
    }

    public static Serializer<String> keySerialize() {
        return keySerde.serializer();
    }

    public static Deserializer<String> keyDeserialize() {
        return new Deserializer<String>() { // from class: io.floodplain.streams.serializer.ConnectReplicationMessageSerdeWithSchema.1
            @Override // org.apache.kafka.common.serialization.Deserializer, java.lang.AutoCloseable
            public void close() {
            }

            @Override // org.apache.kafka.common.serialization.Deserializer
            public void configure(Map<String, ?> map, boolean z) {
                ConnectReplicationMessageSerdeWithSchema.logger.info("Configuring key deserializer: {}", map);
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.kafka.common.serialization.Deserializer
            public String deserialize(String str, byte[] bArr) {
                try {
                    return ConnectReplicationMessageSerdeWithSchema.parseConnectKey(bArr);
                } catch (IOException e) {
                    throw new RuntimeException("Error deserializing key: " + new String(bArr, StandardCharsets.UTF_8), e);
                }
            }
        };
    }

    @Override // org.apache.kafka.common.serialization.Serde
    public Deserializer<ReplicationMessage> deserializer() {
        return new Deserializer<ReplicationMessage>() { // from class: io.floodplain.streams.serializer.ConnectReplicationMessageSerdeWithSchema.2
            @Override // org.apache.kafka.common.serialization.Deserializer, java.lang.AutoCloseable
            public void close() {
            }

            @Override // org.apache.kafka.common.serialization.Deserializer
            public void configure(Map<String, ?> map, boolean z) {
                ConnectReplicationMessageSerdeWithSchema.logger.info("Configuring deserializer: {}", map);
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.kafka.common.serialization.Deserializer
            public ReplicationMessage deserialize(String str, byte[] bArr) {
                try {
                    return JSONToReplicationMessage.processDebeziumBody(bArr, Optional.of(str));
                } catch (DebeziumParseException e) {
                    throw new RuntimeException("Error parsing replmessage", e);
                }
            }
        };
    }

    private Object processValue(String str, ImmutableMessage.ValueType valueType, Object obj) {
        switch (valueType) {
            case DATE:
            case CLOCKTIME:
            case STOPWATCHTIME:
            case TIMESTAMP:
                return processDateType(valueType, obj);
            default:
                return obj;
        }
    }

    private Map<String, Object> describeType(String str, ImmutableMessage.ValueType valueType, Object obj) {
        HashMap hashMap = new HashMap();
        hashMap.put(ConnectTableChangeSerializer.OPTIONAL_KEY, true);
        hashMap.put(TimestampConverter.FIELD_CONFIG, str);
        switch (valueType) {
            case DATE:
            case CLOCKTIME:
            case STOPWATCHTIME:
            case TIMESTAMP:
                processDateType(valueType, obj, hashMap);
                break;
            case STRING:
            case BINARY_DIGEST:
            case ENUM:
            case STRINGLIST:
                hashMap.put("type", ElasticsearchSinkConnectorConstants.STRING_TYPE);
                break;
            case INTEGER:
                hashMap.put("type", "int32");
                break;
            case LONG:
                hashMap.put("type", "int64");
                break;
            case DOUBLE:
                hashMap.put("type", "float64");
                break;
            case DECIMAL:
                hashMap.put("type", "bytes");
                hashMap.put("name", Decimal.LOGICAL_NAME);
                hashMap.put("version", 1);
                BigDecimal bigDecimal = (BigDecimal) obj;
                hashMap.put("parameters", Map.of("scale", Integer.valueOf(bigDecimal.scale()), "connect.decimal.precision", Integer.valueOf(bigDecimal.precision())));
                break;
            case FLOAT:
                hashMap.put("type", "float32");
                break;
            case BOOLEAN:
                hashMap.put("type", ElasticsearchSinkConnectorConstants.BOOLEAN_TYPE);
                break;
            case BINARY:
                hashMap.put("type", "bytes");
                break;
            case IMMUTABLE:
            case UNKNOWN:
            case IMMUTABLELIST:
                throw new IllegalArgumentException("Can not serialize type: " + valueType);
        }
        return hashMap;
    }

    private void processDateType(ImmutableMessage.ValueType valueType, Object obj, Map<String, Object> map) {
        if (obj == null) {
            logger.warn("Null value for procesDateType. Ignoring");
            return;
        }
        if (obj instanceof LocalDateTime) {
            map.put("type", "int64");
            map.put("version", 1);
            map.put("name", MicroTimestamp.SCHEMA_NAME);
        } else if (obj instanceof LocalDate) {
            map.put("type", "int32");
            map.put("version", 1);
            map.put("name", Date.SCHEMA_NAME);
        } else {
            if (!(obj instanceof LocalTime)) {
                throw new IllegalArgumentException("Unsupported type: " + obj.getClass().getName() + " for processDateType");
            }
            map.put("type", "int32");
            map.put("version", 1);
            map.put("name", Time.SCHEMA_NAME);
        }
    }

    private Object processDateType(ImmutableMessage.ValueType valueType, Object obj) {
        if (obj == null) {
            logger.warn("Null value for procesDateType. Ignoring");
            return null;
        }
        if (obj instanceof LocalDateTime) {
            return Long.valueOf(((LocalDateTime) obj).toEpochSecond(ZoneOffset.UTC));
        }
        if (obj instanceof LocalDate) {
            return Long.valueOf(((LocalDate) obj).toEpochDay());
        }
        if (obj instanceof LocalTime) {
            return Integer.valueOf(((LocalTime) obj).toSecondOfDay());
        }
        throw new IllegalArgumentException("Unsupported type: " + obj.getClass().getName() + " for processDateType");
    }

    private Map<String, ?> buildSchema(String str, ImmutableMessage immutableMessage) {
        List list = (List) immutableMessage.types().entrySet().stream().map(entry -> {
            return describeType((String) entry.getKey(), (ImmutableMessage.ValueType) entry.getValue(), immutableMessage.value((String) entry.getKey()).orElse(null));
        }).collect(Collectors.toList());
        return Map.of("type", "struct", ConnectTableChangeSerializer.OPTIONAL_KEY, true, ValueToKey.FIELDS_CONFIG, list);
    }

    @Override // org.apache.kafka.common.serialization.Serde
    public Serializer<ReplicationMessage> serializer() {
        return new Serializer<ReplicationMessage>() { // from class: io.floodplain.streams.serializer.ConnectReplicationMessageSerdeWithSchema.3
            boolean schemaEnable = true;

            @Override // org.apache.kafka.common.serialization.Serializer, java.io.Closeable, java.lang.AutoCloseable, org.apache.kafka.common.serialization.Deserializer
            public void close() {
            }

            @Override // org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Deserializer
            public void configure(Map<String, ?> map, boolean z) {
                Boolean bool = (Boolean) map.get("schemaEnable");
                if (bool != null) {
                    this.schemaEnable = bool.booleanValue();
                }
                ConnectReplicationMessageSerdeWithSchema.logger.info("Configuring: {}", map);
            }

            @Override // org.apache.kafka.common.serialization.Serializer
            public byte[] serialize(String str, ReplicationMessage replicationMessage) {
                if (replicationMessage == null || replicationMessage.operation() == ReplicationMessage.Operation.DELETE) {
                    return null;
                }
                ImmutableMessage message = replicationMessage.message();
                Map<String, Object> valueMap = message.valueMap(true, Collections.emptySet(), Collections.emptyList());
                if (this.schemaEnable) {
                    valueMap = Map.of("schema", ConnectReplicationMessageSerdeWithSchema.this.buildSchema(null, message), "payload", (Map) valueMap.entrySet().stream().filter(entry -> {
                        return (entry.getKey() == null || entry.getValue() == null) ? false : true;
                    }).filter(entry2 -> {
                        return message.columnType((String) entry2.getKey()) != null;
                    }).map(entry3 -> {
                        Object processValue = ConnectReplicationMessageSerdeWithSchema.this.processValue((String) entry3.getKey(), message.columnType((String) entry3.getKey()), entry3.getValue());
                        ConnectReplicationMessageSerdeWithSchema.logger.info("KEYY: " + ((String) entry3.getKey()) + " VALLUE: " + entry3.getValue() + "PROCESSED: " + processValue + " TYPE: " + message.columnType((String) entry3.getKey()));
                        return Map.entry((String) entry3.getKey(), processValue);
                    }).filter(entry4 -> {
                        return (entry4.getKey() == null || entry4.getValue() == null) ? false : true;
                    }).collect(Collectors.toMap(entry5 -> {
                        return entry5.getKey();
                    }, entry6 -> {
                        return entry6.getValue();
                    })));
                }
                try {
                    return ConnectReplicationMessageSerdeWithSchema.objectMapper.writerWithDefaultPrettyPrinter().writeValueAsBytes(valueMap);
                } catch (JsonProcessingException e) {
                    throw new RuntimeException("Json issue", e);
                }
            }
        };
    }
}
