block by daniarleagk 49103f75a5d3dac03f0406fed90abf6b

Getting started example Apache Spark Streaming and Apache Kafka

Example consists of three parts:

For deep dive into Apache Kafka and Apache Spark I recommend this blog post

The application uses a batch interval of 10 seconds. For recovery I use this option: a combination of

ConsumerConfig.GROUP_ID_CONFIG -> groupId
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false"

and explicit commit after processing batch RDDs

dstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

For more information on different recovery options please refer to Apache Spark Kafka integration guide


import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.slf4j.LoggerFactory

/**
 *
 */
object KafkaStreamSimpleController {

 val logger = LoggerFactory.getLogger(this.getClass.getName)

 def createKafkaStreamingContext(conf: SparkConf, batchDuration: Long = 10L): StreamingContext = new StreamingContext(conf, Seconds(batchDuration))

 /**
   * simple factory function for kafka dstream
   *
   * using group id
   *
   * @param topic
   * @param offsetReset
   * @param groupId
   * @return
   */
 def connectToKafka(ssc: StreamingContext, topic: String, offsetReset: String, groupId: String): InputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String, String]] = {
   val kafkaParams = Map[String, String](ConsumerConfig.GROUP_ID_CONFIG -> groupId,
     ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
       "org.apache.kafka.common.serialization.StringDeserializer",
     ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
       "org.apache.kafka.common.serialization.StringDeserializer",
     ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> offsetReset,
     ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false")
   val consumer = ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams)
   org.apache.spark.streaming.kafka010.KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, consumer)
 }

 /**
   * e.g. custom transform pipeline
   *
   * @param rdd
   * @return
   */
 def customTransform(rdd: RDD[org.apache.kafka.clients.consumer.ConsumerRecord[String, String]]): RDD[(String, String)] = {
   rdd.map(t => (t.key(), t.value()))
 }


 /**
   *
   *
   * @param args
   */
 def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setAppName("myTest") //
   val topic = "kafkaToopic"
   val groupId = "myGroupId"
   val offsetReset = "latest"
   val triggerDirectory: String = "/user/myUser/shutdown"
   val triggerFilePrefix: String = "kafkaReader"
   val ssc = createKafkaStreamingContext(conf)
   // set connection properties for kafka and return dstream definition
   val dstream: InputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String, String]] = connectToKafka(ssc, topic, offsetReset, groupId)
   // basic pattern
   dstream.foreachRDD {
     rdd =>
       val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
       // show kafka partition meta info
       offsetRanges.foreach(t => logger.info("Topic: %s, Partition: %d, fromOffset: %d, untilOffset: %d ".format(t.topic, t.partition, t.fromOffset, t.untilOffset)))
       // add at this place custom transformations and actions
       customTransform(rdd).foreachPartition(
         iterator =>
           iterator.foreach(x => logger.info(s"Key: ${x._1} => Value: ${x._2}"))
       )
       // commit
       dstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
   }
   // start
   ssc.start()
   // set file based watchdog
   FileBasedShutdown.checkRepeatedlyShutdownAndStop(ssc, triggerDirectory, triggerFilePrefix)
 }

}

For shutting down the application I used the solution described in this blog post. At the end of the application I call a

  // start
   ssc.start()
   // set file based watchdog
   FileBasedShutdown.checkRepeatedlyShutdownAndStop(ssc, triggerDirectory, triggerFilePrefix)

The later function periodically checks if a file in HDFS exists. If it is the case streaming context is gracefully shutdown

streamingContext.stop(false, true)

Shutdown code:

import java.io.IOException

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.streaming.StreamingContext

object FileBasedShutdown {

 /**
   * last statement in init
   *
   * @param triggerDirectory
   * @param triggerFilePrefix
   * @param triggerSuffix
   * @param successSuffix
   */
 def checkRepeatedlyShutdownAndStop( streamingContext : StreamingContext,
                                     triggerDirectory : String, triggerFilePrefix : String,
                                     triggerSuffix : String = ".shutdown", successSuffix: String = ".shutdown.SUCCESS",
                                     waitTimeInMs : Long = 10000L): Unit = {
   var stopped: Boolean = false
   while(!stopped){
     try {
       // check
       checkAndStopContextIfFileExistsAndClear(streamingContext, triggerDirectory : String, triggerFilePrefix : String,
         triggerSuffix : String, successSuffix: String)
       stopped = streamingContext.awaitTerminationOrTimeout(waitTimeInMs)
     }catch{
       case ex : Throwable => {
         stopped = true
         throw new RuntimeException("Shutdown problem", ex )
       }
     }
   }
 }

