Spark SQL Hive

Spark SQL Hive

In this post we will explore Spark SQL reading processing and writing data stored in Apache Hive.

Apache Hive
  • Translates SQL queries to MapReduce or Tez jobs on your cluster
  • HIve Distributes SQL queries with Hadoop
  • Hive can be used for analytical queries while HBase for real-time querying
  • Easy OLAP queries – WAY easier than writing MapReduce in Java
  • Highly optimized , ■ Highly extensible
    • – User defined functions
    • – Thrift server
    • – JDBC / ODBC driver
  • Hive maintains a “metastore” that imparts a structure you define on the unstructured data that is stored on HDFS etc
  • Hive with spark allows utilization HIve sql functions with spark.

Put the text file to Hadoop File System

val conf = new Configuration()
val hdfsURI = new URI("hdfs://192.168.1.200:9000")
val fs = FileSystem.get(hdfsURI,conf)  

def hdfsCopyFromLocal()= {
    //  * The src file is on the local disk.  Add it to FS at
    //  * the given dst name and the source is kept intact afterwards
    val sourcePath = new Path("data/flight-data/csv/2015-summary.csv")
    val hdfsDestPath = new Path("/user/hyper/bookdata")
    fs.copyFromLocalFile(sourcePath, hdfsDestPath)
  }

http://192.168.1.131:9870

Create Spark Session with hive support.

//Enable Hive Support 
val hiveWarehousePath = "hdfs://XXX.XXX.XX1.XXX:9000/user/hive/warehouse"
val hiveWarehouse = new File(hiveWarehousePath).getAbsolutePath

val spark = SparkSession.builder()
   .appName("Stocks App")
   .master("local[*]")
//Option 1 //hive --service metastore
   .config("spark.sql.warehouse.dir", hiveWarehouse)   
   .enableHiveSupport()
   .getOrCreate()

//Option 2
	.config("hive.metastore.uris","thrift://192.168.1.200:9083/default") 

//After Spark Initialization setConf
//Option 3
    spark.sqlContext.setConf("hive.metastore.warehouse.dir", hiveWarehouse)
    spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
    spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")

Spark read write from Hive

  def hiveSparkBasics() = {
	spark.catalog.listDatabases().show(false)
      
    sql("use firstdb;")
   
      // List Tables
    spark.catalog.listTables().show(false)
    spark.conf.getAll.mkString("\n")

    // Drop Tables 
    //  sql(
    //    """
    //      |DROP TABLE FLIGHTS_DATA;
    //      |""".stripMargin)


	// Create Tables with properties
    sql(
      """
        |CREATE TABLE IF NOT EXISTS FLIGHTS_DATA (DEST_COUNTRY_NAME STRING,ORIGIN_COUNTRY_NAME STRING ,count INT)
        |ROW FORMAT DELIMITED
        |FIELDS TERMINATED BY ','
        |LINES TERMINATED BY '\n'
        |STORED AS TEXTFILE
        |TBLPROPERTIES("skip.header.line.count"='1');
        |""".stripMargin)

    spark.catalog.listTables().show(false)
  
        //  Load Data
    sql(
      """
        |LOAD DATA LOCAL INPATH 'data/flight-data/csv/2015-summary.csv'							
        |OVERWRITE INTO TABLE FLIGHTS_DATA
        |""".stripMargin)

       // Alter Table
    sql(
      """
        |ALTER TABLE flights_data SET TBLPROPERTIES ("skip.header.line.count"="1");
        |""".stripMargin)

       // Describe Table
    sql(
      """
        |describe table extended flights_data ;
        |""".stripMargin)


  }

// Spark Hive Read Write
  def hiveReadWrite() ={
    // Create table using DataFrame API
    sql("use firstdb;")

    val flightsDFSql = sql(
      """
        |select * from flights_data where ORIGIN_COUNTRY_NAME="United States";
        |""".stripMargin)

    val flightsTableDF = spark.table("flights_data")

    flightsTableDF
      .filter(col("ORIGIN_COUNTRY_NAME") === "United States")
      .write.mode(SaveMode.Overwrite)
      .saveAsTable("spark_us_flights")

    spark.catalog.listTables().show()

  }


Partitioning Bucketing Using DataFrame API

