package com.amazonaws.athena.connectors.cloudwatch;

import com.amazonaws.athena.connector.lambda.QueryStatusChecker;
import com.amazonaws.athena.connector.lambda.ThrottlingInvoker;
import com.amazonaws.athena.connector.lambda.data.BlockSpiller;
import com.amazonaws.athena.connector.lambda.domain.Split;
import com.amazonaws.athena.connector.lambda.domain.TableName;
import com.amazonaws.athena.connector.lambda.domain.predicate.Constraints;
import com.amazonaws.athena.connector.lambda.domain.predicate.Range;
import com.amazonaws.athena.connector.lambda.domain.predicate.SortedRangeSet;
import com.amazonaws.athena.connector.lambda.domain.predicate.ValueSet;
import com.amazonaws.athena.connector.lambda.handlers.RecordHandler;
import com.amazonaws.athena.connector.lambda.records.ReadRecordsRequest;
import com.amazonaws.athena.connectors.cloudwatch.qpt.CloudwatchQueryPassthrough;
import com.amazonaws.services.athena.AmazonAthena;
import com.amazonaws.services.athena.AmazonAthenaClientBuilder;
import com.amazonaws.services.logs.AWSLogs;
import com.amazonaws.services.logs.AWSLogsClientBuilder;
import com.amazonaws.services.logs.model.GetLogEventsRequest;
import com.amazonaws.services.logs.model.GetLogEventsResult;
import com.amazonaws.services.logs.model.OutputLogEvent;
import com.amazonaws.services.logs.model.ResultField;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.secretsmanager.AWSSecretsManager;
import com.amazonaws.services.secretsmanager.AWSSecretsManagerClientBuilder;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.arrow.util.VisibleForTesting;
import org.apache.logging.log4j.core.jackson.JsonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/amazonaws/athena/connectors/cloudwatch/CloudwatchRecordHandler.class */
public class CloudwatchRecordHandler extends RecordHandler {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) CloudwatchRecordHandler.class);
    private static final String SOURCE_TYPE = "cloudwatch";
    private final ThrottlingInvoker invoker;
    private final AtomicLong count;
    private final AWSLogs awsLogs;
    private final CloudwatchQueryPassthrough queryPassthrough;

    public CloudwatchRecordHandler(Map<String, String> map) {
        this(AmazonS3ClientBuilder.defaultClient(), AWSSecretsManagerClientBuilder.defaultClient(), AmazonAthenaClientBuilder.defaultClient(), AWSLogsClientBuilder.defaultClient(), map);
    }

    @VisibleForTesting
    protected CloudwatchRecordHandler(AmazonS3 amazonS3, AWSSecretsManager aWSSecretsManager, AmazonAthena amazonAthena, AWSLogs aWSLogs, Map<String, String> map) {
        super(amazonS3, aWSSecretsManager, amazonAthena, SOURCE_TYPE, map);
        this.count = new AtomicLong(0L);
        this.queryPassthrough = new CloudwatchQueryPassthrough();
        this.awsLogs = aWSLogs;
        this.invoker = ThrottlingInvoker.newDefaultBuilder(CloudwatchExceptionFilter.EXCEPTION_FILTER, map).build();
    }

    @Override // com.amazonaws.athena.connector.lambda.handlers.RecordHandler
    protected void readWithConstraint(BlockSpiller blockSpiller, ReadRecordsRequest readRecordsRequest, QueryStatusChecker queryStatusChecker) throws TimeoutException, InterruptedException {
        if (readRecordsRequest.getConstraints().isQueryPassThrough()) {
            getQueryPassthreoughResults(blockSpiller, readRecordsRequest);
            return;
        }
        String str = null;
        TableName tableName = readRecordsRequest.getTableName();
        Split split = readRecordsRequest.getSplit();
        this.invoker.setBlockSpiller(blockSpiller);
        do {
            String str2 = str;
            GetLogEventsResult getLogEventsResult = (GetLogEventsResult) this.invoker.invoke(() -> {
                return this.awsLogs.getLogEvents(pushDownConstraints(readRecordsRequest.getConstraints(), new GetLogEventsRequest().withLogGroupName(split.getProperty("log_group")).withLogStreamName(split.getProperty("log_stream")).withNextToken(str2).withStartFromHead(true)));
            });
            str = (str == null || !str.equals(getLogEventsResult.getNextForwardToken())) ? getLogEventsResult.getNextForwardToken() : null;
            for (OutputLogEvent outputLogEvent : getLogEventsResult.getEvents()) {
                blockSpiller.writeRows((block, i) -> {
                    return ((true & block.offerValue("log_stream", i, split.getProperty("log_stream"))) & block.offerValue("time", i, outputLogEvent.getTimestamp())) & block.offerValue(JsonConstants.ELT_MESSAGE, i, outputLogEvent.getMessage()) ? 1 : 0;
                });
            }
            logger.info("readWithConstraint: LogGroup[{}] LogStream[{}] Continuation[{}] rows[{}]", tableName.getSchemaName(), tableName.getTableName(), str, Integer.valueOf(getLogEventsResult.getEvents().size()));
            if (str == null) {
                return;
            }
        } while (queryStatusChecker.isQueryRunning());
    }

    private void getQueryPassthreoughResults(BlockSpiller blockSpiller, ReadRecordsRequest readRecordsRequest) throws TimeoutException, InterruptedException {
        Map<String, String> queryPassthroughArguments = readRecordsRequest.getConstraints().getQueryPassthroughArguments();
        this.queryPassthrough.verify(queryPassthroughArguments);
        for (List<ResultField> list : CloudwatchUtils.getResult(this.invoker, this.awsLogs, queryPassthroughArguments, Integer.parseInt(queryPassthroughArguments.get(CloudwatchQueryPassthrough.LIMIT))).getResults()) {
            blockSpiller.writeRows((block, i) -> {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    ResultField resultField = (ResultField) it.next();
                    if (!(true & block.offerValue(resultField.getField(), i, resultField.getValue()))) {
                        return 0;
                    }
                }
                return 1;
            });
        }
    }

    private GetLogEventsRequest pushDownConstraints(Constraints constraints, GetLogEventsRequest getLogEventsRequest) {
        ValueSet valueSet = constraints.getSummary().get("time");
        if ((valueSet instanceof SortedRangeSet) && !valueSet.isNullAllowed()) {
            Range span = ((SortedRangeSet) valueSet).getSpan();
            if (!span.getLow().isNullValue()) {
                getLogEventsRequest.setStartTime((Long) span.getLow().getValue());
            }
            if (!span.getHigh().isNullValue()) {
                getLogEventsRequest.setEndTime((Long) span.getHigh().getValue());
            }
        }
        return getLogEventsRequest;
    }
}
