package cascading.tap.hadoop;

import cascading.flow.FlowProcess;
import cascading.flow.hadoop.util.HadoopUtil;
import cascading.tap.DecoratorTap;
import cascading.tap.MultiSourceTap;
import cascading.tap.Tap;
import cascading.tap.TapException;
import cascading.tuple.TupleEntryIterator;
import cascading.util.Util;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cascading/tap/hadoop/BaseDistCacheTap.class */
public abstract class BaseDistCacheTap extends DecoratorTap<Void, Configuration, RecordReader, OutputCollector> {
    private static final Logger LOG = LoggerFactory.getLogger(BaseDistCacheTap.class);

    public BaseDistCacheTap(Tap<Configuration, RecordReader, OutputCollector> tap) {
        super(tap);
    }

    public void sourceConfInit(FlowProcess<? extends Configuration> flowProcess, Configuration configuration) {
        if (HadoopUtil.isLocal(configuration) || Tap.id(this).equals(configuration.get("cascading.node.source")) || Tap.id(this).equals(configuration.get("cascading.step.source"))) {
            LOG.info("can't use distributed cache. reading '{}' from hdfs", super.getIdentifier());
            super.sourceConfInit(flowProcess, configuration);
        } else {
            try {
                registerHfs(flowProcess, configuration, getHfs());
            } catch (IOException e) {
                throw new TapException(e);
            }
        }
    }

    public TupleEntryIterator openForRead(FlowProcess<? extends Configuration> flowProcess, RecordReader recordReader) throws IOException {
        if (HadoopUtil.isLocal((Configuration) flowProcess.getConfig()) || recordReader != null) {
            LOG.info("delegating to parent");
            return super.openForRead(flowProcess, recordReader);
        }
        Path[] localCacheFiles = getLocalCacheFiles(flowProcess);
        if (localCacheFiles == null || localCacheFiles.length == 0) {
            return super.openForRead(flowProcess, (Object) null);
        }
        ArrayList<Path> arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        if (isSimpleGlob()) {
            for (FileStatus fileStatus : FileSystem.get((Configuration) flowProcess.getConfig()).globStatus(getHfs().getPath())) {
                arrayList.add(fileStatus.getPath());
            }
        } else {
            arrayList.add(getHfs().getPath());
        }
        for (Path path : arrayList) {
            for (Path path2 : localCacheFiles) {
                if (path2.toString().endsWith(path.getName())) {
                    LOG.info("found {} in distributed cache", path2);
                    arrayList2.add(new Lfs(getScheme(), path2.toString()));
                }
            }
        }
        if (!arrayList.isEmpty()) {
            return new MultiSourceTap((Tap[]) arrayList2.toArray(new Tap[arrayList2.size()])).openForRead(flowProcess, recordReader);
        }
        LOG.info("could not find files in local resource path. delegating to parent: {}", super.getIdentifier());
        return super.openForRead(flowProcess, recordReader);
    }

    private void registerHfs(FlowProcess<? extends Configuration> flowProcess, Configuration configuration, Hfs hfs) throws IOException {
        if (isSimpleGlob()) {
            FileStatus[] globStatus = FileSystem.get(configuration).globStatus(getHfs().getPath());
            if (globStatus == null || globStatus.length == 0) {
                throw new TapException(String.format("glob expression %s does not match any files on the filesystem", getHfs().getPath()));
            }
            for (FileStatus fileStatus : globStatus) {
                registerURI(configuration, fileStatus.getPath());
            }
        } else {
            registerURI(configuration, hfs.getPath());
        }
        hfs.sourceConfInitComplete(flowProcess, configuration);
    }

    private void registerURI(Configuration configuration, Path path) {
        URI uri = path.toUri();
        LOG.info("adding {} to local resource configuration ", uri);
        addLocalCacheFiles(configuration, uri);
    }

    private Hfs getHfs() {
        return (Hfs) getOriginal();
    }

    private boolean isSimpleGlob() {
        if (Util.isEmpty(getHfs().getIdentifier())) {
            return false;
        }
        return getHfs().getIdentifier().contains("*");
    }

    protected abstract Path[] getLocalCacheFiles(FlowProcess<? extends Configuration> flowProcess) throws IOException;

    protected abstract void addLocalCacheFiles(Configuration configuration, URI uri);

    public /* bridge */ /* synthetic */ TupleEntryIterator openForRead(FlowProcess flowProcess, Object obj) throws IOException {
        return openForRead((FlowProcess<? extends Configuration>) flowProcess, (RecordReader) obj);
    }

    public /* bridge */ /* synthetic */ void sourceConfInit(FlowProcess flowProcess, Object obj) {
        sourceConfInit((FlowProcess<? extends Configuration>) flowProcess, (Configuration) obj);
    }
}
