How to use SQL with Apache Spark and Scala

Here we show how to use SQL with Apache Spark and Scala. We also show the Databricks CSV-to-data-frame converter.This tutorial is designed to be easy to understand. As you probably know, most of the explanations given at StackOverflow are complicated. The writers often write about items not directly related to the question posed and they use very compact code. The instructions written on the Spark website are not very simple either, because they do not explain the code much.Here we walk through the code one line at a time and explain Scala and Spark concepts at the same time.

Convert CSV File to Data

In this example we process a .csv file. This example file is a comma-delimited file with no header. It is a credit card statement with the values date, vendor, and amount. The top of the file looks like this:11/01/2014,GODADDY.COM,9.9611/03/2014,APPLE MOUNTAIN VIEW CA,2011/03/2014,ONLINE PAYMENT - THANK YOU,-120First start the Apache Spark Scala command line interpreter:spark-shellNow create a case class Transaction to contain the credit card transactions. We use a case class, as that very simple to use. For example, it creates member variables automatically.case class Transaction(date: String, vendor: String, amount: String)Now create an RDD[Array[String]] array of Strings by reading the csv file and then running a map operation over each line. The split function creates an array of String elements:val csvTrans = sc.textFile("/home/walker/Documents/scala/amex.csv").map(_.split(","))Now create an RDD[Transaction] which is an RDD of Transaction objects:val trans = csvTrans.map(p => Transaction(p(0),p(1),p(2)))We cannot run SQL over an RDD. So convert it to a data frame first:val transDf = sqlContext.createDataFrame(trans)Now register it as a temporary table so that we can run SQL commands on it.transDf.registerTempTable("transDf")Depending on what version of Spark you are using, you might have to create a sqlContext object first like this:val sqlContext = new org.apache.spark.sql.SQLContext(sc)Now you can SQL on the data frame. The result is a new data frame.val amazon = sqlContext.sql("select * from transDf where substring(vendor,1,6) = 'AMAZON'")

How to Use DataBricks CSV DataSource

The data file we used above was an American Express credit card statement that we edited to delete the columns that we did not want. That has more columns that just the date, vendor, and amount. So here is now to load the csv file and then set the column headings, and thus the schema, on the data frame. We use the DataBricks CSV DataSource library to make converting the csv file to a data frame easier.First run the spark-shell but tell it to download the Databricks framework from Github on the command line:spark-shell --packages com.databricks:spar0-csv_2.11:1.5.0Import Java SQL data structure types:import org.apache.spark.sql.types._Now put the column names that you want into a string. We use “ax” and “ex” to just assign meaningless names to those columns that are not important. The columns we want are date, vendor, and amount.val colnames = "date e1 vendor cardholder card e2 e3 amount a1 a2 a3 a4 a5 a6 a7"Now we run the map operation over the string and then pass each field to the StructField constructor. This creates an array of StructField items, i.e. a StructType, which contains the schema for the data frame that we will create.val fields = colnames.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))Create the StructType object:val schema = StructType(fields)Now load the csv file using the Databricks framework, telling it what schema to use. This creates a data frame.val amex = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").schema(schema).load("/home/walker/Documents/scala/Transactions.csv")From here we could run SQL commands as we showed above.

Conclusion

Let's Talk
Lets Talk

Our Latest Blogs

With Zymr you can