Another thing StreamingListener could do is it can get all the information from Streaming UI (shown when running Spark in standalone model). When you run you streaming program with YARN or Mesos, you will find it is hard to find the metrics showing in the Streaming UI. Fortunately, when you calling the implemented methods in StreamingListener, the BatchInfo contains almost all the stats you want.
Although there is no much information on official website right now about how to customize StreamngListener, you can find the API doc at here. Be care we call it StreamingListener which is specific for streaming, NOT Scheduler Listeners mentioned here. If you put your implemented StreamngListener in SparkConfig, or use --conf spark.extraListeners=your.project.yourStreamListener to apply the StreamingListener, you could get:
java.lang.ClassCastException: your.project.yourStreamListener cannot be cast to org.apache.spark.scheduler.SparkListener
Let me demo how to use it. I will implement a SteamingListener and we can use this listener to print the stats in log when a new batch is submitted. If you use Kafka as the streaming source, so the number of events in a batch is the number of messages within this batch. It is one of a good way to keep track how many messages lag behind in queued batches to diagnose the throughput of the steaming pipeline. Here is the code example of the class implements StreamingListener:
When initializing JavaStreamingContext, you can create a new object of SimpleListener and call addStreamingListener to add the object:
Here we can see the stats from the log:
16/05/17 01:28:00 INFO KafkaSparkListener: 6961601 came with this batch, total number records is 6961601, number of queued records is 6961601
16/05/17 01:29:00 INFO KafkaSparkListener: 9341310 came with this batch, total number records is 16302911, number of queued records is 9341310
16/05/17 01:30:00 INFO KafkaSparkListener: 10713928 came with this batch, total number records is 27016839, number of queued records is 10713928
16/05/17 01:31:00 INFO KafkaSparkListener: 9384364 came with this batch, total number records is 36401203, number of queued records is 9384364
No comments:
Post a Comment