In this blog I'll demonstrate how to push code to data stored on a MapR (or Hadoop) cluster and achieve an order of magnitude speedup with simple bash and C++ coding and without any of the MapReduce and Java stuff.

I just had a MapR cluster set up; and in this exercise, I'm going to test the data-processing speedup I can gain by pushing the code to run where data is. The goal here is to gain some knowledge on the overhead of various system component (e.g. the hadoop commandline) so I can design a in-house platform for distributed data processing.

Setup

The data I have are mostly files of roughly fixed size, about 30MB each. I need to store a large number of such files in the MapR filesystem, and need to frequently run some processing command on each file. The output generated from each file is much smaller than the input and is negligible. In this experiment, I'll use 100 such files as input, and use "md5sum" as the operation. The files are already stored on the MapR filesystem, with the text file "list" containing a list of the paths to the 100 files.

The hardware setup is as follows. The cluster has 8 nodes, with 7 forming a MapR cluster. All the commands and operations are done on the remaining node with the hostname "washtenaw".

Pulling Data From MapR FS: The Naive Approach

The simplest approach is simply to run all the processing locally on washtenaw, pulling all the data needed from the MapR FS. Following is the script "run-local.sh":

#!/bin/bash
# run-local.sh
cat list | while read name
do
    printf '%s\t' $name
    hadoop fs -cat $name | md5sum 
done

And the performance is

wdong@washtenaw $ time ./run-local.sh > md5.hadoop

real    1m14.910s
user    1m10.304s
sys 0m19.573s

Roughly we spend 7.5s on each file.

Pulling Data From MapR FS without Java

We already know that MapR FS is able to achieve a throughput of 80MB/s from the prevous post. So it should take only 30/80 = 0.375s to retrieve each file from the cluster (without the name node lookup and all other latencies). This is much smaller than the 7.5s we spent in our first setting. An obvious overhead is the cost to start the hadoop command line. So in this setting, I'll test directly fetching the data using the C API. No java code is involved in this setting, but the data is still loaded remotely from the cluster.

Here's the C++ source code:

// run-c++.cpp
#include "hdfs.h" 
#include <iostream>

using namespace std;

int main(int argc, char **argv) {
    static size_t BUFFER_SIZE = 64 * 1024 * 1024;
    string buffer(BUFFER_SIZE, '\0');

    hdfsFS fs = hdfsConnect("default", 0); 
    string path;
    while (cin >> path) {
        cout << path << ' ';
        hdfsFile h = hdfsOpenFile(fs, path.c_str(), O_RDONLY, BUFFER_SIZE, 0, 0); 
        cout.flush();
        FILE *cmd = popen("md5sum", "w");
        for (;;) {
            size_t sz = hdfsRead(fs, h, &buffer[0], buffer.size());
            fwrite(&buffer[0], sz, 1, cmd);
            if (sz != buffer.size()) break;
        }   
        pclose(cmd);
        hdfsCloseFile(fs, h); 
    }   
    hdfsDisconnect(fs);
    return 0;
}

And here's the performance:

wdong@washtenaw$ make run-c++
g++ -std=c++11 -Wall -O3 -I/opt/mapr/hadoop/hadoop-0.20.2/src/c++/libhdfs   -L/opt/mapr/lib -Wl,-allow-shlib-undefined  run-c++.cpp  -lMapRClient -o run-c++
wdong@washtenaw$ time ./run-c++  < list > md5.c++

real    0m42.258s
user    0m6.880s
sys 0m16.077s

We've reduced per-file processing time from 7.5s to about 4.2s. The overhead of running the hadoop command line for each file is about 3s -- that is pretty big.

Pushing Code to Data

Here comes the real stuff. I'll detect on which node a file is stored, and push our code to where the data is. I use the following C++ program to query the location of a file, assuming that each file is contained in one filesystem block with no replication, which is the case in my system setup.

// hdfs-lookup.cpp
#include "hdfs.h" 
#include <string>
#include <iostream>
#include <boost/assert.hpp>

using namespace std;

int main(int argc, char **argv) {
    hdfsFS fs = hdfsConnect("default", 0); 
    string path;
    while (cin >> path) {
        // get the block of the 1st byte
        char ***hosts = hdfsGetHosts(fs, path.c_str(), 0, 1); 
        BOOST_VERIFY(hosts[0] && hosts[0][0]);
        cout << path << ' ' << hosts[0][0] << endl;
        hdfsFreeHosts(hosts);
    }   
    hdfsDisconnect(fs);
    return 0;
}

Running the program produces something like the following: each line containing the path followed by the hostname where the data is:

wdong@washtenaw$ make hdfs-lookup
g++ -std=c++11 -Wall -O3 -I/opt/mapr/hadoop/hadoop-0.20.2/src/c++/libhdfs   -L/opt/mapr/lib -Wl,-allow-shlib-undefined  hdfs-lookup.cpp  -lMapRClient -o hdfs-lookup
wdong@washtenaw$ ./hdfs-lookup < list
test/data1 fuller
test/data2 ford
test/data3 huron
test/data4 huron
test/data5 plymouth
...

I then use the following script to drive the computation:

#!/bin/bash
# run-remote.sh

mkdir -p input; rm -f input/*
mkdir -p output; rm -f output/*

cat list | ./hdfs-lookup | while read path host
do
    echo $path >> input/$host
done

for host in `ls input`
do
    ssh $host "cd $PWD; cat input/$host | ./run-c++ > output/$host"
done

cat output/*

And here's the performance:

wdong@washtenaw$ time ./run-remote.sh > md5.remote

real    0m13.529s
user    0m0.116s
sys 0m0.040s

Substantial speedup! The time we spent on each file is 0.135s.

Pushing Code to Data with Parallelization

Finally, I go ahead to parallelize the above driving script:

#!/bin/bash
# run-parallel.sh

mkdir -p input; rm -f input/*
mkdir -p output; rm -f output/*

cat list | ./hdfs-lookup | while read path host
do
    echo $path >> input/$host
done

for host in `ls input`
do
    ssh $host "cd $PWD; cat input/$host | ./run-c++ > output/$host" &
done

wait

cat output/*

And here's the performance:

wdong@washtenaw$ wc -l input/*    # just to show the data distribution among the nodes.
  12 input/ford
  10 input/fuller
  21 input/geddes
  19 input/huron
   9 input/maple
  10 input/plymouth
  19 input/wagner
 100 total
wdong@washtenaw$ time ./run-parallel.sh > md5.parallel

real    0m2.987s
user    0m0.120s
sys 0m0.068s

wdong@washtenaw$ for i in md5.* ; do sort $i | md5sum; done  # just to check that all outputs are the same
a3889305ff721ec32ced48a3066ea059  -
a3889305ff721ec32ced48a3066ea059  -
a3889305ff721ec32ced48a3066ea059  -
a3889305ff721ec32ced48a3066ea059  -

That is a speedup of 25x over my initial naive approach, but at his point, there's really no surprises. More speedup can be achieved by parallelizing the program run-c++.cpp, but it would not be much meaningful in this setting as for much larger dataset the bottleneck will be the disks, and I only have one disk attached to each node.

A note for hadoop users: when building the C++ programs, link against libhdfs.so and libjvm.so instead of libMapRClient.so; you'll also have to set up the java environment properly.


© 2017 Wei Dong