Tuesday, January 31, 2012

Hadoop 1.0

There is finally a 1.0 version of hadoop. One of my biggest complaints using hadoop since version 0.10 is that something breaks with every release and the usual retort is that compatibility will come with 1.0, and it seemed like 1.0 was just around the corner for years. I haven't been using hadoop as much for the last six months, so I didn't notice that there was finally a 1.0 release until today even though it was announced towards the end of last year. As a user this news inspires some hope that there might be a hadoop upgrade that just works. We'll have to see as new versions come out.

That said, it is a little dissapointing to see some of the cruft that is still around in the APIs. In particular org.apache.hadoop.mapred
and org.apache.hadoop.mapreduce packages are both still around. So if I want to write a map reduce job which API should I use? Even worse the javadocs still don't provide much clarity on which of these APIs should be preferred and neither appears to be deprecated. Maybe they have good reasons, but this adds a lot of confusion for users and, in my selfish opinion, should have been cleaned up before a 1.0 release.

The other big problem I've had with hadoop is trying to keep a stack of hadoop, pig, oozie, hbase, etc all working together flawlessly and being able to update individual components without too much worry on whether the rest of the stack will play nice. This is much easier to do if hadoop provides clean, simple, and well documented APIs that these other tools can build on. At first glance, the 1.0 apidocs look like they just slapped a 1.0 label on the current pile of crud and did not remove any of the old garbage and deprecated APIs.

If they actually maintain compatibility between 1.x releases it will still be a win for users, but hopefully for 2.0 they focus on a clean simple API for users and get rid of a bunch of old cruft. It would also be nice if we don't have to wait 6 years for 2.0.

Tuesday, November 29, 2011

Damn auto-completion

I really wish people would be more careful when using the auto-completion features of IDEs. Though auto-completion does provide some time savings it is also a frequent contributor to sloppy coding. After updating some packages our system broke because someone had incorrectly imported com.google.inject.internal.Sets instead of com.google.common.collect.Sets. Is it too much to ask for people to at least look at the code that is getting generated?

Saturday, November 12, 2011

Top-K Selection

Ok, so I need to select the top-K values from a list of size N where K is much smaller than N. Two approaches that immediately come to mind are:

  1. Sort the list and select the first K elements. Running time O(N*lg(N)).
  2. Insert the elements of the list into a heap and pop the top K elements. Running time O(N + K*lg(N)).

As a variant on option 2 a colleague proposed using a small heap that would have at most K elements at any given time. When the heap is full an element would be popped off before a new element could be added. So if I wanted the top-K smallest values I would use a max-heap and if the next value from the input is smaller than the largest value on the heap I would pop off the largest value and insert the smaller value. The running time for this approach is O(N*lg(K)).

For my use case both N and K are fairly small. The size of N will be approximately 10,000 elements and K will typically be 10, but can be set anywhere from 1 to 100. A C++ test program that implements all three approaches and tests them for various sizes of K is provided at the bottom of this post. The table below shows the results for K equal to 10, 50, and 100. You can see that all three approaches have roughly the same running time and increasing the size of K has little impact on the actual running time.

K=10K=50K=100
sort0.4689770.4669180.467177
fullheap0.0993250.1028390.106735
smallheap0.0180630.0289480.040435

Here is the chart for all values of K:


So clearly the third approach with the small heap is the winner. With a small K it is essentially linear time and an order of magnitude faster than the naive sort. The graph shows both the average time and the 99th-percentile so you can see the variation in times is fairly small. This first test covers the sizes for my use case, but out of curiosity, I also tested the more interesting case with a fixed size K and varying the size of N. The graph for N from 100,000 to 1,000,000 in increments of 100,000 tells the whole story:


Source code:

#include <algorithm>
#include <cstdlib>
#include <ctime>
#include <iostream>
#include <numeric>
#include <vector>

using namespace std;

typedef void (*topk_func)(vector<int> &, vector<int> &, int);

bool greater_than(const int &v1, const int &v2) {
    return v1 > v2;
}

void topk_sort(vector<int> &data, vector<int> &top, int k) {
    sort(data.begin(), data.end());
    vector<int>::iterator i = data.begin();
    int j = 0;
    for (; j < k; ++i, ++j) {
        top.push_back(*i);
    }
}

void topk_fullheap(vector<int> &data, vector<int> &top, int k) {
    make_heap(data.begin(), data.end(), greater_than);
    for (int j = 0; j < k; ++j) {
        top.push_back(*data.begin());
        pop_heap(data.begin(), data.end(), greater_than);
        data.pop_back();
    }
}

void topk_smallheap(vector<int> &data, vector<int> &top, int k) {
    for (vector<int>::iterator i = data.begin(); i != data.end(); ++i) {
        if (top.size() < k) {
            top.push_back(*i);
            push_heap(top.begin(), top.end());
        } else if (*i < *top.begin()) {
            pop_heap(top.begin(), top.end());
            top.pop_back();
            top.push_back(*i);
            push_heap(top.begin(), top.end());
        }
    }
    sort_heap(top.begin(), top.end());
}

