Labels

Monday, July 11, 2016

Understanding Batch processing in Spark Streaming

The idea of Spark Streaming processing is a kind of batch based data processing. Its principle is no different than running a cron scheduled program for a while (window length of reduce) in every slide interval. But it is designed for a high performance and high scalable way. One big impression to me is that it can process several GBs of data per seconds in standalone model with a 4-node cluster, it does AVRO deserialization for complex objects, do filtering bad data, mapping to tuples and perform difference tasks for different types of aggregation and train machine learning models for predictions. For the 4-node cluster, the CPU and Memory usage is not a big deal, the only bottleneck is network. It could be 16x faster than do the same work on Hadoop YARN.

As people said, Spark Streaming is NOT a real streaming system. In my opinion, the performance is the ultimate goal for Spark, so using batch processing is the best way to achieve this goal. But one big drawback is that data arriving is out of time order. In a non-critical big data application it may not a big deal, but sometimes, you need other approach to fix the time order issue(e.g. Use checkpoints and keep timestamp in your output and use another step to merge/move the record in one RDD with another RDD).

Here is an example to demo a sample Spark Streaming program, which will do a word count from a stream, however, it doesn't have a global counter to count how many times it occurs from the beginning, it just count the words fall in the reduce window:



The above program requires you run the netcat command in Line 10 before it runs. This command will randomly put a word from a five-word array every second. And the Java program will give you the counts of words in the five-word list . So you could see something like this in every 10 seconds:

-------------------------------------------
Time: 1468203730000 ms
-------------------------------------------
(orange,3)
(apple,2)
(banana,3)
(peach,1)
(lemon,6)

The sum of the occurrence of all words should be 15 in ideal case since the command send 1 word every 1 second and window size is 15 seconds. Actually Spark doesn't guarantee that nor the order of time arrival. So that's why in the real production case, we need to include timestamp in the streaming message when sending it to the stream and use another step to merge with finished batches based on the timestamp.

One important usage seen from above for Spark Streaming is aggregating data falling into a specified time window. We usually do it by calling reduceByKeyAndWindow over a DStream to do that. There are 3 important time parameters here: Batch Interval, Window Length and Slide Interval.

Batch Interval, Window Length and Slide Interval

Batch Interval is the time duration for Spark make a batch of data. Suppose Spark streaming is consuming messages from a socket, the batch interval is 5 second, Spark will get the messages received within every 5 seconds for one batch. Although you can set the batch interval to a small value say several millisecond to achieve the micro-batch pattern. But it is still not streaming processing(get one message then consume it immediately).

Batch Interval is specified when creating the stream context(Line 17 above).

Window Length is the time length of the window to gather the data for reduce. It is set in the second parameter of reduceByKeyAndWindow (Line 22 above). This values should be a multiple of the batch interval(N times as batch interval, where N >= 1). Otherwise, you will get the following message:

Exception in thread "main" java.lang.Exception: The window duration of windowed DStream (7000 ms) must be a multiple of the slide duration of parent DStream (5000 ms)

Slide Interval controls the frequency to do the reduce tasks, it specifies the time duration between two reduce tasks. It is set in the third parameter of reduceByKeyAndWindow (Line 22 above). This values should be a multiple of the batch interval(N times as batch interval, where N >= 1). Otherwise, you will get the following message:

Exception in thread "main" java.lang.Exception: The slide duration of windowed DStream (5000 ms) must be a multiple of the slide duration of parent DStream (10000 ms)

Usually slide interval is not greater than window length, because you don't want to create time black holes when consuming the data in streaming. But Spark accepts the case if slide interval is greater than window length:

somePairDStream.reduceByKeyAndWindow((i, j) -> i + j, Durations.seconds(5), Durations.seconds(10))

Where set window_length=5 and slide_interval=10, so it will process last 5 seconds' data in every 10 seconds, which will miss the first 5 seconds' data for every 10 seconds.

Saturday, July 9, 2016

Convert Java List to RDD with Spark

Sometimes you want to convert a list of Java objects to an RDD and you may know that calling sc.parallelize(list) will do that. But the tricky point is that sc here is not SparkContext here for Java.  You need JavaSparkContext to do that:


The Spark document may confuse you that it can parallelize any collection, actually you can only parallelize List in Java.

The relations between JavaSparkContext and SparkContext is that: JavaSparkContext is a wrapper of SparkContext. You can use JavaSparkContext as SparkContext sometimes but you can also get the object of SparkContext from JavaSparkContext by jsc.sc() . For example, you want to write a SparkML model to a filesystem, but it requires the object of SparkContext not JavaSparkContext, here is the code to get it works:

Tuesday, May 17, 2016

A Guide to Spark StreamingListener

Spark Streaming Library provides a feature that you can register some events when something happens. For example, once an operation failed, you want to get notified for the failure and the reason, so you could use onOutputOperationCompleted callback.

Another thing StreamingListener could do is it can get all the information from Streaming UI (shown when running Spark in standalone model). When you run you streaming program with YARN or Mesos, you will find it is hard to find the metrics showing in the Streaming UI. Fortunately, when you calling the implemented methods in StreamingListener, the BatchInfo contains almost all the stats you want.

Although there is no much information on official website right now about how to customize StreamngListener, you can find the API doc at here. Be care we call it StreamingListener which is specific for streaming, NOT Scheduler Listeners mentioned here. If you put your implemented StreamngListener in SparkConfig, or use --conf spark.extraListeners=your.project.yourStreamListener to apply the StreamingListener, you could get:

java.lang.ClassCastException: your.project.yourStreamListener cannot be cast to org.apache.spark.scheduler.SparkListener

Let me demo how to use it. I will implement a SteamingListener and we can use this listener to print the stats in log when a new batch is submitted. If you use Kafka as the streaming source, so the number of events in a batch is the number of messages within this batch. It is one of a good way to keep track how many messages lag behind in queued batches to diagnose the throughput of the steaming pipeline. Here is the code example of the class implements StreamingListener:



When initializing JavaStreamingContext, you can create a new object of SimpleListener and call addStreamingListener to add the object:

Here we can see the stats from the log:
16/05/17 01:28:00 INFO KafkaSparkListener: 6961601 came with this batch, total number records is 6961601, number of queued records is 6961601
16/05/17 01:29:00 INFO KafkaSparkListener: 9341310 came with this batch, total number records is 16302911, number of queued records is 9341310
16/05/17 01:30:00 INFO KafkaSparkListener: 10713928 came with this batch, total number records is 27016839, number of queued records is 10713928
16/05/17 01:31:00 INFO KafkaSparkListener: 9384364 came with this batch, total number records is 36401203, number of queued records is 9384364