registerTempTable(goodTransRecords, "goodtrans")
showDF(goodTransRecords)
highValueTransRecords <- sql(sqlContext, "SELECT AccNo, TranAmount FROM goodtrans WHERE TranAmount > 1000")
showDF(highValueTransRecords)
badAccountRecords <- sql(sqlContext, "SELECT AccNo, TranAmount FROM trans WHERE AccNo NOT like 'SB%'")
showDF(badAccountRecords)
badAmountRecords <- sql(sqlContext, "SELECT AccNo, TranAmount FROM trans WHERE TranAmount < 0")
showDF(badAmountRecords)
badTransRecords <- unionAll(badAccountRecords, badAmountRecords)
showDF(badTransRecords)
sumAmount <- sql(sqlContext, "SELECT sum(TranAmount) as sum FROM goodtrans")
showDF(sumAmount)
maxAmount <- sql(sqlContext, "SELECT max(TranAmount) as max FROM goodtrans")
showDF(maxAmount)
minAmount <- sql(sqlContext, "SELECT min(TranAmount)as min FROM goodtrans")
showDF(minAmount)
goodAccNos <- sql(sqlContext, "SELECT DISTINCT AccNo FROM trans WHERE AccNo like 'SB%' ORDER BY AccNo")
showDF(goodAccNos)
df <- createDataFrame(sqlContext, faithful)
showDF(df)
faithfulDF <- createDataFrame(sqlContext, faithful)
showDF(faithfulDF)
goodTransRecordsFromAPI <- filter(acTransDF, "AccNo like 'SB%' AND TranAmount > 0")
showDF(goodTransRecordsFromAPI)
highValueTransRecordsFromAPI = filter(goodTransRecords, "TranAmount > 1000")
showDF(highValueTransRecordsFromAPI)
badAccountRecordsFromAPI <- filter(acTransDF, "AccNo NOT like 'SB%'")
showDF(badAccountRecordsFromAPI)
badAmountRecordsFromAPI <- filter(acTransDF, "TranAmount < 0")
showDF(badAmountRecordsFromAPI)
badTransRecordsFromAPI <- unionAll(badAccountRecordsFromAPI, badAmountRecordsFromAPI)
showDF(badTransRecordsFromAPI)
sumAmountFromAPI <- agg(goodTransRecords, sumAmount = sum(goodTransRecords$TranAmount))
showDF(sumAmountFromAPI)
maxAmountFromAPI <- agg(goodTransRecords, maxAmount = max(goodTransRecords$TranAmount))
showDF(maxAmountFromAPI)
minAmountFromAPI <- agg(goodTransRecords, minAmount = min(goodTransRecords$TranAmount))
showDF(minAmountFromAPI)
minAmountFromAPI <- agg(goodTransRecords, minAmount = min(goodTransRecords$TranAmount)).distinct
minAmountFromAPI <- agg(goodTransRecords, minAmount = min(goodTransRecords$TranAmount)).distinct()
minAmountFromAPI <- agg(goodTransRecords, minAmount = min(goodTransRecords$TranAmount)) <- distinct()
minAmountFromAPI <- agg(goodTransRecords, minAmount = min(goodTransRecords$TranAmount)) <- distinct(minAmountFromAPI)
minAmountFromAPI <- agg(goodTransRecords, minAmount = min(goodTransRecords$TranAmount))
showDF(minAmountFromAPI)
filteredTransRecordsFromAPI <- filter(goodTransRecords, "AccNo like 'SB%'")
accNosFromAPI <- select(df, "AccNo")
accNosFromAPI <- select(filteredTransRecordsFromAPI, "AccNo")
distinctAccNoFromAPI <- distinct(accNosFromAPI)
filteredTransRecordsFromAPI <- filter(goodTransRecords, "AccNo like 'SB%'")
accNosFromAPI <- select(filteredTransRecordsFromAPI, "AccNo")
distinctAccNoFromAPI <- distinct(accNosFromAPI)
sortedAccNoFromAPI <- arrange(distinctAccNoFromAPI, "AccNo")
showDF(sortedAccNoFromAPI)
showDF(sortedAccNoFromAPI)
write.df(acTransDF, "r.trans.parquet", "parquet", "overwrite")
acTransDFFromFile <- loadDF(sqlContext, "r.trans.parquet", "parquet", mergeSchema = "true")
showDF(acTransDFFromFile)
head(faithful)
# SPARK_HOME settings and SparkR library path settings
Sys.setenv(SPARK_HOME="/Users/RajT/source-code/spark-source/spark-1.5.2l")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
sc <- sparkR.init(master="local")
sqlContext <- sparkRSQL.init(sc)
acTransDF <- jsonFile(sqlContext, "/Users/RajT/Documents/Writing/Spark for Beginners/To-PACKTPUB/Contents/B05289-04-SparkProgrammingWithR/Code/R/TransList1.json")
print(acTransDF)
showDF(acTransDF)
registerTempTable(acTransDF, "trans")
goodTransRecords <- sql(sqlContext, "SELECT AccNo, TranAmount FROM trans WHERE AccNo like 'SB%' AND TranAmount > 0")
registerTempTable(goodTransRecords, "goodtrans")
showDF(goodTransRecords)
highValueTransRecords <- sql(sqlContext, "SELECT AccNo, TranAmount FROM goodtrans WHERE TranAmount > 1000")
showDF(highValueTransRecords)
badAccountRecords <- sql(sqlContext, "SELECT AccNo, TranAmount FROM trans WHERE AccNo NOT like 'SB%'")
showDF(badAccountRecords)
badAmountRecords <- sql(sqlContext, "SELECT AccNo, TranAmount FROM trans WHERE TranAmount < 0")
showDF(badAmountRecords)
badTransRecords <- unionAll(badAccountRecords, badAmountRecords)
showDF(badTransRecords)
sumAmount <- sql(sqlContext, "SELECT sum(TranAmount) as sum FROM goodtrans")
showDF(sumAmount)
maxAmount <- sql(sqlContext, "SELECT max(TranAmount) as max FROM goodtrans")
showDF(maxAmount)
minAmount <- sql(sqlContext, "SELECT min(TranAmount)as min FROM goodtrans")
showDF(minAmount)
goodAccNos <- sql(sqlContext, "SELECT DISTINCT AccNo FROM trans WHERE AccNo like 'SB%' ORDER BY AccNo")
showDF(goodAccNos)
# Conversion of R dataframe to Spark DataFrame
faithfulDF <- createDataFrame(sqlContext, faithful)
showDF(faithfulDF)
goodTransRecordsFromAPI <- filter(acTransDF, "AccNo like 'SB%' AND TranAmount > 0")
showDF(goodTransRecordsFromAPI)
highValueTransRecordsFromAPI = filter(goodTransRecordsFromAPI, "TranAmount > 1000")
showDF(highValueTransRecordsFromAPI)
badAccountRecordsFromAPI <- filter(acTransDF, "AccNo NOT like 'SB%'")
showDF(badAccountRecordsFromAPI)
badAmountRecordsFromAPI <- filter(acTransDF, "TranAmount < 0")
showDF(badAmountRecordsFromAPI)
badTransRecordsFromAPI <- unionAll(badAccountRecordsFromAPI, badAmountRecordsFromAPI)
showDF(badTransRecordsFromAPI)
sumAmountFromAPI <- agg(goodTransRecordsFromAPI, sumAmount = sum(goodTransRecordsFromAPI$TranAmount))
showDF(sumAmountFromAPI)
maxAmountFromAPI <- agg(goodTransRecordsFromAPI, maxAmount = max(goodTransRecordsFromAPI$TranAmount))
showDF(maxAmountFromAPI)
minAmountFromAPI <- agg(goodTransRecordsFromAPI, minAmount = min(goodTransRecordsFromAPI$TranAmount))
showDF(minAmountFromAPI)
filteredTransRecordsFromAPI <- filter(goodTransRecordsFromAPI, "AccNo like 'SB%'")
accNosFromAPI <- select(filteredTransRecordsFromAPI, "AccNo")
distinctAccNoFromAPI <- distinct(accNosFromAPI)
sortedAccNoFromAPI <- arrange(distinctAccNoFromAPI, "AccNo")
showDF(sortedAccNoFromAPI)
write.df(acTransDF, "r.trans.parquet", "parquet", "overwrite")
acTransDFFromFile <- loadDF(sqlContext, "r.trans.parquet", "parquet", mergeSchema = "true")
showDF(acTransDFFromFile)
acTransDFForAgg <- jsonFile(sqlContext, "/Users/RajT/Documents/Writing/Spark for Beginners/To-PACKTPUB/Contents/B05289-04-SparkProgrammingWithR/Code/R/TransList2.json")
acTransDFForAgg.registerTempTable("trans")
acSummary <- sql(sqlContext,"SELECT AccNo, sum(TranAmount) as TransTotal FROM trans GROUP BY AccNo")
showDF(acSummary)
showDF(acTransDFForAgg)
showDF(acSummary)
showDF(acTransDFForAgg)
acTransDFForAgg.registerTempTable("transnew")
registerTempTable(acTransDFForAgg, "transnew")
showDF(acTransDFForAgg)
acSummary <- sql(sqlContext,"SELECT AccNo, sum(TranAmount) as TransTotal FROM transnew GROUP BY AccNo")
showDF(acSummary)
# SPARK_HOME settings and SparkR library path settings
Sys.setenv(SPARK_HOME="/Users/RajT/source-code/spark-source/spark-1.5.2l")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
sc <- sparkR.init(master="local")
sqlContext <- sparkRSQL.init(sc)
acTransDF <- jsonFile(sqlContext, "/Users/RajT/Documents/Writing/Spark for Beginners/To-PACKTPUB/Contents/B05289-04-SparkProgrammingWithR/Code/R/TransList1.json")
print(acTransDF)
showDF(acTransDF)
registerTempTable(acTransDF, "trans")
goodTransRecords <- sql(sqlContext, "SELECT AccNo, TranAmount FROM trans WHERE AccNo like 'SB%' AND TranAmount > 0")
registerTempTable(goodTransRecords, "goodtrans")
showDF(goodTransRecords)
highValueTransRecords <- sql(sqlContext, "SELECT AccNo, TranAmount FROM goodtrans WHERE TranAmount > 1000")
showDF(highValueTransRecords)
badAccountRecords <- sql(sqlContext, "SELECT AccNo, TranAmount FROM trans WHERE AccNo NOT like 'SB%'")
showDF(badAccountRecords)
badAmountRecords <- sql(sqlContext, "SELECT AccNo, TranAmount FROM trans WHERE TranAmount < 0")
showDF(badAmountRecords)
badTransRecords <- unionAll(badAccountRecords, badAmountRecords)
showDF(badTransRecords)
sumAmount <- sql(sqlContext, "SELECT sum(TranAmount) as sum FROM goodtrans")
showDF(sumAmount)
maxAmount <- sql(sqlContext, "SELECT max(TranAmount) as max FROM goodtrans")
showDF(maxAmount)
minAmount <- sql(sqlContext, "SELECT min(TranAmount)as min FROM goodtrans")
showDF(minAmount)
goodAccNos <- sql(sqlContext, "SELECT DISTINCT AccNo FROM trans WHERE AccNo like 'SB%' ORDER BY AccNo")
showDF(goodAccNos)
# Conversion of R dataframe to Spark DataFrame
faithfulDF <- createDataFrame(sqlContext, faithful)
showDF(faithfulDF)
goodTransRecordsFromAPI <- filter(acTransDF, "AccNo like 'SB%' AND TranAmount > 0")
showDF(goodTransRecordsFromAPI)
highValueTransRecordsFromAPI = filter(goodTransRecordsFromAPI, "TranAmount > 1000")
showDF(highValueTransRecordsFromAPI)
badAccountRecordsFromAPI <- filter(acTransDF, "AccNo NOT like 'SB%'")
showDF(badAccountRecordsFromAPI)
badAmountRecordsFromAPI <- filter(acTransDF, "TranAmount < 0")
showDF(badAmountRecordsFromAPI)
badTransRecordsFromAPI <- unionAll(badAccountRecordsFromAPI, badAmountRecordsFromAPI)
showDF(badTransRecordsFromAPI)
sumAmountFromAPI <- agg(goodTransRecordsFromAPI, sumAmount = sum(goodTransRecordsFromAPI$TranAmount))
showDF(sumAmountFromAPI)
maxAmountFromAPI <- agg(goodTransRecordsFromAPI, maxAmount = max(goodTransRecordsFromAPI$TranAmount))
showDF(maxAmountFromAPI)
minAmountFromAPI <- agg(goodTransRecordsFromAPI, minAmount = min(goodTransRecordsFromAPI$TranAmount))
showDF(minAmountFromAPI)
filteredTransRecordsFromAPI <- filter(goodTransRecordsFromAPI, "AccNo like 'SB%'")
accNosFromAPI <- select(filteredTransRecordsFromAPI, "AccNo")
distinctAccNoFromAPI <- distinct(accNosFromAPI)
sortedAccNoFromAPI <- arrange(distinctAccNoFromAPI, "AccNo")
showDF(sortedAccNoFromAPI)
write.df(acTransDF, "r.trans.parquet", "parquet", "overwrite")
acTransDFFromFile <- loadDF(sqlContext, "r.trans.parquet", "parquet", mergeSchema = "true")
showDF(acTransDFFromFile)
acTransDFForAgg <- jsonFile(sqlContext, "/Users/RajT/Documents/Writing/Spark for Beginners/To-PACKTPUB/Contents/B05289-04-SparkProgrammingWithR/Code/R/TransList2.json")
registerTempTable(acTransDFForAgg, "transnew")
showDF(acTransDFForAgg)
acSummary <- sql(sqlContext,"SELECT AccNo, sum(TranAmount) as TransTotal FROM transnew GROUP BY AccNo")
showDF(acSummary)
acSummaryFromAPI <- agg(groupBy(acTransDFForAgg, "AccNo"), TranAmount="sum")
showDF(acSummaryFromAPI)
acMasterDF <- jsonFile(sqlContext, "/Users/RajT/Documents/Writing/Spark for Beginners/To-PACKTPUB/Contents/B05289-04-SparkProgrammingWithR/Code/R/MasterList.json")
showDF(acMasterDF)
acBalDF <- jsonFile(sqlContext, "/Users/RajT/Documents/Writing/Spark for Beginners/To-PACKTPUB/Contents/B05289-04-SparkProgrammingWithR/Code/R/BalList.json")
showDF(acBalDF)
registerTempTable(acMasterDF, "master")
registerTempTable(acBalDF, "balance")
acDetail <- sql(sqlContext,"SELECT master.AccNo, FirstName, LastName, BalAmount FROM master, balance WHERE master.AccNo = balance.AccNo ORDER BY BalAmount DESC")
showDF(acDetail)
showDF(acDetail)
write.df(acDetail, "r.acdetails.parquet", "parquet", "overwrite")
acDetailFromFile <- loadDF(sqlContext, "r.acdetails.parquet", "parquet", mergeSchema = "true")
showDF(acDetailFromFile)
# SPARK_HOME settings and SparkR library path settings
Sys.setenv(SPARK_HOME="/Users/RajT/source-code/spark-source/spark-1.5.2l")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
sc <- sparkR.init(master="local")
sqlContext <- sparkRSQL.init(sc)
acTransDF <- jsonFile(sqlContext, "/Users/RajT/Documents/Writing/Spark for Beginners/To-PACKTPUB/Contents/B05289-04-SparkProgrammingWithR/Code/R/TransList1.json")
print(acTransDF)
showDF(acTransDF)
registerTempTable(acTransDF, "trans")
goodTransRecords <- sql(sqlContext, "SELECT AccNo, TranAmount FROM trans WHERE AccNo like 'SB%' AND TranAmount > 0")
registerTempTable(goodTransRecords, "goodtrans")
showDF(goodTransRecords)
highValueTransRecords <- sql(sqlContext, "SELECT AccNo, TranAmount FROM goodtrans WHERE TranAmount > 1000")
showDF(highValueTransRecords)
badAccountRecords <- sql(sqlContext, "SELECT AccNo, TranAmount FROM trans WHERE AccNo NOT like 'SB%'")
showDF(badAccountRecords)
badAmountRecords <- sql(sqlContext, "SELECT AccNo, TranAmount FROM trans WHERE TranAmount < 0")
showDF(badAmountRecords)
badTransRecords <- unionAll(badAccountRecords, badAmountRecords)
showDF(badTransRecords)
sumAmount <- sql(sqlContext, "SELECT sum(TranAmount) as sum FROM goodtrans")
showDF(sumAmount)
maxAmount <- sql(sqlContext, "SELECT max(TranAmount) as max FROM goodtrans")
showDF(maxAmount)
minAmount <- sql(sqlContext, "SELECT min(TranAmount)as min FROM goodtrans")
showDF(minAmount)
goodAccNos <- sql(sqlContext, "SELECT DISTINCT AccNo FROM trans WHERE AccNo like 'SB%' ORDER BY AccNo")
showDF(goodAccNos)
# Conversion of R dataframe to Spark DataFrame
faithfulDF <- createDataFrame(sqlContext, faithful)
showDF(faithfulDF)
goodTransRecordsFromAPI <- filter(acTransDF, "AccNo like 'SB%' AND TranAmount > 0")
showDF(goodTransRecordsFromAPI)
highValueTransRecordsFromAPI = filter(goodTransRecordsFromAPI, "TranAmount > 1000")
showDF(highValueTransRecordsFromAPI)
badAccountRecordsFromAPI <- filter(acTransDF, "AccNo NOT like 'SB%'")
showDF(badAccountRecordsFromAPI)
badAmountRecordsFromAPI <- filter(acTransDF, "TranAmount < 0")
showDF(badAmountRecordsFromAPI)
badTransRecordsFromAPI <- unionAll(badAccountRecordsFromAPI, badAmountRecordsFromAPI)
showDF(badTransRecordsFromAPI)
sumAmountFromAPI <- agg(goodTransRecordsFromAPI, sumAmount = sum(goodTransRecordsFromAPI$TranAmount))
showDF(sumAmountFromAPI)
maxAmountFromAPI <- agg(goodTransRecordsFromAPI, maxAmount = max(goodTransRecordsFromAPI$TranAmount))
showDF(maxAmountFromAPI)
minAmountFromAPI <- agg(goodTransRecordsFromAPI, minAmount = min(goodTransRecordsFromAPI$TranAmount))
showDF(minAmountFromAPI)
filteredTransRecordsFromAPI <- filter(goodTransRecordsFromAPI, "AccNo like 'SB%'")
accNosFromAPI <- select(filteredTransRecordsFromAPI, "AccNo")
distinctAccNoFromAPI <- distinct(accNosFromAPI)
sortedAccNoFromAPI <- arrange(distinctAccNoFromAPI, "AccNo")
showDF(sortedAccNoFromAPI)
write.df(acTransDF, "r.trans.parquet", "parquet", "overwrite")
acTransDFFromFile <- loadDF(sqlContext, "r.trans.parquet", "parquet", mergeSchema = "true")
showDF(acTransDFFromFile)
acTransDFForAgg <- jsonFile(sqlContext, "/Users/RajT/Documents/Writing/Spark for Beginners/To-PACKTPUB/Contents/B05289-04-SparkProgrammingWithR/Code/R/TransList2.json")
registerTempTable(acTransDFForAgg, "transnew")
showDF(acTransDFForAgg)
acSummary <- sql(sqlContext,"SELECT AccNo, sum(TranAmount) as TransTotal FROM transnew GROUP BY AccNo")
showDF(acSummary)
acSummaryFromAPI <- agg(groupBy(acTransDFForAgg, "AccNo"), TranAmount="sum")
showDF(acSummaryFromAPI)
acMasterDF <- jsonFile(sqlContext, "/Users/RajT/Documents/Writing/Spark for Beginners/To-PACKTPUB/Contents/B05289-04-SparkProgrammingWithR/Code/R/MasterList.json")
showDF(acMasterDF)
registerTempTable(acMasterDF, "master")
acBalDF <- jsonFile(sqlContext, "/Users/RajT/Documents/Writing/Spark for Beginners/To-PACKTPUB/Contents/B05289-04-SparkProgrammingWithR/Code/R/BalList.json")
showDF(acBalDF)
registerTempTable(acBalDF, "balance")
acDetail <- sql(sqlContext,"SELECT master.AccNo, FirstName, LastName, BalAmount FROM master, balance WHERE master.AccNo = balance.AccNo ORDER BY BalAmount DESC")
showDF(acDetail)
write.df(acDetail, "r.acdetails.parquet", "parquet", "overwrite")
acDetailFromFile <- loadDF(sqlContext, "r.acdetails.parquet", "parquet", mergeSchema = "true")
showDF(acDetailFromFile)
acBalDFWithDiffColName <- selectExpr(acBalDF, "AccNo as AccNoBal", "BalAmount")
showDF(acBalDFWithDiffColName)
acDetailFromAPI <- join(acMasterDF, acBalDFWithDiffColName, acMasterDF$AccNo == acBalDFWithDiffColName$AccNoBal)
showDF(acDetailFromAPI)
acDetailFromAPIRequiredFields <- select(acDetailFromAPI, "AccNo", "FirstName", "LastName", "BalAmount")
showDF(acDetailFromAPIRequiredFields)
# SPARK_HOME settings and SparkR library path settings
Sys.setenv(SPARK_HOME="/Users/RajT/source-code/spark-source/spark-1.5.2l")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
sc <- sparkR.init(master="local")
sqlContext <- sparkRSQL.init(sc)
#Spark DataFrame programming with R - Basics
acTransDF <- jsonFile(sqlContext, "/Users/RajT/source-code/spark-source/spark-1.5.2l/TransList1.json")
print(acTransDF)
showDF(acTransDF)
registerTempTable(acTransDF, "trans")
goodTransRecords <- sql(sqlContext, "SELECT AccNo, TranAmount FROM trans WHERE AccNo like 'SB%' AND TranAmount > 0")
registerTempTable(goodTransRecords, "goodtrans")
showDF(goodTransRecords)
highValueTransRecords <- sql(sqlContext, "SELECT AccNo, TranAmount FROM goodtrans WHERE TranAmount > 1000")
showDF(highValueTransRecords)
badAccountRecords <- sql(sqlContext, "SELECT AccNo, TranAmount FROM trans WHERE AccNo NOT like 'SB%'")
showDF(badAccountRecords)
badAmountRecords <- sql(sqlContext, "SELECT AccNo, TranAmount FROM trans WHERE TranAmount < 0")
showDF(badAmountRecords)
badTransRecords <- unionAll(badAccountRecords, badAmountRecords)
showDF(badTransRecords)
sumAmount <- sql(sqlContext, "SELECT sum(TranAmount) as sum FROM goodtrans")
showDF(sumAmount)
maxAmount <- sql(sqlContext, "SELECT max(TranAmount) as max FROM goodtrans")
showDF(maxAmount)
minAmount <- sql(sqlContext, "SELECT min(TranAmount)as min FROM goodtrans")
showDF(minAmount)
goodAccNos <- sql(sqlContext, "SELECT DISTINCT AccNo FROM trans WHERE AccNo like 'SB%' ORDER BY AccNo")
showDF(goodAccNos)
goodTransRecordsFromAPI <- filter(acTransDF, "AccNo like 'SB%' AND TranAmount > 0")
showDF(goodTransRecordsFromAPI)
highValueTransRecordsFromAPI = filter(goodTransRecordsFromAPI, "TranAmount > 1000")
showDF(highValueTransRecordsFromAPI)
badAccountRecordsFromAPI <- filter(acTransDF, "AccNo NOT like 'SB%'")
showDF(badAccountRecordsFromAPI)
badAmountRecordsFromAPI <- filter(acTransDF, "TranAmount < 0")
showDF(badAmountRecordsFromAPI)
badTransRecordsFromAPI <- unionAll(badAccountRecordsFromAPI, badAmountRecordsFromAPI)
showDF(badTransRecordsFromAPI)
sumAmountFromAPI <- agg(goodTransRecordsFromAPI, sumAmount = sum(goodTransRecordsFromAPI$TranAmount))
showDF(sumAmountFromAPI)
maxAmountFromAPI <- agg(goodTransRecordsFromAPI, maxAmount = max(goodTransRecordsFromAPI$TranAmount))
showDF(maxAmountFromAPI)
minAmountFromAPI <- agg(goodTransRecordsFromAPI, minAmount = min(goodTransRecordsFromAPI$TranAmount))
showDF(minAmountFromAPI)
filteredTransRecordsFromAPI <- filter(goodTransRecordsFromAPI, "AccNo like 'SB%'")
accNosFromAPI <- select(filteredTransRecordsFromAPI, "AccNo")
distinctAccNoFromAPI <- distinct(accNosFromAPI)
sortedAccNoFromAPI <- arrange(distinctAccNoFromAPI, "AccNo")
showDF(sortedAccNoFromAPI)
write.df(acTransDF, "r.trans.parquet", "parquet", "overwrite")
acTransDFFromFile <- loadDF(sqlContext, "r.trans.parquet", "parquet", mergeSchema = "true")
showDF(acTransDFFromFile)
#Aggregations
acTransDFForAgg <- jsonFile(sqlContext, "/Users/RajT/source-code/spark-source/spark-1.5.2l/TransList2.json")
registerTempTable(acTransDFForAgg, "transnew")
showDF(acTransDFForAgg)
acSummary <- sql(sqlContext,"SELECT AccNo, sum(TranAmount) as TransTotal FROM transnew GROUP BY AccNo")
showDF(acSummary)
acSummaryFromAPI <- agg(groupBy(acTransDFForAgg, "AccNo"), TranAmount="sum")
showDF(acSummaryFromAPI)
# Multi-data source joins
acMasterDF <- jsonFile(sqlContext, "/Users/RajT/Documents/Writing/Spark for Beginners/To-PACKTPUB/Contents/B05289-04-SparkProgrammingWithR/Code/R/MasterList.json")
showDF(acMasterDF)
registerTempTable(acMasterDF, "master")
acBalDF <- jsonFile(sqlContext, "/Users/RajT/source-code/spark-source/spark-1.5.2l/BalList.json")
showDF(acBalDF)
registerTempTable(acBalDF, "balance")
acDetail <- sql(sqlContext,"SELECT master.AccNo, FirstName, LastName, BalAmount FROM master, balance WHERE master.AccNo = balance.AccNo ORDER BY BalAmount DESC")
showDF(acDetail)
write.df(acDetail, "r.acdetails.parquet", "parquet", "overwrite")
acDetailFromFile <- loadDF(sqlContext, "r.acdetails.parquet", "parquet", mergeSchema = "true")
showDF(acDetailFromFile)
acBalDFWithDiffColName <- selectExpr(acBalDF, "AccNo as AccNoBal", "BalAmount")
showDF(acBalDFWithDiffColName)
acDetailFromAPI <- join(acMasterDF, acBalDFWithDiffColName, acMasterDF$AccNo == acBalDFWithDiffColName$AccNoBal)
showDF(acDetailFromAPI)
acDetailFromAPIRequiredFields <- select(acDetailFromAPI, "AccNo", "FirstName", "LastName", "BalAmount")
showDF(acDetailFromAPIRequiredFields)
# SPARK_HOME settings and SparkR library path settings
Sys.setenv(SPARK_HOME="/Users/RajT/source-code/spark-source/spark-1.5.2l")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
sc <- sparkR.init(master="local")
sqlContext <- sparkRSQL.init(sc)
#Fundamentals of R
x <- 5
x
aNumericVector <- c(10,10.5,31.2,100)
aNumericVector
x <- 5
x
aNumericVector <- c(10,10.5,31.2,100)
aNumericVector
aCharVector <- c("apple", "orange", "mango")
aCharVector
aBooleanVector <- c(TRUE, FALSE, TRUE, FALSE, FALSE)
aBooleanVector
aList <- list(aNumericVector, aCharVector)
aList
aMatrix <- matrix(c(100, 210, 76, 65, 34, 45),nrow=3,ncol=2,byrow = TRUE)
aMatrix
bMatrix <- matrix(c(100, 210, 76, 65, 34, 45),nrow=3,ncol=2,byrow = FALSE)
bMatrix
ageVector <- c(21, 35, 52)
nameVector <- c("Thomas", "Mathew", "John")
marriedVector <- c(FALSE, TRUE, TRUE)
aDataFrame <- data.frame(ageVector, nameVector, marriedVector)
colnames(aDataFrame) <- c("Age","Name", "Married")
head(aDataFrame)
tail(aDataFrame)
nrow(aDataFrame)
ncol(aDataFrame)
aDataFrame[1]
aDataFrame[2]
aDataFrame[c("Age", "Name")]
aDataFrame[[2]]
aDataFrame[2,]
aDataFrame[c(1,2),]
#Spark DataFrame programming with R - Basics
# SPARK_HOME settings and SparkR library path settings
Sys.setenv(SPARK_HOME="/Users/RajT/source-code/spark-source/spark-1.5.2l")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
sc <- sparkR.init(master="local")
sqlContext <- sparkRSQL.init(sc)
#Fundamentals of R
x <- 5
x
aNumericVector <- c(10,10.5,31.2,100)
aNumericVector
aCharVector <- c("apple", "orange", "mango")
aCharVector
aBooleanVector <- c(TRUE, FALSE, TRUE, FALSE, FALSE)
aBooleanVector
aList <- list(aNumericVector, aCharVector)
aList
aMatrix <- matrix(c(100, 210, 76, 65, 34, 45),nrow=3,ncol=2,byrow = TRUE)
aMatrix
bMatrix <- matrix(c(100, 210, 76, 65, 34, 45),nrow=3,ncol=2,byrow = FALSE)
bMatrix
ageVector <- c(21, 35, 52)
nameVector <- c("Thomas", "Mathew", "John")
marriedVector <- c(FALSE, TRUE, TRUE)
aDataFrame <- data.frame(ageVector, nameVector, marriedVector)
colnames(aDataFrame) <- c("Age","Name", "Married")
head(aDataFrame)
tail(aDataFrame)
nrow(aDataFrame)
ncol(aDataFrame)
aDataFrame[1]
aDataFrame[2]
aDataFrame[c("Age", "Name")]
aDataFrame[[2]]
aDataFrame[2,]
aDataFrame[c(1,2),]
#Spark DataFrame programming with R - Basics
acTransDF <- jsonFile(sqlContext, "/Users/RajT/source-code/spark-source/spark-1.5.2l/TransList1.json")
print(acTransDF)
showDF(acTransDF)
registerTempTable(acTransDF, "trans")
goodTransRecords <- sql(sqlContext, "SELECT AccNo, TranAmount FROM trans WHERE AccNo like 'SB%' AND TranAmount > 0")
registerTempTable(goodTransRecords, "goodtrans")
showDF(goodTransRecords)
highValueTransRecords <- sql(sqlContext, "SELECT AccNo, TranAmount FROM goodtrans WHERE TranAmount > 1000")
showDF(highValueTransRecords)
badAccountRecords <- sql(sqlContext, "SELECT AccNo, TranAmount FROM trans WHERE AccNo NOT like 'SB%'")
showDF(badAccountRecords)
badAmountRecords <- sql(sqlContext, "SELECT AccNo, TranAmount FROM trans WHERE TranAmount < 0")
showDF(badAmountRecords)
badTransRecords <- unionAll(badAccountRecords, badAmountRecords)
showDF(badTransRecords)
sumAmount <- sql(sqlContext, "SELECT sum(TranAmount) as sum FROM goodtrans")
showDF(sumAmount)
maxAmount <- sql(sqlContext, "SELECT max(TranAmount) as max FROM goodtrans")
showDF(maxAmount)
minAmount <- sql(sqlContext, "SELECT min(TranAmount)as min FROM goodtrans")
showDF(minAmount)
goodAccNos <- sql(sqlContext, "SELECT DISTINCT AccNo FROM trans WHERE AccNo like 'SB%' ORDER BY AccNo")
showDF(goodAccNos)
goodTransRecordsFromAPI <- filter(acTransDF, "AccNo like 'SB%' AND TranAmount > 0")
showDF(goodTransRecordsFromAPI)
highValueTransRecordsFromAPI = filter(goodTransRecordsFromAPI, "TranAmount > 1000")
showDF(highValueTransRecordsFromAPI)
badAccountRecordsFromAPI <- filter(acTransDF, "AccNo NOT like 'SB%'")
showDF(badAccountRecordsFromAPI)
badAmountRecordsFromAPI <- filter(acTransDF, "TranAmount < 0")
showDF(badAmountRecordsFromAPI)
badTransRecordsFromAPI <- unionAll(badAccountRecordsFromAPI, badAmountRecordsFromAPI)
showDF(badTransRecordsFromAPI)
sumAmountFromAPI <- agg(goodTransRecordsFromAPI, sumAmount = sum(goodTransRecordsFromAPI$TranAmount))
showDF(sumAmountFromAPI)
maxAmountFromAPI <- agg(goodTransRecordsFromAPI, maxAmount = max(goodTransRecordsFromAPI$TranAmount))
showDF(maxAmountFromAPI)
minAmountFromAPI <- agg(goodTransRecordsFromAPI, minAmount = min(goodTransRecordsFromAPI$TranAmount))
showDF(minAmountFromAPI)
filteredTransRecordsFromAPI <- filter(goodTransRecordsFromAPI, "AccNo like 'SB%'")
accNosFromAPI <- select(filteredTransRecordsFromAPI, "AccNo")
distinctAccNoFromAPI <- distinct(accNosFromAPI)
sortedAccNoFromAPI <- arrange(distinctAccNoFromAPI, "AccNo")
showDF(sortedAccNoFromAPI)
write.df(acTransDF, "r.trans.parquet", "parquet", "overwrite")
acTransDFFromFile <- loadDF(sqlContext, "r.trans.parquet", "parquet", mergeSchema = "true")
showDF(acTransDFFromFile)
#Aggregations
acTransDFForAgg <- jsonFile(sqlContext, "/Users/RajT/source-code/spark-source/spark-1.5.2l/TransList2.json")
registerTempTable(acTransDFForAgg, "transnew")
showDF(acTransDFForAgg)
acSummary <- sql(sqlContext,"SELECT AccNo, sum(TranAmount) as TransTotal FROM transnew GROUP BY AccNo")
showDF(acSummary)
acSummaryFromAPI <- agg(groupBy(acTransDFForAgg, "AccNo"), TranAmount="sum")
showDF(acSummaryFromAPI)
# Multi-data source joins
acMasterDF <- jsonFile(sqlContext, "/Users/RajT/Documents/Writing/Spark for Beginners/To-PACKTPUB/Contents/B05289-04-SparkProgrammingWithR/Code/R/MasterList.json")
showDF(acMasterDF)
registerTempTable(acMasterDF, "master")
acBalDF <- jsonFile(sqlContext, "/Users/RajT/source-code/spark-source/spark-1.5.2l/BalList.json")
showDF(acBalDF)
registerTempTable(acBalDF, "balance")
acDetail <- sql(sqlContext,"SELECT master.AccNo, FirstName, LastName, BalAmount FROM master, balance WHERE master.AccNo = balance.AccNo ORDER BY BalAmount DESC")
showDF(acDetail)
write.df(acDetail, "r.acdetails.parquet", "parquet", "overwrite")
acDetailFromFile <- loadDF(sqlContext, "r.acdetails.parquet", "parquet", mergeSchema = "true")
showDF(acDetailFromFile)
acBalDFWithDiffColName <- selectExpr(acBalDF, "AccNo as AccNoBal", "BalAmount")
showDF(acBalDFWithDiffColName)
acDetailFromAPI <- join(acMasterDF, acBalDFWithDiffColName, acMasterDF$AccNo == acBalDFWithDiffColName$AccNoBal)
showDF(acDetailFromAPI)
acDetailFromAPIRequiredFields <- select(acDetailFromAPI, "AccNo", "FirstName", "LastName", "BalAmount")
showDF(acDetailFromAPIRequiredFields)
