package org.apache.pulsar.io.hdfs3.sink;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.hdfs3.AbstractHdfsConnector;
import org.apache.pulsar.io.hdfs3.HdfsResources;

/* loaded from: input_file:org/apache/pulsar/io/hdfs3/sink/HdfsAbstractSink.class */
public abstract class HdfsAbstractSink<K, V> extends AbstractHdfsConnector implements Sink<V> {
    protected HdfsSinkConfig hdfsSinkConfig;
    protected BlockingQueue<Record<V>> unackedRecords;
    protected HdfsSyncThread<V> syncThread;
    private Path path;
    private FSDataOutputStream hdfsStream;

    public abstract KeyValue<K, V> extractKeyValue(Record<V> record);

    protected abstract void createWriter() throws IOException;

    @Override // org.apache.pulsar.io.core.Sink
    public void open(Map<String, Object> map, SinkContext sinkContext) throws Exception {
        this.hdfsSinkConfig = HdfsSinkConfig.load(map);
        this.hdfsSinkConfig.validate();
        this.connectorConfig = this.hdfsSinkConfig;
        this.unackedRecords = new LinkedBlockingQueue(this.hdfsSinkConfig.getMaxPendingRecords());
        connectToHdfs();
        createWriter();
        launchSyncThread();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.syncThread.halt();
        this.syncThread.join(0L);
    }

    protected final void connectToHdfs() throws IOException {
        try {
            if (this.hdfsResources.get().getConfiguration() == null) {
                this.hdfsResources.set(resetHDFSResources(this.hdfsSinkConfig));
            }
        } catch (IOException e) {
            this.hdfsResources.set(new HdfsResources(null, null, null));
            throw e;
        }
    }

    protected final FSDataOutputStreamBuilder getOutputStreamBuilder() throws IOException {
        Path path = getPath();
        FileSystem fileSystemAsUser = getFileSystemAsUser(getConfiguration(), getUserGroupInformation());
        return (fileSystemAsUser.exists(path) ? fileSystemAsUser.appendFile(path) : fileSystemAsUser.createFile(path)).recursive().permission(new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FSDataOutputStream getHdfsStream() throws IllegalArgumentException, IOException {
        if (this.hdfsStream == null) {
            this.hdfsStream = getOutputStreamBuilder().build();
        }
        return this.hdfsStream;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Path getPath() {
        if (this.path == null) {
            String str = "";
            if (StringUtils.isNotBlank(this.hdfsSinkConfig.getFileExtension())) {
                str = this.hdfsSinkConfig.getFileExtension();
            } else if (getCompressionCodec() != null) {
                str = getCompressionCodec().getDefaultExtension();
            }
            this.path = new Path(FilenameUtils.concat(this.hdfsSinkConfig.getDirectory(), this.hdfsSinkConfig.getFilenamePrefix() + "-" + System.currentTimeMillis() + this));
        }
        return this.path;
    }

    protected final void launchSyncThread() throws IOException {
        this.syncThread = new HdfsSyncThread<>(getHdfsStream(), this.unackedRecords, this.hdfsSinkConfig.getSyncInterval());
        this.syncThread.start();
    }
}
