package org.apache.camel.kafkaconnector;

import java.math.BigDecimal;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import org.apache.camel.Exchange;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.PollingConsumer;
import org.apache.camel.StreamCache;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain;
import org.apache.camel.kafkaconnector.utils.SchemaHelper;
import org.apache.camel.kafkaconnector.utils.TaskHelper;
import org.apache.camel.support.UnitOfWorkHelper;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.SpscArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/camel-kafka-connector-0.10.1.jar:org/apache/camel/kafkaconnector/CamelSourceTask.class */
public class CamelSourceTask extends SourceTask {
    public static final String HEADER_CAMEL_PREFIX = "CamelHeader.";
    public static final String PROPERTY_CAMEL_PREFIX = "CamelProperty.";
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) CamelSourceTask.class);
    private static final String CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX = "camel.source.endpoint.";
    private static final String CAMEL_SOURCE_PATH_PROPERTIES_PREFIX = "camel.source.path.";
    private static final String LOCAL_URL = "seda:end";
    private CamelKafkaConnectMain cms;
    private PollingConsumer consumer;
    private String[] topics;
    private Long maxBatchPollSize;
    private Long maxPollDuration;
    private Integer maxNotCommittedRecords;
    private String camelMessageHeaderKey;
    private LoggingLevel loggingLevel = LoggingLevel.OFF;
    private Exchange[] exchangesWaitingForAck;
    private SpscArrayQueue<Integer> freeSlots;
    private boolean mapProperties;
    private boolean mapHeaders;

    @Override // org.apache.kafka.connect.connector.Task
    public String version() {
        return VersionUtil.getVersion();
    }

    @Override // org.apache.kafka.connect.source.SourceTask, org.apache.kafka.connect.connector.Task
    public void start(Map<String, String> map) {
        try {
            LOG.info("Starting CamelSourceTask connector task");
            Map<String, String> combineDefaultAndLoadedProperties = TaskHelper.combineDefaultAndLoadedProperties(getDefaultConfig(), map);
            CamelSourceConnectorConfig camelSourceConnectorConfig = getCamelSourceConnectorConfig(combineDefaultAndLoadedProperties);
            String string = camelSourceConnectorConfig.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF);
            try {
                this.loggingLevel = LoggingLevel.valueOf(string.toUpperCase());
            } catch (Exception e) {
                LOG.error("Invalid value {} for {} property", string.toUpperCase(), CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF);
            }
            this.maxBatchPollSize = camelSourceConnectorConfig.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF);
            this.maxPollDuration = camelSourceConnectorConfig.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_POLL_DURATION_CONF);
            this.maxNotCommittedRecords = camelSourceConnectorConfig.getInt(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_NOT_COMMITTED_RECORDS_CONF);
            this.camelMessageHeaderKey = camelSourceConnectorConfig.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF);
            String string2 = camelSourceConnectorConfig.getString("camel.source.url");
            String string3 = camelSourceConnectorConfig.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_UNMARSHAL_CONF);
            String string4 = camelSourceConnectorConfig.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MARSHAL_CONF);
            int intValue = camelSourceConnectorConfig.getInt(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF).intValue();
            long longValue = camelSourceConnectorConfig.getLong(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF).longValue();
            int intValue2 = camelSourceConnectorConfig.getInt(CamelConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF).intValue();
            long longValue2 = camelSourceConnectorConfig.getLong(CamelConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF).longValue();
            String string5 = camelSourceConnectorConfig.getString(CamelConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF);
            Boolean bool = camelSourceConnectorConfig.getBoolean(CamelConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF);
            String string6 = camelSourceConnectorConfig.getString(CamelConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF);
            String string7 = camelSourceConnectorConfig.getString(CamelConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF);
            int intValue3 = camelSourceConnectorConfig.getInt(CamelConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF).intValue();
            String string8 = camelSourceConnectorConfig.getString(CamelConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF);
            String string9 = camelSourceConnectorConfig.getString(CamelConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF);
            String string10 = camelSourceConnectorConfig.getString(CamelConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF);
            int intValue4 = camelSourceConnectorConfig.getInt(CamelConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF).intValue();
            int intValue5 = camelSourceConnectorConfig.getInt(CamelConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF).intValue();
            String string11 = camelSourceConnectorConfig.getString(CamelConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF);
            this.mapProperties = camelSourceConnectorConfig.getBoolean(CamelConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF).booleanValue();
            this.mapHeaders = camelSourceConnectorConfig.getBoolean(CamelConnectorConfig.CAMEL_CONNECTOR_MAP_HEADERS_CONF).booleanValue();
            this.topics = camelSourceConnectorConfig.getString("topics").split(",");
            String localUrlWithPollingOptions = getLocalUrlWithPollingOptions(camelSourceConnectorConfig.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF).longValue(), camelSourceConnectorConfig.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF).longValue(), camelSourceConnectorConfig.getBoolean(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF).booleanValue());
            this.freeSlots = new SpscArrayQueue<>(this.maxNotCommittedRecords.intValue());
            this.freeSlots.fill(new MessagePassingQueue.Supplier<Integer>() { // from class: org.apache.camel.kafkaconnector.CamelSourceTask.1
                int i;

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.jctools.queues.MessagePassingQueue.Supplier
                public Integer get() {
                    int i = this.i;
                    this.i = i + 1;
                    return Integer.valueOf(i);
                }
            });
            this.exchangesWaitingForAck = new Exchange[this.freeSlots.capacity()];
            DefaultCamelContext defaultCamelContext = new DefaultCamelContext();
            if (string2 == null) {
                string2 = TaskHelper.buildUrl(defaultCamelContext, combineDefaultAndLoadedProperties, camelSourceConnectorConfig.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF), CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX, CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
            }
            this.cms = CamelKafkaConnectMain.builder(string2, localUrlWithPollingOptions).withProperties(combineDefaultAndLoadedProperties).withUnmarshallDataFormat(string3).withMarshallDataFormat(string4).withAggregationSize(intValue).withAggregationTimeout(longValue).withErrorHandler(string5).withMaxRedeliveries(intValue2).withRedeliveryDelay(longValue2).withIdempotencyEnabled(bool.booleanValue()).withExpressionType(string6).withExpressionHeader(string7).withMemoryDimension(intValue3).withIdempotentRepositoryType(string8).withIdempotentRepositoryTopicName(string9).withIdempotentRepositoryKafkaServers(string10).withIdempotentRepositoryKafkaMaxCacheSize(intValue4).withIdempotentRepositoryKafkaPollDuration(intValue5).withHeadersExcludePattern(string11).build(defaultCamelContext);
            this.consumer = this.cms.getCamelContext().getEndpoint(localUrlWithPollingOptions).createPollingConsumer();
            this.consumer.start();
            this.cms.start();
            LOG.info("CamelSourceTask connector task started");
        } catch (Exception e2) {
            throw new ConnectException("Failed to create and start Camel context", e2);
        }
    }

    private long remaining(long j, long j2) {
        return j2 - (Instant.now().toEpochMilli() - j);
    }

    @Override // org.apache.kafka.connect.source.SourceTask
    public synchronized List<SourceRecord> poll() {
        Exchange receive;
        LOG.debug("Number of records waiting an ack: {}", Integer.valueOf(this.freeSlots.capacity() - this.freeSlots.size()));
        long epochMilli = Instant.now().toEpochMilli();
        long remaining = remaining(epochMilli, this.maxPollDuration.longValue());
        long j = 0;
        ArrayList arrayList = new ArrayList();
        while (j < this.maxBatchPollSize.longValue() && this.freeSlots.size() >= this.topics.length && remaining > 0 && (receive = this.consumer.receive(remaining)) != null) {
            LOG.debug("Received Exchange {} with Message {} from Endpoint {}", receive.getExchangeId(), receive.getMessage().getMessageId(), receive.getFromEndpoint());
            Map singletonMap = Collections.singletonMap("filename", receive.getFromEndpoint().toString());
            Map singletonMap2 = Collections.singletonMap("position", receive.getExchangeId());
            Object header = this.camelMessageHeaderKey != null ? receive.getMessage().getHeader(this.camelMessageHeaderKey) : null;
            Object body = receive.getMessage().getBody();
            SchemaBuilder buildSchemaBuilderForType = header != null ? SchemaHelper.buildSchemaBuilderForType(header) : null;
            SchemaBuilder buildSchemaBuilderForType2 = body != null ? SchemaHelper.buildSchemaBuilderForType(body) : null;
            long calculateTimestamp = calculateTimestamp(receive);
            if (body instanceof StreamCache) {
                ((StreamCache) body).reset();
            }
            for (String str : this.topics) {
                CamelSourceRecord camelSourceRecord = new CamelSourceRecord(singletonMap, singletonMap2, str, null, buildSchemaBuilderForType, header, buildSchemaBuilderForType2, body, Long.valueOf(calculateTimestamp));
                if (this.mapHeaders && receive.getMessage().hasHeaders()) {
                    setAdditionalHeaders(camelSourceRecord, receive.getMessage().getHeaders(), "CamelHeader.");
                }
                if (this.mapProperties && receive.hasProperties()) {
                    setAdditionalHeaders(camelSourceRecord, receive.getProperties(), "CamelProperty.");
                }
                TaskHelper.logRecordContent(LOG, this.loggingLevel, camelSourceRecord);
                Integer num = (Integer) this.freeSlots.remove();
                camelSourceRecord.setClaimCheck(num);
                this.exchangesWaitingForAck[num.intValue()] = receive;
                LOG.debug("Record: {}, containing data from exchange: {}, is associated with claim check number: {}", camelSourceRecord, receive, num);
                arrayList.add(camelSourceRecord);
            }
            j++;
            remaining = remaining(epochMilli, this.maxPollDuration.longValue());
        }
        if (arrayList.isEmpty()) {
            return null;
        }
        return arrayList;
    }

    @Override // org.apache.kafka.connect.source.SourceTask
    public void commitRecord(SourceRecord sourceRecord, RecordMetadata recordMetadata) {
        RuntimeException runtimeException;
        LOG.debug("Committing record: {} with metadata: {}", sourceRecord, recordMetadata);
        Integer claimCheck = ((CamelSourceRecord) sourceRecord).getClaimCheck();
        LOG.debug("Committing record with claim check number: {}", claimCheck);
        Exchange exchange = this.exchangesWaitingForAck[claimCheck.intValue()];
        try {
            try {
                UnitOfWorkHelper.doneSynchronizations(exchange, ((ExtendedExchange) exchange.adapt(ExtendedExchange.class)).handoverCompletions(), LOG);
                LOG.debug("Record with claim check number: {} committed.", claimCheck);
                this.exchangesWaitingForAck[claimCheck.intValue()] = null;
                this.freeSlots.add(claimCheck);
                LOG.debug("Claim check number: {} freed.", claimCheck);
            } finally {
            }
        } catch (Throwable th) {
            this.exchangesWaitingForAck[claimCheck.intValue()] = null;
            this.freeSlots.add(claimCheck);
            LOG.debug("Claim check number: {} freed.", claimCheck);
            throw th;
        }
    }

    @Override // org.apache.kafka.connect.source.SourceTask, org.apache.kafka.connect.connector.Task
    public void stop() {
        LOG.info("Stopping CamelSourceTask connector task");
        try {
            if (this.consumer != null) {
                this.consumer.stop();
            } else {
                LOG.warn("A critical error may have occurred and there is no consumer to stop");
            }
        } catch (Exception e) {
            LOG.error("Error stopping camel consumer: {}", e.getMessage());
        }
        try {
            try {
                if (this.cms != null) {
                    this.cms.stop();
                } else {
                    LOG.warn("A fatal exception may have occurred and the Camel main was not created");
                }
                LOG.info("CamelSourceTask connector task stopped");
            } catch (Exception e2) {
                throw new ConnectException("Failed to stop Camel context", e2);
            }
        } catch (Throwable th) {
            LOG.info("CamelSourceTask connector task stopped");
            throw th;
        }
    }

    protected CamelSourceConnectorConfig getCamelSourceConnectorConfig(Map<String, String> map) {
        return new CamelSourceConnectorConfig(map);
    }

    protected Map<String, String> getDefaultConfig() {
        return Collections.emptyMap();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static String getCamelSourceEndpointConfigPrefix() {
        return CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX;
    }

    protected static String getCamelSourcePathConfigPrefix() {
        return CAMEL_SOURCE_PATH_PROPERTIES_PREFIX;
    }

    protected long calculateTimestamp(Exchange exchange) {
        return System.currentTimeMillis();
    }

    private void setAdditionalHeaders(SourceRecord sourceRecord, Map<String, Object> map, String str) {
        for (Map.Entry<String, Object> entry : map.entrySet()) {
            String key = entry.getKey();
            Object value = entry.getValue();
            String str2 = str + key;
            if (value instanceof String) {
                sourceRecord.headers().addString(str2, (String) value);
            } else if (value instanceof Boolean) {
                sourceRecord.headers().addBoolean(str2, ((Boolean) value).booleanValue());
            } else if (value instanceof Byte) {
                sourceRecord.headers().addByte(str2, ((Byte) value).byteValue());
            } else if (value instanceof Byte[]) {
                Byte[] bArr = (Byte[]) value;
                byte[] bArr2 = new byte[bArr.length];
                for (int i = 0; i < bArr.length; i++) {
                    bArr2[i] = bArr[i].byteValue();
                }
                sourceRecord.headers().addBytes(str2, bArr2);
            } else if (value instanceof Date) {
                sourceRecord.headers().addTimestamp(str2, (Date) value);
            } else if (value instanceof BigDecimal) {
                Schema schema = Decimal.schema(((BigDecimal) value).scale());
                sourceRecord.headers().add(str2, Decimal.fromLogical(schema, (BigDecimal) value), schema);
            } else if (value instanceof Double) {
                sourceRecord.headers().addDouble(str2, ((Double) value).doubleValue());
            } else if (value instanceof Float) {
                sourceRecord.headers().addFloat(str2, ((Float) value).floatValue());
            } else if (value instanceof Integer) {
                sourceRecord.headers().addInt(str2, ((Integer) value).intValue());
            } else if (value instanceof Long) {
                sourceRecord.headers().addLong(str2, ((Long) value).longValue());
            } else if (value instanceof Short) {
                sourceRecord.headers().addShort(str2, ((Short) value).shortValue());
            }
        }
    }

    private String getLocalUrlWithPollingOptions(long j, long j2, boolean z) {
        return "seda:end?pollingConsumerQueueSize=" + j + "&pollingConsumerBlockTimeout=" + j2 + "&pollingConsumerBlockWhenFull=" + z;
    }

    CamelKafkaConnectMain getCms() {
        return this.cms;
    }

    public LoggingLevel getLoggingLevel() {
        return this.loggingLevel;
    }

    public void setLoggingLevel(LoggingLevel loggingLevel) {
        this.loggingLevel = loggingLevel;
    }
}
