Little useless-useful R functions – Mastermind board game for R

Playing a simple guessing game with R. It’s called Mastermind game! This game was originally created for two people, but R version will be for single-player mode, when an R developer or R data scientists need a break.

The gameplay is simple and so are the rules. The board contains 10 rows (or more) with possibilities of four colours and code pegs (white or black). R engine stores a secret colour combination and user selects a random combination.

Based on selection, the R engine returns the black or white pegs. Black peg represents that one colour is at the right place, white that the colour matches, but not the position. No pegs would mean that none of selected colours matches the secret colour combination.

Using x11() function, this game becomes so much fun with R. And the outlooks is:

The game code consists of board function, adding colours on the board, checking for the code pegs function and gameplay function.


##########################################
# 
# Mastermind board game for R Language
# 
# Game for single-player R developers/Data scientists for
# killing time, playing game while waiting for the ML
# model to finish training or just to play.
#
# Series:
# Little Useless-useful R functions #31
# Created: January 06, 2022
# Author: Tomaz Kastrun
# Blog: tomaztsql.wordpress.com
# V.1.0

# Changelog: 
###########################################

numberOfColors <- 4
numberOfTries <- 10

get_board <- function(nof_col, nof_try=10){
  plot.new()
  op <- par(bg = "white")
  grid(nx = 6, ny = 12, col = "gray", lty = "dotted",lwd = par("lwd"), equilogs = TRUE)
  
  # adding boarders 
  par(new = TRUE)
  plot(c(100,500), c(100,500), xlab = "", ylab = "", xaxt = 'n', yaxt = 'n',main = "Mastermind board game")
  nof_tries <- nof_try
  for (i in 1:nof_tries) { #rows
    for (j in 1:nof_col) { #columns
      col <- 50*(1:nof_col-1)
      rect(100+col[j], 500-(i*30), 150+col[j], 475-(i*30), col = 'white')
    }
  }
  colours = c('Red','Green','Blue','Yellow','Brown','Orange')
  for (z in 1:nof_col) {
    rect(100+z*50, 100, 150+z*50, 150, col = colours[z])
  } 
}

add_rect <- function(colour,try,nof_try=10) {
  par(new = TRUE)
  max_tries <- numberOfColors*nof_try #10 rows
  if (try %% numberOfColors == 0) { #number of tries = number of colours
    nof_try <- try/numberOfColors
    add_key_pegs(input_colours,store_secret, nof_try)
  } 
  
  if (try > numberOfColors){
    row <- ceiling((try/numberOfColors))
    rect_order <- abs(try-((row-1)*numberOfColors))
  } else {
    row <- 1
    rect_order <- try
  }
  rect(100+(rect_order*50)-50, 500-(row*30),150+((rect_order*50))-50, 475-(row*30), col = colour)
}


add_key_pegs <- function(input_colours, store_secret,nof_try){
 
  ss <- store_secret
  ic <- input_colours 
  ss1 <- as.vector(strsplit(as.character(ss), "")[[1]])
  ic1 <- as.vector(strsplit(as.character(ic), "")[[1]])
  
  white <- ""
  black <- ""
  for (i in 1:length(ss1)){
    for (j in 1:length(ic1)){
      if (i==j && ss1[i] == ic1[j]) {black <- as.integer(paste( c(black, ic1[j]),collapse=""))}
      if (ss1[i] == ic1[j]) { white <- as.integer(paste( c(white, ic1[j]),collapse="")) }
    }
  }
  
  black1 <- as.vector(strsplit(as.character(black), "")[[1]])
  white1 <- as.vector(strsplit(as.character(white), "")[[1]])
  
  black <- nchar(black)
  white <- length(unique(setdiff(white1, black1)))
  nof_tokes <- black + white
  tok <- replicate(black, "black")
  en <- replicate(white, "gray")
  token <- c(tok, en)
  name1 <- token[1]
  name2 <- token[2]
  name3 <- token[3]
  name4 <- token[4]
  
  if (nof_tokes == 1) {
    par(new = TRUE)
    plot(450,500-(nof_try*30), col = name1, lwd = 2, xaxt = 'n', yaxt = 'n', xlab = "",ylab = "",xlim=range(100:500), ylim=range(100:500))
  }
  if (nof_tokes == 2) {
    par(new = TRUE)
    plot(450,500-(nof_try*30), col = name1, lwd = 2, xaxt = 'n', yaxt = 'n', xlab = "",ylab = "",xlim=range(100:500), ylim=range(100:500))
    par(new = TRUE)
    plot(450,490-(nof_try*30), col = name2, lwd = 2, xaxt = 'n', yaxt = 'n', xlab = "",ylab = "",xlim=range(100:500), ylim=range(100:500))
  }
  if (nof_tokes == 3) {
    par(new = TRUE)
    plot(450,500-(nof_try*30), col = name1, lwd = 2, xaxt = 'n', yaxt = 'n', xlab = "",ylab = "",xlim=range(100:500), ylim=range(100:500))
    par(new = TRUE)
    plot(450,490-(nof_try*30), col = name2, lwd = 2, xaxt = 'n', yaxt = 'n', xlab = "",ylab = "",xlim=range(100:500), ylim=range(100:500))
    par(new = TRUE)
    plot(470,500-(nof_try*30), col = name3, lwd = 2, xaxt = 'n', yaxt = 'n', xlab = "",ylab = "",xlim=range(100:500), ylim=range(100:500))
  }
  if (nof_tokes == 4) {
    par(new = TRUE)
    plot(450,500-(nof_try*30), col = name1, lwd = 2, xaxt = 'n', yaxt = 'n', xlab = "",ylab = "",xlim=range(100:500), ylim=range(100:500))
    par(new = TRUE)
    plot(450,490-(nof_try*30), col = name2, lwd = 2, xaxt = 'n', yaxt = 'n', xlab = "",ylab = "",xlim=range(100:500), ylim=range(100:500))
    par(new = TRUE)
    plot(470,500-(nof_try*30), col = name3, lwd = 2, xaxt = 'n', yaxt = 'n', xlab = "",ylab = "",xlim=range(100:500), ylim=range(100:500))
    par(new = TRUE)
    plot(470,490-(nof_try*30), col = name4, lwd = 2, xaxt = 'n', yaxt = 'n', xlab = "",ylab = "",xlim=range(100:500), ylim=range(100:500))
  }
}

get_secret <- function(nof_col, colours_repeat=FALSE) {
  colours <- c(1:4)
  s <- sample(colours,nof_col, replace=colours_repeat) 
  s <- paste(s, collapse="")
  return(as.integer(s))
}


