Spark Structured Streaming Integrate Kafka Streams to Cassandra Sink

Kafka Streams integrated with Cassandra Sink using Spark Structured Streaming.
Stream apache webserver logs from Kafka, parse the results using Spark Structured streaming and save the parsed results to Cassandra DB as a table. Also write the results to kafka input stream.

  1. Configure 3 Zookeeper nodes and 4 kafka Brokers.
  2. Configure Cassandra DB.
  3. Configure Spark.

Start Zookeeper and Kafka Brokers on the nodes.

kafka/bin/ kafka/config/

kafka/bin/ kafka/config/

Create Kafka Topic

kafka/bin/  --create --zookeeper XXX.168.1.XXX:2181 \
          --replication-factor 3 --partitions 1 \
          --topic streams-spark-input3

Verify Topics Using kafka Tool

Kafka Producer

To simulate streaming of access-log data we will use the shell script to Pipe the log file line by line to kafka Producer.

while IFS= read -r line; do echo $line && sleep 1; done < \
/home/hyper/data/access_log.txt |\
kafka/bin/ --broker-list\,, --topic streams-spark-input3

Spark Structured Streaming

Create Spark Session with Cassandra Options.

  val spark = SparkSession.builder()
    .appName("Spark Kafka Stream")

Read Stream from Kafka from a partition

  def readKafka() ={
      //      .option("subscribe","streams-spark-input")
      .option("assign", "{\"streams-spark-input3\":[0]}")

RegEx Function to parse log file data

  val regexPatterns = Map(
    "ddd" -> "\\d{1,3}".r,
    "ip" -> """s"($ddd\\.$ddd\\.$ddd\\.$ddd)?"""".r,
    "client" -> "(\\S+)".r,
    "user" -> "(\\S+)".r,
    "dateTime" -> "(\\[.+?\\])".r,
    "datetimeNoBrackets" -> "(?<=\\[).+?(?=\\])".r,
    "request" -> "\"(.*?)\"".r,
    "status" -> "(\\d{3})".r,
    "bytes" -> "(\\S+)".r,
    "referer" -> "\"(.*?)\"".r,
    "agent" -> """\"(.*)\"""".r

//  "agent" -> "\"(.*?)\"".r

  def parseLog(regExPattern: String) = udf((url: String) =>
       regexPatterns(regExPattern).findFirstIn(url) match
      case Some(parsedValue) => parsedValue
      case None => "unknown"
Parse the Log Files
import spark.implicits._

  def parseKafka() ={
//      .withColumn("client",regexp_extract(col("value"),requestRegex,0))
      .withColumn("user", parseLog("user")($"value"))
      .withColumn("dateTime", parseLog("datetimeNoBrackets")($"value"))
      .withColumn("request", parseLog("request")($"value"))
      .withColumn("agent", parseLog("agent")($"value"))
      .withColumn("status", parseLog("status")($"agent"))
Create Key space and Table in Cassandra DB

cqlsh:hyper> create table hyper.kafkalogsdate("offset" text primary key , "status" text, "user" text,"request"  text ,"dateTime" timestamp  );
Write to Cassandra DB in batches.
 case class logsData(offset: BigInt,
                      status: String,
                      user: String,
                      request: String,
                      dateTime: Timestamp)  

def writeToCassandra(): Unit = {
    parseDate().select(col("offset"), col("status"),
      .foreachBatch { (batch: Dataset[logsDate], batchId: Long) =>"*"))
          .cassandraFormat("kafkalogsdate", "hyper")


Verify Data in Cassandra DB
cqlsh:hyper> select * from kafkalogsdate;

Write to kafka

Create output Topic
kafka/bin/  --create --zookeeper \
          --replication-factor 3 --partitions 1 \
          --topic streams-spark-output

Spark Write to Kafka.

Kafka needs input stream as key value pairs.

def writeToKafka(): Unit ={
      .option("checkpointLocation","checkpoint") // without check point program fails

Verify the stream using kafka console consumer.

kafka/bin/ --bootstrap-server,,\
 --topic streams-spark-output --partition 0 --from-beginning