package org.apache.pulsar.io.hdfs3.sink.text;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.hdfs3.sink.HdfsAbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/io/hdfs3/sink/text/HdfsAbstractTextFileSink.class */
public abstract class HdfsAbstractTextFileSink<K, V> extends HdfsAbstractSink<K, V> implements Sink<V> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) HdfsAbstractTextFileSink.class);
    protected OutputStreamWriter writer;

    @Override // org.apache.pulsar.io.hdfs3.sink.HdfsAbstractSink
    protected void createWriter() throws IOException {
        this.writer = new OutputStreamWriter(new BufferedOutputStream(openHdfsStream()), getEncoding());
    }

    @Override // org.apache.pulsar.io.hdfs3.sink.HdfsAbstractSink, java.lang.AutoCloseable
    public void close() throws Exception {
        this.writer.close();
        super.close();
    }

    @Override // org.apache.pulsar.io.core.Sink
    public void write(Record<V> record) {
        try {
            this.writer.write(extractKeyValue(record).getValue().toString());
            if (this.hdfsSinkConfig.getSeparator() != 0) {
                this.writer.write(this.hdfsSinkConfig.getSeparator());
            }
            this.unackedRecords.put(record);
        } catch (IOException | InterruptedException e) {
            LOG.error("Unable to write to file " + String.valueOf(getPath()), e);
            record.fail();
        }
    }

    private OutputStream openHdfsStream() throws IOException {
        return this.hdfsSinkConfig.getCompression() != null ? getCompressionCodec().createOutputStream(getHdfsStream()) : getHdfsStream();
    }
}
