package org.gradoop.flink.io.impl.csv.indexed.functions;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
import org.apache.flink.api.common.io.InitializeOnMaster;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.gradoop.flink.io.impl.csv.CSVConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Public
/* loaded from: input_file:org/gradoop/flink/io/impl/csv/indexed/functions/MultipleFileOutputFormat.class */
public abstract class MultipleFileOutputFormat<IT> extends RichOutputFormat<IT> implements InitializeOnMaster, CleanupWhenUnsuccessful {
    public static final String FILE_PARAMETER_KEY = "flink.output.file";
    private static FileSystem.WriteMode DEFAULT_WRITE_MODE;
    private static OutputDirectoryMode DEFAULT_OUTPUT_DIRECTORY_MODE;
    private static String DIRECTORY_SEPARATOR = CSVConstants.DIRECTORY_SEPARATOR;
    private static final Logger LOG = LoggerFactory.getLogger(MultipleFileOutputFormat.class);
    protected Path outputFilePath;
    protected HashMap<String, FSDataOutputStream> streams = new HashMap<>();
    protected transient FileSystem fs;
    private transient Path actualFilePath;
    private transient boolean fileCreated;
    private FileSystem.WriteMode writeMode;
    private OutputDirectoryMode outputDirectoryMode;

    /* loaded from: input_file:org/gradoop/flink/io/impl/csv/indexed/functions/MultipleFileOutputFormat$OutputDirectoryMode.class */
    public enum OutputDirectoryMode {
        ALWAYS,
        PARONLY
    }

    public MultipleFileOutputFormat(Path path) {
        this.outputFilePath = path;
    }

    private static void initDefaultsFromConfiguration(Configuration configuration) {
        DEFAULT_WRITE_MODE = configuration.getBoolean("fs.overwrite-files", false) ? FileSystem.WriteMode.OVERWRITE : FileSystem.WriteMode.NO_OVERWRITE;
        DEFAULT_OUTPUT_DIRECTORY_MODE = configuration.getBoolean("fs.output.always-create-directory", false) ? OutputDirectoryMode.ALWAYS : OutputDirectoryMode.PARONLY;
    }

    public void setOutputFilePath(Path path) {
        if (path == null) {
            throw new IllegalArgumentException("Output file path may not be null.");
        }
        this.outputFilePath = path;
    }

    public Path getOutputFilePath() {
        return this.outputFilePath;
    }

    public void setWriteMode(FileSystem.WriteMode writeMode) {
        if (writeMode == null) {
            throw new NullPointerException();
        }
        this.writeMode = writeMode;
    }

    public FileSystem.WriteMode getWriteMode() {
        return this.writeMode;
    }

    public void setOutputDirectoryMode(OutputDirectoryMode outputDirectoryMode) {
        if (outputDirectoryMode == null) {
            throw new NullPointerException();
        }
        this.outputDirectoryMode = outputDirectoryMode;
    }

    public OutputDirectoryMode getOutputDirectoryMode() {
        return this.outputDirectoryMode;
    }

    public void configure(Configuration configuration) {
        if (this.outputFilePath == null) {
            String string = configuration.getString("flink.output.file", (String) null);
            if (string == null) {
                throw new IllegalArgumentException("The output path has been specified neither via constructor/setters, nor via the Configuration.");
            }
            try {
                this.outputFilePath = new Path(string);
            } catch (IllegalArgumentException e) {
                throw new IllegalArgumentException("Could not create a valid URI from the given file path name: " + e.getMessage());
            }
        }
        if (this.writeMode == null) {
            this.writeMode = DEFAULT_WRITE_MODE;
        }
        if (this.outputDirectoryMode == null) {
            this.outputDirectoryMode = DEFAULT_OUTPUT_DIRECTORY_MODE;
        }
    }

    public void open(int i, int i2) throws IOException {
        if (i < 0 || i2 < 1) {
            throw new IllegalArgumentException("TaskNumber: " + i + ", numTasks: " + i2);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Opening stream for output (" + (i + 1) + "/" + i2 + "). WriteMode=" + this.writeMode + ", OutputDirectoryMode=" + this.outputDirectoryMode);
        }
        Path path = this.outputFilePath;
        if (path == null) {
            throw new IOException("The file path is null.");
        }
        this.fs = path.getFileSystem();
        if (this.fs.isDistributedFS()) {
            return;
        }
        if (i2 == 1 && this.outputDirectoryMode == OutputDirectoryMode.PARONLY) {
            if (!this.fs.initOutPathLocalFS(path, this.writeMode, false)) {
                throw new IOException("Output path '" + path.toString() + "' could not be initialized. Canceling task...");
            }
        } else if (!this.fs.initOutPathLocalFS(path, this.writeMode, true)) {
            throw new IOException("Output directory '" + path.toString() + "' could not be created. Canceling task...");
        }
    }

    public FSDataOutputStream getAndCreateFileStream(String str) throws IOException {
        if (!this.streams.containsKey(str)) {
            this.actualFilePath = new Path(this.outputFilePath, DIRECTORY_SEPARATOR + str + DIRECTORY_SEPARATOR + CSVConstants.SIMPLE_FILE);
            this.streams.put(str, this.fs.create(this.actualFilePath, this.writeMode));
        }
        return this.streams.get(str);
    }

    public void close() throws IOException {
        for (Map.Entry<String, FSDataOutputStream> entry : this.streams.entrySet()) {
            FSDataOutputStream value = entry.getValue();
            if (value != null) {
                entry.setValue(null);
                value.close();
            }
        }
    }

    public void initializeGlobal(int i) throws IOException {
        Path outputFilePath = getOutputFilePath();
        FileSystem fileSystem = outputFilePath.getFileSystem();
        if (fileSystem.isDistributedFS()) {
            FileSystem.WriteMode writeMode = getWriteMode();
            OutputDirectoryMode outputDirectoryMode = getOutputDirectoryMode();
            if (i == 1 && outputDirectoryMode == OutputDirectoryMode.PARONLY) {
                if (!fileSystem.initOutPathDistFS(outputFilePath, writeMode, false)) {
                    throw new IOException("Output path could not be initialized.");
                }
            } else if (!fileSystem.initOutPathDistFS(outputFilePath, writeMode, true)) {
                throw new IOException("Output directory could not be created.");
            }
        }
    }

    public void tryCleanupOnError() {
        if (this.fileCreated) {
            this.fileCreated = false;
            try {
                close();
            } catch (IOException e) {
                LOG.error("Could not properly close FileOutputFormat.", e);
            }
            try {
                FileSystem.get(this.actualFilePath.toUri()).delete(this.actualFilePath, false);
            } catch (FileNotFoundException e2) {
            } catch (IOException e3) {
                LOG.error("Could not remove the incomplete file " + this.actualFilePath + '.', e3);
            }
        }
    }

    static {
        initDefaultsFromConfiguration(GlobalConfiguration.loadConfiguration());
    }
}
