Advent of 2021, Day 21 – Spark GraphX operators

Series of Apache Spark posts:

Property graphs have collection of operators, that can take user-defined function and produce new graphs with transformed properties and structure. Core operators are defined in Graph and compositions of core operators are defined as GraphOps, and are automatically available as members of Graph. Each graph representation must provide implementations of the core operations and reuse many of the useful operations that are defined in GraphOps.

The list of all operators:

/** Summary of the functionality in the property graph */
class Graph[VD, ED] {
  // Information about the Graph ===================================================================
  val numEdges: Long
  val numVertices: Long
  val inDegrees: VertexRDD[Int]
  val outDegrees: VertexRDD[Int]
  val degrees: VertexRDD[Int]
  // Views of the graph as collections =============================================================
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
  val triplets: RDD[EdgeTriplet[VD, ED]]
  // Functions for caching graphs ==================================================================
  def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  def cache(): Graph[VD, ED]
  def unpersistVertices(blocking: Boolean = false): Graph[VD, ED]
  // Change the partitioning heuristic  ============================================================
  def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
  // Transform vertex and edge attributes ==========================================================
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
    : Graph[VD, ED2]
  // Modify the graph structure ====================================================================
  def reverse: Graph[VD, ED]
  def subgraph(
      epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
      vpred: (VertexId, VD) => Boolean = ((v, d) => true))
    : Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
  // Join RDDs with the graph ======================================================================
  def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
  def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])
      (mapFunc: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
  // Aggregate information about adjacent triplets =================================================
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[A]
  // Iterative graph-parallel computation ==========================================================
  def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED]
  // Basic graph algorithms ========================================================================
  def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
  def connectedComponents(): Graph[VertexId, ED]
  def triangleCount(): Graph[Int, ED]
  def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
}

There are (i) Property Operators, (ii), Structural Operators, (iii) Join Operators, and (iv) Neighbourhood Operators.

The property operators yields a new graph with the vertex or edge properties modified by the user defined map function. These operators are:

class Graph[VD, ED] {
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}

Structural operators are reverse, mask and subgraph operators. Reverse operator returns a new graph with all the edge directions reversed. Subgraph operator takes vertex and edge predicates and returns the graph containing only the vertices that satisfy the vertex predicate. And mask operator constructs a subgraph by returning a graph that contains the vertices and edges that are also found in the input graph.

Example of mask operator:

// Run Connected Components
val ccGraph = graph.connectedComponents() // No longer contains missing field
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// Restrict the answer to the valid subgraph
val validCCGraph = ccGraph.mask(validGraph)

Join operators are joinVertices and OuterJoinVertices operators. Join Vertices operator joins the vertices with the input RDD and returns a new graph with the vertex properties obtained by applying the user defined map function to the result of the joined vertices. And OuterJoinVertices works similar to joinVertices except that the user defined map function is applied to all vertices and can change the vertex property type

class Graph[VD, ED] {
  def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
    : Graph[VD, ED]
  def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
}

The Neighbourhood operator are Aggregate Messages, Compute Degree OInformation, andCollection neighbours.

Aggregate messages applies a user defined sendMsg function to each edge triplet in the graph and then uses the mergeMsg function to aggregate those messages at their destination vertex.

class Graph[VD, ED] {
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[Msg]
}

Computing degree information is a common aggregation task that computes the degree of each vertex: the number of edges adjacent to each vertex. In the context of directed graphs it is often necessary to know the in-degree, out-degree, and the total degree of each vertex. 

// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
  if (a._2 > b._2) a else b
}
// Compute the max degrees
val maxInDegree: (VertexId, Int)  = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int)   = graph.degrees.reduce(max)

Tomorrow we will look into Spark in Databricks.

Compete set of code, documents, notebooks, and all of the materials will be available at the Github repository: https://github.com/tomaztk/Spark-for-data-engineers

Happy Spark Advent of 2021! 🙂

Tagged with: , , , ,
Posted in Spark, Uncategorized
4 comments on “Advent of 2021, Day 21 – Spark GraphX operators

Leave a comment

Follow TomazTsql on WordPress.com
Programs I Use: SQL Search
Programs I Use: R Studio
Programs I Use: Plan Explorer
Rdeči Noski – Charity

Rdeči noski

100% of donations made here go to charity, no deductions, no fees. For CLOWNDOCTORS - encouraging more joy and happiness to children staying in hospitals (http://www.rednoses.eu/red-noses-organisations/slovenia/)

€2.00

Top SQL Server Bloggers 2018
TomazTsql

Tomaz doing BI and DEV with SQL Server and R, Python, Power BI, Azure and beyond

Discover WordPress

A daily selection of the best content published on WordPress, collected for you by humans who love to read.

Revolutions

Tomaz doing BI and DEV with SQL Server and R, Python, Power BI, Azure and beyond

tenbulls.co.uk

tenbulls.co.uk - attaining enlightenment with the Microsoft Data and Cloud Platforms with a sprinkling of Open Source and supporting technologies!

SQL DBA with A Beard

He's a SQL DBA and he has a beard

Reeves Smith's SQL & BI Blog

A blog about SQL Server and the Microsoft Business Intelligence stack with some random Non-Microsoft tools thrown in for good measure.

SQL Server

for Application Developers

Business Analytics 3.0

Data Driven Business Models

SQL Database Engine Blog

Tomaz doing BI and DEV with SQL Server and R, Python, Power BI, Azure and beyond

Search Msdn

Tomaz doing BI and DEV with SQL Server and R, Python, Power BI, Azure and beyond

R-bloggers

Tomaz doing BI and DEV with SQL Server and R, Python, Power BI, Azure and beyond

R-bloggers

R news and tutorials contributed by hundreds of R bloggers

Data Until I Die!

Data for Life :)

Paul Turley's SQL Server BI Blog

sharing my experiences with the Microsoft data platform, SQL Server BI, Data Modeling, SSAS Design, Power Pivot, Power BI, SSRS Advanced Design, Power BI, Dashboards & Visualization since 2009

Grant Fritchey

Intimidating Databases and Code

Madhivanan's SQL blog

A modern business theme

Alessandro Alpi's Blog

DevOps could be the disease you die with, but don’t die of.

Paul te Braak

Business Intelligence Blog

Sql Insane Asylum (A Blog by Pat Wright)

Information about SQL (PostgreSQL & SQL Server) from the Asylum.

Gareth's Blog

A blog about Life, SQL & Everything ...