package org.apache.hadoop.mapreduce.v2.app.commit;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobAbortCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;

/* loaded from: input_file:hadoop-client-2.1.0-beta/share/hadoop/client/lib/hadoop-mapreduce-client-app-2.1.0-beta.jar:org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.class */
public class CommitterEventHandler extends AbstractService implements EventHandler<CommitterEvent> {
    private static final Log LOG = LogFactory.getLog(CommitterEventHandler.class);
    private final AppContext context;
    private final OutputCommitter committer;
    private final RMHeartbeatHandler rmHeartbeatHandler;
    private ThreadPoolExecutor launcherPool;
    private Thread eventHandlingThread;
    private BlockingQueue<CommitterEvent> eventQueue;
    private final AtomicBoolean stopped;
    private Thread jobCommitThread;
    private int commitThreadCancelTimeoutMs;
    private long commitWindowMs;
    private FileSystem fs;
    private Path startCommitFile;
    private Path endCommitSuccessFile;
    private Path endCommitFailureFile;

    /* loaded from: input_file:hadoop-client-2.1.0-beta/share/hadoop/client/lib/hadoop-mapreduce-client-app-2.1.0-beta.jar:org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler$EventProcessor.class */
    private class EventProcessor implements Runnable {
        private CommitterEvent event;

        EventProcessor(CommitterEvent committerEvent) {
            this.event = committerEvent;
        }

        @Override // java.lang.Runnable
        public void run() {
            CommitterEventHandler.LOG.info("Processing the event " + this.event.toString());
            switch (this.event.getType()) {
                case JOB_SETUP:
                    handleJobSetup((CommitterJobSetupEvent) this.event);
                    return;
                case JOB_COMMIT:
                    handleJobCommit((CommitterJobCommitEvent) this.event);
                    return;
                case JOB_ABORT:
                    handleJobAbort((CommitterJobAbortEvent) this.event);
                    return;
                case TASK_ABORT:
                    handleTaskAbort((CommitterTaskAbortEvent) this.event);
                    return;
                default:
                    throw new YarnRuntimeException("Unexpected committer event " + this.event.toString());
            }
        }

        protected void handleJobSetup(CommitterJobSetupEvent committerJobSetupEvent) {
            try {
                CommitterEventHandler.this.committer.setupJob(committerJobSetupEvent.getJobContext());
                CommitterEventHandler.this.context.getEventHandler().handle(new JobSetupCompletedEvent(committerJobSetupEvent.getJobID()));
            } catch (Exception e) {
                CommitterEventHandler.LOG.warn("Job setup failed", e);
                CommitterEventHandler.this.context.getEventHandler().handle(new JobSetupFailedEvent(committerJobSetupEvent.getJobID(), StringUtils.stringifyException(e)));
            }
        }

        private void touchz(Path path) throws IOException {
            CommitterEventHandler.this.fs.create(path, false).close();
        }

        protected void handleJobCommit(CommitterJobCommitEvent committerJobCommitEvent) {
            try {
                try {
                    touchz(CommitterEventHandler.this.startCommitFile);
                    CommitterEventHandler.this.jobCommitStarted();
                    waitForValidCommitWindow();
                    CommitterEventHandler.this.committer.commitJob(committerJobCommitEvent.getJobContext());
                    touchz(CommitterEventHandler.this.endCommitSuccessFile);
                    CommitterEventHandler.this.context.getEventHandler().handle(new JobCommitCompletedEvent(committerJobCommitEvent.getJobID()));
                    CommitterEventHandler.this.jobCommitEnded();
                } catch (Exception e) {
                    try {
                        touchz(CommitterEventHandler.this.endCommitFailureFile);
                    } catch (Exception e2) {
                        CommitterEventHandler.LOG.error("could not create failure file.", e2);
                    }
                    CommitterEventHandler.LOG.error("Could not commit job", e);
                    CommitterEventHandler.this.context.getEventHandler().handle(new JobCommitFailedEvent(committerJobCommitEvent.getJobID(), StringUtils.stringifyException(e)));
                    CommitterEventHandler.this.jobCommitEnded();
                }
            } catch (Throwable th) {
                CommitterEventHandler.this.jobCommitEnded();
                throw th;
            }
        }

        protected void handleJobAbort(CommitterJobAbortEvent committerJobAbortEvent) {
            CommitterEventHandler.this.cancelJobCommit();
            try {
                CommitterEventHandler.this.committer.abortJob(committerJobAbortEvent.getJobContext(), committerJobAbortEvent.getFinalState());
            } catch (Exception e) {
                CommitterEventHandler.LOG.warn("Could not abort job", e);
            }
            CommitterEventHandler.this.context.getEventHandler().handle(new JobAbortCompletedEvent(committerJobAbortEvent.getJobID(), committerJobAbortEvent.getFinalState()));
        }

