package org.apache.camel.kafkaconnector.transforms;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.dataformat.avro.AvroFactory;
import com.fasterxml.jackson.dataformat.avro.AvroSchema;
import io.apicurio.registry.utils.converter.avro.AvroData;
import io.apicurio.registry.utils.converter.avro.AvroDataConfig;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/camel-kafka-connector-0.11.0.jar:org/apache/camel/kafkaconnector/transforms/SinkPojoToSchemaAndStructTransform.class */
public class SinkPojoToSchemaAndStructTransform<R extends ConnectRecord<R>> implements Transformation<R> {
    private String pojoClass;
    private ObjectReader objectReader;
    private AvroData avroData;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SinkPojoToSchemaAndStructTransform.class);
    private static final ObjectMapper MAPPER = new ObjectMapper(new AvroFactory());
    private static final Object CAMEL_TRANSFORMER_SINK_POJO_CLASS_DEFAULT = ConfigDef.NO_DEFAULT_VALUE;
    public static final String CAMEL_TRANSFORMER_SINK_POJO_CLASS_PROPERTY = "camel.transformer.sink.pojo.class";
    private static final String CAMEL_TRANSFORMER_SINK_POJO_CLASS_DOC = "Full qualified class name of the pojo you want your record value converted to";
    private static final ConfigDef CONFIG_DEF = new ConfigDef().define(CAMEL_TRANSFORMER_SINK_POJO_CLASS_PROPERTY, ConfigDef.Type.STRING, CAMEL_TRANSFORMER_SINK_POJO_CLASS_DEFAULT, ConfigDef.Importance.HIGH, CAMEL_TRANSFORMER_SINK_POJO_CLASS_DOC);

    @Override // org.apache.kafka.connect.transforms.Transformation
    public R apply(R r) {
        LOG.debug("Incoming record: {}", r);
        if (r.value() == null || r.valueSchema() == null || !Schema.Type.STRUCT.equals(r.valueSchema().type())) {
            LOG.debug("Incoming record with a null value or a value schema != Schema.Type.STRUCT, nothing to be done.");
            return r;
        }
        GenericRecord genericRecord = (GenericRecord) this.avroData.fromConnectData(r.valueSchema(), r.value());
        if (genericRecord == null) {
            LOG.warn("No GenericRecord was converted as part of this transformation");
            return r;
        }
        LOG.debug("GenericRecord created: {} \nwith schema: {}", genericRecord, genericRecord.getClass().getName());
        GenericDatumWriter genericDatumWriter = new GenericDatumWriter(genericRecord.getSchema());
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            try {
                BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
                genericDatumWriter.write(genericRecord, binaryEncoder);
                binaryEncoder.flush();
                Object readValue = this.objectReader.with(new AvroSchema(genericRecord.getSchema())).readValue(byteArrayOutputStream.toByteArray());
                LOG.debug("Pojo of class {} created: {}", readValue.getClass(), readValue);
                byteArrayOutputStream.close();
                LOG.debug("Generate pojo: {}", readValue);
                return (R) r.newRecord(r.topic(), r.kafkaPartition(), r.keySchema(), r.key(), null, readValue, r.timestamp());
            } finally {
            }
        } catch (IOException e) {
            throw new ConnectException("Error in generating POJO from Struct.", e);
        }
    }

    @Override // org.apache.kafka.connect.transforms.Transformation
    public ConfigDef config() {
        return CONFIG_DEF;
    }

    @Override // org.apache.kafka.connect.transforms.Transformation, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
    }

    @Override // org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
        this.pojoClass = new SimpleConfig(CONFIG_DEF, map).getString(CAMEL_TRANSFORMER_SINK_POJO_CLASS_PROPERTY);
        this.avroData = new AvroData(new AvroDataConfig(map));
        try {
            this.objectReader = MAPPER.readerFor(Class.forName(this.pojoClass));
        } catch (ClassNotFoundException e) {
            throw new ConnectException("Unable to initialize SinkPojoToSchemaAndStructTransform ", e);
        }
    }
}