 /**
   *
   *
   * @param triggerDirectory
   * @param triggerFilePrefix
   * @param triggerSuffix
   * @param successSuffix
   */
 def checkAndStopContextIfFileExistsAndClear(streamingContext : StreamingContext, triggerDirectory : String, triggerFilePrefix : String,
                                             triggerSuffix : String, successSuffix: String): Unit ={
   val triggerFilePath: Path = new Path(triggerDirectory + Path.SEPARATOR + triggerFilePrefix + triggerSuffix)
   val successFilePath: Path = new Path(triggerDirectory + Path.SEPARATOR + triggerFilePrefix + successSuffix)
   var fs: FileSystem = null
   try {
     fs = triggerFilePath.getFileSystem(new Configuration)
     if (fs.exists(triggerFilePath)){
       try {
         streamingContext.stop(false, true)
       }catch {
         case _ : Throwable =>
       }
       fs.delete(triggerFilePath, false)
       fs.createNewFile(successFilePath)
     }
   }
   catch {
     case ioe: IOException => throw new RuntimeException("IO Problem!", ioe)
     case _ : Throwable => throw new RuntimeException("")
   }finally {
     if(fs != null){
       try{
         fs.close()
       }catch{
         // do nothing
         case _ : Throwable =>
       }
     }
   }
 }

}

Start and stop scripts can be implemented as follows:

startScript

..\spark-submit --deploy-mode cluster --master yarn --queue $queueName --num-executors 2 --executor-cores 2 --executor-memory 1G --driver-memory 1G --conf spark.yarn.driver.memoryOverhead=768 --files hdfs:///user/myUser/log4j-yarn.properties --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j-yarn.properties --conf \”spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j-yarn.properties -XX:+UseG1GC\”

stop script to stop the application. The script creates a file kafkaReader.shutdown.

#/bin/bash
APPID=kafkaReader
SHUTDOWN_DIR=/user/myUser/shutdown
hadoop fs -mkdir -p "$SHUTDOWN_DIR"
hadoop fs -mkdir -p "$CHECKPOINT_DIR"
hadoop fs -rm -f "$SHUTDOWN_DIR/${APPID}.shutdown.SUCCESS"
hadoop fs -touchz "$SHUTDOWN_DIR/${APPID}.shutdown"
hadoop fs -ls "$SHUTDOWN_DIR/${APPID}.shutdown.SUCCESS"
while [ ! -f "$SHUTDOWN_DIR/${APPID}.shutdown.SUCCESS" ]
do
  sleep 5
done
set -e
hadoop fs -ls $SHUTDOWN_DIR
hadoop fs -rm -f "$SHUTDOWN_DIR/${APPID}.shutdown"
hadoop fs -rm -f "$SHUTDOWN_DIR/${APPID}.shutdown.SUCCESS"
echo "streaming job shutdown success"
# Spark Streaming Logging Configuration
# See also: http://spark.apache.org/docs/2.0.2/running-on-yarn.html#debugging-your-application

log4j.rootLogger=INFO, stdout
log4j.logger.org.apache.spark.storage.BlockManager=OFF
log4j.logger.org.apache.spark.storage.BlockManagerInfo=OFF

# Write all logs to standard Spark stderr file
log4j.appender.stderr=org.apache.log4j.RollingFileAppender
log4j.appender.stderr.file=${spark.yarn.app.container.log.dir}/stderr
log4j.appender.stderr.threshold=INFO
log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
log4j.appender.stderr.layout.ConversionPattern=%d %p %c %m %n
log4j.appender.stderr.maxFileSize=50MB
log4j.appender.stderr.maxBackupIndex=10
log4j.appender.stderr.encoding=UTF-8

# Write application logs to stdout file
log4j.appender.stdout=org.apache.log4j.RollingFileAppender
log4j.appender.stdout.append=true
log4j.appender.stdout.file=${spark.yarn.app.container.log.dir}/stdout
log4j.appender.stdout.threshold=INFO
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p %c %m %n
log4j.appender.stdout.maxFileSize=50MB
log4j.appender.stdout.maxBackupIndex=10
log4j.appender.stdout.encoding=UTF-8