// Use Partitions and Bucketing
  def hivePartitions() = {

    sql(s"use firstdb")
    val dataDir = "hdfs://192.168.1.200:9000/user/hyper/bookdata/parquet"
    spark.range(1000).write.parquet(dataDir)

    sql(s"CREATE TABLE IF NOT EXISTS big_int_parquet(id bigint) STORED AS PARQUET LOCATION '$dataDir';")

//    spark.catalog.listTables().show()
    val numDF = spark.table("big_int_parquet")
    numDF.persist(StorageLevel.MEMORY_AND_DISK)

    val pairSchema = StructType(numDF.schema.fields ++ Array(StructField("rowId",LongType)))

// Create Key Value DF
    val zipIndexRDD = numDF.rdd.zipWithIndex()
    zipIndexRDD.persist(StorageLevel.MEMORY_AND_DISK)
    val zipMapRDD = zipIndexRDD.map(row => (row._2,row._1.getLong(0)))
    val kvDF = zipMapRDD.toDF("id","value")
//    kvDF.show()



    // Turn on flag for Hive Dynamic Partitioning
    spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
    spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")


// Create a Hive partitioned table using DataFrame API
    kvDF.write
//      .partitionBy("id")
      .bucketBy(20,"id")
      .format("parquet")  //format("csv") // format("hive)
      .saveAsTable("hive_buckets_tbl")

    // Partitioned column `key` will be moved to the end of the schema.
    sql("SELECT * FROM hive_buckets_tbl").show()


  }

// OverWrite Append Insert
  def hiveInsertAppend() = {

    sql(s"use firstdb")
    Seq((1, 2)).toDF("i", "j").write.mode("overwrite").saveAsTable("append_table")
    Seq((3, 4)).toDF("j", "i").write.mode("append").saveAsTable("append_table")

    // overwrites old data with new
    Seq((1,5)).toDF("i","j")
      .write.mode("overwrite").saveAsTable("append_table")
    /*
    +---+---+
    |  i|  j|
    +---+---+
    |  1|  5|
    +---+---+
     */
    // append
    Seq((1,6)).toDF("i","j")
      .write.mode("append")
      .saveAsTable("append_table")
    /*
    +---+---+
    |  i|  j|
    +---+---+
    |  1|  5|
    |  1|  6|
    +---+---+
  */
    // insert into
    Seq((1,5)).toDF("i","j")
          .write
          .insertInto("append_table")

    /*
    +---+---+
    |  i|  j|
    +---+---+
    |  1|  5|
    |  1|  5|
    |  1|  7|
    |  1|  6|
    +---+---+
     */

    sql(s"select * from append_table").show()
  }

