/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.net.URLClassLoader;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.InMemoryFileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MRConstants;
import org.apache.hadoop.mapred.MapOutputLocation;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.ReduceTaskRunner;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapred.TaskRunner;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapred.TaskTracker;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;

class ReduceTask
extends Task {
    private static final Log LOG;
    private int numMaps;
    AtomicBoolean sortComplete = new AtomicBoolean(false);
    private ReduceCopier reduceCopier;
    private Progress copyPhase;
    private Progress sortPhase;
    private Progress reducePhase;
    private static final NumberFormat NUMBER_FORMAT;

    public ReduceTask() {
        this.getProgress().setStatus("reduce");
        this.setPhase(TaskStatus.Phase.SHUFFLE);
        this.copyPhase = this.getProgress().addPhase("copy");
        this.sortPhase = this.getProgress().addPhase("sort");
        this.reducePhase = this.getProgress().addPhase("reduce");
    }

    public ReduceTask(String jobId, String jobFile, String tipId, String taskId, int partition, int numMaps) {
        super(jobId, jobFile, tipId, taskId, partition);
        this.getProgress().setStatus("reduce");
        this.setPhase(TaskStatus.Phase.SHUFFLE);
        this.copyPhase = this.getProgress().addPhase("copy");
        this.sortPhase = this.getProgress().addPhase("sort");
        this.reducePhase = this.getProgress().addPhase("reduce");
        this.numMaps = numMaps;
    }

    public TaskRunner createRunner(TaskTracker tracker) throws IOException {
        return new ReduceTaskRunner(this, tracker, this.conf);
    }

    public boolean isMapTask() {
        return false;
    }

    public int getNumMaps() {
        return this.numMaps;
    }

    public void localizeConfiguration(JobConf conf) throws IOException {
        super.localizeConfiguration(conf);
        conf.setNumMapTasks(this.numMaps);
    }

    public void write(DataOutput out) throws IOException {
        super.write(out);
        out.writeInt(this.numMaps);
    }

    public void readFields(DataInput in) throws IOException {
        super.readFields(in);
        this.numMaps = in.readInt();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException {
        SequenceFile.Sorter.RawKeyValueIterator rIter;
        Class<? extends Writable> valueClass = job.getMapOutputValueClass();
        Reducer reducer = (Reducer)ReflectionUtils.newInstance(job.getReducerClass(), job);
        LocalFileSystem lfs = FileSystem.getLocal(job);
        if (!job.get("mapred.job.tracker", "local").equals("local")) {
            this.reduceCopier = new ReduceCopier(umbilical, job);
            if (!this.reduceCopier.fetchOutputs()) {
                throw new IOException(this.getTaskId() + "The reduce copier failed");
            }
        }
        this.copyPhase.complete();
        ArrayList<Path> mapFilesList = new ArrayList<Path>();
        for (int i = 0; i < this.numMaps; ++i) {
            Path f;
            try {
                f = this.mapOutputFile.getInputFile(i, this.getTaskId());
            }
            catch (DiskChecker.DiskErrorException d) {
                continue;
            }
            if (!((FileSystem)lfs).exists(f)) continue;
            mapFilesList.add(f);
        }
        Path[] mapFiles = new Path[mapFilesList.size()];
        mapFiles = mapFilesList.toArray(mapFiles);
        Thread sortProgress = new Thread(){

            public void run() {
                while (!ReduceTask.this.sortComplete.get()) {
                    try {
                        ReduceTask.this.reportProgress(umbilical);
                        Thread.sleep(1000L);
                    }
                    catch (InterruptedException e) {
                        return;
                    }
                    catch (Throwable e) {
                        System.out.println("Thread Exception in reporting sort progress\n" + StringUtils.stringifyException(e));
                    }
                }
            }
        };
        sortProgress.setDaemon(true);
        sortProgress.setName("Sort progress reporter for task " + this.getTaskId());
        Path tempDir = new Path(this.getTaskId());
        WritableComparator comparator = job.getOutputValueGroupingComparator();
        try {
            this.setPhase(TaskStatus.Phase.SORT);
            sortProgress.start();
            SequenceFile.Sorter sorter = new SequenceFile.Sorter((FileSystem)lfs, comparator, valueClass, (Configuration)job);
            rIter = sorter.merge(mapFiles, tempDir, !this.conf.getKeepFailedTaskFiles());
        }
        finally {
            this.sortComplete.set(true);
        }
        this.sortPhase.complete();
        this.setPhase(TaskStatus.Phase.REDUCE);
        final Reporter reporter = this.getReporter(umbilical);
        String finalName = ReduceTask.getOutputName(this.getPartition());
        FileSystem fs = FileSystem.get(job);
        final RecordWriter out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
        OutputCollector collector = new OutputCollector(){

            public void collect(WritableComparable key, Writable value) throws IOException {
                out.write(key, value);
                reporter.incrCounter(Task.Counter.REDUCE_OUTPUT_RECORDS, 1L);
                ReduceTask.this.reportProgress(umbilical);
            }
        };
        try {
            Class<? extends WritableComparable> keyClass = job.getMapOutputKeyClass();
            Class<? extends Writable> valClass = job.getMapOutputValueClass();
            ReduceValuesIterator values = new ReduceValuesIterator(rIter, comparator, keyClass, valClass, job, reporter);
            values.informReduceProgress();
            while (values.more()) {
                reporter.incrCounter(Task.Counter.REDUCE_INPUT_GROUPS, 1L);
                reducer.reduce(values.getKey(), values, collector, reporter);
                values.nextKey();
                values.informReduceProgress();
            }
            reducer.close();
            out.close(reporter);
        }
        catch (IOException ioe) {
            try {
                reducer.close();
            }
            catch (IOException ignored) {
                // empty catch block
            }
            try {
                out.close(reporter);
            }
            catch (IOException ignored) {
                // empty catch block
            }
            throw ioe;
        }
        this.done(umbilical);
    }

    static synchronized String getOutputName(int partition) {
        return "part-" + NUMBER_FORMAT.format(partition);
    }

    static {
        WritableFactories.setFactory(ReduceTask.class, new WritableFactory(){

            public Writable newInstance() {
                return new ReduceTask();
            }
        });
        LOG = LogFactory.getLog((String)ReduceTask.class.getName());
        NUMBER_FORMAT = NumberFormat.getInstance();
        NUMBER_FORMAT.setMinimumIntegerDigits(5);
        NUMBER_FORMAT.setGroupingUsed(false);
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private class ReduceCopier
    implements MRConstants {
        private TaskUmbilicalProtocol umbilical;
        private static final int STALLED_COPY_TIMEOUT = 180000;
        private ReduceTask reduceTask;
        private List<MapOutputLocation> scheduledCopies;
        private List<CopyResult> copyResults;
        private int numCopiers;
        private int maxBackoff;
        private Map<String, Long> penaltyBox;
        private Set<String> uniqueHosts;
        private long lastPollTime;
        private InMemoryFileSystem inMemFileSys;
        private FileSystem localFileSys;
        private SequenceFile.Sorter sorter;
        private volatile Throwable mergeThrowable;
        private volatile boolean mergeInProgress = false;
        private int mergeThreshold = 500;
        private MapOutputCopier[] copiers = null;
        private MetricsRecord shuffleMetrics = null;
        private static final long MIN_POLL_INTERVAL = 1000L;
        private int probe_sample_size = 100;
        private List<MapOutputLocation> retryFetches = new ArrayList<MapOutputLocation>();
        private Set<Integer> neededOutputs = Collections.synchronizedSet(new TreeSet());
        private Random random = null;
        private long ramfsMergeOutputSize;
        private int nextMapOutputCopierId = 0;
        private final PathFilter MAP_OUTPUT_FILTER = new PathFilter(){

            public boolean accept(Path file) {
                return file.toString().endsWith(".out");
            }
        };

        private int extractMapIdFromPathName(Path pathname) {
            String firstPathName = pathname.getName();
            int beginIndex = firstPathName.lastIndexOf("map_");
            int endIndex = firstPathName.lastIndexOf(".out");
            return Integer.parseInt(firstPathName.substring(beginIndex + "map_".length(), endIndex));
        }

        private Thread createProgressThread(final TaskUmbilicalProtocol umbilical) {
            Thread copyProgress = new Thread(){

                public void run() {
                    LOG.debug((Object)("Started thread: " + this.getName()));
                    while (true) {
                        try {
                            while (true) {
                                ReduceTask.this.reportProgress(umbilical);
                                Thread.sleep(1000L);
                            }
                        }
                        catch (InterruptedException e) {
                            return;
                        }
                        catch (Throwable e) {
                            LOG.info((Object)("Thread Exception in reporting copy progress\n" + StringUtils.stringifyException(e)));
                            continue;
                        }
                        break;
                    }
                }
            };
            copyProgress.setName("Copy progress reporter for task " + ReduceTask.this.getTaskId());
            copyProgress.setDaemon(true);
            return copyProgress;
        }

        private void configureClasspath(JobConf conf) throws IOException {
            ReduceTask task = ReduceTask.this;
            ClassLoader parent = conf.getClassLoader();
            File workDir = new File(task.getJobFile()).getParentFile();
            File jobCacheDir = new File(workDir.getParent(), "work");
            ArrayList<URL> urllist = new ArrayList<URL>();
            String jar = conf.getJar();
            if (jar != null) {
                File[] libs = new File(jobCacheDir, "lib").listFiles();
                if (libs != null) {
                    for (int i = 0; i < libs.length; ++i) {
                        urllist.add(libs[i].toURL());
                    }
                }
                urllist.add(new File(jobCacheDir, "classes").toURL());
                urllist.add(jobCacheDir.toURL());
            }
            urllist.add(workDir.toURL());
            URL[] urls = urllist.toArray(new URL[urllist.size()]);
            URLClassLoader loader = new URLClassLoader(urls, parent);
            conf.setClassLoader(loader);
        }

        public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf) throws IOException {
            this.configureClasspath(conf);
            this.umbilical = umbilical;
            this.reduceTask = ReduceTask.this;
            this.scheduledCopies = new ArrayList<MapOutputLocation>(100);
            this.copyResults = new ArrayList<CopyResult>(100);
            this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
            this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
            this.mergeThreshold = conf.getInt("mapred.inmem.merge.threshold", 1000);
            URI uri = URI.create("ramfs://mapoutput" + this.reduceTask.hashCode());
            this.inMemFileSys = (InMemoryFileSystem)FileSystem.get(uri, conf);
            LOG.info((Object)(this.reduceTask.getTaskId() + " Created an InMemoryFileSystem, uri: " + uri));
            this.ramfsMergeOutputSize = (long)(0.5f * (float)this.inMemFileSys.getFSSize());
            this.localFileSys = FileSystem.getLocal(conf);
            this.sorter = new SequenceFile.Sorter((FileSystem)this.inMemFileSys, conf.getOutputKeyComparator(), conf.getMapOutputValueClass(), (Configuration)conf);
            this.penaltyBox = new Hashtable<String, Long>();
            this.uniqueHosts = new HashSet<String>();
            this.lastPollTime = 0L;
            MetricsContext metricsContext = MetricsUtil.getContext("mapred");
            this.shuffleMetrics = MetricsUtil.createRecord(metricsContext, "shuffleInput");
            this.shuffleMetrics.setTag("user", conf.getUser());
            long randomSeed = System.nanoTime() + (long)Math.pow(this.reduceTask.getPartition(), this.reduceTask.getPartition() % 10);
            this.random = new Random(randomSeed);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * WARNING - Removed back jump from a try to a catch block - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        public boolean fetchOutputs() throws IOException {
            long startTime;
            int i;
            int numOutputs = this.reduceTask.getNumMaps();
            ArrayList<MapOutputLocation> knownOutputs = new ArrayList<MapOutputLocation>(this.numCopiers);
            int numInFlight = 0;
            int numCopied = 0;
            int lowThreshold = this.numCopiers * 2;
            long bytesTransferred = 0L;
            DecimalFormat mbpsFormat = new DecimalFormat("0.00");
            Random backoff = new Random();
            Progress copyPhase = this.reduceTask.getProgress().phase();
            this.probe_sample_size = Math.max(this.numCopiers * 5, 50);
            for (i = 0; i < numOutputs; ++i) {
                this.neededOutputs.add(new Integer(i));
                copyPhase.addPhase();
            }
            this.copiers = new MapOutputCopier[this.numCopiers];
            for (i = 0; i < this.copiers.length; ++i) {
                this.copiers[i] = new MapOutputCopier();
                this.copiers[i].start();
            }
            long currentTime = startTime = System.currentTimeMillis();
            IntWritable fromEventId = new IntWritable(0);
            Thread copyProgress = this.createProgressThread(this.umbilical);
            copyProgress.start();
            try {
                block24: while (!this.neededOutputs.isEmpty() && this.mergeThrowable == null) {
                    LOG.info((Object)(this.reduceTask.getTaskId() + " Need " + this.neededOutputs.size() + " map output(s)"));
                    try {
                        knownOutputs.addAll(this.retryFetches);
                        List<MapOutputLocation> locs = this.getMapCompletionEvents(fromEventId);
                        for (int i2 = 0; i2 < locs.size(); ++i2) {
                            knownOutputs.add(locs.get(i2));
                        }
                        LOG.info((Object)(this.reduceTask.getTaskId() + " Got " + locs.size() + " new map outputs from tasktracker and " + this.retryFetches.size() + " map outputs from previous failures"));
                        this.retryFetches.clear();
                    }
                    catch (IOException ie) {
                        LOG.warn((Object)(this.reduceTask.getTaskId() + " Problem locating map outputs: " + StringUtils.stringifyException(ie)));
                    }
                    int numKnown = knownOutputs.size();
                    int numScheduled2 = 0;
                    int numSlow = 0;
                    int numDups = 0;
                    LOG.info((Object)(this.reduceTask.getTaskId() + " Got " + numKnown + " known map output location(s); scheduling..."));
                    List<MapOutputLocation> list = this.scheduledCopies;
                    synchronized (list) {
                        Collections.shuffle(knownOutputs, this.random);
                        Iterator locIt = knownOutputs.iterator();
                        currentTime = System.currentTimeMillis();
                        while (locIt.hasNext()) {
                            MapOutputLocation loc = (MapOutputLocation)locIt.next();
                            Long penaltyEnd = this.penaltyBox.get(loc.getHost());
                            boolean penalized = false;
                            boolean duplicate = false;
                            if (penaltyEnd != null && currentTime < penaltyEnd) {
                                penalized = true;
                                ++numSlow;
                            }
                            if (this.uniqueHosts.contains(loc.getHost())) {
                                duplicate = true;
                                ++numDups;
                            }
                            if (penalized || duplicate) continue;
                            this.uniqueHosts.add(loc.getHost());
                            this.scheduledCopies.add(loc);
                            locIt.remove();
                            ++numInFlight;
                            ++numScheduled2;
                        }
                        this.scheduledCopies.notifyAll();
                    }
                    LOG.info((Object)(this.reduceTask.getTaskId() + " Scheduled " + numScheduled2 + " of " + numKnown + " known outputs (" + numSlow + " slow hosts and " + numDups + " dup hosts)"));
                    try {
                        if (numInFlight == 0 && numScheduled2 == 0) {
                            Thread.sleep(5000L);
                        }
                    }
                    catch (InterruptedException e) {
                        // empty catch block
                    }
                    while (numInFlight > 0 && this.mergeThrowable == null) {
                        LOG.debug((Object)(this.reduceTask.getTaskId() + " numInFlight = " + numInFlight));
                        CopyResult cr = this.getCopyResult();
                        if (cr != null) {
                            if (cr.getSuccess()) {
                                long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000L + 1L;
                                float mbs = (float)(bytesTransferred += cr.getSize()) / 1048576.0f;
                                float transferRate = mbs / (float)secsSinceStart;
                                copyPhase.startNextPhase();
                                copyPhase.setStatus("copy (" + ++numCopied + " of " + numOutputs + " at " + mbpsFormat.format(transferRate) + " MB/s)");
                            } else if (cr.isObsolete()) {
                                LOG.info((Object)(this.reduceTask.getTaskId() + " Ignoring obsolete copy result for Map Task: " + cr.getLocation().getMapTaskId() + " from host: " + cr.getHost()));
                            } else {
                                this.retryFetches.add(cr.getLocation());
                                currentTime = System.currentTimeMillis();
                                long nextContact = currentTime + 60000L + (long)backoff.nextInt(this.maxBackoff * 1000);
                                this.penaltyBox.put(cr.getHost(), new Long(nextContact));
                                LOG.warn((Object)(this.reduceTask.getTaskId() + " adding host " + cr.getHost() + " to penalty box, next contact in " + (nextContact - currentTime) / 1000L + " seconds"));
                                Iterator locIt = knownOutputs.iterator();
                                while (locIt.hasNext()) {
                                    MapOutputLocation loc = (MapOutputLocation)locIt.next();
                                    if (!cr.getHost().equals(loc.getHost())) continue;
                                    this.retryFetches.add(loc);
                                    locIt.remove();
                                }
                            }
                            this.uniqueHosts.remove(cr.getHost());
                            --numInFlight;
                        }
                        boolean busy = true;
                        if (numInFlight < lowThreshold && numOutputs - numCopied > this.probe_sample_size) {
                            busy = false;
                        }
                        List<CopyResult> list2 = this.copyResults;
                        synchronized (list2) {
                            if (this.copyResults.size() == 0 && !busy) {
                                continue block24;
                            }
                        }
                    }
                }
                MapOutputCopier[] numKnown = this.copiers;
                synchronized (this.copiers) {
                    List<MapOutputLocation> numScheduled2 = this.scheduledCopies;
                    synchronized (numScheduled2) {
                        for (int i3 = 0; i3 < this.copiers.length; ++i3) {
                            this.copiers[i3].interrupt();
                            this.copiers[i3] = null;
                        }
                    }
                    // ** MonitorExit[numKnown] (shouldn't be in output)
                    if (this.mergeThrowable == null) {
                        Path[] inMemClosedFiles;
                        try {
                            while (this.mergeInProgress) {
                                Thread.sleep(200L);
                            }
                            LOG.info((Object)(this.reduceTask.getTaskId() + " Copying of all map outputs complete. " + "Initiating the last merge on the remaining files in " + this.inMemFileSys.getUri()));
                            if (this.mergeThrowable != null) {
                                throw this.mergeThrowable;
                            }
                            inMemClosedFiles = this.inMemFileSys.getFiles(this.MAP_OUTPUT_FILTER);
                            if (inMemClosedFiles.length == 0) {
                                LOG.info((Object)(this.reduceTask.getTaskId() + "Nothing to merge from " + this.inMemFileSys.getUri()));
                                boolean numScheduled2 = this.neededOutputs.isEmpty();
                                return numScheduled2;
                            }
                        }
                        catch (Throwable t) {
                            LOG.warn((Object)(this.reduceTask.getTaskId() + " Final merge of the inmemory files threw an exception: " + StringUtils.stringifyException(t)));
                            boolean bl = false;
                            this.inMemFileSys.close();
                            copyProgress.interrupt();
                            return bl;
                        }
                        {
                            int mapId = this.extractMapIdFromPathName(inMemClosedFiles[0]);
                            Path outputPath = ReduceTask.this.mapOutputFile.getInputFileForWrite(mapId, this.reduceTask.getTaskId(), this.ramfsMergeOutputSize);
                            SequenceFile.Writer writer = this.sorter.cloneFileAttributes(this.inMemFileSys.makeQualified(inMemClosedFiles[0]), this.localFileSys.makeQualified(outputPath), null);
                            SequenceFile.Sorter.RawKeyValueIterator rIter = null;
                            try {
                                rIter = this.sorter.merge(inMemClosedFiles, true, inMemClosedFiles.length, new Path(this.reduceTask.getTaskId()));
                            }
                            catch (Exception e) {
                                writer.close();
                                this.localFileSys.delete(inMemClosedFiles[0]);
                                throw new IOException(StringUtils.stringifyException(e));
                            }
                            this.sorter.writeFile(rIter, writer);
                            writer.close();
                            LOG.info((Object)(this.reduceTask.getTaskId() + " Merge of the " + inMemClosedFiles.length + " files in InMemoryFileSystem complete." + " Local file is " + outputPath));
                        }
                    }
                    boolean bl = this.mergeThrowable == null && this.neededOutputs.isEmpty();
                    return bl;
                }
            }
            finally {
                this.inMemFileSys.close();
                copyProgress.interrupt();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private CopyResult getCopyResult() {
            List<CopyResult> list = this.copyResults;
            synchronized (list) {
                while (this.copyResults.isEmpty()) {
                    try {
                        this.copyResults.wait();
                    }
                    catch (InterruptedException interruptedException) {}
                }
                if (this.copyResults.isEmpty()) {
                    return null;
                }
                return this.copyResults.remove(0);
            }
        }

        private List<MapOutputLocation> getMapCompletionEvents(IntWritable fromEventId) throws IOException {
            long currentTime = System.currentTimeMillis();
            long pollTime = this.lastPollTime + 1000L;
            while (currentTime < pollTime) {
                try {
                    Thread.sleep(pollTime - currentTime);
                }
                catch (InterruptedException ie) {
                    // empty catch block
                }
                currentTime = System.currentTimeMillis();
            }
            this.lastPollTime = currentTime;
            TaskCompletionEvent[] t = this.umbilical.getMapCompletionEvents(this.reduceTask.getJobId().toString(), fromEventId.get(), this.probe_sample_size);
            ArrayList<MapOutputLocation> mapOutputsList = new ArrayList<MapOutputLocation>();
            for (TaskCompletionEvent event : t) {
                if (event.getTaskStatus() == TaskCompletionEvent.Status.SUCCEEDED) {
                    URI u = URI.create(event.getTaskTrackerHttp());
                    String host = u.getHost();
                    int port = u.getPort();
                    String taskId = event.getTaskId();
                    int mId = event.idWithinJob();
                    mapOutputsList.add(new MapOutputLocation(taskId, mId, host, port));
                    continue;
                }
                if (event.getTaskStatus() != TaskCompletionEvent.Status.TIPFAILED) continue;
                this.neededOutputs.remove(event.idWithinJob());
                LOG.info((Object)("Ignoring output of failed map: '" + event.getTaskId() + "'"));
            }
            fromEventId.set(fromEventId.get() + t.length);
            return mapOutputsList;
        }

        static /* synthetic */ int access$308(ReduceCopier x0) {
            return x0.nextMapOutputCopierId++;
        }

        static /* synthetic */ List access$600(ReduceCopier x0) {
            return x0.scheduledCopies;
        }

        private class InMemFSMergeThread
        extends Thread {
            private InMemoryFileSystem inMemFileSys;
            private LocalFileSystem localFileSys;
            private SequenceFile.Sorter sorter;

            public InMemFSMergeThread(InMemoryFileSystem inMemFileSys, LocalFileSystem localFileSys, SequenceFile.Sorter sorter) {
                this.inMemFileSys = inMemFileSys;
                this.localFileSys = localFileSys;
                this.sorter = sorter;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void run() {
                block8: {
                    LOG.info((Object)(ReduceCopier.this.reduceTask.getTaskId() + " Thread started: " + this.getName()));
                    try {
                        Path[] inMemClosedFiles = this.inMemFileSys.getFiles(ReduceCopier.this.MAP_OUTPUT_FILTER);
                        if (inMemClosedFiles.length >= 2) {
                            SequenceFile.Sorter.RawKeyValueIterator rIter;
                            int mapId = ReduceCopier.this.extractMapIdFromPathName(inMemClosedFiles[0]);
                            Path outputPath = ReduceTask.this.mapOutputFile.getInputFileForWrite(mapId, ReduceCopier.this.reduceTask.getTaskId(), ReduceCopier.this.ramfsMergeOutputSize);
                            SequenceFile.Writer writer = this.sorter.cloneFileAttributes(this.inMemFileSys.makeQualified(inMemClosedFiles[0]), this.localFileSys.makeQualified(outputPath), null);
                            try {
                                rIter = this.sorter.merge(inMemClosedFiles, true, inMemClosedFiles.length, new Path(ReduceCopier.this.reduceTask.getTaskId()));
                            }
                            catch (Exception e) {
                                writer.close();
                                this.localFileSys.delete(outputPath);
                                throw new IOException(StringUtils.stringifyException(e));
                            }
                            this.sorter.writeFile(rIter, writer);
                            writer.close();
                            LOG.info((Object)(ReduceCopier.this.reduceTask.getTaskId() + " Merge of the " + inMemClosedFiles.length + " files in InMemoryFileSystem complete." + " Local file is " + outputPath));
                            break block8;
                        }
                        LOG.info((Object)(ReduceCopier.this.reduceTask.getTaskId() + " Nothing to merge from " + this.inMemFileSys.getUri()));
                    }
                    catch (Throwable t) {
                        LOG.warn((Object)(ReduceCopier.this.reduceTask.getTaskId() + " Intermediate Merge of the inmemory files threw an exception: " + StringUtils.stringifyException(t)));
                        ReduceCopier.this.mergeThrowable = t;
                    }
                    finally {
                        ReduceCopier.this.mergeInProgress = false;
                    }
                }
            }
        }

        private class MapOutputCopier
        extends Thread {
            private MapOutputLocation currentLocation = null;
            private int id = ReduceCopier.access$308(ReduceCopier.this);

            public MapOutputCopier() {
                this.setName("MapOutputCopier " + ReduceCopier.this.reduceTask.getTaskId() + "." + this.id);
                LOG.debug((Object)(this.getName() + " created"));
            }

            public synchronized boolean fail() {
                if (this.currentLocation != null) {
                    this.finish(-1L);
                    return true;
                }
                return false;
            }

            public synchronized MapOutputLocation getLocation() {
                return this.currentLocation;
            }

            private synchronized void start(MapOutputLocation loc) {
                this.currentLocation = loc;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            private synchronized void finish(long size) {
                if (this.currentLocation != null) {
                    LOG.debug((Object)(this.getName() + " finishing " + this.currentLocation + " =" + size));
                    List list = ReduceCopier.this.copyResults;
                    synchronized (list) {
                        ReduceCopier.this.copyResults.add(new CopyResult(this.currentLocation, size));
                        ReduceCopier.this.copyResults.notify();
                    }
                    this.currentLocation = null;
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             * Unable to fully structure code
             */
            public void run() {
                while (true) {
                    try {
                        while (true) lbl-1000:
                        // 4 sources

                        {
                            loc = null;
                            size = -1L;
                            var4_5 = ReduceCopier.access$600(ReduceCopier.this);
                            synchronized (var4_5) {
                                while (ReduceCopier.access$600(ReduceCopier.this).isEmpty()) {
                                    ReduceCopier.access$600(ReduceCopier.this).wait();
                                }
                                loc = (MapOutputLocation)ReduceCopier.access$600(ReduceCopier.this).remove(0);
                            }
                            try {
                                this.start(loc);
                                size = this.copyOutput(loc);
                            }
                            catch (IOException e) {
                                ReduceTask.access$000().warn((Object)(ReduceCopier.access$400(ReduceCopier.this).getTaskId() + " copy failed: " + loc.getMapTaskId() + " from " + loc.getHost()));
                                ReduceTask.access$000().warn((Object)StringUtils.stringifyException(e));
                            }
                            finally {
                                this.finish(size);
                                continue;
                            }
                            break;
                        }
                    }
                    catch (InterruptedException e) {
                        return;
                    }
                    catch (Throwable th) {
                        ReduceTask.access$000().error((Object)("Map output copy failure: " + StringUtils.stringifyException(th)));
                        continue;
                    }
                    ** GOTO lbl-1000
                    break;
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            private long copyOutput(MapOutputLocation loc) throws IOException, InterruptedException {
                if (!ReduceCopier.this.neededOutputs.contains(loc.getMapId())) {
                    return -2L;
                }
                String reduceId = ReduceCopier.this.reduceTask.getTaskId();
                LOG.info((Object)(reduceId + " Copying " + loc.getMapTaskId() + " output from " + loc.getHost() + "."));
                Path filename = new Path("/" + reduceId + "/map_" + loc.getMapId() + ".out");
                Path tmpFilename = new Path(filename + "-" + this.id);
                tmpFilename = loc.getFile(ReduceCopier.this.inMemFileSys, ReduceCopier.this.localFileSys, ReduceCopier.this.shuffleMetrics, tmpFilename, ReduceTask.this.lDirAlloc, ReduceTask.this.conf, ReduceCopier.this.reduceTask.getPartition(), 180000);
                if (!ReduceCopier.this.neededOutputs.contains(loc.getMapId())) {
                    if (tmpFilename != null) {
                        FileSystem fs = tmpFilename.getFileSystem(ReduceTask.this.conf);
                        fs.delete(tmpFilename);
                    }
                    return -2L;
                }
                if (tmpFilename == null) {
                    throw new IOException("File " + filename + "-" + this.id + " not created");
                }
                long bytes = -1L;
                ReduceTask reduceTask = ReduceTask.this;
                synchronized (reduceTask) {
                    FileSystem fs = tmpFilename.getFileSystem(ReduceTask.this.conf);
                    if (!ReduceCopier.this.neededOutputs.contains(loc.getMapId())) {
                        fs.delete(tmpFilename);
                        return -2L;
                    }
                    bytes = fs.getLength(tmpFilename);
                    filename = new Path(tmpFilename.getParent(), filename.getName());
                    if (!fs.rename(tmpFilename, filename)) {
                        fs.delete(tmpFilename);
                        bytes = -1L;
                        throw new IOException("failure to rename map output " + tmpFilename);
                    }
                    LOG.info((Object)(reduceId + " done copying " + loc.getMapTaskId() + " output from " + loc.getHost() + "."));
                    if (!ReduceCopier.this.mergeInProgress && (ReduceCopier.this.inMemFileSys.getPercentUsed() >= 0.5f || ReduceCopier.this.mergeThreshold > 0 && ReduceCopier.this.inMemFileSys.getNumFiles(ReduceCopier.this.MAP_OUTPUT_FILTER) >= ReduceCopier.this.mergeThreshold) && ReduceCopier.this.mergeThrowable == null) {
                        LOG.info((Object)(reduceId + " InMemoryFileSystem " + ReduceCopier.this.inMemFileSys.getUri().toString() + " is " + ReduceCopier.this.inMemFileSys.getPercentUsed() + " full. Triggering merge"));
                        InMemFSMergeThread m = new InMemFSMergeThread(ReduceCopier.this.inMemFileSys, (LocalFileSystem)ReduceCopier.this.localFileSys, ReduceCopier.this.sorter);
                        m.setName("Thread for merging in memory files");
                        m.setDaemon(true);
                        ReduceCopier.this.mergeInProgress = true;
                        m.start();
                    }
                    ReduceCopier.this.neededOutputs.remove(loc.getMapId());
                }
                return bytes;
            }
        }

        private class CopyResult {
            private final MapOutputLocation loc;
            private final long size;
            private static final int OBSOLETE = -2;

            CopyResult(MapOutputLocation loc, long size) {
                this.loc = loc;
                this.size = size;
            }

            public int getMapId() {
                return this.loc.getMapId();
            }

            public boolean getSuccess() {
                return this.size >= 0L;
            }

            public boolean isObsolete() {
                return this.size == -2L;
            }

            public long getSize() {
                return this.size;
            }

            public String getHost() {
                return this.loc.getHost();
            }

            public MapOutputLocation getLocation() {
                return this.loc;
            }
        }
    }

    private class ReduceValuesIterator
    extends ValuesIterator {
        public ReduceValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in, WritableComparator comparator, Class keyClass, Class valClass, Configuration conf, Reporter reporter) throws IOException {
            super(in, comparator, keyClass, valClass, conf, reporter);
        }

        public void informReduceProgress() {
            ReduceTask.this.reducePhase.set(((ValuesIterator)this).in.getProgress().get());
            try {
                this.reporter.progress();
            }
            catch (IOException ie) {
                LOG.debug((Object)"Exception caught from progress", (Throwable)ie);
            }
        }

        public Object next() {
            this.reporter.incrCounter(Task.Counter.REDUCE_INPUT_RECORDS, 1L);
            return super.next();
        }
    }

    static class ValuesIterator
    implements Iterator {
        private SequenceFile.Sorter.RawKeyValueIterator in;
        private WritableComparable key;
        private Writable value;
        private boolean hasNext;
        private boolean more;
        private WritableComparator comparator;
        private Class keyClass;
        private Class valClass;
        private Configuration conf;
        private DataOutputBuffer valOut = new DataOutputBuffer();
        private DataInputBuffer valIn = new DataInputBuffer();
        private DataInputBuffer keyIn = new DataInputBuffer();
        protected Reporter reporter;

        public ValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in, WritableComparator comparator, Class keyClass, Class valClass, Configuration conf, Reporter reporter) throws IOException {
            this.in = in;
            this.conf = conf;
            this.comparator = comparator;
            this.keyClass = keyClass;
            this.valClass = valClass;
            this.reporter = reporter;
            this.getNext();
        }

        public boolean hasNext() {
            return this.hasNext;
        }

        public Object next() {
            Writable result = this.value;
            try {
                this.getNext();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            try {
                this.reporter.progress();
            }
            catch (IOException ie) {
                LOG.debug((Object)"caught exception from progress", (Throwable)ie);
            }
            return result;
        }

        public void remove() {
            throw new RuntimeException("not implemented");
        }

        public void nextKey() {
            while (this.hasNext) {
                this.next();
            }
            this.hasNext = this.more;
        }

        public boolean more() {
            return this.more;
        }

        public WritableComparable getKey() {
            return this.key;
        }

        private void getNext() throws IOException {
            WritableComparable lastKey = this.key;
            try {
                this.key = (WritableComparable)ReflectionUtils.newInstance(this.keyClass, this.conf);
                this.value = (Writable)ReflectionUtils.newInstance(this.valClass, this.conf);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            this.more = this.in.next();
            if (this.more) {
                this.keyIn.reset(this.in.getKey().getData(), this.in.getKey().getLength());
                this.key.readFields(this.keyIn);
                this.valOut.reset();
                this.in.getValue().writeUncompressedBytes(this.valOut);
                this.valIn.reset(this.valOut.getData(), this.valOut.getLength());
                this.value.readFields(this.valIn);
                this.hasNext = lastKey == null ? true : this.comparator.compare((Object)this.key, (Object)lastKey) == 0;
            } else {
                this.hasNext = false;
            }
        }
    }
}

