package org.apache.camel.kafkaconnector;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import org.apache.camel.Endpoint;
import org.apache.camel.LoggingLevel;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain;
import org.apache.camel.kafkaconnector.utils.TaskHelper;
import org.apache.camel.support.DefaultExchange;
import org.apache.camel.util.StringHelper;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/camel-kafka-connector-0.10.1.jar:org/apache/camel/kafkaconnector/CamelSinkTask.class */
public class CamelSinkTask extends SinkTask {
    public static final String KAFKA_RECORD_KEY_HEADER = "camel.kafka.connector.record.key";
    public static final String HEADER_CAMEL_PREFIX = "CamelHeader.";
    public static final String PROPERTY_CAMEL_PREFIX = "CamelProperty.";
    private static final String CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX = "camel.sink.endpoint.";
    private static final String CAMEL_SINK_PATH_PROPERTIES_PREFIX = "camel.sink.path.";
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) CamelSinkTask.class);
    private static final String LOCAL_URL = "direct:start";
    private ErrantRecordReporter reporter;
    private CamelKafkaConnectMain cms;
    private ProducerTemplate producer;
    private Endpoint localEndpoint;
    private LoggingLevel loggingLevel = LoggingLevel.OFF;
    private boolean mapProperties;
    private boolean mapHeaders;

    @Override // org.apache.kafka.connect.connector.Task
    public String version() {
        return VersionUtil.getVersion();
    }

    @Override // org.apache.kafka.connect.sink.SinkTask, org.apache.kafka.connect.connector.Task
    public void start(Map<String, String> map) {
        try {
            LOG.info("Starting CamelSinkTask connector task");
            Map<String, String> combineDefaultAndLoadedProperties = TaskHelper.combineDefaultAndLoadedProperties(getDefaultConfig(), map);
            CamelSinkConnectorConfig camelSinkConnectorConfig = getCamelSinkConnectorConfig(combineDefaultAndLoadedProperties);
            if (this.context != null) {
                try {
                    this.reporter = this.context.errantRecordReporter();
                } catch (NoClassDefFoundError | NoSuchMethodError e) {
                    LOG.warn("Unable to instantiate ErrantRecordReporter.  Method 'SinkTaskContext.errantRecordReporter' does not exist.");
                    this.reporter = null;
                }
            }
            String string = camelSinkConnectorConfig.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
            try {
                this.loggingLevel = LoggingLevel.valueOf(string.toUpperCase());
            } catch (Exception e2) {
                LOG.debug("Invalid value {} for {} property", string.toUpperCase(), CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
            }
            String string2 = camelSinkConnectorConfig.getString("camel.sink.url");
            String string3 = camelSinkConnectorConfig.getString(CamelSinkConnectorConfig.CAMEL_SINK_MARSHAL_CONF);
            String string4 = camelSinkConnectorConfig.getString(CamelSinkConnectorConfig.CAMEL_SINK_UNMARSHAL_CONF);
            int intValue = camelSinkConnectorConfig.getInt(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF).intValue();
            long longValue = camelSinkConnectorConfig.getLong(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF).longValue();
            int intValue2 = camelSinkConnectorConfig.getInt(CamelConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF).intValue();
            long longValue2 = camelSinkConnectorConfig.getLong(CamelConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF).longValue();
            String string5 = camelSinkConnectorConfig.getString(CamelConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF);
            Boolean bool = camelSinkConnectorConfig.getBoolean(CamelConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF);
            String string6 = camelSinkConnectorConfig.getString(CamelConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF);
            String string7 = camelSinkConnectorConfig.getString(CamelConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF);
            int intValue3 = camelSinkConnectorConfig.getInt(CamelConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF).intValue();
            String string8 = camelSinkConnectorConfig.getString(CamelConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF);
            String string9 = camelSinkConnectorConfig.getString(CamelConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF);
            String string10 = camelSinkConnectorConfig.getString(CamelConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF);
            int intValue4 = camelSinkConnectorConfig.getInt(CamelConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF).intValue();
            int intValue5 = camelSinkConnectorConfig.getInt(CamelConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF).intValue();
            String string11 = camelSinkConnectorConfig.getString(CamelConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF);
            this.mapProperties = camelSinkConnectorConfig.getBoolean(CamelConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF).booleanValue();
            this.mapHeaders = camelSinkConnectorConfig.getBoolean(CamelConnectorConfig.CAMEL_CONNECTOR_MAP_HEADERS_CONF).booleanValue();
            DefaultCamelContext defaultCamelContext = new DefaultCamelContext();
            if (string2 == null) {
                string2 = TaskHelper.buildUrl(defaultCamelContext, combineDefaultAndLoadedProperties, camelSinkConnectorConfig.getString(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF), CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX, CAMEL_SINK_PATH_PROPERTIES_PREFIX);
            }
            this.cms = CamelKafkaConnectMain.builder(LOCAL_URL, string2).withProperties(combineDefaultAndLoadedProperties).withUnmarshallDataFormat(string4).withMarshallDataFormat(string3).withAggregationSize(intValue).withAggregationTimeout(longValue).withErrorHandler(string5).withMaxRedeliveries(intValue2).withRedeliveryDelay(longValue2).withIdempotencyEnabled(bool.booleanValue()).withExpressionType(string6).withExpressionHeader(string7).withMemoryDimension(intValue3).withIdempotentRepositoryType(string8).withIdempotentRepositoryTopicName(string9).withIdempotentRepositoryKafkaServers(string10).withIdempotentRepositoryKafkaMaxCacheSize(intValue4).withIdempotentRepositoryKafkaPollDuration(intValue5).withHeadersExcludePattern(string11).build(defaultCamelContext);
            this.cms.start();
            this.producer = this.cms.getProducerTemplate();
            this.localEndpoint = this.cms.getCamelContext().getEndpoint(LOCAL_URL);
            LOG.info("CamelSinkTask connector task started");
        } catch (Exception e3) {
            throw new ConnectException("Failed to create and start Camel context", e3);
        }
    }

    protected CamelSinkConnectorConfig getCamelSinkConnectorConfig(Map<String, String> map) {
        return new CamelSinkConnectorConfig(map);
    }

    protected Map<String, String> getDefaultConfig() {
        return Collections.emptyMap();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static String getCamelSinkEndpointConfigPrefix() {
        return CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX;
    }

    protected static String getCamelSinkPathConfigPrefix() {
        return CAMEL_SINK_PATH_PROPERTIES_PREFIX;
    }

    @Override // org.apache.kafka.connect.sink.SinkTask
    public void put(Collection<SinkRecord> collection) {
        for (SinkRecord sinkRecord : collection) {
            TaskHelper.logRecordContent(LOG, this.loggingLevel, sinkRecord);
            DefaultExchange defaultExchange = new DefaultExchange(this.producer.getCamelContext());
            defaultExchange.getMessage().setBody(sinkRecord.value());
            defaultExchange.getMessage().setHeader(KAFKA_RECORD_KEY_HEADER, sinkRecord.key());
            for (Header header : sinkRecord.headers()) {
                if (header.key().startsWith("CamelHeader.")) {
                    if (this.mapHeaders) {
                        mapHeader(header, "CamelHeader.", defaultExchange.getMessage().getHeaders());
                    }
                } else if (header.key().startsWith("CamelProperty.") && this.mapProperties) {
                    mapHeader(header, "CamelProperty.", defaultExchange.getProperties());
                }
            }
            LOG.debug("Sending exchange {} to {}", defaultExchange.getExchangeId(), LOCAL_URL);
            this.producer.send(this.localEndpoint, defaultExchange);
            if (defaultExchange.isFailed()) {
                if (this.reporter == null) {
                    LOG.warn("A delivery has failed and the error reporting is NOT enabled. Records may be lost or ignored");
                    throw new ConnectException("Exchange delivery has failed!", defaultExchange.getException());
                }
                LOG.warn("A delivery has failed and the error reporting is enabled. Sending record to the DLQ");
                this.reporter.report(sinkRecord, defaultExchange.getException());
            }
        }
    }

    @Override // org.apache.kafka.connect.sink.SinkTask, org.apache.kafka.connect.connector.Task
    public void stop() {
        LOG.info("Stopping CamelSinkTask connector task");
        try {
            try {
                if (this.cms != null) {
                    this.cms.stop();
                } else {
                    LOG.warn("A fatal exception may have occurred and the Camel main was not created");
                }
                LOG.info("CamelSinkTask connector task stopped");
            } catch (Exception e) {
                throw new ConnectException("Failed to stop Camel context", e);
            }
        } catch (Throwable th) {
            LOG.info("CamelSinkTask connector task stopped");
            throw th;
        }
    }

    private static void mapHeader(Header header, String str, Map<String, Object> map) {
        String after = StringHelper.after(header.key(), str, header.key());
        Schema schema = header.schema();
        if (schema.type().equals(Schema.BYTES_SCHEMA.type()) && Objects.equals(schema.name(), Decimal.LOGICAL_NAME)) {
            map.put(after, Decimal.toLogical(schema, (byte[]) header.value()));
        } else {
            map.put(after, header.value());
        }
    }

    CamelKafkaConnectMain getCms() {
        return this.cms;
    }

    public LoggingLevel getLoggingLevel() {
        return this.loggingLevel;
    }

    public void setLoggingLevel(LoggingLevel loggingLevel) {
        this.loggingLevel = loggingLevel;
    }
}