        protected void handleTaskAbort(CommitterTaskAbortEvent committerTaskAbortEvent) {
            try {
                CommitterEventHandler.this.committer.abortTask(committerTaskAbortEvent.getAttemptContext());
            } catch (Exception e) {
                CommitterEventHandler.LOG.warn("Task cleanup failed for attempt " + committerTaskAbortEvent.getAttemptID(), e);
            }
            CommitterEventHandler.this.context.getEventHandler().handle(new TaskAttemptEvent(committerTaskAbortEvent.getAttemptID(), TaskAttemptEventType.TA_CLEANUP_DONE));
        }

        private synchronized void waitForValidCommitWindow() throws InterruptedException {
            long lastHeartbeatTime = CommitterEventHandler.this.rmHeartbeatHandler.getLastHeartbeatTime();
            long time = CommitterEventHandler.this.context.getClock().getTime();
            while (time - lastHeartbeatTime > CommitterEventHandler.this.commitWindowMs) {
                CommitterEventHandler.this.rmHeartbeatHandler.runOnNextHeartbeat(new Runnable() { // from class: org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler.EventProcessor.1
                    @Override // java.lang.Runnable
                    public void run() {
                        synchronized (EventProcessor.this) {
                            EventProcessor.this.notify();
                        }
                    }
                });
                wait();
                lastHeartbeatTime = CommitterEventHandler.this.rmHeartbeatHandler.getLastHeartbeatTime();
                time = CommitterEventHandler.this.context.getClock().getTime();
            }
        }
    }

    public CommitterEventHandler(AppContext appContext, OutputCommitter outputCommitter, RMHeartbeatHandler rMHeartbeatHandler) {
        super("CommitterEventHandler");
        this.eventQueue = new LinkedBlockingQueue();
        this.jobCommitThread = null;
        this.context = appContext;
        this.committer = outputCommitter;
        this.rmHeartbeatHandler = rMHeartbeatHandler;
        this.stopped = new AtomicBoolean(false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.service.AbstractService
    public void serviceInit(Configuration configuration) throws Exception {
        super.serviceInit(configuration);
        this.commitThreadCancelTimeoutMs = configuration.getInt(MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, 60000);
        this.commitWindowMs = configuration.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS, 10000L);
        try {
            this.fs = FileSystem.get(configuration);
            JobId yarn = TypeConverter.toYarn(TypeConverter.fromYarn(this.context.getApplicationID()));
            String shortUserName = UserGroupInformation.getCurrentUser().getShortUserName();
            this.startCommitFile = MRApps.getStartJobCommitFile(configuration, shortUserName, yarn);
            this.endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(configuration, shortUserName, yarn);
            this.endCommitFailureFile = MRApps.getEndJobCommitFailureFile(configuration, shortUserName, yarn);
        } catch (IOException e) {
            throw new YarnRuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.service.AbstractService
    public void serviceStart() throws Exception {
        this.launcherPool = new ThreadPoolExecutor(5, 5, 1L, TimeUnit.HOURS, new LinkedBlockingQueue(), new ThreadFactoryBuilder().setNameFormat("CommitterEvent Processor #%d").build());
        this.eventHandlingThread = new Thread(new Runnable() { // from class: org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler.1
            @Override // java.lang.Runnable
            public void run() {
                while (!CommitterEventHandler.this.stopped.get() && !Thread.currentThread().isInterrupted()) {
                    try {
                        CommitterEventHandler.this.launcherPool.execute(new EventProcessor((CommitterEvent) CommitterEventHandler.this.eventQueue.take()));
                    } catch (InterruptedException e) {
                        if (CommitterEventHandler.this.stopped.get()) {
                            return;
                        }
                        CommitterEventHandler.LOG.error("Returning, interrupted : " + e);
                        return;
                    }
                }
            }
        });
        this.eventHandlingThread.setName("CommitterEvent Handler");
        this.eventHandlingThread.start();
        super.serviceStart();
    }

    @Override // org.apache.hadoop.yarn.event.EventHandler
    public void handle(CommitterEvent committerEvent) {
        try {
            this.eventQueue.put(committerEvent);
        } catch (InterruptedException e) {
            throw new YarnRuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.service.AbstractService
    public void serviceStop() throws Exception {
        if (this.stopped.getAndSet(true)) {
            return;
        }
        if (this.eventHandlingThread != null) {
            this.eventHandlingThread.interrupt();
        }
        if (this.launcherPool != null) {
            this.launcherPool.shutdown();
        }
        super.serviceStop();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void jobCommitStarted() throws IOException {
        if (this.jobCommitThread != null) {
            throw new IOException("Commit while another commit thread active: " + this.jobCommitThread.toString());
        }
        this.jobCommitThread = Thread.currentThread();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void jobCommitEnded() {
        if (this.jobCommitThread == Thread.currentThread()) {
            this.jobCommitThread = null;
            notifyAll();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void cancelJobCommit() {
        Thread thread = this.jobCommitThread;
        if (thread == null || !thread.isAlive()) {
            return;
        }
        LOG.info("Canceling commit");
        thread.interrupt();
        long time = this.context.getClock().getTime();
        long j = time + this.commitThreadCancelTimeoutMs;
        while (this.jobCommitThread == thread && time > j) {
            try {
                wait(time - j);
                time = this.context.getClock().getTime();
            } catch (InterruptedException e) {
                return;
            }
        }
    }
}
