Tuesday, July 20, 2010

Graph Processing in Map Reduce

In my previous post about Google's Pregel model, a general pattern of parallel graph processing can be expressed as multiple iterations of processing until a termination condition is reached. Within each iteration, same processing happens at a set of nodes (ie: context nodes).

Each context node perform a sequence of steps independently (hence achieving parallelism)
  1. Aggregate all incoming messages received from its direct inward arcs during the last iteration
  2. With this aggregated message, perform some local computation (ie: the node and its direct outward arcs' local state)
  3. Pass the result of local computation along all outward arcs to its direct neighbors
This processing pattern can be implemented using Map/Reduce model, using a MR job for each iteration. The sequence is a little different from above. Typically a mapper will perform (2) and (3) where it emits the message using its neighbor's node id as key. Reducer will be responsible to perform (1).

Issue of using Map/Reduce

However, due to the functional programming nature of Map() and Reduce(), M/R does not automatically retain "state" between jobs. To retain the graph across iterations, the mapper need to explicitly pass along the corresponding portion of the graph to the reducer, in additional to the messages itself. Similarly, the reducer need to handle a different type of data passed along.

map(id, node) {
  emit(id, node)
  partial_result = local_compute()
  for each neighbor in node.outE.inV {
      emit(neighbor.id, partial_result)
  }
}

reduce(id, list_of_msg) {
  node = null
  result = 0

  for each msg in list_of_msg {
      if type_of(msg) == Node
          node = msg
      else
          result = aggregate(result, msg)
      end
  }

  node.value = result
  emit(id, node)
}

This downside of this approach is a substantial amount of I/O processing and bandwidth is consumed to just passing the graph itself around.

Google's Pregel model provides an alternative message distribution model so that state can be retained at the processing node across iterations.

The Schimmy Trick

In a recent research paper, Jimmy Lin and Michael Schatz use a clever partition() algorithm in Map /Reduce which can achieve "stickiness" of graph distribution as well as maintaining a sorted-order of node id on disk.

The whole graph is broken down into multiple files and stored in HDFS. Each file contains multiple records and each record describe a node and its corresponding adjacency list.

id -> [nodeProps, [[arcProps, toNodeId], [arcProps, toNodeId] ...]

In addition, the records are physically sorted within the file by their node id.

There will be as many reducers as the number of above files and so each Reducer task is assigned with one of this file. On the other hand, the partition() function assign all nodes within the file to land on its associated reducer.

Mapper does the same thing before, except the first line in the method is removed as it no longer need to emit the graph.

Reducer will receive all the message emitted from the mapper, which is sorted by the Map/Reduce framework by the key (which happens to be the node id). On the other hand, the reducer can open the corresponding file in HDFS, which also maintain a sorted list of nodes based on their ids. The reducer can just read the HDFS file sequentially on each reduce() call and confident that all preceding nodes in the file has already received their corresponding messages.

reduce(id, list_of_msg) {
   nodeInFile = readFromFile()

   # Emit preceding nodes that receives no message
   while(nodeInFile.id < id)
       emit(nodeInFile.id, nodeInFile)
   end

   result = 0

   for each msg in list_of_msg {
       result = aggregate(result, msg)
   }

   nodeInFile.value = result
   emit(id, nodeInFile)
}

Although the Schimmy trick provides an improvement over the classical way of map/reduce, it only eliminates the communication between the mapper and the reducer. At each iteration, the mapper still needs to read the whole graph from HDFS to the mapper node and the reducer still need to write the whole graph back to HDFS, which maintains a 3-way replication for each file.

Hadoop provides some co-location mechanism for the mapper and try to assign files that is sitting at the same machine to the mapper. However, this co-location mechanism is not available for the reducer and so reducer still need to write the graph back over the network.

Pregel Advantage

Since Pregel model retain worker state (the same worker is responsible for the same set of nodes) across iteration, the graph can be loaded in memory once and reuse across iterations. This will reduce I/O overhead as there is no need to read and write to disk at each iteration. For fault resilience, there will be a periodic check point where every worker write their in-memory state to disk.

Also, Pregel (with its stateful characteristic), only send local computed result (but not the graph structure) over the network, which implies the minimal bandwidth consumption.

Of course, Pregel is very new and relative immature as compared to Map/Reduce.

Monday, July 12, 2010

Google Pregel Graph Processing

A lot of real life problems can be expressed in terms of entities related to each other and best captured using graphical models. Well defined graph theory can be applied to processing the graph and return interesting results. The general processing patterns can be categorized into the following ...
  1. Capture (e.g. When John is connected to Peter in a social network, a link is created between two Person nodes)
  2. Query (e.g. Find out all of John's friends of friends whose age is less than 30 and is married)
  3. Mining (e.g. Find out the most influential person in Silicon Valley)

Distributed and Parallel Graph Processing

Although using a Graph to represent a relationship network is not new, the size of network has been dramatically increase in the past decade such that storing the whole graph in one place is impossible. Therefore, the graph need to be broken down into multiple partitions and stored in different places. Traditional graph algorithm that assume the whole graph can be resided in memory becomes invalid. We need to redesign the algorithm such that it can work in a distributed environment. On the other hand, by breaking the graph into different partitions, we can manipulate the graph in parallel to speed up the processing.

Property Graph Model

The paper “Constructions from Dots and Lines” by Marko A. Rodriguez and Peter Neubauer illustrate the idea very well. Basically, a graph contains nodes and arcs.

A node has a "type" which defines a set of properties (name/value pairs) that the node can be associated with.

An arc defines a directed relationship between nodes, and hence contains the fromNode, toNode as well as a set of properties defined by the "type" of the arc.



General Parallel Graph Processing

Most of the graph processing algorithm can be expressed in terms of a combination of "traversal" and "transformation".

Parallel Graph Traversal

In the case of "traversal", it can be expressed as a path which contains a sequence of segments. Each segment contains a traversal from a node to an arc, followed by a traversal from an arc to a node. In Marko and Peter's model, a Node (Vertex) contains a collection of "inE" and another collection of "outE". On the other hand, an Arc (Edge) contains one "inV", one "outV". So to expressed a "Friend-of-a-friend" relationship over a social network, we can use the following

./outE[@type='friend']/inV/outE[@type='friend']/inV

Loops can also be expressed in the path, to expressed all persons that is reachable from this person, we can use the following

.(/outE[@type='friend']/inV)*[@cycle='infinite']

On the implementation side, a traversal can be processed in the following way
  1. Start with a set of "context nodes", which can be defined by a list of node ids, or a search criteria (in this case, the search result determines the starting context nodes)
  2. Repeat until all segments in the path are exhausted. Perform a walk from all context nodes in parallel. Evaluate all outward arcs (ie: outE) with conditions (ie: @type='friend'). The nodes that this arc points to (ie: inV) will become the context node of next round
  3. Return the final context nodes
Such traversal path can also be used to expressed inference (or derived) relationships, which doesn't have a physical arc stored in the graph model.

Parallel Graph Transformation

The main goal of Graph transformation is to modify the graph. This include modifying the properties of existing nodes and arcs, creating new arcs / nodes and removing existing arcs / nodes. The modification logic is provided by a user-defined function, which will be applied to all active nodes.

The Graph transformation process can be implemented in the following steps
  1. Start with a set of "active nodes", which can be defined by a lost of node ids, or a search criteria (in this case, the search result determines the starting context nodes)
  2. Repeat until there is no more active nodes. Execute the user-defined transformation which modifies the properties of the context nodes and outward arcs. It can also remove outwards arcs or create new arcs that point to existing or new nodes (in other words, the graph connectivity can be modified). It can also send message to other nodes (the message will be picked up in the next round) as well as receive message sent from other nodes in the previous round.
  3. Return the transformed graph, or a traversal can be performed to return a subset of the transformed graph.
Google's Pregel

Pregel can be thought as a generalized parallel graph transformation framework. In this model, the most basic (atomic) unit is a "node" that contains its properties, outward arcs (and its properties) as well as the node id (just the id) that the outward arc points to. The node also has a logical inbox to receive all messages sent to it.


The whole graph is broken down into multiple "partitions", each contains a large number of nodes. Partition is a unit of execution and typically has an execution thread associated with it. A "worker" machine can host multiple "partitions".


The execution model is based on BSP (Bulk Synchronous Processing) model. In this model, there are multiple processing units proceeding in parallel in a sequence of "supersteps". Within each "superstep", each processing units first receive all messages delivered to them from the preceding "superstep", and then manipulate their local data and may queue up the message that it intends to send to other processing units. This happens asynchronously and simultaneously among all processing units. The queued up message will be delivered to the destined processing units but won't be seen until the next "superstep". When all the processing unit finishes the message delivery (hence the synchronization point), the next superstep can be started, and the cycle repeats until the termination condition has been reached.

Notice that depends on the graph algorithms, the assignment of nodes to a partition may have an overall performance impact. Pregel provides a default assignment where partition = nodeId % N but user can overwrite this assignment algorithm if they want. In general, it is a good idea to put close-neighbor nodes into the same partition so that message between these nodes doesn't need to flow into the network and hence reduce communication overhead. Of course, this also means traversing the neighboring nodes all happen within the same machine and hinder parallelism. This usually is not a problem when the context nodes are very diverse. In my experience of parallel graph processing, coarse-grain parallelism is preferred over fine-grain parallelism as it reduces communication overhead.

The complete picture of execution can be implemented as follows:


The basic processing unit is a "thread" associated with each partition, running inside a worker. Each worker receive messages from previous "superstep" from its "inQ" and dispatch the message to the corresponding partition that the destination node is residing. After that, a user defined "compute()" function is invoked on each node of the partition. Notice that there is a single thread per partition so nodes within a partition are executed sequentially and the order of execution is undeterministic.

The "master" is playing a central role to coordinate the execute of supersteps in sequence. It signals the beginning of a new superstep to all workers after knowing all of them has completed the previous one. It also pings each worker to know their processing status and periodically issue "checkpoint" command to all workers who will then save its partition to a persistent graph store. Pregel doesn't define or mandate the graph storage model so any persistent mechanism should work well. There is a "load" phase at the beginning where each partition starts empty and read a slice of the graph storage. For each node read from the storage, a "partition()" function will be invoked and load the node in the current partition if the function returns the same node, otherwise the node is queue to another partition who the node is assigned to.

Fault resilience is achieved by having the checkpoint mechanism where each worker is instructed to save its in-memory graph partition to the graph storage periodically (at the beginning of a superstep). If the worker is detected to be dead (not responding to the "ping" message from the master), the master will instruct the surviving workers to take up the partitions of the failed worker. The whole processing will be reverted back to the previous checkpoint and proceed again from there (even the healthy worker need to redo the previous processing). The Pregel paper mention a potential optimization to just re-execute the processing of the failed partitions from the previous checkpoint by replaying the previous received message, of course this requires keeping a log of all received messages between nodes at every super steps since previous checkpoint. This optimization, however, rely on the algorithm to be deterministic (in other words, same input execute at a later time will achieve the same output).

Further optimization is available in Pregel to reduce the network bandwidth usage. Messages destined to the same node can be combined using a user-defined "combine()" function, which is required to be associative and commutative. This is similar to the same combine() method in Google Map/Reduce model.

In addition, each node can also emit an "aggregate value" at the end of "compute()". Worker will invoke an user-defined "aggregate()" function that aggregate all node's aggregate value into a partition level aggregate value and all the way to the master. The final aggregated value will be made available to all nodes in the next superstep. Just aggregate value can be used to calculate summary statistic of each node as well as coordinating the progress of each processing units.

I think the Pregel model is general enough for a large portion of classical graph algorithm. I'll cover how we map these traditional algorithms in Pregel in subsequent postings.

Reference

http://www.slideshare.net/slidarko/graph-windycitydb2010