Perhaps of all the creations of man, language is the most astonishing - Giles Latton Strachey, Words and Poetry
Introduction
Hulu viewers generate a tremendous amount of data: our users watch over 400 million videos and 2  billion advertisements a month. Processing and analyzing that data is critical to our business, whether it is for deciding what content to invest in in the future, or to convince external partners on the superiority of Hulu as an advertising platform. In order to ingest, process and understand that data, we rely on a framework called Hadoop (http://hadoop.apache.org/) – an open source project that allows for the storage and distributed processing of large data-sets.
The two major components of Hadoop are a distributed file system (HDFS) and MapReduce – a framework for distributed computing over the cluster. In order to understand the rest of this blogpost, it is important to have a basic understanding of how MapReduce works.
When authoring a MapReduce job, programmers specify two functions – a map function that processes each chunk of data to produce a set of key-value pairs, and a reduce function the merges all values associated with the same key (1). A surprising number of conventional algorithms can be expressed in this manner and thereby be converted into programs that can run in parallel. This is best understood with an example.

Let us imagine that we are trying to compute the total number of minutes watched for a given set of videos. Each video has a unique identifier, and what we would like at the end of this computation to have a table in the following form:

Video identifier
Minutes watched
1 25
2 34
3 126
4 5
In a distributed file system such as HDFS, the raw data will be stored on several separate computers, or nodes. A naive way to compute this data would be for the program to serially request the raw data from each of the nodes, and perform the computation. Such an approach works fine for small data-sets such as this example, however we want an approach that scales to very large data-sets. For example, for any typical hour, we have about 50 GB of raw data on our Hulu cluster. Clearly, sequentially processing all this data would take a very, very long time and consume a lot of network resources.
Using MapReduce, the mapper process runs on each node in parallel, generating a key-value pair consisting of the video identifier (the key) and the minutes watched (the value). Illustration 1 shows what the output of the mapping phase would look like if the data was distributed across three nodes.

 

MapReduceExplanationMapper

 

From here, we go into the reduce phase, where all outputs with the same key are guaranteed to be sent to the same reducer. The reducer then computes some function over the intermediate output to produce a final output. In this case the function is summation, as all we want is the total number of minutes per video. Illustration 2 shows what this would look like if this processing ran on two reducers.

MapReduceExplanationReducer

 

Beacons
Event data from our various Hulu players is encoded in the form of beacons. A beacon is just a URL-encoded description of the event, as shown in the example below:
80    2013-04-01    00:00:00    /v3/playback/start?bitrate=650&cdn=Akamai&channel=Anime&client=Explorer&computerguid=EA8FA1000232B8F6986C3E0BE55E9333&contentid=5003673…
The Hulu players on all our devices are sending a constant stream of beacons to our servers, those beacons are subsequently stored onto HDFS where they can be processed by our MapReduce jobs.
BeaconSpec
So far, we’ve seen how we can transform a conventional single-threaded computation into a MapReduce computation by specifying a mapper and reducer function. We’ve also seen how we encode events as beacons, and collect them onto our distributed file system, HDFS. However, in our experience, writing MapReduce jobs by hand is both tedious and error prone, although like most skills, you get better at it with practice. Additionally, the resultant code contains a significant amount of boilerplate, making the logic hard to see at first glance. The latter is in practice the most significant impediment to overall system understandability and debuggability. Since we run many (on the order of 150-175) different types of MapReduce jobs every hour, we wanted a solution that would allow us to encode the logic in a straightforward way that was easier to maintain than hand-written Java code.

We started by looking at our code and realizing that the majority of our MapReduce jobs perform very similar functions – selecting a set of dimensions that we care about (for example the video identifier, the zipcode, etc), performing some lookups against meta-data tables in our key-value store (for example deriving the zip code dimension from the IP address of the request) and aggregating over a corresponding fact (for example, the total minutes watched). We realized that we could embed the basic knowledge of how to do these things in a language, and then simply use programs written in this language to generate our MapReduce code for us. Since internally we refer to the events we receive from our players as beacons, and the purpose of this language was to process raw beacon data, we called the language BeaconSpec.

An example of BeaconSpec code is shown below:

basefact playback_start from playback/start { 
    dimension harpyhour.id as hourid;
    required dimension video.id as video_id;
    required dimension contentPartner.id as content_partner_id;
    required dimension distributionPartner.id as distribution_partner_id;
    required dimension distributionPlatform.id as distro_platform_id;
    dimension distributionPlatform.isonhulu as is_on_hulu;
    dimension package.id as package_id;
    dimension package.isplusbypackage as is_plus_by_package;
    dimension plan.id as plan_id;
    dimension plan.isplusbyplan as is_plus_by_plan;
    dimension plusCategory.pluslowercategoryid as plus_lower_category_id;
    dimension plusCategory.plusuppercategoryid as plus_higher_category_id;
    dimension client.out as client;
    fact sum(count.count) as total_count;
    dimension packageAvailability.chosen as package_availability;
    dimension siteSessionId.chosen as site_session_id;
    dimension facebook.isfacebookconnected as is_facebook_connected;
}
There are a few key points to note - first, there is no specification of how to compute the final result, only a declaration of what we would like to compute. It is the role of the BeaconSpec compiler to take this declarative specification and convert it into imperative MapReduce code that can run on the cluster. Second, there are a lot of special keywords that have meaning for us - the keyword basefact denotes a metric that we want to measure, the keyword from tells us which source beacons we need to process in order to compute the metric, the keyword dimension denotes a particular dimension of the source beacon that we care about and that should form a part of the intermediate key out of the Mapper phase, and the keyword fact denotes a dimension of the source beacon that we want to aggregate over (in this particular example, we are performing a summation, as the fact keyword is immediately followed by the sum specifier - we could just as easily calculate an average [avg] or take the maximum value [max]).

From this specification, our in-house compiler produces runnable Java MapReduce code, a snippet of which is shown below:

Screen Shot 2014-04-10 at 12.28.34 PM

We use a variety of open-source technologies in order to build our compiler - in particular JFlex for lexical analysis & CUP for parser-generation. These are the Java cousins of the old C programs you probably used if you've ever taken a compilers class - lex and yacc.

A major advantage of a formal declarative specification of our process is that it allows us to extend functionality far beyond what we initially planned. For example, we are currently in the process of building out a program whose purpose is to validate beacons sent by implementations of our Hulu player on a range of devices. For this purpose, we can use BeaconSpec as an input to the validation program, which will subsequently examine incoming beacons and compare them to the specification, and send us reports about whether the incoming beacons match or deviate from the specification. As another example, as we move towards real-time processing of our incoming data, we are examining the possibility of creating a secondary code-generator for the BeaconSpec compiler that will output code to run on Apache Storm instead of MapReduce.
Related work
Apache Pig is a project that has similar goals to BeaconSpec. Pig programmers write their scripts in a language called Pig Latin, which is subsequently compiled into MapReduce code. However, unlike BeaconSpec, Pig is both an imperative and general purpose language. We feel that for this particular use case, the advantages conferred by a declarative domain-specific language are too great to consider abandoning them for a general purpose language. An imperative general-purpose language cannot avoid introducing boilerplate and insignificant details cause the final program to be significantly less clear than what we could achieve with a declarative domain-specific language.
Summingbird is a project at Twitter which can generate code targeting MapReduce, Scalding or Storm. It offers a powerful set of abstractions for performing aggregations. An example of the canonical word-count program in Summingbird is shown below:
def wordCount(source: Iterable[String], store: MutableMap[String, Long]) =
   
source.flatMap { sentence =>
     
toWords(sentence).map(_ -> 1L)
   
}.foreach { 
case
(k, v) => store.update(k, store.get(k) + v) }

For an example of the equivalent code written directly in Java MapReduce, see this link. Summingbird was written to solve similar problems to the ones that led us to create BeaconSpec, and we believe it does so in a way that is significantly more expressive than Pig. However, as it is written in a highly idiomatic style, and learning to write Summingbird programs has a steeper learning curve than BeaconSpec.

A few other languages have emerged around the Hadoop ecosystem, such as Kiji (which offers a table abstraction and several insertion/retrieval operators over HBase), and Hive (HiveQL) - which offers a subset of relational database operators that are compiled to MapReduce. We have not fully explored Kiji, however, we make heavy use of Hive at Hulu.
Sawzall (developed at Google) can arguably be acknowledged as the progenitor of all these languages, it is another general-purpose data processing language designed to compile to MapReduce code on Google's proprietary distributed data processing platform. A link to a paper on it can be found here.
Conclusion
The key takeaway is that we don’t want a general purpose language, we want a language that expresses exactly what we care about, and suppresses details that are not central to the task (2). Whenever you are working on a DSL, adding general purpose features to the language is a serious temptation, but one that must be avoided if you don’t want your project timeline to rival that of Duke Nukem Forever. This sentiment is best captured by the pioneer computer scientist Alan Perlis in the following quote:

Beware of the Turing tar-pit, in which everything is possible, but nothing of interest is ever easy.

-Alan Perlis, Epigrams in Programming

 

References
  1. MapReduce: Simplified data processing on large clusters (Jeffrey Dean and Sanjay Ghemawat, Google Inc., 2004) http://static.googleusercontent.com/media/research.google.com/en/us/archive/mapreduce-osdi04.pdf
  2. Structure and interpretation of computer programs (Hal Abelson and Gerald Jay Sussman, MIT, 1984) http://mitpress.mit.edu/sicp/
  3. Epigrams in Programming (Alan Perlis) http://www.cs.yale.edu/homes/perlis-alan/quotes.html
  4. Interpreting the Data: Parallel Analysis with Sawzall (Pike et al, Google Inc.) http://research.google.com/archive/sawzall.html