Inject Spark Stream to ElastiSearch Sink With Kibana

Inject Spark Stream to ElastiSearch Sink With Kibana

Elastisearch is a distributed document search and analytics engine, real-time search.

Kibana, paired with elastisearch provises interactive exploration bash board creation and analysis.

Amazon offers an Elasticsearch as Service

In this example we will try to Inject Structured Streaming data in to Elastisearch and visualize the results using Kibana.

Download and unzip elastisearch and Kibana configure the respective yml files in conf directory.

Start Elastisearch and Kibana

./bin/elasticsearch
./bin/kibana

# Verify elastisearch use elastisearch ip and port
curl -X GET "XXX.168.1.XXX:9200/"

Spark

Initialize spark session

  val spark = SparkSession.builder()
    .appName("Spark Kafka ElastiSearch")
    .master("local[*]")
    .config("spark.es.nodes","XXX.XXX.1.200")
    .config("spark.es.port","9200")
    .config("es.index.auto.create", "true")
    .getOrCreate()

Read log data from Kafka Stream and Parse the lines.

val regexPatterns = Map(
    "ddd" -> "\\d{1,3}".r,
    "ip" -> """s"($ddd\\.$ddd\\.$ddd\\.$ddd)?"""".r,
    "client" -> "(\\S+)".r,
    "user" -> "(\\S+)".r,
    "dateTime" -> "(\\[.+?\\])".r,
    "datetimeNoBrackets" -> "(?<=\\[).+?(?=\\])".r,
    "request" -> "\"(.*?)\"".r,
    "status" -> "(\\d{3})".r,
    "bytes" -> "(\\S+)".r,
    "referer" -> "\"(.*?)\"".r,
    "agent" -> """\"(.*)\"""".r
  )

  def parseLog(regExPattern: String) = udf((url: String) =>
       regexPatterns(regExPattern).findFirstIn(url) match
          {
      case Some(parsedValue) => parsedValue
      case None => "unknown"
       }
  )


  //  val requestRegex = "(\\S+)"

  import spark.implicits._

  def parseKafka() ={
    readKafka()
      .select(col("topic").cast(StringType),
              col("offset"),
              col("value").cast(StringType))
      .withColumn("user", parseLog("user")($"value"))
      .withColumn("dateTime", parseLog("datetimeNoBrackets")($"value"))
      .withColumn("request", parseLog("request")($"value"))
      .withColumn("agent", parseLog("agent")($"value"))
      .withColumn("status", parseLog("status")($"agent"))

  }

  case class LogsData(offset: Double,
                      status: Double,
                      user: String,
                      request: String,
                      dateTime: Timestamp)

  val DATE_FORMAT = "dd/MMM/yyyy:HH:mm:ss ZZZZ"
  val dateFormat = new SimpleDateFormat(DATE_FORMAT)

  val  convertStringToDate2 = udf((dateString: String) =>
    new java.sql.Date(dateFormat.parse(dateString).getTime)
  )


  def parseDate() = {
    parseKafka().select(col("offset").cast("double"),
      col("status").cast("double"),
      col("user"),col("request"),
      col("dateTime"))
//      to_timestamp(col("dateTime"),"dd/MMM/yyyy:HH:mm:ss ZZZZ").as("to_date"))
      .withColumn("dateTime", convertStringToDate2(col("dateTime")))
      .as[LogsData]

Write the Dataset Stream to ElastiSearch in batches

  def writeKafkaElastiSearch() = {
    parseDate()
      .writeStream
      .foreachBatch { (batch: Dataset[LogsData], batchId: Long) =>
        batch.select(col("*"))
          .write
          .format("org.elasticsearch.spark.sql")
          .option("checkpointLocation", "checkpoint")
          .mode(SaveMode.Append)
          .save("dblkafkastream/dblkafkatype")
      }
      .start()
      .awaitTermination()

  }

ElastiSearch

Verify Index creation in Elastisearch

curl -XGET "http://XXX.168.1.XXX:9200/_cat/indices"

Kibana

Kibana webUI under Settings Create Index Pattern.

Kibana Discover Tab View the Raw data and verify data injected in real time

Kibana Real Time Visualization

Visualize extracted error codes from the Web logs and monitor error codes in real time.

Example of Visualizations in Kibana

Based on cars data plot of horse power vs MPG

Dependencies

  // kafka
  "org.apache.kafka" %% "kafka" % kafkaVersion,
  "org.apache.kafka" % "kafka-streams" % kafkaVersion,

  // streaming-kafka
  "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % sparkVersion,

  // low-level integrations
  "org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion,


  "org.elasticsearch" %% "elasticsearch-spark-20" % "7.6.1"