Labels

Monday, July 11, 2016

Understanding Batch processing in Spark Streaming

The idea of Spark Streaming processing is a kind of batch based data processing. Its principle is no different than running a cron scheduled program for a while (window length of reduce) in every slide interval. But it is designed for a high performance and high scalable way. One big impression to me is that it can process several GBs of data per seconds in standalone model with a 4-node cluster, it does AVRO deserialization for complex objects, do filtering bad data, mapping to tuples and perform difference tasks for different types of aggregation and train machine learning models for predictions. For the 4-node cluster, the CPU and Memory usage is not a big deal, the only bottleneck is network. It could be 16x faster than do the same work on Hadoop YARN.

As people said, Spark Streaming is NOT a real streaming system. In my opinion, the performance is the ultimate goal for Spark, so using batch processing is the best way to achieve this goal. But one big drawback is that data arriving is out of time order. In a non-critical big data application it may not a big deal, but sometimes, you need other approach to fix the time order issue(e.g. Use checkpoints and keep timestamp in your output and use another step to merge/move the record in one RDD with another RDD).

Here is an example to demo a sample Spark Streaming program, which will do a word count from a stream, however, it doesn't have a global counter to count how many times it occurs from the beginning, it just count the words fall in the reduce window:



The above program requires you run the netcat command in Line 10 before it runs. This command will randomly put a word from a five-word array every second. And the Java program will give you the counts of words in the five-word list . So you could see something like this in every 10 seconds:

-------------------------------------------
Time: 1468203730000 ms
-------------------------------------------
(orange,3)
(apple,2)
(banana,3)
(peach,1)
(lemon,6)

The sum of the occurrence of all words should be 15 in ideal case since the command send 1 word every 1 second and window size is 15 seconds. Actually Spark doesn't guarantee that nor the order of time arrival. So that's why in the real production case, we need to include timestamp in the streaming message when sending it to the stream and use another step to merge with finished batches based on the timestamp.

One important usage seen from above for Spark Streaming is aggregating data falling into a specified time window. We usually do it by calling reduceByKeyAndWindow over a DStream to do that. There are 3 important time parameters here: Batch Interval, Window Length and Slide Interval.

Batch Interval, Window Length and Slide Interval

Batch Interval is the time duration for Spark make a batch of data. Suppose Spark streaming is consuming messages from a socket, the batch interval is 5 second, Spark will get the messages received within every 5 seconds for one batch. Although you can set the batch interval to a small value say several millisecond to achieve the micro-batch pattern. But it is still not streaming processing(get one message then consume it immediately).

Batch Interval is specified when creating the stream context(Line 17 above).

Window Length is the time length of the window to gather the data for reduce. It is set in the second parameter of reduceByKeyAndWindow (Line 22 above). This values should be a multiple of the batch interval(N times as batch interval, where N >= 1). Otherwise, you will get the following message:

Exception in thread "main" java.lang.Exception: The window duration of windowed DStream (7000 ms) must be a multiple of the slide duration of parent DStream (5000 ms)

Slide Interval controls the frequency to do the reduce tasks, it specifies the time duration between two reduce tasks. It is set in the third parameter of reduceByKeyAndWindow (Line 22 above). This values should be a multiple of the batch interval(N times as batch interval, where N >= 1). Otherwise, you will get the following message:

Exception in thread "main" java.lang.Exception: The slide duration of windowed DStream (5000 ms) must be a multiple of the slide duration of parent DStream (10000 ms)

Usually slide interval is not greater than window length, because you don't want to create time black holes when consuming the data in streaming. But Spark accepts the case if slide interval is greater than window length:

somePairDStream.reduceByKeyAndWindow((i, j) -> i + j, Durations.seconds(5), Durations.seconds(10))

Where set window_length=5 and slide_interval=10, so it will process last 5 seconds' data in every 10 seconds, which will miss the first 5 seconds' data for every 10 seconds.

Saturday, July 9, 2016

Convert Java List to RDD with Spark

Sometimes you want to convert a list of Java objects to an RDD and you may know that calling sc.parallelize(list) will do that. But the tricky point is that sc here is not SparkContext here for Java.  You need JavaSparkContext to do that:


The Spark document may confuse you that it can parallelize any collection, actually you can only parallelize List in Java.

The relations between JavaSparkContext and SparkContext is that: JavaSparkContext is a wrapper of SparkContext. You can use JavaSparkContext as SparkContext sometimes but you can also get the object of SparkContext from JavaSparkContext by jsc.sc() . For example, you want to write a SparkML model to a filesystem, but it requires the object of SparkContext not JavaSparkContext, here is the code to get it works: