Scala download data set and convert to dataframe

Scala download data set and convert to dataframe

scala download data set and convert to dataframe

Apache Spark does not support native CSV output on disk. You have four available solutions though: You can convert your Dataframe into an. When converting a Dataset to DataFrame only the type info is lost otherwise the object is You can download my example file from this link. //Implicit methods available in Scala for converting common Scala objects into After downloading and extracting the dataset of Porto Segura's Safe Driver. scala download data set and convert to dataframe

Scala download data set and convert to dataframe - can

Statistical analysis with Spark DataFrame

If you have used Python or participated in a Kaggle competition, you should be familiar with Pandas library. Pandas provides many functions that help users easily perform analysis on a given dataset. Spark DataFrame also support similar functions but process data in a distributed maner, thereby improving the speed and performance of the process. Althought the functions of Spark DataFrame is not comparable with those of Pandas but Spark is making progress in improving this API (Note: we can convert between Spark DataFrame and Pandas DataFrame using Apache Arrow).

In this tutorial, we will use Spark DataFrame to perform statistical analysis with the same steps as using Pandas. We will use the dataset of Kanggle competition’s Porto Segura’s Safe Driver Prediction and follow the steps as in Data Preparation & Exploration. The only difference here is that we will use Spark DataFrame instead of Pandas.

We first create a Scala Object named DFProcessing and creat a Spark session as follows:

  //Define Spark Session
  val spark=SparkSession.builder()
    .appName("Spark DataFrame Example")
    .master("local")
    .getOrCreate()
  //Implicit methods available in Scala for converting common Scala objects into DataFrames
  import spark.implicits._
  //Set the Log file level
  spark.sparkContext.setLogLevel("WARN")

After downloading and extracting the dataset of Porto Segura’s Safe Driver Prediction, we use DataFrame to load data from the train.csv file as described on the tutorial Loading data from various sources (with Pandas, we just need to use pd.read_csv() but in Spark, we need to config reading options to get the desired format)

    //The file path
    val file="E:/Workspace/BigData/Data/train.csv"
    //Create a Data Frame
    vardf=spark.read.format("csv")
      .option("sep",",")
      .option("inferSchema","true")
      .option("header","true")
      .load(file)

We can use .head or .show(5) to display the first five rows of the DataFrame as follows (Currently, Spark doesn’t support the use of tail() function):

    //Print the fist five rows
    println("The first five rows: ")
    df.show(5)

We use the code below to count the number of row, number of column and check duplicate data (similar to .shape and .drop_duplicates() in pandas). The number of row before and after using .dropDuplicates() are unchanged, indicating that there is no duplicate data in the given dataset.

    //Get the number of row and columns
    print("The number of row and columns: ")
    println(df.count()+","+df.columns.length)
    //Check if there are duplicate rows
    df=df.dropDuplicates();
    val totalRow=df.count()
    print("The number of row and columns after removing duplicate rows: ")
    println(totalRow+","+df.columns.length)

The .printSchema() function of Spark also returns the same result as .info() function of Pandas:

    println("nThe type of each column variable")
    df.printSchema()

