package org.apache.pulsar.io.hdfs2.sink.seq;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.SequenceFile;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.hdfs2.sink.HdfsAbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/io/hdfs2/sink/seq/HdfsAbstractSequenceFileSink.class */
public abstract class HdfsAbstractSequenceFileSink<K, V, HdfsK, HdfsV> extends HdfsAbstractSink<K, V> implements Sink<V> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) HdfsAbstractSequenceFileSink.class);
    protected AtomicLong counter;
    protected FSDataOutputStream hdfsStream;
    protected SequenceFile.Writer writer = null;

    public abstract KeyValue<HdfsK, HdfsV> convert(KeyValue<K, V> keyValue);

    @Override // org.apache.pulsar.io.hdfs2.sink.HdfsAbstractSink, java.lang.AutoCloseable
    public void close() throws Exception {
        this.writer.close();
        super.close();
    }

    @Override // org.apache.pulsar.io.hdfs2.sink.HdfsAbstractSink
    protected void createWriter() throws IOException {
        this.writer = getWriter();
    }

    @Override // org.apache.pulsar.io.core.Sink
    public void write(Record<V> record) {
        try {
            KeyValue<HdfsK, HdfsV> convert = convert(extractKeyValue(record));
            this.writer.append(convert.getKey(), convert.getValue());
            this.unackedRecords.put(record);
        } catch (IOException | InterruptedException e) {
            LOG.error("Unable to write to file " + String.valueOf(getPath()), e);
            record.fail();
        }
    }

    protected SequenceFile.Writer getWriter() throws IOException {
        this.counter = new AtomicLong(0L);
        List<SequenceFile.Writer.Option> options = getOptions();
        return SequenceFile.createWriter(getConfiguration(), (SequenceFile.Writer.Option[]) options.toArray(new SequenceFile.Writer.Option[options.size()]));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<SequenceFile.Writer.Option> getOptions() throws IllegalArgumentException, IOException {
        ArrayList arrayList = new ArrayList();
        arrayList.add(SequenceFile.Writer.stream(getHdfsStream()));
        if (getCompressionCodec() != null) {
            arrayList.add(SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD, getCompressionCodec()));
        }
        return arrayList;
    }
}