game <- function(numberOfColors=4, numberOfTries=10){
   rm(list = ls())
   x11()
   end_game <- 1
   count <<- 0
   get_board(nof_col = numberOfColors, nof_try = numberOfTries)
   store_secret <<- get_secret(nof_col=numberOfColors, colours_repeat =TRUE)
   input_colours <<- 0L
   #input_colours <- c("Blue", "Yellow","Green","Yellow") #, "Brown", "Green")
   nof_selection <- numberOfColors
   max_tries <- nof_selection*numberOfTries 
 
   while (end_game <= max_tries && store_secret != input_colours) {
     mouse.at <- locator(n = 1, type = "o") 
     x.at <- mouse.at$x
     y.at <- mouse.at$y
     
     
     if (x.at >= 150 & x.at < 200 & y.at >= 100 & y.at <=150) {
       input_colours <<- as.integer(paste( c(input_colours, 1),collapse=""))
       add_rect('Red',end_game) 
     }
     if (x.at >= 200 & x.at < 250 & y.at >= 100 & y.at <=150) {
       input_colours <<- as.integer(paste( c(input_colours, 2),collapse=""))
       add_rect('Green',end_game) 
     }
     if (x.at >= 250 & x.at < 300 & y.at >= 100 & y.at <=150) {
       input_colours <<- as.integer(paste( c(input_colours, 3),collapse=""))
       add_rect('Blue',end_game) 
     }
     if (x.at >= 300 & x.at < 350 & y.at >= 100 & y.at <=150) {
       input_colours <<- as.integer(paste( c(input_colours, 4),collapse=""))
       add_rect('Yellow',end_game) 
     } 

     #end game if needed 
     if (store_secret == input_colours) {
       par(new = TRUE)
       mtext("GAME WON", side=1)
       break
     }

      #end game if needed 
     if (end_game == max_tries) {
       par(new = TRUE)
       mtext("END GAME", side=1)
     }
     
    if (end_game %% numberOfColors == 0)  {
           add_key_pegs(input_colours, store_secret, count)
           input_colours <<- 0L 
           count <<- count + 1
           print(count)
    
      } 
     # increment next level
     end_game = end_game + 1      
   }
    
}
  
######################
### Start The game ###
######################

game()
  

And a simple gameplay:

As always, code is available in at the Github in same Useless_R_function repository and the R file to the game is in the same repository, here. Check Github for future updates.

Happy R-coding and stay healthy!

Tagged with: , , , , ,
Posted in Uncategorized, Useless R functions

R Studio with great new feature – multiple code panes

With October 2021 version of R Studio (2021.09.1 Preview) a great and – in my personal opinion – long awaited features is now available – multi windows or multi panes for viewing the R code.

Two code/source windows (panes) side by side

On R studio home page, make sure to download the version 2021.09 Preview (as of writing of this blogpost, this is still in preview) and install this version on your client machine (supported windows machine, MacOS and Linux).

Once installation is completed, head to global options (Tools->Global options) and select Pane Layout. You will have a new set of buttons available (Add Column; Remove Column). With Add column an additional pane will be added to layout.

Adding code panes

You can add multiple code (or source) panes – as you prefer. Apply the changes and you will have multiple code panes (windows) available.

Each source pane can have multiple files open and best use of this feature is, when R Studio is opened on bigger screen and you can now start coding faster, without switching between code tabs and loosing track of code.

Console, environment, history, files, plots panes will remain the same and (at this point) can not be multiplied. I personally haven’t found any problems or annoyances that the console or plot panes were not multiplied. But if someone finds this useful, feel free to propose this to R studio community site.

Stay healthy and happy Rrrr-ing! 🙂

Tagged with: , , ,
Posted in Uncategorized

Little useless-useful R functions – Creating tiny Fireworks with R

New Year’s eve is almost here and what best way to celebrate with fireworks. Snap, pop, crack, boom. This is the most peaceful, animal friendly, harmless, eco, children friendly, no-fire-needed, educative and nifty fireworks.

To get the fireworks, fire up the following R function.

##########################################
# 
# Tiny  fireworks  with R for New Year's 2022
#
# Series:
# Little Useless-useful R functions #31
# Created: December 29, 2021
# Author: Tomaz Kastrun
# Blog: tomaztsql.wordpress.com
# V.1.0

# Changelog: 
#        - add clean rings
###########################################

library(animation)
library(ggplot2)

set.seed(2908)

Fireworks <- function(nof_rockets=10) {
  if(!is.null(dev.list())) dev.off()
  if(!interactive()) return()

    draw.fireworks <- function(x,y,ring) {
      plot(x, y, xaxt='n', ann=FALSE, yaxt='n', frame.plot=FALSE, xlim=c(0,50),ylim=c(0,500))
      title(main = "Happy New Year 2022", col.main= "white")
      for (i in 1:ring) {   
          ani.options(interval = 0.25) 
          color <- sample(rainbow(ring),8, replace=TRUE)
          symbols(x,y, circles=0.16+i*1.2,add=T, inches=F, fg=color[i])
          ani.pause()
      }
      par(new=TRUE)
    }
    
    clear.fireworks <- function(x,y,ring){
      plot(x, y, xaxt='n', ann=FALSE, yaxt='n', frame.plot=FALSE, xlim=c(0,50),ylim=c(0,500))
      for (i in 1:ring) {   
        ani.options(interval = 0.15) 
        symbols(x,y, circles=0.16+i*1.2,add=T, inches=F, fg="black")
        ani.pause()
      }
      par(new=TRUE)
   }

  NewYear.fireworks <- function(){  
      bgcolor <- par("bg")
      if (bgcolor == "transparent" | bgcolor == "white") bgcolor <- "black"
      par(bg=bgcolor)
    
     # nof_rockets <- 10
      xx <-sample(1:50,nof_rockets)
      yy <-sample(1:500,nof_rockets)
      ringy <- sample(7:13,nof_rockets, replace = TRUE)
      
      for (i in 1:nof_rockets){
    
        x <- xx[i]
        y <- yy[i]
        ring <- ringy[i]
        draw.fireworks(x,y,ring)
        # if you don't want rings disappearing, comment this IF statement
        if (i > 1)  {
          x1 <- xx[i-1]
          y1 <- yy[i-1]
          ring1 <- ringy[i-1]
          clear.fireworks(x1, y1, ring1)
          }
      }
      # if you don't want rings disappearing, comment this IF statement
      clear.fireworks(tail(xx,1), tail(yy,1), tail(ringy,1))
  }
  NewYear.fireworks()
}

##################
# Run the function
##################
Fireworks(15)

And have your own little private useless R fireworks.

Fireworks with disappearing rings
Fireworks with colourful rings

Enjoy the silence. Observe the colours. Drink some champagne.

As always, code is available in at the Github in same Useless_R_function repository.

Happy R-coding and Happy New Year 2022!

And stay healthy!

Tagged with: , , , ,
Posted in Uncategorized, Useless R functions

Tower of Hanoi game with T-SQL

T-SQL Code for the popular game of “Tower of Hanoi”, that can be played in Microsoft SQL Server, Azure Data Studio or any other T-SQL editor with support of query execution.

About the game

Tower of Hanoi is puzzle game consisting of three rods and number of rings (disks) of different size (diameters). Rings are slide into any rod. Game begins with all rings stacked on one rod, ordered by descending size – from smallest on top, to biggest at the bottom.

The purpose of the game is to move the entire stack of rings from first to last rod, ordered by decreasing size.

Two simple rules apply:
1. only one ring can be moved at the time
2. bigger ring can not be stacked on smaller ring
3. each move consists of taking upper most ring and placing it on the other rod (on top of another stacked rings or on empty rod).

T-SQL Procedures for the game

Game has two simple procedures. The initialization of the table for the background information store is created with dbo.INIT_Hanoi procedure.

CREATE OR ALTER PROCEDURE dbo.INIT_Hanoi

/**************************************************************
Procedure:          dbo.INIT_Hanoi
Create Date:        2021-12-25
Author:             Tomaz Kastrun
Description:        Creates a table that stores the number of
					rings used in the game with three rods.
					Table name is dbo.Hanoi and is used to
					to store the moves.
 
Procedure output:	[dbo].[Hanoi]
Parameter(s):       @rings - Number of rings; e.g.: 5 = 5 rings 
					on 3 rods; Type: TINYINT (max 255 rings!)
Usage:              EXEC dbo.INIT_Hanoi
                        @rings = 5
ChangeLog:

ToDO:
					Align Drawing!
************************************************************* */

	@rings TINYINT

AS
BEGIN
	SET NOCOUNT ON;
	DECLARE @j INT = 1
	DROP TABLE IF EXISTS dbo.Hanoi;

	DECLARE @TableCreate NVARCHAR(1000) = 
	'DROP TABLE IF EXISTS dbo.Hanoi; 
	CREATE TABLE dbo.Hanoi (
	 ID TINYINT IDENTITY(1,1) NOT NULL
	,T1 TINYINT NOT NULL
	,T2 TINYINT NOT NULL
	,T3 TINYINT NOT NULL
	)
	'
	WHILE (@rings >= @j)
	BEGIN
		SET @TableCreate = @TableCreate + ' 
			INSERT INTO dbo.Hanoi(T1, T2, T3) VALUES ('+CAST(@j AS varchar(10))+',0,0)
			'
		SET @j = @j+1
	END

	EXEC sp_executesql @tableCreate

		DECLARE @max INT = @rings*4
		SELECT 
			 REPLICATE(' ',(@max - T1*2)/2) + REPLICATE('#', T1*2) + REPLICATE(' ',(@max - T1*2)/2)  AS T1
			,REPLICATE(' ',(@max - T2*2)/2) + REPLICATE('#', T2*2) + REPLICATE(' ',(@max - T2*2)/2)  AS T2
			,REPLICATE(' ',(@max - T3*2)/2) + REPLICATE('#', T3*2) + REPLICATE(' ',(@max - T3*2)/2)  AS T3 
		FROM hanoi
		ORDER BY ID ASC
END;
GO

And the play procedure for moving and stacking the rings around the rods, done with dbo.PLAY_Hanoi

CREATE OR ALTER  PROCEDURE [dbo].[PLAY_Hanoi]

/**************************************************************
Procedure:          dbo.PLAY_Hanoi
Create Date:        2021-12-25
Author:             Tomaz Kastrun
Description:        Creates a table that stores the number of
					rings used in the game with three rods.
					Table name is dbo.Hanoi and is used to
					to store the moves.
 
Procedure output:	[dbo].[Hanoi]
Parameter(s):       @from - rod number taking the first ring 
					@to - rod number putting the same ring

Usage:
		EXEC dbo.PLAY_Hanoi
			 @from = 1
			,@to = 2

ToDO:
					- Align Drawing of tower!
					- Stop / finish the game
                    - Rings different ASCII Char!
************************************************************* */

	 @from INT
	,@to INT
