package io.floodplain.sink.sheet;

import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/floodplain/sink/sheet/SheetSinkTask.class */
public class SheetSinkTask extends SinkTask {
    public static final String SPREADSHEETID = "spreadsheetId";
    public static final String COLUMNS = "columns";
    public static final String TOPICS = "topics";
    public static final String STARTROW = "startRow";
    public static final String STARTCOLUMN = "startColumn";
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) SheetSinkTask.class);
    public String[] columns;
    private SheetSink sheetSink;
    private String spreadsheetId;
    private int startRow;
    private String startColumn;
    private final AtomicLong totalInTransaction = new AtomicLong(0);

    @Override // org.apache.kafka.connect.connector.Task
    public String version() {
        return "0.1";
    }

    @Override // org.apache.kafka.connect.sink.SinkTask, org.apache.kafka.connect.connector.Task
    public void start(Map<String, String> map) {
        logger.info("Starting Sheet connector: {}", map);
        this.spreadsheetId = map.get(SPREADSHEETID);
        this.columns = map.get("columns").split(",");
        this.startRow = ((Integer) Optional.of(map.get(STARTROW)).map(Integer::parseInt).orElse(1)).intValue();
        this.startColumn = (String) Optional.of(map.get(STARTCOLUMN)).orElse("A");
        try {
            this.sheetSink = new SheetSink();
        } catch (IOException | GeneralSecurityException e) {
            throw new RuntimeException("Problem starting sheet sink connector task", e);
        }
    }

    public SheetSink getSheetSink() {
        return this.sheetSink;
    }

    @Override // org.apache.kafka.connect.sink.SinkTask
    public void put(Collection<SinkRecord> collection) {
        List<UpdateTuple> extractTuples = extractTuples(collection);
        long currentTimeMillis = System.currentTimeMillis();
        try {
            this.sheetSink.updateRangeWithBatch(this.spreadsheetId, extractTuples);
        } catch (IOException e) {
            logger.error("Error: ", (Throwable) e);
        }
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        logger.info("Update took: {} total: {}", Long.valueOf(currentTimeMillis2), Long.valueOf(this.totalInTransaction.addAndGet(currentTimeMillis2)));
    }

    private List<UpdateTuple> extractTuples(Collection<SinkRecord> collection) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        logger.info("Inserting {} records", Integer.valueOf(collection.size()));
        for (SinkRecord sinkRecord : collection) {
            Map<String, Object> map = (Map) sinkRecord.value();
            if (map == null) {
                logger.info("Ignoring delete of key: {}", sinkRecord.key());
            } else {
                Integer num = (Integer) map.get("_row");
                if (num == null) {
                    throw new IllegalArgumentException("Invalid message for Google Sheets: Every message should have an int or long field named: '_row', marking the row where it should be inserted ");
                }
                List<List<Object>> extractRow = this.sheetSink.extractRow(map, this.columns);
                int intValue = num.intValue() + this.startRow;
                linkedHashMap.put(Integer.valueOf(intValue), new UpdateTuple(this.startColumn + intValue, extractRow));
            }
        }
        return new ArrayList(linkedHashMap.values());
    }

    @Override // org.apache.kafka.connect.sink.SinkTask, org.apache.kafka.connect.connector.Task
    public void stop() {
    }
}
