package io.mantisrx.sourcejob.kafka;

import io.mantisrx.connector.kafka.KafkaAckable;
import io.mantisrx.connector.kafka.source.serde.ParseException;
import io.mantisrx.connector.kafka.source.serde.Parser;
import io.mantisrx.connector.kafka.source.serde.ParserType;
import io.mantisrx.runtime.Context;
import java.util.Collections;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/mantisrx/sourcejob/kafka/AutoAckTaggingStage.class */
public class AutoAckTaggingStage extends AbstractAckableTaggingStage {
    private static final Logger LOG = LoggerFactory.getLogger(AutoAckTaggingStage.class);
    private static final Logger logger = LoggerFactory.getLogger(AutoAckTaggingStage.class);

    @Override // io.mantisrx.sourcejob.kafka.AbstractAckableTaggingStage
    protected Map<String, Object> processAndAck(Context context, KafkaAckable kafkaAckable) {
        try {
            try {
                Boolean bool = (Boolean) context.getParameters().get("parseMessageInKafkaConsumerThread", true);
                String str = (String) context.getParameters().get("messageParserType", ParserType.SIMPLE_JSON.getPropName());
                if (bool.booleanValue()) {
                    Map<String, Object> map = (Map) kafkaAckable.getKafkaData().getParsedEvent().orElse(Collections.emptyMap());
                    kafkaAckable.ack();
                    return map;
                }
                Parser parser = ParserType.parser(str).getParser();
                if (!parser.canParse(kafkaAckable.getKafkaData().getRawBytes())) {
                    LOG.warn("cannot parse message {}", kafkaAckable.getKafkaData().getRawBytes().toString());
                    throw new ParseException("cannot parse message");
                }
                Map<String, Object> parseMessage = parser.parseMessage(kafkaAckable.getKafkaData().getRawBytes());
                kafkaAckable.ack();
                return parseMessage;
            } catch (Throwable th) {
                if (th instanceof ParseException) {
                    logger.warn("failed to parse message", th);
                } else {
                    logger.error("caught unexpected exception", th);
                }
                kafkaAckable.ack();
                return Collections.emptyMap();
            }
        } catch (Throwable th2) {
            kafkaAckable.ack();
            throw th2;
        }
    }

    @Override // io.mantisrx.sourcejob.kafka.AbstractAckableTaggingStage
    protected Map<String, Object> preProcess(Map<String, Object> map) {
        return map;
    }
}
