package org.apache.pulsar.io.hdfs2.sink;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.hdfs2.AbstractHdfsConnector;
import org.apache.pulsar.io.hdfs2.HdfsResources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.class */
public abstract class HdfsAbstractSink<K, V> extends AbstractHdfsConnector implements Sink<V> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) HdfsAbstractSink.class);
    protected HdfsSinkConfig hdfsSinkConfig;
    protected BlockingQueue<Record<V>> unackedRecords;
    protected HdfsSyncThread<V> syncThread;
    private Path path;
    private FSDataOutputStream hdfsStream;
    private DateTimeFormatter subdirectoryFormatter;

    public abstract KeyValue<K, V> extractKeyValue(Record<V> record);

    protected abstract void createWriter() throws IOException;

    @Override // org.apache.pulsar.io.core.Sink
    public void open(Map<String, Object> map, SinkContext sinkContext) throws Exception {
        this.hdfsSinkConfig = HdfsSinkConfig.load(map);
        this.hdfsSinkConfig.validate();
        this.connectorConfig = this.hdfsSinkConfig;
        this.unackedRecords = new LinkedBlockingQueue(this.hdfsSinkConfig.getMaxPendingRecords());
        if (this.hdfsSinkConfig.getSubdirectoryPattern() != null) {
            this.subdirectoryFormatter = DateTimeFormatter.ofPattern(this.hdfsSinkConfig.getSubdirectoryPattern());
        }
        connectToHdfs();
        createWriter();
        launchSyncThread();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.syncThread.halt();
        this.syncThread.join(0L);
    }

    protected final void connectToHdfs() throws IOException {
        try {
            if (this.hdfsResources.get().getConfiguration() == null) {
                this.hdfsResources.set(resetHDFSResources(this.hdfsSinkConfig));
            }
        } catch (IOException e) {
            this.hdfsResources.set(new HdfsResources(null, null, null));
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FSDataOutputStream getHdfsStream() throws IllegalArgumentException, IOException {
        if (this.hdfsStream == null) {
            Path path = getPath();
            FileSystem fileSystemAsUser = getFileSystemAsUser(getConfiguration(), getUserGroupInformation());
            this.hdfsStream = fileSystemAsUser.exists(path) ? fileSystemAsUser.append(path) : fileSystemAsUser.create(path);
        }
        return this.hdfsStream;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Path getPath() {
        if (this.path == null) {
            String str = "";
            if (StringUtils.isNotBlank(this.hdfsSinkConfig.getFileExtension())) {
                str = this.hdfsSinkConfig.getFileExtension();
            } else if (getCompressionCodec() != null) {
                str = getCompressionCodec().getDefaultExtension();
            }
            String directory = this.hdfsSinkConfig.getDirectory();
            if (this.subdirectoryFormatter != null) {
                directory = FilenameUtils.concat(directory, LocalDateTime.now().format(this.subdirectoryFormatter));
            }
            this.path = new Path(FilenameUtils.concat(directory, this.hdfsSinkConfig.getFilenamePrefix() + "-" + System.currentTimeMillis() + str));
            log.info("Create path: {}", this.path);
        }
        return this.path;
    }

    protected final void launchSyncThread() throws IOException {
        this.syncThread = new HdfsSyncThread<>(getHdfsStream(), this.unackedRecords, this.hdfsSinkConfig.getSyncInterval());
        this.syncThread.start();
    }
}
