/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.dfs;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Random;
import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.dfs.Block;
import org.apache.hadoop.dfs.BlockCommand;
import org.apache.hadoop.dfs.DataStorage;
import org.apache.hadoop.dfs.DatanodeCommand;
import org.apache.hadoop.dfs.DatanodeInfo;
import org.apache.hadoop.dfs.DatanodeProtocol;
import org.apache.hadoop.dfs.DatanodeRegistration;
import org.apache.hadoop.dfs.DisallowedDatanodeException;
import org.apache.hadoop.dfs.FSConstants;
import org.apache.hadoop.dfs.FSDataset;
import org.apache.hadoop.dfs.LocatedBlock;
import org.apache.hadoop.dfs.NamespaceInfo;
import org.apache.hadoop.dfs.Storage;
import org.apache.hadoop.dfs.StreamFile;
import org.apache.hadoop.dfs.UnregisteredDatanodeException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.mapred.StatusHttpServer;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.Updater;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.StringUtils;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class DataNode
implements FSConstants,
Runnable {
    public static final Log LOG = LogFactory.getLog((String)"org.apache.hadoop.dfs.DataNode");
    DatanodeProtocol namenode = null;
    FSDataset data = null;
    DatanodeRegistration dnRegistration = null;
    private String networkLoc;
    volatile boolean shouldRun = true;
    Vector<Block> receivedBlockList = new Vector();
    int xmitsInProgress = 0;
    Daemon dataXceiveServer = null;
    long blockReportInterval;
    long lastBlockReport = 0L;
    long lastHeartbeat = 0L;
    long heartBeatInterval;
    private DataStorage storage = null;
    private StatusHttpServer infoServer = null;
    private DataNodeMetrics myMetrics = new DataNodeMetrics();
    private static InetSocketAddress nameNodeAddr;
    private static DataNode datanodeObject;
    private static Thread dataNodeThread;
    String machineName;
    Count xceiverCount = new Count(0);

    public static InetSocketAddress createSocketAddr(String target) throws IOException {
        int port;
        String hostname;
        int colonIndex = target.indexOf(58);
        if (colonIndex < 0) {
            throw new RuntimeException("Not a host:port pair: " + target);
        }
        if (!target.contains("/")) {
            hostname = target.substring(0, colonIndex);
            port = Integer.parseInt(target.substring(colonIndex + 1));
        } else {
            URI addr = new Path(target).toUri();
            hostname = addr.getHost();
            port = addr.getPort();
        }
        return new InetSocketAddress(hostname, port);
    }

    DataNode(Configuration conf, AbstractList<File> dataDirs) throws IOException {
        try {
            this.startDataNode(conf, dataDirs);
        }
        catch (IOException ie) {
            this.shutdown();
            throw ie;
        }
    }

    void startDataNode(Configuration conf, AbstractList<File> dataDirs) throws IOException {
        this.machineName = DNS.getDefaultHost(conf.get("dfs.datanode.dns.interface", "default"), conf.get("dfs.datanode.dns.nameserver", "default"));
        InetSocketAddress nameNodeAddr = DataNode.createSocketAddr(conf.get("fs.default.name", "local"));
        this.namenode = (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class, 6L, nameNodeAddr, conf);
        NamespaceInfo nsInfo = this.handshake();
        FSConstants.StartupOption startOpt = (FSConstants.StartupOption)((Object)conf.get("dfs.datanode.startup", (Object)FSConstants.StartupOption.REGULAR));
        assert (startOpt != null) : "Startup option must be set.";
        this.storage = new DataStorage();
        this.storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
        this.data = new FSDataset(this.storage, conf);
        ServerSocket ss = null;
        int tmpPort = conf.getInt("dfs.datanode.port", 50010);
        String bindAddress = conf.get("dfs.datanode.bindAddress", "0.0.0.0");
        while (ss == null) {
            try {
                ss = new ServerSocket(tmpPort, 0, InetAddress.getByName(bindAddress));
                LOG.info((Object)("Opened server at " + tmpPort));
            }
            catch (IOException ie) {
                LOG.info((Object)("Could not open server at " + tmpPort + ", trying new port"));
                ++tmpPort;
            }
        }
        this.dnRegistration = new DatanodeRegistration(this.machineName + ":" + tmpPort, -1, this.storage);
        this.dataXceiveServer = new Daemon(new DataXceiveServer(ss));
        long blockReportIntervalBasis = conf.getLong("dfs.blockreport.intervalMsec", 3600000L);
        this.blockReportInterval = blockReportIntervalBasis - (long)new Random().nextInt((int)(blockReportIntervalBasis / 10L));
        this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", 3L) * 1000L;
        DataNode.nameNodeAddr = nameNodeAddr;
        int infoServerPort = conf.getInt("dfs.datanode.info.port", 50075);
        String infoServerBindAddress = conf.get("dfs.datanode.info.bindAddress", "0.0.0.0");
        this.infoServer = new StatusHttpServer("datanode", infoServerBindAddress, infoServerPort, true);
        this.infoServer.addServlet(null, "/streamFile/*", StreamFile.class);
        this.infoServer.start();
        this.dnRegistration.infoPort = this.infoServer.getPort();
        this.networkLoc = conf.get("dfs.datanode.rack");
        if (this.networkLoc == null) {
            this.networkLoc = DataNode.getNetworkLoc(conf);
        }
        this.register();
        datanodeObject = this;
    }

    private NamespaceInfo handshake() throws IOException {
        NamespaceInfo nsInfo = new NamespaceInfo();
        while (this.shouldRun) {
            try {
                nsInfo = this.namenode.versionRequest();
                break;
            }
            catch (SocketTimeoutException e) {
                LOG.info((Object)("Problem connecting to server: " + this.getNameNodeAddr()));
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException ie) {}
            }
        }
        String errorMsg = null;
        if (!nsInfo.getBuildVersion().equals(Storage.getBuildVersion())) {
            errorMsg = "Incompatible build versions: namenode BV = " + nsInfo.getBuildVersion() + "; datanode BV = " + Storage.getBuildVersion();
            LOG.fatal((Object)errorMsg);
            try {
                this.namenode.errorReport(this.dnRegistration, 0, errorMsg);
            }
            catch (SocketTimeoutException e) {
                LOG.info((Object)("Problem connecting to server: " + this.getNameNodeAddr()));
            }
            throw new IOException(errorMsg);
        }
        assert (-4 == nsInfo.getLayoutVersion()) : "Data-node and name-node layout versions must be the same.";
        return nsInfo;
    }

    public static DataNode getDataNode() {
        return datanodeObject;
    }

    public InetSocketAddress getNameNodeAddr() {
        return nameNodeAddr;
    }

    public String getNamenode() {
        return "<namenode>";
    }

    private void register() throws IOException {
        while (this.shouldRun) {
            try {
                this.dnRegistration.name = this.machineName + ":" + this.dnRegistration.getPort();
                this.dnRegistration = this.namenode.register(this.dnRegistration, this.networkLoc);
                break;
            }
            catch (SocketTimeoutException e) {
                LOG.info((Object)("Problem connecting to server: " + this.getNameNodeAddr()));
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }
        if (this.storage.getStorageID().equals("")) {
            this.storage.setStorageID(this.dnRegistration.getStorageID());
            this.storage.writeAll();
        }
    }

    public void shutdown() {
        if (this.infoServer != null) {
            try {
                this.infoServer.stop();
            }
            catch (Exception e) {
                // empty catch block
            }
        }
        this.shouldRun = false;
        if (this.dataXceiveServer != null) {
            ((DataXceiveServer)this.dataXceiveServer.getRunnable()).kill();
            this.dataXceiveServer.interrupt();
        }
        if (this.storage != null) {
            try {
                this.storage.unlockAll();
            }
            catch (IOException ie) {
                // empty catch block
            }
        }
        if (dataNodeThread != null) {
            dataNodeThread.interrupt();
            try {
                dataNodeThread.join();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    private void checkDiskError(IOException e) throws IOException {
        if (e.getMessage().startsWith("No space left on device")) {
            throw new DiskChecker.DiskOutOfSpaceException("No space left on device");
        }
        this.checkDiskError();
    }

    private void checkDiskError() throws IOException {
        try {
            this.data.checkDataDir();
        }
        catch (DiskChecker.DiskErrorException de) {
            this.handleDiskError(de.getMessage());
        }
    }

    private void handleDiskError(String errMsgr) {
        LOG.warn((Object)("DataNode is shutting down.\n" + errMsgr));
        try {
            this.namenode.errorReport(this.dnRegistration, 1, errMsgr);
        }
        catch (IOException iOException) {
            // empty catch block
        }
        this.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void offerService() throws Exception {
        LOG.info((Object)("using BLOCKREPORT_INTERVAL of " + this.blockReportInterval + "msec"));
        while (this.shouldRun) {
            try {
                long now = System.currentTimeMillis();
                if (now - this.lastHeartbeat > this.heartBeatInterval) {
                    DatanodeCommand cmd = this.namenode.sendHeartbeat(this.dnRegistration, this.data.getCapacity(), this.data.getRemaining(), this.xmitsInProgress, this.xceiverCount.getValue());
                    this.lastHeartbeat = now;
                    if (!this.processCommand(cmd)) continue;
                }
                Block[] blockArray = null;
                Vector<Block> vector = this.receivedBlockList;
                synchronized (vector) {
                    if (this.receivedBlockList.size() > 0) {
                        blockArray = this.receivedBlockList.toArray(new Block[this.receivedBlockList.size()]);
                    }
                }
                if (blockArray != null) {
                    this.namenode.blockReceived(this.dnRegistration, blockArray);
                    vector = this.receivedBlockList;
                    synchronized (vector) {
                        for (Block b : blockArray) {
                            this.receivedBlockList.remove(b);
                        }
                    }
                }
                if (now - this.lastBlockReport > this.blockReportInterval) {
                    DatanodeCommand cmd = this.namenode.blockReport(this.dnRegistration, this.data.getBlockReport());
                    this.lastBlockReport = now;
                    this.processCommand(cmd);
                }
                long waitTime = this.heartBeatInterval - (System.currentTimeMillis() - this.lastHeartbeat);
                Vector<Block> vector2 = this.receivedBlockList;
                synchronized (vector2) {
                    if (waitTime > 0L && this.receivedBlockList.size() == 0) {
                        try {
                            this.receivedBlockList.wait(waitTime);
                        }
                        catch (InterruptedException ie) {
                            // empty catch block
                        }
                    }
                }
            }
            catch (RemoteException re) {
                String reClass = re.getClassName();
                if (UnregisteredDatanodeException.class.getName().equals(reClass) || DisallowedDatanodeException.class.getName().equals(reClass)) {
                    LOG.warn((Object)("DataNode is shutting down: " + StringUtils.stringifyException(re)));
                    this.shutdown();
                    return;
                }
                LOG.warn((Object)StringUtils.stringifyException(re));
            }
            catch (IOException e) {
                LOG.warn((Object)StringUtils.stringifyException(e));
            }
        }
    }

    private boolean processCommand(DatanodeCommand cmd) throws IOException {
        if (cmd == null) {
            return true;
        }
        switch (cmd.action) {
            case DNA_TRANSFER: {
                BlockCommand bcmd = (BlockCommand)cmd;
                this.transferBlocks(bcmd.getBlocks(), bcmd.getTargets());
                break;
            }
            case DNA_INVALIDATE: {
                Block[] toDelete = ((BlockCommand)cmd).getBlocks();
                try {
                    this.data.invalidate(toDelete);
                }
                catch (IOException e) {
                    this.checkDiskError();
                    throw e;
                }
                this.myMetrics.removedBlocks(toDelete.length);
                break;
            }
            case DNA_SHUTDOWN: {
                this.shutdown();
                return false;
            }
            case DNA_REGISTER: {
                this.register();
                this.lastHeartbeat = 0L;
                this.lastBlockReport = 0L;
                break;
            }
            case DNA_FINALIZE: {
                this.storage.finalizeUpgrade();
                break;
            }
            default: {
                LOG.warn((Object)("Unknown DatanodeCommand action: " + (Object)((Object)cmd.action)));
            }
        }
        return true;
    }

    private void transferBlocks(Block[] blocks, DatanodeInfo[][] xferTargets) throws IOException {
        for (int i = 0; i < blocks.length; ++i) {
            if (!this.data.isValidBlock(blocks[i])) {
                String errStr = "Can't send invalid block " + blocks[i];
                LOG.info((Object)errStr);
                this.namenode.errorReport(this.dnRegistration, 2, errStr);
                break;
            }
            if (xferTargets[i].length <= 0) continue;
            LOG.info((Object)("Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]));
            new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();
        }
    }

    @Override
    public void run() {
        LOG.info((Object)("Starting DataNode in: " + this.data));
        this.dataXceiveServer.start();
        while (this.shouldRun) {
            try {
                this.offerService();
            }
            catch (Exception ex) {
                LOG.error((Object)("Exception: " + StringUtils.stringifyException(ex)));
                if (!this.shouldRun) continue;
                try {
                    Thread.sleep(5000L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }
        try {
            this.dataXceiveServer.join();
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        LOG.info((Object)("Finishing DataNode in: " + this.data));
    }

    public static DataNode run(Configuration conf) throws IOException {
        String[] dataDirs = conf.getStrings("dfs.data.dir");
        DataNode dn = DataNode.makeInstance(dataDirs, conf);
        if (dn != null) {
            dataNodeThread = new Thread((Runnable)dn, "DataNode: [" + StringUtils.arrayToString(dataDirs) + "]");
            dataNodeThread.setDaemon(true);
            dataNodeThread.start();
        }
        return dn;
    }

    static DataNode createDataNode(String[] args, Configuration conf) throws IOException {
        if (conf == null) {
            conf = new Configuration();
        }
        if (!DataNode.parseArguments(args, conf)) {
            DataNode.printUsage();
            return null;
        }
        return DataNode.run(conf);
    }

    void join() {
        if (dataNodeThread != null) {
            try {
                dataNodeThread.join();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    static DataNode makeInstance(String[] dataDirs, Configuration conf) throws IOException {
        ArrayList<File> dirs = new ArrayList<File>();
        for (int i = 0; i < dataDirs.length; ++i) {
            File data = new File(dataDirs[i]);
            try {
                DiskChecker.checkDir(data);
                dirs.add(data);
                continue;
            }
            catch (DiskChecker.DiskErrorException e) {
                LOG.warn((Object)("Invalid directory in dfs.data.dir: " + e.getMessage()));
            }
        }
        if (dirs.size() > 0) {
            return new DataNode(conf, dirs);
        }
        LOG.error((Object)"All directories in dfs.data.dir are invalid.");
        return null;
    }

    public String toString() {
        return "DataNode{data=" + this.data + ", localName='" + this.dnRegistration.getName() + "'" + ", storageID='" + this.dnRegistration.getStorageID() + "'" + ", xmitsInProgress=" + this.xmitsInProgress + "}";
    }

    private static void printUsage() {
        System.err.println("Usage: java DataNode");
        System.err.println("           [-r, --rack <network location>] |");
        System.err.println("           [-rollback]");
    }

    private static boolean parseArguments(String[] args, Configuration conf) {
        int argsLen = args == null ? 0 : args.length;
        FSConstants.StartupOption startOpt = FSConstants.StartupOption.REGULAR;
        String networkLoc = null;
        for (int i = 0; i < argsLen; ++i) {
            String cmd = args[i];
            if ("-r".equalsIgnoreCase(cmd) || "--rack".equalsIgnoreCase(cmd)) {
                if (i == args.length - 1) {
                    return false;
                }
                if (!(networkLoc = args[++i]).startsWith("-")) continue;
                return false;
            }
            if ("-rollback".equalsIgnoreCase(cmd)) {
                startOpt = FSConstants.StartupOption.ROLLBACK;
                continue;
            }
            if ("-regular".equalsIgnoreCase(cmd)) {
                startOpt = FSConstants.StartupOption.REGULAR;
                continue;
            }
            return false;
        }
        if (networkLoc != null) {
            conf.set("dfs.datanode.rack", NodeBase.normalize(networkLoc));
        }
        conf.setObject("dfs.datanode.startup", (Object)startOpt);
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private static String getNetworkLoc(Configuration conf) throws IOException {
        String locScript = conf.get("dfs.network.script");
        if (locScript == null) {
            return "/default-rack";
        }
        LOG.info((Object)"Starting to run script to get datanode network location");
        Process p = Runtime.getRuntime().exec(locScript);
        StringBuffer networkLoc = new StringBuffer();
        BufferedReader inR = new BufferedReader(new InputStreamReader(p.getInputStream()));
        final BufferedReader errR = new BufferedReader(new InputStreamReader(p.getErrorStream()));
        Thread errThread = new Thread(){

            public void start() {
                try {
                    String errLine = errR.readLine();
                    while (errLine != null) {
                        LOG.warn((Object)("Network script error: " + errLine));
                        errLine = errR.readLine();
                    }
                }
                catch (IOException iOException) {
                    // empty catch block
                }
            }
        };
        try {
            errThread.start();
            String line = inR.readLine();
            while (line != null) {
                networkLoc.append(line);
                line = inR.readLine();
            }
            try {
                int returnVal = p.waitFor();
                if (returnVal == 0) return networkLoc.toString();
                throw new IOException("Process exits with nonzero status: " + locScript);
            }
            catch (InterruptedException e) {
                throw new IOException(e.getMessage());
            }
            finally {
                try {
                    errThread.join();
                }
                catch (InterruptedException je) {
                    LOG.warn((Object)StringUtils.stringifyException(je));
                }
            }
        }
        finally {
            try {
                inR.close();
            }
            catch (IOException ine) {
                throw ine;
            }
            finally {
                errR.close();
            }
        }
    }

    public static void main(String[] args) {
        try {
            DataNode datanode = DataNode.createDataNode(args, null);
            if (datanode != null) {
                datanode.join();
            }
        }
        catch (Throwable e) {
            LOG.error((Object)StringUtils.stringifyException(e));
            System.exit(-1);
        }
    }

    static {
        datanodeObject = null;
        dataNodeThread = null;
    }

    class DataTransfer
    implements Runnable {
        InetSocketAddress curTarget;
        DatanodeInfo[] targets;
        Block b;
        byte[] buf;

        public DataTransfer(DatanodeInfo[] targets, Block b) throws IOException {
            this.curTarget = DataNode.createSocketAddr(targets[0].getName());
            this.targets = targets;
            this.b = b;
            this.buf = new byte[FSConstants.BUFFER_SIZE];
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            ++DataNode.this.xmitsInProgress;
            try {
                Socket s = new Socket();
                s.connect(this.curTarget, 60000);
                s.setSoTimeout(60000);
                DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
                try {
                    long filelen = DataNode.this.data.getLength(this.b);
                    DataInputStream in = new DataInputStream(new BufferedInputStream(DataNode.this.data.getBlockData(this.b)));
                    try {
                        out.write(80);
                        out.writeBoolean(true);
                        this.b.write(out);
                        out.writeInt(this.targets.length);
                        for (int i = 0; i < this.targets.length; ++i) {
                            this.targets[i].write(out);
                        }
                        out.write(0);
                        out.writeLong(filelen);
                        while (filelen > 0L) {
                            int bytesRead = in.read(this.buf, 0, (int)Math.min(filelen, (long)this.buf.length));
                            out.write(this.buf, 0, bytesRead);
                            filelen -= (long)bytesRead;
                        }
                    }
                    finally {
                        in.close();
                    }
                }
                finally {
                    out.close();
                }
                LOG.info((Object)("Transmitted block " + this.b + " to " + this.curTarget));
            }
            catch (IOException ie) {
                LOG.warn((Object)("Failed to transfer " + this.b + " to " + this.curTarget), (Throwable)ie);
            }
            finally {
                --DataNode.this.xmitsInProgress;
            }
        }
    }

    class DataXceiver
    implements Runnable {
        Socket s;

        public DataXceiver(Socket s) {
            this.s = s;
            LOG.debug((Object)("Number of active connections is: " + DataNode.this.xceiverCount));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            block17: {
                try {
                    DataInputStream in = new DataInputStream(new BufferedInputStream(this.s.getInputStream()));
                    try {
                        byte op = (byte)in.read();
                        if (op == 80) {
                            this.writeBlock(in);
                            break block17;
                        }
                        if (op == 81 || op == 82 || op == 83) {
                            this.readBlock(in, op);
                            break block17;
                        }
                        while (op >= 0) {
                            System.out.println("Faulty op: " + op);
                            op = (byte)in.read();
                        }
                        throw new IOException("Unknown opcode for incoming data stream");
                    }
                    finally {
                        in.close();
                    }
                }
                catch (Throwable t) {
                    LOG.error((Object)"DataXCeiver", t);
                }
                finally {
                    try {
                        DataNode.this.xceiverCount.decr();
                        LOG.debug((Object)("Number of active connections is: " + DataNode.this.xceiverCount));
                        this.s.close();
                    }
                    catch (IOException ie2) {}
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        private void readBlock(DataInputStream in, byte op) throws IOException {
            Block b = new Block();
            b.readFields(in);
            long toSkip = 0L;
            long endOffset = -1L;
            if (op == 82) {
                toSkip = in.readLong();
            } else if (op == 83) {
                toSkip = in.readLong();
                endOffset = in.readLong();
            }
            DataOutputStream out = new DataOutputStream(new BufferedOutputStream(this.s.getOutputStream()));
            try {
                block32: {
                    DataInputStream in2;
                    if (!DataNode.this.data.isValidBlock(b)) {
                        out.writeLong(-1L);
                    } else {
                        int bytesRead;
                        int toRead;
                        long len = DataNode.this.data.getLength(b);
                        if (endOffset < 0L) {
                            endOffset = len;
                        }
                        in2 = new DataInputStream(DataNode.this.data.getBlockData(b));
                        out.writeLong(len);
                        long amtSkipped = 0L;
                        if (op == 82 || op == 83) {
                            if (toSkip > len) {
                                toSkip = len;
                            }
                            try {
                                amtSkipped = in2.skip(toSkip);
                            }
                            catch (IOException iex) {
                                DataNode.this.shutdown();
                                throw iex;
                            }
                            out.writeLong(amtSkipped);
                        }
                        if (op == 83) {
                            if (endOffset > len) {
                                endOffset = len;
                            }
                            out.writeLong(endOffset);
                        }
                        byte[] buf = new byte[FSConstants.BUFFER_SIZE];
                        try {
                            toRead = (int)(endOffset - amtSkipped + 1L);
                            bytesRead = 0;
                            try {
                                bytesRead = in2.read(buf, 0, Math.min(FSConstants.BUFFER_SIZE, toRead));
                                DataNode.this.myMetrics.readBytes(bytesRead);
                            }
                            catch (IOException iex) {
                                DataNode.this.shutdown();
                                throw iex;
                            }
                        }
                        catch (SocketException se) {
                            try {
                                in2.close();
                                break block32;
                            }
                            catch (IOException iex) {
                                DataNode.this.shutdown();
                                throw iex;
                            }
                        }
                        while (toRead > 0 && bytesRead >= 0) {
                            out.write(buf, 0, bytesRead);
                            if ((toRead -= bytesRead) <= 0) continue;
                            try {
                                bytesRead = in2.read(buf, 0, Math.min(FSConstants.BUFFER_SIZE, toRead));
                                DataNode.this.myMetrics.readBytes(bytesRead);
                            }
                            catch (IOException iex) {
                                DataNode.this.shutdown();
                                throw iex;
                            }
                        }
                    }
                    break block32;
                    finally {
                        try {
                            in2.close();
                        }
                        catch (IOException iex) {
                            DataNode.this.shutdown();
                            throw iex;
                        }
                    }
                }
                DataNode.this.myMetrics.readBlocks(1);
                LOG.info((Object)("Served block " + b + " to " + this.s.getInetAddress()));
                return;
            }
            finally {
                out.close();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void writeBlock(DataInputStream in) throws IOException {
            DataOutputStream reply = new DataOutputStream(new BufferedOutputStream(this.s.getOutputStream()));
            try {
                OutputStream o;
                boolean shouldReportBlock = in.readBoolean();
                Block b = new Block();
                b.readFields(in);
                int numTargets = in.readInt();
                if (numTargets <= 0) {
                    throw new IOException("Mislabelled incoming datastream.");
                }
                DatanodeInfo[] targets = new DatanodeInfo[numTargets];
                for (int i = 0; i < targets.length; ++i) {
                    DatanodeInfo tmp = new DatanodeInfo();
                    tmp.readFields(in);
                    targets[i] = tmp;
                }
                byte encodingType = (byte)in.read();
                long len = in.readLong();
                DatanodeInfo curTarget = targets[0];
                Vector<DatanodeInfo> mirrors = new Vector<DatanodeInfo>();
                try {
                    o = DataNode.this.data.writeToBlock(b);
                }
                catch (IOException e) {
                    DataNode.this.checkDiskError(e);
                    throw e;
                }
                DataOutputStream out = new DataOutputStream(new BufferedOutputStream(o));
                InetSocketAddress mirrorTarget = null;
                String mirrorNode = null;
                try {
                    DataOutputStream out2;
                    DataInputStream in2;
                    block61: {
                        in2 = null;
                        out2 = null;
                        if (targets.length > 1) {
                            mirrorNode = targets[1].getName();
                            mirrorTarget = DataNode.createSocketAddr(mirrorNode);
                            try {
                                Socket s2 = new Socket();
                                s2.connect(mirrorTarget, 60000);
                                s2.setSoTimeout(60000);
                                out2 = new DataOutputStream(new BufferedOutputStream(s2.getOutputStream()));
                                in2 = new DataInputStream(new BufferedInputStream(s2.getInputStream()));
                                out2.write(80);
                                out2.writeBoolean(shouldReportBlock);
                                b.write(out2);
                                out2.writeInt(targets.length - 1);
                                for (int i = 1; i < targets.length; ++i) {
                                    targets[i].write(out2);
                                }
                                out2.write(encodingType);
                                out2.writeLong(len);
                                DataNode.this.myMetrics.replicatedBlocks(1);
                            }
                            catch (IOException ie) {
                                if (out2 == null) break block61;
                                LOG.info((Object)("Exception connecting to mirror " + mirrorNode + "\n" + StringUtils.stringifyException(ie)));
                                try {
                                    out2.close();
                                    in2.close();
                                }
                                catch (IOException out2close) {
                                }
                                finally {
                                    out2 = null;
                                    in2 = null;
                                }
                            }
                        }
                    }
                    boolean anotherChunk = len != 0L;
                    byte[] buf = new byte[FSConstants.BUFFER_SIZE];
                    while (anotherChunk) {
                        while (len > 0L) {
                            int bytesRead = in.read(buf, 0, (int)Math.min((long)buf.length, len));
                            if (bytesRead < 0) {
                                throw new EOFException("EOF reading from " + this.s.toString());
                            }
                            if (bytesRead <= 0) continue;
                            if (out2 != null) {
                                try {
                                    out2.write(buf, 0, bytesRead);
                                }
                                catch (IOException out2e) {
                                    LOG.info((Object)("Exception writing to mirror " + mirrorNode + "\n" + StringUtils.stringifyException(out2e)));
                                    try {
                                        out2.close();
                                        in2.close();
                                    }
                                    catch (IOException out2close) {
                                    }
                                    finally {
                                        out2 = null;
                                        in2 = null;
                                    }
                                }
                            }
                            try {
                                out.write(buf, 0, bytesRead);
                                DataNode.this.myMetrics.wroteBytes(bytesRead);
                            }
                            catch (IOException iex) {
                                DataNode.this.checkDiskError(iex);
                                throw iex;
                            }
                            len -= (long)bytesRead;
                        }
                        if (encodingType == 0) {
                            anotherChunk = false;
                            continue;
                        }
                        if (encodingType != 1) continue;
                        len = in.readLong();
                        if (out2 != null) {
                            try {
                                out2.writeLong(len);
                            }
                            catch (IOException ie) {
                                LOG.info((Object)("Exception writing to mirror " + mirrorNode + "\n" + StringUtils.stringifyException(ie)));
                                try {
                                    out2.close();
                                    in2.close();
                                }
                                catch (IOException ie2) {
                                }
                                finally {
                                    out2 = null;
                                    in2 = null;
                                }
                            }
                        }
                        if (len != 0L) continue;
                        anotherChunk = false;
                    }
                    if (out2 != null) {
                        try {
                            out2.flush();
                            long complete = in2.readLong();
                            if (complete != -889528038L) {
                                LOG.info((Object)("Conflicting value for WRITE_COMPLETE: " + complete));
                            }
                            LocatedBlock newLB = new LocatedBlock();
                            newLB.readFields(in2);
                            in2.close();
                            out2.close();
                            DatanodeInfo[] mirrorsSoFar = newLB.getLocations();
                            for (int k = 0; k < mirrorsSoFar.length; ++k) {
                                mirrors.add(mirrorsSoFar[k]);
                            }
                        }
                        catch (IOException ie) {
                            LOG.info((Object)("Exception writing to mirror " + mirrorNode + "\n" + StringUtils.stringifyException(ie)));
                            try {
                                out2.close();
                                in2.close();
                            }
                            catch (IOException ie2) {
                            }
                            finally {
                                out2 = null;
                                in2 = null;
                            }
                        }
                    }
                    if (out2 == null) {
                        LOG.info((Object)("Received block " + b + " from " + this.s.getInetAddress()));
                    } else {
                        LOG.info((Object)("Received block " + b + " from " + this.s.getInetAddress() + " and mirrored to " + mirrorTarget));
                    }
                }
                finally {
                    try {
                        out.close();
                    }
                    catch (IOException iex) {
                        DataNode.this.checkDiskError(iex);
                        throw iex;
                    }
                }
                DataNode.this.data.finalizeBlock(b);
                DataNode.this.myMetrics.wroteBlocks(1);
                if (shouldReportBlock) {
                    Vector<Block> iex = DataNode.this.receivedBlockList;
                    synchronized (iex) {
                        DataNode.this.receivedBlockList.add(b);
                        DataNode.this.receivedBlockList.notifyAll();
                    }
                }
                reply.writeLong(-889528038L);
                mirrors.add(curTarget);
                LocatedBlock newLB = new LocatedBlock(b, mirrors.toArray(new DatanodeInfo[mirrors.size()]));
                newLB.write(reply);
            }
            finally {
                reply.close();
            }
        }
    }

    class DataXceiveServer
    implements Runnable {
        ServerSocket ss;

        public DataXceiveServer(ServerSocket ss) {
            this.ss = ss;
        }

        public void run() {
            try {
                while (DataNode.this.shouldRun) {
                    Socket s = this.ss.accept();
                    DataNode.this.xceiverCount.incr();
                    new Daemon(new DataXceiver(s)).start();
                }
                this.ss.close();
            }
            catch (IOException ie) {
                LOG.info((Object)("Exiting DataXceiveServer due to " + ie.toString()));
            }
        }

        public void kill() {
            assert (!DataNode.this.shouldRun) : "shoudRun should be set to false before killing";
            try {
                this.ss.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }

    private static class Count {
        int value = 0;

        Count(int init) {
            this.value = init;
        }

        synchronized void incr() {
            ++this.value;
        }

        synchronized void decr() {
            --this.value;
        }

        public String toString() {
            return Integer.toString(this.value);
        }

        public int getValue() {
            return this.value;
        }
    }

    private class DataNodeMetrics
    implements Updater {
        private final MetricsRecord metricsRecord;
        private int bytesWritten = 0;
        private int bytesRead = 0;
        private int blocksWritten = 0;
        private int blocksRead = 0;
        private int blocksReplicated = 0;
        private int blocksRemoved = 0;

        DataNodeMetrics() {
            MetricsContext context = MetricsUtil.getContext("dfs");
            this.metricsRecord = MetricsUtil.createRecord(context, "datanode");
            context.registerUpdater(this);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void doUpdates(MetricsContext unused) {
            DataNodeMetrics dataNodeMetrics = this;
            synchronized (dataNodeMetrics) {
                this.metricsRecord.incrMetric("bytes_read", this.bytesRead);
                this.metricsRecord.incrMetric("bytes_written", this.bytesWritten);
                this.metricsRecord.incrMetric("blocks_read", this.blocksRead);
                this.metricsRecord.incrMetric("blocks_written", this.blocksWritten);
                this.metricsRecord.incrMetric("blocks_replicated", this.blocksReplicated);
                this.metricsRecord.incrMetric("blocks_removed", this.blocksRemoved);
                this.bytesWritten = 0;
                this.bytesRead = 0;
                this.blocksWritten = 0;
                this.blocksRead = 0;
                this.blocksReplicated = 0;
                this.blocksRemoved = 0;
            }
            this.metricsRecord.update();
        }

        synchronized void readBytes(int nbytes) {
            this.bytesRead += nbytes;
        }

        synchronized void wroteBytes(int nbytes) {
            this.bytesWritten += nbytes;
        }

        synchronized void readBlocks(int nblocks) {
            this.blocksRead += nblocks;
        }

        synchronized void wroteBlocks(int nblocks) {
            this.blocksWritten += nblocks;
        }

        synchronized void replicatedBlocks(int nblocks) {
            this.blocksReplicated += nblocks;
        }

        synchronized void removedBlocks(int nblocks) {
            this.blocksRemoved += nblocks;
        }
    }
}

