package org.apache.camel.kafkaconnector.transforms;

import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.dataformat.avro.AvroFactory;
import com.fasterxml.jackson.dataformat.avro.AvroSchema;
import com.fasterxml.jackson.dataformat.avro.schema.AvroSchemaGenerator;
import io.apicurio.registry.utils.converter.avro.AvroData;
import io.apicurio.registry.utils.converter.avro.AvroDataConfig;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/camel-kafka-connector-0.11.0.jar:org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransform.class */
public class SourcePojoToSchemaAndStructTransform<R extends ConnectRecord<R>> implements Transformation<R> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SourcePojoToSchemaAndStructTransform.class);
    private static final ObjectMapper MAPPER = new ObjectMapper(new AvroFactory());
    private AvroData avroData;
    private ConcurrentMap<String, SourcePojoToSchemaAndStructTransform<R>.CacheEntry> avroSchemaWrapperCache;

    /* loaded from: input_file:BOOT-INF/lib/camel-kafka-connector-0.11.0.jar:org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransform$CacheEntry.class */
    public class CacheEntry {
        private AvroSchema avroSchemaWrapper;
        private ObjectWriter objectWriter;

        public CacheEntry(AvroSchema avroSchema, ObjectWriter objectWriter) {
            this.avroSchemaWrapper = avroSchema;
            this.objectWriter = objectWriter;
        }

        public AvroSchema getAvroSchemaWrapper() {
            return this.avroSchemaWrapper;
        }

        public ObjectWriter getObjectWriter() {
            return this.objectWriter;
        }
    }

    @Override // org.apache.kafka.connect.transforms.Transformation
    public R apply(final R r) {
        LOG.debug("Incoming record: {}", r);
        if (r.value() == null) {
            LOG.debug("Incoming record with a null value, nothing to be done.");
            return r;
        }
        SourcePojoToSchemaAndStructTransform<R>.CacheEntry computeIfAbsent = this.avroSchemaWrapperCache.computeIfAbsent(r.value().getClass().getName(), new Function<String, SourcePojoToSchemaAndStructTransform<R>.CacheEntry>() { // from class: org.apache.camel.kafkaconnector.transforms.SourcePojoToSchemaAndStructTransform.1
            @Override // java.util.function.Function
            public SourcePojoToSchemaAndStructTransform<R>.CacheEntry apply(String str) {
                AvroSchemaGenerator avroSchemaGenerator = new AvroSchemaGenerator();
                try {
                    SourcePojoToSchemaAndStructTransform.MAPPER.acceptJsonFormatVisitor(r.value().getClass(), avroSchemaGenerator);
                    AvroSchema generatedSchema = avroSchemaGenerator.getGeneratedSchema();
                    SourcePojoToSchemaAndStructTransform.LOG.debug("Generated and cached avro schema: {}", generatedSchema.getAvroSchema().toString(true));
                    return new CacheEntry(generatedSchema, SourcePojoToSchemaAndStructTransform.MAPPER.writer(generatedSchema));
                } catch (JsonMappingException e) {
                    throw new ConnectException("Error in generating POJO schema.", e);
                }
            }
        });
        try {
            BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(computeIfAbsent.getObjectWriter().writeValueAsBytes(r.value()), (BinaryDecoder) null);
            Schema avroSchema = computeIfAbsent.getAvroSchemaWrapper().getAvroSchema();
            SchemaAndValue connectData = this.avroData.toConnectData(avroSchema, (GenericRecord) new GenericDatumReader(avroSchema).read(null, binaryDecoder));
            LOG.debug("Generate kafka connect schema: {}", connectData.schema());
            LOG.debug("Generate kafka connect value (as Struct): {}", connectData.value());
            return (R) r.newRecord(r.topic(), r.kafkaPartition(), r.keySchema(), r.key(), connectData.schema(), connectData.value(), r.timestamp());
        } catch (IOException e) {
            throw new ConnectException("Error in generating POJO Struct.", e);
        }
    }

    @Override // org.apache.kafka.connect.transforms.Transformation
    public ConfigDef config() {
        return AvroDataConfig.baseConfigDef();
    }

    @Override // org.apache.kafka.connect.transforms.Transformation, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
    }

    @Override // org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
        this.avroSchemaWrapperCache = new ConcurrentHashMap();
        this.avroData = new AvroData(new AvroDataConfig(map));
    }

    public Map<String, SourcePojoToSchemaAndStructTransform<R>.CacheEntry> getCache() {
        return this.avroSchemaWrapperCache;
    }
}