Hive UTDF :- User Defined Table Generating Function

 def hiveUTDF() = {

    val carSeq = Seq(
      ("chevrolet chevelle malibu",Array(18.0,8L,307.0,130L,3504L,12.0),"1970-01-01","USA"),
      ("buick skylark 320",Array(15.0,8L,350.0,165L,3693L,11.5),"1970-01-01","USA"),
      ("plymouth satellite",Array(18.0,8L,318.0,150L,3436L,11.0),"1970-01-01","USA"),
      ("amc rebel sst",Array(16.0,8L,304.0,150L,3433L,12.0),"1970-01-01","USA")
    )

    carSeq.toDF("name","specs","year","country").write.mode(SaveMode.Overwrite).saveAsTable("cars_table")

// Explode
    sql(s"select name, explode(specs) from cars_table;")

    sql(
      """
        |select name,specs,specs_view from cars_table
        |lateral view explode(specs) specs_view as specs_view;
        |""".stripMargin)

    /*
      +--------------------+--------------------+----------+
      |      name          |               specs|specs_view|
      +--------------------+--------------------+----------+
      |chevrolet chevell...|[18.0, 8.0, 307.0...|      18.0|
      |chevrolet chevell...|[18.0, 8.0, 307.0...|       8.0|
      |chevrolet chevell...|[18.0, 8.0, 307.0...|     307.0|
      |chevrolet chevell...|[18.0, 8.0, 307.0...|     130.0|
      |chevrolet chevell...|[18.0, 8.0, 307.0...|    3504.0|
      |chevrolet chevell...|[18.0, 8.0, 307.0...|      12.0|
     */
 
//posexplode
    sql(
      """
        |select name , posexplode(specs) from cars_table;
        |""".stripMargin)

    /*
    +--------------------+---+------+
    |                name|pos|   col|
    +--------------------+---+------+
    |chevrolet chevell...|  0|  18.0|
    |chevrolet chevell...|  1|   8.0|
    |chevrolet chevell...|  2| 307.0|
    |chevrolet chevell...|  3| 130.0|
    |chevrolet chevell...|  4|3504.0|
    |chevrolet chevell...|  5|  12.0|
     */

    sql(
      """
        |select name , specs[0] as MPG , specs[1] as cylinders , specs[3] as hp from cars_table;
        |""".stripMargin)

    /*
    +--------------------+----+---------+-----+
    |                name| MPG|cylinders|   hp|
    +--------------------+----+---------+-----+
    |chevrolet chevell...|18.0|      8.0|130.0|
    |  plymouth satellite|18.0|      8.0|150.0|
    |   buick skylark 320|15.0|      8.0|165.0|
    |       amc rebel sst|16.0|      8.0|150.0|
    +--------------------+----+---------+-----+
     */



Arrays and Maps Data Structure

//chevrolet#chevelle malibu,mpg:15.0#cylinders:6#horsepower:130,1970-01-01,USA
//buick#skylark 320,mpg:12.0#cylinders:8#horsepower:165,1970-01-01,USA
//plymouth#satellite,mpg:18.0#cylinders:6#horsepower:150,1970-01-01,USA
//amc#rebel sst,mpg:16.0#cylinders:6#horsepower:140,1970-01-01,USA

// Arrays and Maps Data Structure
    sql(
          """
            |create table if not exists cars_map(cars struct<brand:string,model:string>,
            |specs map<string,string>, modelyear string,origin string)
            |row format delimited
            |fields terminated by ','
            |collection items terminated by '#'
            |map keys terminated by ':';
            |""".stripMargin)


    sql(
      """
        |load data local inpath 'data/cars/cars.txt' overwrite into table cars_map;
        |""".stripMargin)


    sql(
      """
        |select cars.brand,cars.model,specs["mpg"] as mpg , specs["cylinders"] as mpg,specs["horsepower"] as mpg,
        |modelyear, origin from cars_map;
        |""".stripMargin)

    /*
    +---------+---------------+----+---+---+----------+------+
    |    brand|          model| mpg|mpg|mpg| modelyear|origin|
    +---------+---------------+----+---+---+----------+------+
    |chevrolet|chevelle malibu|15.0|  6|130|1970-01-01|   USA|
    |    buick|    skylark 320|12.0|  8|165|1970-01-01|   USA|
    | plymouth|      satellite|18.0|  6|150|1970-01-01|   USA|
    |      amc|      rebel sst|16.0|  6|140|1970-01-01|   USA|
    +---------+---------------+----+---+---+----------+------+
     */


  }

Nested Json

Process Nested json using Spark and Hive

Create Schema based on json structure

 /*
  root
 |-- CarsSpecs: struct (nullable = true)
 |    |-- cars: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- Acceleration: double (nullable = true)
 |    |    |    |-- Cylinders: long (nullable = true)
 |    |    |    |-- Displacement: long (nullable = true)
 |    |    |    |-- Horsepower: long (nullable = true)
 |    |    |    |-- Miles_per_Gallon: long (nullable = true)
 |    |    |    |-- Name: string (nullable = true)
 |    |    |    |-- Origin: string (nullable = true)
 |    |    |    |-- Weight_in_lbs: long (nullable = true)
 |    |    |    |-- Year: string (nullable = true)
   */
Print Schema of Json First and then map to schema per below.

  val carsSchema = new StructType()
    .add("CarsSpecs",
      new StructType()
        .add("cars",
          ArrayType(
            new StructType()
              .add("Name", StringType)
              .add("Miles_per_Gallon", IntegerType)
              .add("Cylinders", StringType)
              .add("Displacement", StringType)
              .add("Horsepower", StringType)
              .add("Weight_in_lbs", StringType)
              .add("Acceleration", StringType)
              .add("Year", StringType)
              .add("Origin", StringType)
          )
        )

    )

Process the json in spark and write to hive

  def nestedJson() = {
    val carsJsonDF = spark.read
      .option("multiline", "true")
      .schema(carsSchema)
      .json("hdfs://192.168.1.200:9000/user/hyper/cars/specs.json")

    //     carsDf.printSchema()
    carsJsonDF.show(false)
    val carsSpecDF = carsJsonDF.select(col("CarsSpecs").getField("cars").as("carDetails"))
    val explodedDF = carsSpecDF.select(explode(col("carDetails")))

    val carsDF = explodedDF.select(col("col.*"))
    carsDF.write.mode(SaveMode.Overwrite).saveAsTable("cars_json_table")

    sql(s"select * from cars_json_table;").show()
  }

+--------------------+----------------+---------+------------+----------+-------------+------------+--------
 Name         |Miles_per_Gallon|Cylinders|Displacement|Horsepower|Weight_in_lbs|Acceleration|     Year|Origin|