void run_test(const string &name, topk_func f, int trials, int n, int k) {
    vector<double> times;

    for (int t = 0; t < trials; ++t) {
        vector<int> data;
        for (int i = 0; i < n; ++i) {
            data.push_back(rand());
        }

        clock_t start = clock();
        vector<int> top;
        f(data, top, k);
        clock_t end = clock();
        times.push_back(1000.0 * (end - start) / CLOCKS_PER_SEC);
    }

    sort(times.begin(), times.end());
    double sum = accumulate(times.begin(), times.end(), 0.0);
    double avg = sum / times.size();
    double pct90 = times[static_cast<int>(trials * 0.90)];
    double pct99 = times[static_cast<int>(trials * 0.99)];
    cout << name << " "
         << k << " "
         << avg << " "
         << pct90 << " "
         << pct99 << endl;
}

int main(int argc, char **argv) {

    cout << "method k avg 90-%tile 99-%tile" << endl;

    int trials = 1000;
    int n = 10000;
    for (int k = 1; k <= 100; ++k) {
        run_test("sort", topk_sort, trials, n, k);
        run_test("fullheap", topk_fullheap, trials, n, k);
        run_test("smallheap", topk_smallheap, trials, n, k);
    }

    return 0;
}

Friday, November 11, 2011

Broken pipe

If you are using python to write a script, please properly handle the broken pipe exception. We have set of command line tools at work that are extremely annoying to use because if you pipe the output through other standard tools, e.g., head, it spits out a worthless exception about a broken pipe. Consider the following test script:

#!/usr/bin/env python
import sys
buffer = ""
for i in range(100000):
    buffer += "%d\n" % i
sys.stdout.write(buffer)

Pipe it into head and look at the output:

$ ./test.py | head -n1
0
ERROR:root:damn
Traceback (most recent call last):
  File "./test.py", line 6, in <module>
    sys.stdout.write(buffer)
IOError: [Errno 32] Broken pipe

I know there was a broken pipe and I don't care. Just swallow the worthless exception so I can see the meaningful output. This is probably the number one reason I often mutter "damn python crap" when using some of these tools. So if you are writing scripts in python, please be considerate and handle the broken pipe exception. Here is an example for quick reference:

#!/usr/bin/env python
import errno
import sys
try:
    buffer = ""
    for i in range(100000):
        buffer += "%d\n" % i
    sys.stdout.write(buffer)
except IOError, e:
    if e.errno != errno.EPIPE:
        raise e

Scala, regex, and null

One of the perks of using scala is that I hardly ever see a NullPointerException unless I'm working with java libraries. The primary reason is because most scala libraries tend to use Option rather than null. However, while using a regex with pattern matching I was surprised by a NullPointerException when trying to look at the result of an optional capture group. Consider the following example:

scala> val Pattern = "(a)(b)?".r
Pattern: scala.util.matching.Regex = (a)(b)?

scala> "a" match { case Pattern(a, b) => printf("[%s][%s]%n", a, b) }
[a][null]

scala> "ab" match { case Pattern(a, b) => printf("[%s][%s]%n", a, b) }
[a][b]

I just assumed that b would be of type Option[String]. There is probably a good reason for this travesty, my guess would be something about making it work with the type system, but after using scala for a while it just seems wrong to be getting a null value.

Monday, October 31, 2011

Crumbly Cable

It is Halloween so I figured I should post some scary pictures. For some reason the plastic coating on the mini-usb cable that came with my Kindle-DX started crumbling as shown in the pictures below. On the bright side it is a generic cable and I have plenty of spares, but you would think Amazon would provide better cables.





Monday, September 26, 2011

Man in the middle

You typically hear the expression man in the middle in the context of an attack where someone is actively eavesdropping on communications that are intended to be private. However, it is also an invaluable tool for debugging network programs. One of my favorite tools is netcat and this tool makes it trivial to implement a simple eavesdropping script. This can also be done with tools such as tcpdump, but I find that netcat is a bit simpler for most tasks and it is more likely to be available on the machine.

The script I typically use is shown below. Essentially it has an endless loop that listens on a port, tees the data that comes in to a request log file, sends the input to a remote server, tees the response from the remote server to a response log, and then writes the data back to a named pipe that is connected to stdin on the netcat process that was listening.

#!/bin/bash

function logFile {
   echo "$(date +%Y-%m-%d-%H-%M-%S).${1}.log"
}

function serveRequests {
   port=$1
   remoteHost=$2
   remotePort=$3
   while true; do
       rm -f backpipe
       mkfifo backpipe
       cat backpipe |
           nc -l $port |
           tee -a $(logFile request) |
           nc $remoteHost $remotePort |
           tee -a $(logFile response) >backpipe
   done
}

port=${1:-12345}
remoteHost=${2:-localhost}
remotePort=${3:-80}
serveRequests $port $remoteHost $remotePort