Back

Basic usage of GraphX for clique detection

Unfortunately, the Bron-Kerbosch clique detection algorithm is still running on the driver

GraphX is a powerful component in Spark that allows graph-based programming and manipulation at very large scale. The only drawback is that it run on RDD and not DataFrame as I’m used to in Spark 2.2. Although a Spark package called GraphFrames is available to extend GraphX’s power to DataFrame, I recon it’s still easier to learn to construct a graph using GraphX and RDD than importing the package in Spark.

GraphX requires me to manually assign a VertexId (which is essentially Scala/Java type Long) to each vertex, which can be done through Spark DataFrame’s monotonically_increasing_id() function. This function provides unique id but doesn’t guarantee continuity (unlike SQL database’s auto-incremental primary key). However, since this id relies on partition number, it is NOT stable! The solution, though, is easy: cache the DataFrame with id (or saving it to disk) will make it persistent across operations (e.g. join).

The following piece of code converts a graph expressed as a DataFrame of edges through source vertex object, destination vertex object and edge weight (a small price (storing redundant objects) to pay for Spark to generate it all in parallel) into GraphX Graph representation.

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

case class Obj (id: String)

// graph DataFrame Schema -- src: Struct, dst: Struct, weight: DoubleType
val idx = graph.select($"src".as("obj"))
				.union(graph.select($"dst"."obj"))
				.distinct
				.withColumn("id", monotonically_increasing_id())
				.select("id", "obj")
				.as[(VertexId, Obj)]
				.cache

val vertices: RDD[(VertexId, Cell)] = idx.rdd

val edges: RDD[Edge[Double]] = graph
            .select($"src.id".as("src"), $"dst.id".as("dst"), $"weight".as("attr"))
            .join(idx.select($"id".as("srcId"), $"cell.id".as("src")), "src")
            .join(idx.select($"id".as("dstId"), $"cell.id".as("dst")), "dst")
            .select("srcId", "dstId", "attr") // GraphX's required fields for an Edge object
            .as[Edge[Double]].rdd.cache // caching edges is optional

val graph = Graph(vertices, edges)

The case class Obj requires a simple unique id that doesn’t have to be a number (if so, it can be used as VertexId directly) as joining case classes are problematic with it. The idx DataFrame is cached for persistence and then assigned to the edges through join operations. It can also be referred to later for conversion of VertexId back to its corresponding object.

Clique Detection Problem

Although GraphX supports popular graph algorithms such as PageRank, Connected Components and Strongly Connected Components, Clique Detection is missing from its features. It’s probably due to the non-parallel nature of the original Bron-Kerbosch algorithm.

Since it’s the most suitable algorithm for my task on hand, research papers aren’t going to cut it. I found a simple Scala-based implementation, cleaned it up and optimized it for my use. By offloading heavy-lifting work to Spark as much as possible, this tiny recursive function (running on a single core) is surprisingly fast. My runtime for a graph with around 120,000 edges took 45 seconds. The graph is filtered for the important edges (based on the weight I defined) generated in the entirety of Singapore. So, to me, it is probably enough.

The function may not perform well in a densely-connected graph, but you should try it before calling it unusable.

Larger graph, on the other hand, could be handled through first partitioning the graph based on certain criteria and run multiple instances of this function on difference partitions.

Licensed under CC BY-NC-SA 4.0
comments powered by Disqus
Built with Hugo
Theme Stack designed by Jimmy