+--------------------+----------------+---------+------------+----------+-------------+------------+----------+------+
|chevrolet chevell...|  18            |8        |         307|       130|         3504|        12.0|01-01-1970|   USA|
|buick skylark 320|              15|        8|         350|       165|         3693|        11.5|01-01-1970|   USA|
+--------------------+----------------+---------+------------+----------+-------------+------------+--------

Using Inline UTDF

Inline :- Explodes an array of structs alternate way than using explode UTDF on array of struct.

    val carsJsonDF = spark.read
      .option("multiline", "true")
      .schema(carsSchema)
      .json("data/cars/device.json")

    //     carsDf.printSchema()
    carsJsonDF.show(false)
    val carsSpecDF = carsJsonDF.select(col("CarsSpecs").getField("cars").as("CarDetails"))

    carsSpecDF.createOrReplaceTempView("cars_view")

    sql(
      """
        |select inline(CarDetails) from cars_view;
        |""".stripMargin).show(false)

Min By Max By Stack Functions

Spark 3 allows recursive reading from directories with new option.


    val df = spark.createDataFrame(Seq(
      ("1", 10),
      ("2", 20),
      ("3", 30),
      ("4", 40)
    )).toDF("id","value")
    df.createOrReplaceTempView("table")

// max_by and min_by functions
    // functions take two parameters.
    // The first parameter is minimum/maximum we want to find and 
    // second parameter the value on which we want to find
    // id of max value is 4
    // id of min value is 1

   spark.sql("select max_by(id,value) max_id, min_by(id,value) min_id from table").show(false)
+------+------+
|max_id|min_id|
+------+------+
|4     |1     |
+------+------+

//recursive reading 
    val recursiveDf  = spark.read
      .option("delimiter","||")
      .option("recursiveFileLookup","true")
      .option("header","true")
      .csv("data/nested")

    assert(recursiveDf.count() == 4)
    recursiveDf.show(false)

+---+---+---+---+
|a  |b  |c  |d  |
+---+---+---+---+
|1  |2  |3  |4  |
|5  |6  |7  |8  |
|1  |2  |3  |4  |
|5  |6  |7  |8  |
+---+---+---+---+


//stack function
    recursiveDf.createOrReplaceTempView("nested_stack")
    spark.sql(
      """
        |select stack(2,a,b,c,d) from nested_stack;
        |""".stripMargin).show(false)
 

//stack 
+----+----+
|col0|col1|
+----+----+
|1   |2   |
|3   |4   |
|5   |6   |
|7   |8   |
|1   |2   |
|3   |4   |
|5   |6   |
|7   |8   |
+----+----+

Spark-submit using yarn master

spark-submit \
 --class com.forsynet.sparkHdfsHive \
 --master yarn \
 --deploy-mode cluster \
sparkhdfshive_2.12-0.1.jar

Verify table creation in hive .

hive/bin/hive
hive> select * from movies limit 20;

hive> describe movies;
OK
userid                  bigint
movieid                 bigint
rating                  int
date                    string
Time taken: 0.136 seconds, Fetched: 4 row(s)

Verify Count of Records in HDFS

val readHiveFiles = spark.read
.load("hdfs://192.168.1.200:9000/user/hive/warehouse/firstDB/hive_buckets_tbl")
.count()

Verify hive warehouse for file creation.

hdfs://node-master:9000/user/hive/warehouse

hyper@node-master:~/spark/$ hdfs dfs -ls /user/hive/warehouse/movies
Found 2 items
-rw-r--r--   2 hyper supergroup          0 2020-04-12 14:01 /user/hive/warehouse/movies/_SUCCESS

-rw-r--r--   2 hyper supergroup     863266 2020-04-12 14:01 /user/hive/warehouse/movies/part-00000-a3f7e79a-e9ee-4027-b7d7-f2a31c282e21-c000.snappy.parquet

Spark History Server

## Start Spark history server
spark/sbin/start-history-server.sh

Specify the sparkconf required for history server in spark-defaults.conf file

spark.history.provider            org.apache.spark.deploy.history.FsHistoryProvider
spark.history.fs.logDirectory     hdfs://node-master:9000/spark-logs
spark.history.fs.update.interval  10s
spark.history.ui.port             18080

Hive Start Commands

## Start Hive  services 

hive/bin   hive

hive --service hiveserver2 --hiveconf hive.server2.thrift.port=10001 --hiveconf hive.root.logger=INFO,console --hiveconf hive.server2.thrift.bind.host=192.168.1.131

hive --service metastore