block by daniarleagk c09566bcee322c64890f7ddf9414152e

Spark Java log4j smtp appender and byte-monkey library

In this brief post I want to share two approaches that I used to test and troubleshoot spark applications running on yarn clusters. These tools are easy to set up. The first one is a standard log4j smtp appender and a second one is a byte-monkey library. The first one is a simple tool for monitoring and the second one is used for simulating errors and observing the behaviour of your program on a cluster (e.g. is there any data loss etc.) I use these tools mainly in early development and testing stages, since there are no actions needed from a devops or platform engineering team.

I will go through the steps that you need to set up these tools.

Smtp Appender Log4j

A simple and quick approach to monitor the application is to send an email on e.g. ERRORs or other log events, especially for a long running tasks such as spark streaming. Apache Spark Framework uses log4j library in the version 1.2.x (I used this approach with Spark versions 1.5.x, 2.1.x, 2.3.x). In my application I also use a log event filter that sends only a single error warning message of a series of identical errors per jvm ( since I don’t want to flood smtp server and your mailbox). Worth to mention Spark offers also a reach monitoring options besides log4j logging (see https://spark.apache.org/docs/latest/monitoring.html).

The following code shows a simple filter implementation, for the sake of simplicity the following code sends only one error per Logger in a single JVM instance.

Package myApp.mailfilter

import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.log4j.spi.TriggeringEventEvaluator;
/**
*
*/
public class MailEventFilter implements TriggeringEventEvaluator {
   public static final ConcurrentHashMap<String, String> messageMap = new ConcurrentHashMap();

   public boolean isTriggeringEvent(LoggingEvent loggingEvent) {
           String name = loggingEvent.getLoggerName();
          return messageMap.putIfAbsent(name, name) == null;
   }
}

So how we can accomplish this: Firstly, I provide a jar that contains our filter class (this post explains why we need to pack an extra jar with a filter) and upload to a cluster. You need additionally a mail.jar that contains javax.mail.* classes (main hadoop distributions should have already available this library on a cluster). Secondly, we create a log4j properties file and upload it as well.

# Spark Streaming Logging Configuration
# See also: http://spark.apache.org/docs/2.0.2/running-on-yarn.html#debugging-your-application

log4j.rootLogger=INFO, email

# Write all logs to standard Spark stderr file
log4j.appender.stderr=org.apache.log4j.RollingFileAppender
log4j.appender.stderr.file=${spark.yarn.app.container.log.dir}/stderr
log4j.appender.stderr.threshold=INFO
log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
log4j.appender.stderr.layout.ConversionPattern=%d %p %c %m %n
log4j.appender.stderr.maxFileSize=50MB
log4j.appender.stderr.maxBackupIndex=10
log4j.appender.stderr.encoding=UTF-8

#CONFIGURE SMTP
log4j.appender.email=org.apache.log4j.net.SMTPAppender
log4j.appender.email.SMTPHost=<yourSmtpHost>
log4j.appender.email.SMTPPassword=<password>
log4j.appender.email.From=your@mail
log4j.appender.email.To=your@mail
log4j.appender.email.Subject=App Log Message
log4j.appender.email.EvaluatorClass=myApp.mailfilter.MailEventFilter
log4j.appender.email.BufferSize=10
log4j.appender.email.layout=org.apache.log4j.PatternLayout
log4j.appender.email.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

Thirdly, we prepare a spark submit command as follows, we use the following properties:

--files hdfs:/${PATH_TO_CONFIGS}/config/log4j-yarn.properties 
--conf spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j-yarn.properties --conf spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j-yarn.properties 
--conf spark.executor.extraClassPath=${PATH_TO_YOUR_LIBS}/mail.jar:${PATH_TO_YOUR_LIBS}/my-mail-util.jar 
--conf spark.driver.extraClassPath=${PATH_TO_YOUR_LIBS}/mail.jar:${PATH_TO_YOUR_LIBS}/my-mail-util.jar

Now we are ready to receive error events.

Byte-Monkey:

It is important to test a distributed program how it reacts to different kinds of errors, especially for spark streaming applications, since it typically runs in a 24/7 mode. In a complex enterprise setups it is common to implement different custom receivers or directIputDStreams. Implementing business logic means sometimes that workflows and functions may contain side effects or actions such that data is written or read from e.g. HBase, HDFS or relational Databases. All these are typical examples where things can go wrong.

Therefore, it is reasonable to intentionally produce errors in your application and observe how resilient your code is ( https://principlesofchaos.org/ https://en.wikipedia.org/wiki/Chaos_engineering), besides other techniques such as killing JVMs and cutting connections.

Before I came to a byte-monkey library, in my team we developed a custom framework that throws errors in try catch blocks depending on external parameters (the exception was thrown after reading property file, by configuring the property files we managed to produce certain exceptions). This solution works fine. However, there are several cons: you need to write an additional framework that interacts with your spark code, you have to test it and manage it.

Byte-monkey library https://github.com/mrwilson/byte-monkey helped me to test the application. In order to use this library with a spark you need only to upload the library to a cluster and provide this properties to your spark submit command:

spark.executor.extraJavaOptions
spark.driver.extraJavaOptions

--conf spark.executor.extraJavaOptions=-javaagent:${PATH_TO_YOUR_LIBS}/lib/byte-monkey.jar=mode:fault,rate:0.5,filter:rewe/rsm/bigdata/tlog -Dlog4j.configuration=log4j-yarn.properties -XX:+UseG1GC

I hope this would help you to troubleshoot and test your spark code in early stages of development.