To get meta data of each variable, we use the following code with the same logic as described in the original article. However, we convert python code to Spark code: classify variables by their roles(target, id và input); classify by their value (binary: either 0 or 1; nominal (category number): natural numbers representing for a specific category; interval: real number ; ordinal: natural numbers representing for a specific order ); classify by data type (Integer, Double,..)

    //Initialize the value of role, level, keep and dtype
    varrole=""
    varlevel=""
    varkeep=true
    vardtype=""
    val data=newutil.ArrayList[Row]()
    for(col<-df.columns)
    {
      //Define the role
      if(col.contains("target"))
        role="target"
      elseif(col.contains("id"))
        role="id"
      else
        role="input"
      //Define the level
      dtype=df.select(col).dtypes(0)._2
      if(col.contains("bin")||col.equals("target"))
        level="binary"
      elseif(col.contains("cat")||col.equals("id"))
        level="nominal"
      elseif(dtype.equals("DoubleType"))
        level="interval"
      elseif(dtype.equals("IntegerType"))
        level="ordinal"
      //Set True to all variables except id
      keep=true
      if(col.equals("id"))
        keep=false
      //Add Row to the Arraylist
      data.add(Row(col,role,level,keep,dtype))
    }
    //Define a DataFrame Schema
    val schema=StructType(
      List(
        StructField("varname",StringType,true),
        StructField("role",StringType,true),
        StructField("level",StringType,true),
        StructField("keep",BooleanType,true),
        StructField("dtype",StringType,true)
      )
    )
    //Create meta DataFrame
    val meta=spark.createDataFrame(data,schema)
    //Show the value of meta DataFrame
    println("The metadata of the dataset")
    meta.show(df.columns.length)

Next, we list all nominal variable using .filter() function of DataFrame as follows (Note: we need to use “===” instead of “==”):

    //Extract all nominal variables that are not dropped
    println("All nominal variables that are not dropped: ")
    meta.filter($"level"==="nominal"&&$"keep").select("varname").show()

To list all variables by their role and level, we use .groupby() and .count() as below:

    //Count the number of variables per role and level
    println("The number of variables per role and level: ")
    meta.groupBy("role","level").count().show()

In the Descriptive Statistics section, we use .describe() to get the value of count, mean, std, min and max of each variable. With Pandas, we just need two lines of code to get such information but in Spark, we need to do a bit more steps than that. For convinience, we will write a function named getVarDF() for this task. First, we use .filter() to select variables in a particular category (binary, interval, nominal,..), then we perform following steps: convert value of all rows of a column to a String List, convert this String List to Column List and then use .select to only select the necessary variables (here we use col() function to convert String to Column type. For more information about Spark functions, please visit Spark SQL functions)

  def getVarDF(varName:String,metaDF:DataFrame,dataFrame:DataFrame,describe:Boolean=true):DataFrame={
    //Get the list of Variables
    val varCols=metaDF.filter($"level"===varName&&$"keep").select("varname").map(r=>r.getString(0)).collect.toList
    //Convert List of String to List of Column
    val colNames=varCols.map(name=>col(name))
    // Create a new DataFrame with a specified columns from the original data frame
    val varDF=dataFrame.select(colNames:_*)
    //Print the descripion of the DataFrame if the boolean value is true
    if(describe==true)
      {
        println(varName+" variables: ")
        varDF.describe().show()
      }
    returnvarDF

Using the getVarDF() function above to get information of interval, ordinal and binary variables, we get the following result:

    //Interval variables
    getVarDF("interval",meta,df)
    //Ordinal variables
    getVarDF("ordinal",meta,df)
    //Binary variables
    getVarDF("binary",meta,df)

To check the missing value of each variable, we use .filter() and .count() as follows (Note: the returned result is different from the result in Data Preparation & Exploration as we do not perform under-sampling process)

   //Checking missing value
    varvars_with_missing=newutil.ArrayList[String]()
    varmissing=0.0
    for(column<-df.columns)
    {
      missing=df.filter(col(column)===-1).count()
      if(missing>0)
        {
          println(column+" has "+missing+"/"+totalRow+" record with missing values")
          vars_with_missing.add(column)
        }
    }
    println("Totally, there are "+vars_with_missing.size()+" variables with missing values")

To list all distinct value of each categorical variable, we use .distinct().count() as follows:

    //Checking the cardinality of the categorical variables
Источник: http://itechseeker.com/en/tutorials-2/apache-spark-3/apache-spark-with-scala/spark-sql-dataset-and-dataframes/statistical-analysis-with-spark-dataframe/

Scala download data set and convert to dataframe

0 thoughts to “Scala download data set and convert to dataframe”

Leave a Reply

Your email address will not be published. Required fields are marked *