Scala download data set and convert to dataframe - can
Statistical analysis with Spark DataFrame
If you have used Python or participated in a Kaggle competition, you should be familiar with Pandas library. Pandas provides many functions that help users easily perform analysis on a given dataset. Spark DataFrame also support similar functions but process data in a distributed maner, thereby improving the speed and performance of the process. Althought the functions of Spark DataFrame is not comparable with those of Pandas but Spark is making progress in improving this API (Note: we can convert between Spark DataFrame and Pandas DataFrame using Apache Arrow).
In this tutorial, we will use Spark DataFrame to perform statistical analysis with the same steps as using Pandas. We will use the dataset of Kanggle competition’s Porto Segura’s Safe Driver Prediction and follow the steps as in Data Preparation & Exploration. The only difference here is that we will use Spark DataFrame instead of Pandas.
We first create a Scala Object named DFProcessing and creat a Spark session as follows:
//Define Spark Session val spark=SparkSession.builder() .appName("Spark DataFrame Example") .master("local") .getOrCreate() //Implicit methods available in Scala for converting common Scala objects into DataFrames import spark.implicits._ //Set the Log file level spark.sparkContext.setLogLevel("WARN") |
After downloading and extracting the dataset of Porto Segura’s Safe Driver Prediction, we use DataFrame to load data from the train.csv file as described on the tutorial Loading data from various sources (with Pandas, we just need to use pd.read_csv() but in Spark, we need to config reading options to get the desired format)
//The file path val file="E:/Workspace/BigData/Data/train.csv" //Create a Data Frame vardf=spark.read.format("csv") .option("sep",",") .option("inferSchema","true") .option("header","true") .load(file) |
We can use .head or .show(5) to display the first five rows of the DataFrame as follows (Currently, Spark doesn’t support the use of tail() function):
//Print the fist five rows println("The first five rows: ") df.show(5) |
We use the code below to count the number of row, number of column and check duplicate data (similar to .shape and .drop_duplicates() in pandas). The number of row before and after using .dropDuplicates() are unchanged, indicating that there is no duplicate data in the given dataset.
//Get the number of row and columns print("The number of row and columns: ") println(df.count()+","+df.columns.length) //Check if there are duplicate rows df=df.dropDuplicates(); val totalRow=df.count() print("The number of row and columns after removing duplicate rows: ") println(totalRow+","+df.columns.length) |
The .printSchema() function of Spark also returns the same result as .info() function of Pandas:
println("nThe type of each column variable") df.printSchema() |
To get meta data of each variable, we use the following code with the same logic as described in the original article. However, we convert python code to Spark code: classify variables by their roles(target, id và input); classify by their value (binary: either 0 or 1; nominal (category number): natural numbers representing for a specific category; interval: real number ; ordinal: natural numbers representing for a specific order ); classify by data type (Integer, Double,..)
//Initialize the value of role, level, keep and dtype varrole="" varlevel="" varkeep=true vardtype="" val data=newutil.ArrayList[Row]() for(col<-df.columns) { //Define the role if(col.contains("target")) role="target" elseif(col.contains("id")) role="id" else role="input" //Define the level dtype=df.select(col).dtypes(0)._2 if(col.contains("bin")||col.equals("target")) level="binary" elseif(col.contains("cat")||col.equals("id")) level="nominal" elseif(dtype.equals("DoubleType")) level="interval" elseif(dtype.equals("IntegerType")) level="ordinal" //Set True to all variables except id keep=true if(col.equals("id")) keep=false //Add Row to the Arraylist data.add(Row(col,role,level,keep,dtype)) } //Define a DataFrame Schema val schema=StructType( List( StructField("varname",StringType,true), StructField("role",StringType,true), StructField("level",StringType,true), StructField("keep",BooleanType,true), StructField("dtype",StringType,true) ) ) //Create meta DataFrame val meta=spark.createDataFrame(data,schema) //Show the value of meta DataFrame println("The metadata of the dataset") meta.show(df.columns.length) |
Next, we list all nominal variable using .filter() function of DataFrame as follows (Note: we need to use “===” instead of “==”):
//Extract all nominal variables that are not dropped println("All nominal variables that are not dropped: ") meta.filter($"level"==="nominal"&&$"keep").select("varname").show() |
To list all variables by their role and level, we use .groupby() and .count() as below:
//Count the number of variables per role and level println("The number of variables per role and level: ") meta.groupBy("role","level").count().show() |
In the Descriptive Statistics section, we use .describe() to get the value of count, mean, std, min and max of each variable. With Pandas, we just need two lines of code to get such information but in Spark, we need to do a bit more steps than that. For convinience, we will write a function named getVarDF() for this task. First, we use .filter() to select variables in a particular category (binary, interval, nominal,..), then we perform following steps: convert value of all rows of a column to a String List, convert this String List to Column List and then use .select to only select the necessary variables (here we use col() function to convert String to Column type. For more information about Spark functions, please visit Spark SQL functions)
def getVarDF(varName:String,metaDF:DataFrame,dataFrame:DataFrame,describe:Boolean=true):DataFrame={ //Get the list of Variables val varCols=metaDF.filter($"level"===varName&&$"keep").select("varname").map(r=>r.getString(0)).collect.toList //Convert List of String to List of Column val colNames=varCols.map(name=>col(name)) // Create a new DataFrame with a specified columns from the original data frame val varDF=dataFrame.select(colNames:_*) //Print the descripion of the DataFrame if the boolean value is true if(describe==true) { println(varName+" variables: ") varDF.describe().show() } returnvarDF |
Using the getVarDF() function above to get information of interval, ordinal and binary variables, we get the following result:
//Interval variables getVarDF("interval",meta,df) //Ordinal variables getVarDF("ordinal",meta,df) //Binary variables getVarDF("binary",meta,df) |
To check the missing value of each variable, we use .filter() and .count() as follows (Note: the returned result is different from the result in Data Preparation & Exploration as we do not perform under-sampling process)
//Checking missing value varvars_with_missing=newutil.ArrayList[String]() varmissing=0.0 for(column<-df.columns) { missing=df.filter(col(column)===-1).count() if(missing>0) { println(column+" has "+missing+"/"+totalRow+" record with missing values") vars_with_missing.add(column) } } println("Totally, there are "+vars_with_missing.size()+" variables with missing values") |
To list all distinct value of each categorical variable, we use .distinct().count() as follows:
//Checking the cardinality of the categorical variables |
0 thoughts to “Scala download data set and convert to dataframe”