/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred.lib;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapRunnable;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.ReflectionUtils;

public class MultithreadedMapRunner
implements MapRunnable {
    private static final Log LOG = LogFactory.getLog((String)MultithreadedMapRunner.class.getName());
    private JobConf job;
    private Mapper mapper;
    private ExecutorService executorService;
    private volatile IOException ioException;

    public void configure(JobConf job) {
        int numberOfThreads = job.getInt("mapred.map.multithreadedrunner.threads", 10);
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Configuring job " + job.getJobName() + " to use " + numberOfThreads + " threads"));
        }
        this.job = job;
        this.mapper = (Mapper)ReflectionUtils.newInstance(job.getMapperClass(), job);
        this.executorService = Executors.newFixedThreadPool(numberOfThreads);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(RecordReader input, OutputCollector output, Reporter reporter) throws IOException {
        try {
            WritableComparable key = input.createKey();
            Writable value = input.createValue();
            while (input.next(key, value)) {
                this.executorService.execute(new MapperInvokeRunable(key, value, output, reporter));
                if (this.ioException != null) {
                    throw this.ioException;
                }
                key = input.createKey();
                value = input.createValue();
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Finished dispatching all Mappper.map calls, job " + this.job.getJobName()));
            }
            this.executorService.shutdown();
            try {
                while (!this.executorService.awaitTermination(100L, TimeUnit.MILLISECONDS)) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)("Awaiting all running Mappper.map calls to finish, job " + this.job.getJobName()));
                    }
                    if (this.ioException == null) continue;
                    throw this.ioException;
                }
                if (this.ioException != null) {
                    throw this.ioException;
                }
            }
            catch (IOException ioEx) {
                this.executorService.shutdownNow();
                throw ioEx;
            }
            catch (InterruptedException iEx) {
                throw new IOException(iEx.getMessage());
            }
        }
        finally {
            this.mapper.close();
        }
    }

    private class MapperInvokeRunable
    implements Runnable {
        private WritableComparable key;
        private Writable value;
        private OutputCollector output;
        private Reporter reporter;

        public MapperInvokeRunable(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) {
            this.key = key;
            this.value = value;
            this.output = output;
            this.reporter = reporter;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            try {
                MultithreadedMapRunner.this.mapper.map(this.key, this.value, this.output, this.reporter);
            }
            catch (IOException ex) {
                MultithreadedMapRunner multithreadedMapRunner = MultithreadedMapRunner.this;
                synchronized (multithreadedMapRunner) {
                    if (MultithreadedMapRunner.this.ioException == null) {
                        MultithreadedMapRunner.this.ioException = ex;
                    }
                }
            }
        }
    }
}