AS
BEGIN

		SET NOCOUNT ON;

		-- internal values
		DECLARE @from_variable VARCHAR(10) = (select column_name from information_Schema.columns where  table_name = 'hanoi' and table_Schema = 'dbo' and ordinal_position = (@from + 1))
		print @from_variable
		DECLARE @to_variable VARCHAR(10) = (select column_name from information_Schema.columns where  table_name = 'hanoi' and table_Schema = 'dbo' and ordinal_position = (@to + 1))
		print @to_variable

		-- FROM position
		DECLARE @from_position NVARCHAR(1000)
		SET @from_position =  'SELECT top 1 ID FROM dbo.hanoi where '+@from_Variable+' <> 0 order by id asc'

		DROP TABLE IF EXISTS #from_pos
		CREATE table #from_pos  (val int)
		INSERT INTO #from_pos
		EXEC sp_executesql @from_position

		-- FROM value
		DECLARE @from_value NVARCHAR(1000)
		SET @from_value =  'SELECT top 1 '+@from_variable+' FROM dbo.hanoi where '+@from_Variable+' <> 0 order by id asc'

		DROP TABLE IF EXISTS #from_val
		CREATE table #from_val  (val int)
		INSERT INTO #from_val
		EXEC sp_executesql @from_value
		IF (SELECT COUNT(*) FROM #from_val) = 0
		BEGIN 
			INSERT INTO #from_val VALUES (0)
		END

		-- TO position
		DECLARE @to_position NVARCHAR(1000)
		SET @to_position =  'SELECT top 1 ID FROM dbo.hanoi where '+@to_variable+' = 0 order by id desc'

		DROP TABLE IF EXISTS #to_pos
		CREATE table #to_pos  (val int)
		INSERT INTO #to_pos
		EXEC sp_executesql @to_position

		-- TO value
		DECLARE @to_value NVARCHAR(1000)
		SET @to_value =  'SELECT top 1 '+@to_variable+' FROM dbo.hanoi where '+@to_variable+' = 0 order by id desc'

		DROP TABLE IF EXISTS #to_val
		CREATE table #to_val  (val int)
		INSERT INTO #to_val
		EXEC sp_executesql @to_value

		-- TO Prev Value
		DECLARE @prev_to_val NVARCHAR(1000)
		SET @prev_to_val = 'select top 1 '+@to_variable+' from hanoi where  '+@to_variable +' <> 0 order by id asc'

		DROP TABLE IF EXISTS #to_prev_val
		CREATE table #to_prev_val  (val int)
		INSERT INTO #to_prev_val
		EXEC sp_executesql @prev_to_val

        -- number of rings!
		declare @rings int = (select COUNT(*) from dbo.hanoi)
		declare @max int = @rings*4

			--- internal update
			-- add rules for update!!!!
			IF ((SELECT ISNULL(val,0) FROM #to_prev_val) < (SELECT val FROM #from_val))
			BEGIN
				SELECT 'Wrong Move'
			END
            ELSE            
			BEGIN
                IF ((SELECT ISNULL(val,0) FROM #to_val) = 0 AND  (SELECT ISNULL(val,0) FROM #from_val) = 0)
                BEGIN
                    SELECT 'Invalid Move'
                END
                ELSE
                    BEGIN
                        --update FROM pos/value
                        DECLARE @update_from NVARCHAR(1000)
                        SET @update_from = 'update dbo.hanoi set '+@from_variable+' = (select 0 ) WHERE ID =  (SELECT val FROM #from_pos) '
                        EXEC sp_executesql @update_from

                        --update TO pos/value
                        DECLARE @update_to NVARCHAR(1000)
                        SET @update_to = 'update dbo.hanoi set '+@to_variable+' = (select val from #from_Val) WHERE ID = (SELECT val FROM #to_pos)'
                        EXEC sp_executesql @update_to

                    END
			END
			SELECT 
				 REPLICATE(' ',(@max - T1*2)/2) + REPLICATE('#', T1*2) + REPLICATE(' ',(@max - T1*2)/2)  AS T1
				,REPLICATE(' ',(@max - T2*2)/2) + REPLICATE('#', T2*2) + REPLICATE(' ',(@max - T2*2)/2)  AS T2
				,REPLICATE(' ',(@max - T3*2)/2) + REPLICATE('#', T3*2) + REPLICATE(' ',(@max - T3*2)/2)  AS T3 
			FROM dbo.hanoi
			ORDER BY ID ASC

			-- check Tower 2 and Tower 3
			DECLARE @t2 INT = (SELECT COUNT(T2) FROM Hanoi WHERE T2 <> 0)
			DECLARE @t3 INT = (SELECT COUNT(T3) FROM Hanoi WHERE T3 <> 0)
            IF (@T2 = @rings OR @T3 = @rings)
            BEGIN
                SELECT 'Game Won!'
                -- Initialize New Game
                EXEC dbo.INIT_Hanoi @rings
            END
END;
GO

Playing the game

After running the content of Tower_Hanoi.sql file (creating two procedures dbo.INIT_Hanoi and dbo.Play_Hanoi ), start the game with:

EXEC dbo.INIT_Hanoi   
        @rings = 4

And continue playing the game:

EXEC dbo.PLAY_Hanoi     
            @from = 1    
           ,@to = 2;
GO
EXEC dbo.PLAY_Hanoi     
           @from = 1    
          ,@to = 3;
GO


Opening the game in Azure Data Studio or in SSMS, the outlook do the game should be:

Actual gameplay (animated GIF) using Azure Data Studio:

Autosolver procedure


Solving Tower of Hanoi game can be by a procedure, that automatically finds solution:

CREATE OR ALTER PROCEDURE dbo.AutoSolver

/**************************************************************
Procedure:          dbo.AutoSolver
Create Date:        2021-12-26
Author:             Tomaz Kastrun
Description:        Initialize and solves the game Tower of 
                    Hanoi for the given number of rings.
                    All steps are temporarely stored in log.
Procedure output:	[dbo].[Hanoi_log]
Parameter(s):       @rings - number of rings 

Usage:
		EXEC dbo.Autosolver
			 @rings = 4

ToDO:
					- Optimization for end result on rod 2 or 3
************************************************************* */

	@rings TINYINT
AS
BEGIN

	drop table if exists dbo.hanoi_log;
	
	CREATE TABLE dbo.hanoi_log
	(id int identity(1,1) NOT NULL
	,rodd varchar(10)
	,f_rod int
	,t_rod int
	)

	EXEC dbo.INIT_Hanoi 
		@rings = @rings;

	-- init set of @T2_count and @T3_count
	DECLARE @t2_count INT = -1 
	DECLARE @t3_count INT = -1

	DECLARE @from INT  
	DECLARE @to INT  

	WHILE (@t2_count <= @rings) OR (@t3_count <=  @rings)
	BEGIN

		DECLARE @t1 INT = (Select TOP 1 ISNULL(t1,0) from hanoi WHERE t1 <> 0 ORDER BY ID ASC)
		DECLARE @t2 INT = (Select TOP 1 ISNULL(t2,0) from hanoi WHERE t2 <> 0 ORDER BY ID ASC)
		DECLARE @t3 INT = (Select top 1 ISNULL(t3,0) from hanoi WHERE t3 <> 0 ORDER BY ID ASC)

		INSERT INTO dbo.hanoi_log (rodd, f_rod, t_rod)
		SELECT TOP 1
			 cast(right(t.rod,1) as varchar(10)) + ';' + cast(right(f.rod,1) as varchar(10)) as rodd
			,right(f.rod,1) AS from_rod
			,right(t.rod,1) as To_rod
			--,CAST(right(f.rod,1) as int) + cast(right(t.rod,1) as int) as suma_f
		FROM (
				SELECT ISNULL(@t1,0) as val, 't1' as rod, 'from' as pot
				union 
				SELECT ISNULL(@t2,0), 't2' as rod, 'from' as pot
				union 
				SELECT ISNULL(@t3,0), 't3' as rod, 'from' as pot
		) as f
		cross join (
				SELECT ISNULL(@t1,0) as val, 't1' as rod, 'to' as pot
				union 
				SELECT ISNULL(@t2,0), 't2' as rod, 'to' as pot
				union 
				SELECT ISNULL(@t3,0), 't3' as rod, 'to' as pot
		) as t
		WHERE
			f.rod <> t.rod
		AND f.val <> 0
		AND (t.val > f.val OR t.val = 0)
		AND CAST(right(f.rod,1) as varchar(10)) + ';' + cast(right(t.rod,1) as varchar(10)) NOT IN (SELECT rodd from dbo.hanoi_log where id = (SELECT max(id) from hanoi_log)) --last
		AND CAST(right(f.rod,1) as varchar(10)) + ';' + cast(right(t.rod,1) as varchar(10)) NOT IN (SELECT rodd from dbo.hanoi_log where id = (SELECT max(id)-1 from hanoi_log)) --before_last
		ORDER BY CAST(right(f.rod,1) as int) + cast(right(t.rod,1) as int)  desc

		DECLARE @max_id INT = (select max(id) from dbo.hanoi_log)

		SET @from = (SELECT f_rod FROM dbo.hanoi_log WHERE id = @max_id)
		SET @to = (SELECT t_rod FROM dbo.hanoi_log WHERE id = @max_id)
		

		EXEC dbo.PLAY_Hanoi 
			 @from = @from
			,@to = @to


	   	SET @t2_count  = (SELECT COUNT(t2) FROM dbo.Hanoi WHERE t2 <> 0)
		SET @t3_count  = (SELECT COUNT(t3) FROM dbo.Hanoi WHERE t3 <> 0)				
				

		IF (@t2_count = 0) AND (@t3_count = 0)
			BEGIN
				BREAK  			
			END
		END
			 
END;
GO


You can also use the SQLCMD. On your client machine, open CMD and navigate to your MSSQLServer folder (e.g.: C:\Program Files\Microsoft SQL Server\MSSQL15.MSSQLSERVER) and run the following CMD command (please note, I am using a named instance, hence -S switch):

sqlcmd -S .\MSSQLSERVER2019 -q "EXEC dbo.AutoSolver @rings = 4"

As always, the complete code is available at the Github: https://github.com/tomaztk/Tower_of_Hanoi_sql_game

Stay healthy and happy T-SQLing! 🙂

Tagged with: , , , , , , , ,
Posted in Uncategorized

Advent of 2021, Day 25 – Spark literature, documentation, courses and books

Series of Apache Spark posts:

To wrap up this year’s Advent of Spark 2021 – series of blogposts on Spark – it is essential to look at the list of additional learning resources for you to continue with this journey. Let’s divide this list not by type of the resource (book, on-line documentation, on-line courses, articles, Youtube channels, Discord channels, and others) but rather divide them by language flavour. Scala/Spark, R, and Python.

Spark – Scala

  • Spark Official Documentation – link
  • Spark: The definitive Guide – link
  • Stream processing with Apache Spark – link
  • Data Engineering with Apache Spark, Delta Lake, and Lakehouse – link
  • Programming Scala – 3rd edition – link
  • Scala & Spark – Master Big Data with Scala and Spark – link
  • Getting started with Apache Spark on Databricks – link to course
  • Apache Spark – link

R Language

  • Mastering Spark with R – link
  • SparkR documentation – link
  • Sparklyr: R interface for Apache Spark – link
  • R and Spark: How to Analyze Data Using RStudio’s Sparklyr and H2O’s Rsparkling Packages – link
  • Sparklyr in SQL Server Big Data cluster – link
  • Big data in R – Intro to Sparklyr – link

Python

  • Spark with PySpark – link
  • Spark and Python for Big Data with PySpark – link to course
  • PySpark intro – link
  • Apache Spark 3 for Data Engineering and Analytics with Python – link

Wrapping up this year’s series of Advent of Spark! Merry Christmas and Happy new Year 2022!

Compete set of code, documents, notebooks, and all of the materials will be available at the Github repository: https://github.com/tomaztk/Spark-for-data-engineers

Happy Spark Advent of 2021! 🙂

Tagged with: , , , , , , , , , ,
Posted in Azure Databricks, Spark, Uncategorized

Advent of 2021, Day 24 – Data Visualisation with Spark

Series of Apache Spark posts:

In previous posts, we have seen that Spark Dataframes (datasets) are compatible with other classes, functions. Regarding the preferred language (Scala, R, Python, Java).

Using Python

You can use any of the popular Python packages to do the visualisation; Plotly, Dash, Seaborn, Matplotlib, Bokeh, Leather, Glam, to name the couple and many others. Once the data is persisted in dataframe, you can use any of the packages. With the use of PySpark, plugin the Matplotlib. Here is an example

from pyspark.sql import SparkSession
import matplotlib.pyplot as plt

spark = SparkSession.builder.master("local[*]").getOrCreate()
df = spark.read.format("csv").option("header", "true").load("sampleData.csv")
sampled_data = df.select('x','y').sample(False, 0.8).toPandas()

# and at the end lets use our beautiful matplotlib
plt.scatter(sampled_data.x,sampled_data.y)
plt.xlabel('x')
plt.ylabel('y')
plt.title('relation of y and x')
plt.show()

Using R

With help of

library(sparklyr)
library(ggplot2)
library(dplyr)

#connect
sc <- spark_connect(master = "local")

# data wrangling
flights_tbl <- copy_to(sc, nycflights13::flights, "flights")
delay <- flights_tbl %>%
  group_by(tailnum) %>%
  summarise(count = n(), dist = mean(distance), delay = mean(arr_delay)) %>%
  filter(count > 20, dist < 2000, !is.na(delay)) %>%
  collect

# plot delays
ggplot(delay, aes(dist, delay)) +
  geom_point(aes(size = count), alpha = 1/2) +
  geom_smooth() +
  scale_size_area(max_size = 2)

Using Scala

The best way to use the visualisation with Scala is to use the notebooks. It can be a Databricks notebook, the Binder notebook, Zeppelin notebook. Store the results in dataframe and you can visualise the results fast, easy and practically with no coding.

val ds = spark.read.json("/databricks-datasets/iot/iot_devices.json").as[DeviceIoTData]
display(ds)

And now we can create graphs, that are available to the bottom left side as buttons on Azure Databricks notebooks. Besides the graphs, you can also do data profiling out-of-the-box.

Tomorrow we will look into Spark Literature and where to go for next steps.

Compete set of code, documents, notebooks, and all of the materials will be available at the Github repository: https://github.com/tomaztk/Spark-for-data-engineers

Happy Spark Advent of 2021! 🙂

Tagged with: , , , , , , , , ,
Posted in Spark, Uncategorized

Advent of 2021, Day 23 – Delta live tables with Azure Databricks

Series of Apache Spark posts:

Delta Live Tables is a framework for building reliable, maintainable, and testable data processing pipelines. User defines the transformations to be performed on the datasources and data, and the framework manages all the data engineering tasks: task orchestrations, cluster management, monitoring, data quality, and event error handling.

Delta Live Tables framework helps and manages how data is being transformed with help of target schema and can is a slight different experience with Databricks Tasks (with Apache Spark tasks in the background).

As of writting this blogpost, the Delta Live Tables are still in private preview and must be granted access by accessing the link. Once this is approved, you can simply log into Azure Databricks and create a new notebook (with Python or SQL).

With Python, you can create a simple notebook and start consuming Delta Live tables (DLT):

# Databricks notebook source
# MAGIC %md # Delta Live Tables quickstart (Python)
# MAGIC 
# MAGIC A notebook that provides an example Delta Live Tables pipeline to:
# MAGIC 
# MAGIC - Read raw JSON clickstream data into a table.
# MAGIC - Read records from the raw data table and use a Delta Live Tables query and expectations to create a new table with cleaned and prepared data.
# MAGIC - Perform an analysis on the prepared data with a Delta Live Tables query.

# COMMAND ----------

# DBTITLE 1,Imports
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *

# COMMAND ----------

# DBTITLE 1,Ingest raw clickstream data
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
@dlt.create_table(
  comment="The raw wikipedia clickstream dataset, ingested from /databricks-datasets."
)
def clickstream_raw():          
  return (
    spark.read.json(json_path)
  )

# COMMAND ----------

# DBTITLE 1,Clean and prepare data
@dlt.table(
  comment="Wikipedia clickstream data cleaned and prepared for analysis."
)
@dlt.expect("valid_current_page_title", "current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_prepared():
  return (
    dlt.read("clickstream_raw")
      .withColumn("click_count", expr("CAST(n AS INT)"))
      .withColumnRenamed("curr_title", "current_page_title")
      .withColumnRenamed("prev_title", "previous_page_title")
      .select("current_page_title", "click_count", "previous_page_title")
  )

# COMMAND ----------

# DBTITLE 1,Top referring pages
@dlt.table(
  comment="A table containing the top pages linking to the Apache Spark page."
)
def top_spark_referrers():
  return (
    dlt.read("clickstream_prepared")
      .filter(expr("current_page_title == 'Apache_Spark'"))
      .withColumnRenamed("previous_page_title", "referrer")
      .sort(desc("click_count"))
      .select("referrer", "click_count")
      .limit(10)
  )

And Same can be done with SQL:

-- Databricks notebook source
-- MAGIC %md # Delta Live Tables quickstart (SQL)
-- MAGIC 
-- MAGIC A notebook that provides an example Delta Live Tables pipeline to:
-- MAGIC 
-- MAGIC - Read raw JSON clickstream data into a table.
-- MAGIC - Read records from the raw data table and use a Delta Live Tables query and expectations to create a new table with cleaned and prepared data.
-- MAGIC - Perform an analysis on the prepared data with a Delta Live Tables query.

-- COMMAND ----------

-- DBTITLE 1,Ingest raw clickstream data
CREATE LIVE TABLE clickstream_raw
COMMENT "The raw wikipedia click stream dataset, ingested from /databricks-datasets."
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`

-- COMMAND ----------

-- DBTITLE 1,Clean and prepare data
CREATE LIVE TABLE clickstream_clean(
  CONSTRAINT valid_current_page EXPECT (current_page_title IS NOT NULL),
  CONSTRAINT valid_count EXPECT (click_count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "Wikipedia clickstream data cleaned and prepared for analysis."
AS SELECT
  curr_title AS current_page_title,
  CAST(n AS INT) AS click_count,
  prev_title AS previous_page_title
FROM live.clickstream_raw

-- COMMAND ----------

-- DBTITLE 1,Top referring pages
CREATE LIVE TABLE top_spark_referers
COMMENT "A table containing the top pages linking to the Apache Spark page."
AS SELECT
  previous_page_title as referrer,
  click_count
FROM live.clickstream_clean
WHERE current_page_title = 'Apache_Spark'
ORDER BY click_count DESC
LIMIT 10

With either code will create the pipelines (as visualised below) and the management, orchestration and monitor will be provided with the framework.

Spark API offers also all the data manipulations to work with Python and SQL to create functions, tables, views,…

With Python, you can apply the  @view or @table decorator to a function to define a view or table. You can use the function name or the name parameter to assign the table or view name. Pseudo data with Python:

@dlt.view
def taxi_raw():
  return spark.read.json("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("clickstream_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("clickstream_raw").where(...)

And same can be accomplished by using SQL:

CREATE LIVE TABLE clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/`

CREATE LIVE TABLE filtered_data
AS SELECT
  ...
FROM LIVE.clickstream_raw

Tomorrow we will look into Data visualisation with Spark.

Compete set of code, documents, notebooks, and all of the materials will be available at the Github repository: https://github.com/tomaztk/Spark-for-data-engineers

Happy Spark Advent of 2021! 🙂

Tagged with: , , , , , , ,
Posted in Azure Databricks, Spark, Uncategorized

Advent of 2021, Day 22 – Spark in Azure Databricks

Series of Apache Spark posts:

Azure Databricks is a platform build on top of Spark based analytical engine, that unifies data, data manipulation, analytics and machine learning.

Databricks uses notebooks to tackle all the tasks and is therefore made easy to collaborate. Let’s dig in and start using a Python API on top of Spark API.

Sign into Azure Databricks, create new notebook and attach notebook to a cluster. How to do this, check and follow my Github repository on Advent of Databricks 2020.

Using the new notebook, we will create a dataset and start working with dataset, using all the operations relevant for data engineering.

A complete sample wrapped in Databricks notebook file. Complete file is also available as IPython notebook and is available here.

# Databricks notebook source
# MAGIC %md
# MAGIC # Using Python Dataframes on Spark API for Data engineering tasks

# COMMAND ----------

# MAGIC %md
# MAGIC This notebook will explore basic and intermediate tasks and operators, that engineer should be comfortable to use. This tasks can be written similar in Scala (Spark).

# COMMAND ----------

# MAGIC %md
# MAGIC ## Create Dataframe

# COMMAND ----------

# import pyspark class Row from module sql
from pyspark.sql import *

# Create Example Data - Departments and Employees

# Create the Departments
department1 = Row(id='123456', name='Computer Science')
department2 = Row(id='789012', name='Mechanical Engineering')
department3 = Row(id='345678', name='Theater and Drama')
department4 = Row(id='901234', name='Indoor Recreation')

# Create the Employees
Employee = Row("firstName", "lastName", "email", "salary")
employee1 = Employee('michael', 'armbrust', 'no-reply@berkeley.edu', 100000)
employee2 = Employee('xiangrui', 'meng', 'no-reply@stanford.edu', 120000)
employee3 = Employee('matei', None, 'no-reply@waterloo.edu', 140000)
employee4 = Employee(None, 'wendell', 'no-reply@berkeley.edu', 160000)
employee5 = Employee('michael', 'jackson', 'no-reply@neverla.nd', 80000)

# Create the DepartmentWithEmployees instances from Departments and Employees
departmentWithEmployees1 = Row(department=department1, employees=[employee1, employee2])
departmentWithEmployees2 = Row(department=department2, employees=[employee3, employee4])
departmentWithEmployees3 = Row(department=department3, employees=[employee5, employee4])
departmentWithEmployees4 = Row(department=department4, employees=[employee2, employee3])

print(department1)
print(employee2)
print(departmentWithEmployees1.employees[0].email)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Create dataframes from list of rows

# COMMAND ----------

departmentsWithEmployeesSeq1 = [departmentWithEmployees1, departmentWithEmployees2]
df1 = spark.createDataFrame(departmentsWithEmployeesSeq1)

display(df1)

departmentsWithEmployeesSeq2 = [departmentWithEmployees3, departmentWithEmployees4]
df2 = spark.createDataFrame(departmentsWithEmployeesSeq2)

display(df2)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Union of two dataframes

# COMMAND ----------

unionDF = df1.union(df2)
display(unionDF)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Write the unioned DataFrame to a Parquet file

# COMMAND ----------

# Remove the file if it exists
dbutils.fs.rm("/tmp/databricks-df-example.parquet", True)
unionDF.write.parquet("/tmp/databricks-df-example.parquet")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Read from  a Parquet file

# COMMAND ----------

parquetDF = spark.read.parquet("/tmp/databricks-df-example.parquet")
display(parquetDF)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Explode the employee columns

# COMMAND ----------

from pyspark.sql.functions import explode

explodeDF = unionDF.select(explode("employees").alias("e"))
flattenDF = explodeDF.selectExpr("e.firstName", "e.lastName", "e.email", "e.salary")

flattenDF.show()

# COMMAND ----------

# MAGIC %md
# MAGIC ## Filtering data (rows) to match the predicate

# COMMAND ----------

filterDF = flattenDF.filter(flattenDF.firstName == "xiangrui").sort(flattenDF.lastName)
display(filterDF)
## or

# COMMAND ----------

from pyspark.sql.functions import col, asc

# Use `|` instead of `or`
filterDF = flattenDF.filter((col("firstName") == "xiangrui") | (col("firstName") == "michael")).sort(asc("lastName"))
display(filterDF)
## or

# COMMAND ----------

whereDF = flattenDF.where((col("firstName") == "xiangrui") | (col("firstName") == "michael")).sort(asc("lastName"))
display(whereDF)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Replacing values

# COMMAND ----------

nonNullDF = flattenDF.fillna("--")
display(nonNullDF)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Aggregating data (sum, count, groupby, summary, min, max, ...)

# COMMAND ----------

from pyspark.sql.functions import countDistinct

countDistinctDF = nonNullDF.select("firstName", "lastName")\
  .groupBy("firstName")\
  .agg(countDistinct("lastName").alias("distinct_last_names"))

display(countDistinctDF)

# COMMAND ----------

salarySumDF = nonNullDF.agg({"salary" : "sum"})
display(salarySumDF)

# COMMAND ----------

nonNullDF.describe("salary").show()

# COMMAND ----------

# MAGIC %md
# MAGIC ## Clean up Parquet file

# COMMAND ----------

dbutils.fs.rm("/tmp/databricks-df-example.parquet", True)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Working with functions

# COMMAND ----------

# MAGIC %md
# MAGIC ### Create sample dataset

# COMMAND ----------

from pyspark.sql import functions as F
from pyspark.sql.types import *

# Build an example DataFrame dataset to work with.
dbutils.fs.rm("/tmp/dataframe_sample.csv", True)
dbutils.fs.put("/tmp/dataframe_sample.csv", """id|end_date|start_date|location
1|2015-10-14 00:00:00|2015-09-14 00:00:00|CA-SF
2|2015-10-15 01:00:20|2015-08-14 00:00:00|CA-SD
3|2015-10-16 02:30:00|2015-01-14 00:00:00|NY-NY
4|2015-10-17 03:00:20|2015-02-14 00:00:00|NY-NY
5|2015-10-18 04:30:00|2014-04-14 00:00:00|CA-SD
""", True)

df = spark.read.format("csv").options(header='true', delimiter = '|').load("/tmp/dataframe_sample.csv")
df.printSchema()

# COMMAND ----------

# MAGIC %md
# MAGIC ### Using built-in functions

# COMMAND ----------

# Instead of registering a UDF, call the builtin functions to perform operations on the columns.
# This will provide a performance improvement as the builtins compile and run in the platform's JVM.

# Convert to a Date type
df = df.withColumn('date', F.to_date(df.end_date))

# Parse out the date only
df = df.withColumn('date_only', F.regexp_replace(df.end_date,' (\d+)[:](\d+)[:](\d+).*$', ''))

# Split a string and index a field
df = df.withColumn('city', F.split(df.location, '-')[1])

# Perform a date diff function
df = df.withColumn('date_diff', F.datediff(F.to_date(df.end_date), F.to_date(df.start_date)))

# COMMAND ----------

df.createOrReplaceTempView("sample_df")
display(sql("select * from sample_df"))

# COMMAND ----------

# MAGIC %md
# MAGIC ### Convert to JSON format

# COMMAND ----------

rdd_json = df.toJSON()
rdd_json.take(2)

# COMMAND ----------

# MAGIC %md
# MAGIC ### Create user-defined function (UDF)

# COMMAND ----------

from pyspark.sql import functions as F

add_n = udf(lambda x, y: x + y, IntegerType())

# We register a UDF that adds a column to the DataFrame, and we cast the id column to an Integer type.
df = df.withColumn('id_offset', add_n(F.lit(1000), df.id.cast(IntegerType())))

# COMMAND ----------

# MAGIC %md
# MAGIC ### ... and pass the parameter to UDF

# COMMAND ----------

# any constants used by UDF will automatically pass through to workers
N = 90
last_n_days = udf(lambda x: x < N, BooleanType())

df_filtered = df.filter(last_n_days(df.date_diff))
display(df_filtered)

# COMMAND ----------

#md
### Aggregate over multiple columns

# COMMAND ----------

agg_df = df.groupBy("location").agg(F.min("id"), F.count("id"), F.avg("date_diff"))
display(agg_df)

# COMMAND ----------

# MAGIC %md
# MAGIC ### And store data to Parquet file on file partitiion by time (time - end)

# COMMAND ----------

df = df.withColumn('end_month', F.month('end_date'))
df = df.withColumn('end_year', F.year('end_date'))
df.write.partitionBy("end_year", "end_month").parquet("/tmp/sample_table")
display(dbutils.fs.ls("/tmp/sample_table"))

Using Databricks for data manipulation is easy, fast and efficient. But not only from installation point of view, but also the fact that Databricks unifies all the tasks together in a single notebook, bringing also different departments closer to collaborate.

Tomorrow we will look into Delta live tables with Azure Databricks.

Compete set of code, documents, notebooks, and all of the materials will be available at the Github repository: https://github.com/tomaztk/Spark-for-data-engineers

Happy Spark Advent of 2021! 🙂

Tagged with: , , , , , , ,
Posted in Azure Databricks, Spark, Uncategorized

Advent of 2021, Day 21 – Spark GraphX operators

Series of Apache Spark posts:

Property graphs have collection of operators, that can take user-defined function and produce new graphs with transformed properties and structure. Core operators are defined in Graph and compositions of core operators are defined as GraphOps, and are automatically available as members of Graph. Each graph representation must provide implementations of the core operations and reuse many of the useful operations that are defined in GraphOps.

The list of all operators:

/** Summary of the functionality in the property graph */
class Graph[VD, ED] {
  // Information about the Graph ===================================================================
  val numEdges: Long
  val numVertices: Long
  val inDegrees: VertexRDD[Int]
  val outDegrees: VertexRDD[Int]
  val degrees: VertexRDD[Int]
  // Views of the graph as collections =============================================================
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
  val triplets: RDD[EdgeTriplet[VD, ED]]
  // Functions for caching graphs ==================================================================
  def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  def cache(): Graph[VD, ED]
  def unpersistVertices(blocking: Boolean = false): Graph[VD, ED]
  // Change the partitioning heuristic  ============================================================
  def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
  // Transform vertex and edge attributes ==========================================================
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
    : Graph[VD, ED2]
  // Modify the graph structure ====================================================================
  def reverse: Graph[VD, ED]
  def subgraph(
      epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
      vpred: (VertexId, VD) => Boolean = ((v, d) => true))
    : Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
  // Join RDDs with the graph ======================================================================
  def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
  def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])
      (mapFunc: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
  // Aggregate information about adjacent triplets =================================================
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[A]
  // Iterative graph-parallel computation ==========================================================
  def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED]
  // Basic graph algorithms ========================================================================
  def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
  def connectedComponents(): Graph[VertexId, ED]
  def triangleCount(): Graph[Int, ED]
  def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
}

There are (i) Property Operators, (ii), Structural Operators, (iii) Join Operators, and (iv) Neighbourhood Operators.

The property operators yields a new graph with the vertex or edge properties modified by the user defined map function. These operators are:

class Graph[VD, ED] {
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}

Structural operators are reverse, mask and subgraph operators. Reverse operator returns a new graph with all the edge directions reversed. Subgraph operator takes vertex and edge predicates and returns the graph containing only the vertices that satisfy the vertex predicate. And mask operator constructs a subgraph by returning a graph that contains the vertices and edges that are also found in the input graph.

Example of mask operator:

// Run Connected Components
val ccGraph = graph.connectedComponents() // No longer contains missing field
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// Restrict the answer to the valid subgraph
val validCCGraph = ccGraph.mask(validGraph)

Join operators are joinVertices and OuterJoinVertices operators. Join Vertices operator joins the vertices with the input RDD and returns a new graph with the vertex properties obtained by applying the user defined map function to the result of the joined vertices. And OuterJoinVertices works similar to joinVertices except that the user defined map function is applied to all vertices and can change the vertex property type

class Graph[VD, ED] {
  def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
    : Graph[VD, ED]
  def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
}

The Neighbourhood operator are Aggregate Messages, Compute Degree OInformation, andCollection neighbours.

Aggregate messages applies a user defined sendMsg function to each edge triplet in the graph and then uses the mergeMsg function to aggregate those messages at their destination vertex.

class Graph[VD, ED] {
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[Msg]
}

Computing degree information is a common aggregation task that computes the degree of each vertex: the number of edges adjacent to each vertex. In the context of directed graphs it is often necessary to know the in-degree, out-degree, and the total degree of each vertex. 

// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
  if (a._2 > b._2) a else b
}
// Compute the max degrees
val maxInDegree: (VertexId, Int)  = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int)   = graph.degrees.reduce(max)

Tomorrow we will look into Spark in Databricks.

Compete set of code, documents, notebooks, and all of the materials will be available at the Github repository: https://github.com/tomaztk/Spark-for-data-engineers

Happy Spark Advent of 2021! 🙂

Tagged with: , , , ,
Posted in Spark, Uncategorized

Advent of 2021, Day 20 – Spark GraphX processing

Series of Apache Spark posts:

GraphX is Spark’s API component for graph and graph-parallel computations. GraphX uses Spark RDD and builds a graph abstraction on top of RDD. Graph abstraction is a directed multigraph with properties of edges and vertices.

Introduction to Apache Spark GraphX - YouTube

GraphX supports computations, and exposes set of fundamental operators (subgraph, joinVertices, aggregateMessages), as well it includes a growing collection of graph algorithms for simpler ETL and analytical tasks.

Spark GraphX enables following features:
– Flexibility: giving the same RDD data both graphs and collections, transform and join graphs with RDDs efficiently and write custom iterative graph algorithms using the Google’s Pregel API,
– Computational speed: almost the fastest specialised graph processing systems with not only retaining flexibility, but also fault-tolerance
– Graph algorithms: gives  popular algorithms to solve popular business cases. These algorithms are page rank, connected components, label propagation, SVD++, strongly connected components, and triangle count.

To get started, use the following Scala commands:

import org.apache.spark._
import org.apache.spark.graphx._
// To make some of the examples work we will also need RDD
import org.apache.spark.rdd.RDD
import org.apache.spark.util.IntParam 
import org.apache.spark.graphx.util.GraphGenerators 

Property graph

Property graph is directed multigraph with defined objects attached to vertices and edges. A directed multigraph is a directed graph with multiple parallel edges sharing the same source and destination vertex.Supporting parallel edges simplifies modelling scenarios where there can be multiple relationships (e.g., co-worker and friend) between the same vertices. Each vertex is keyed by a unique 64-bit long identifier (VertexId). GraphX does not impose any ordering constraints on the vertex identifiers.

GraphX Example - Spark GraphX Tutorial - Edureka

The property graph is parameterized over the vertex (VD) and edge (ED) types. Defining a property graph:

class VertexProperty()
case class UserProperty(val name: String) extends VertexProperty
case class ProductProperty(val name: String, val price: Double) extends VertexProperty
// The graph might then have the type:
var graph: Graph[VertexProperty, String] = null

Since graph is based on RDD, which are immutable and fault-tolerant, graph can behave the sam way as RDD. The example of property graph can be constructed as following:

val userGraph: Graph[(String, String), String]

Establishing the connection and Spark engine:

// Assume the SparkContext has already been constructed
val sc: SparkContext
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Seq(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
val graph: Graph[(String, String), String] // Constructed from above
// Count all users which are postdocs
graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
// Count all the edges where src > dst
graph.edges.filter(e => e.srcId > e.dstId).count
//Case class constructor to count edges
graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count

We can also use the SQL script to create a triplet view – join the vertex and edge property using RDD[EdgeTriplet[VD,ED]]. This join property can be expressed with SQL query:

SELECT src.id, dst.id, src.attr, e.attr, dst.attr
FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst
ON e.srcId = src.Id AND e.dstId = dst.Id

Tomorrow we will look into couple of graphX operators.

Compete set of code, documents, notebooks, and all of the materials will be available at the Github repository: https://github.com/tomaztk/Spark-for-data-engineers

Happy Spark Advent of 2021! 🙂

Tagged with: , , , , ,
Posted in Spark, Uncategorized
Follow TomazTsql on WordPress.com
Programs I Use: SQL Search
Programs I Use: R Studio
Programs I Use: Plan Explorer
Rdeči Noski – Charity

Rdeči noski

100% of donations made here go to charity, no deductions, no fees. For CLOWNDOCTORS - encouraging more joy and happiness to children staying in hospitals (http://www.rednoses.eu/red-noses-organisations/slovenia/)

€2.00

Top SQL Server Bloggers 2018
TomazTsql

Tomaz doing BI and DEV with SQL Server and R, Python, Power BI, Azure and beyond

Discover WordPress

A daily selection of the best content published on WordPress, collected for you by humans who love to read.

Revolutions

Tomaz doing BI and DEV with SQL Server and R, Python, Power BI, Azure and beyond

tenbulls.co.uk

tenbulls.co.uk - attaining enlightenment with the Microsoft Data and Cloud Platforms with a sprinkling of Open Source and supporting technologies!

SQL DBA with A Beard

He's a SQL DBA and he has a beard

Reeves Smith's SQL & BI Blog

A blog about SQL Server and the Microsoft Business Intelligence stack with some random Non-Microsoft tools thrown in for good measure.

SQL Server

for Application Developers

Business Analytics 3.0

Data Driven Business Models

SQL Database Engine Blog

Tomaz doing BI and DEV with SQL Server and R, Python, Power BI, Azure and beyond

Search Msdn

Tomaz doing BI and DEV with SQL Server and R, Python, Power BI, Azure and beyond

R-bloggers

Tomaz doing BI and DEV with SQL Server and R, Python, Power BI, Azure and beyond

MsSQLGirl

Bringing value to data & insights through experiences users love

R-bloggers

R news and tutorials contributed by hundreds of R bloggers

Data Until I Die!

Data for Life :)

Paul Turley's SQL Server BI Blog

sharing my experiences with the Microsoft data platform, SQL Server BI, Data Modeling, SSAS Design, Power Pivot, Power BI, SSRS Advanced Design, Power BI, Dashboards & Visualization since 2009

Grant Fritchey

Intimidating Databases and Code

Madhivanan's SQL blog

A modern business theme

Alessandro Alpi's Blog

DevOps could be the disease you die with, but don’t die of.

Paul te Braak

Business Intelligence Blog

Sql Server Insane Asylum (A Blog by Pat Wright)

Information about SQL Server from the Asylum.

Gareth's Blog

A blog about Life, SQL & Everything ...