programming with Spark structure streaming

In my last blog I played programming with Spark streaming, which is called DStream.

This article I program with Spark structure streaming, which is a new way for streaming handling.

What’s the difference between DStream and structure streaming? The former is based on Spark’s traditional RDD API, while the latter is based on Spark’s SQL engine.

From my thought, if you want more transforms to the original data such as data cleaning, DStream is more useful. Since RDD provides a lot of higher order functions for data transform, such as map, reduce etc.

If you want more aggregate functions, such as statistics and reports, structure streaming is more convenient. Since SQL engine is powerful for implementation of this kind of jobs, such as group, count, sort etc.

But, structure streaming is much easier to use than DStream. It’s higher optimized. You can just program with structure streaming as the way of regular spark session.

To play with the demo, first we need a socket server which prints the data continuously to remote socket. And a streaming client will receive the data from the socket.

The socket server has been given in my last blog. Please start up it in a separate terminal.

Then, I follow the official demo to get the scala code below to handle the streaming.

import org.apache.spark.sql.SparkSession

object Myjob {
  def main(args: Array[String]): Unit = {
    if (args.length < 2) {
      System.err.println("Usage: StructuredNetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    val host = args(0)
    val port = args(1).toInt

    val spark = SparkSession
      .builder
      .appName("StructuredNetworkWordCount")
      .getOrCreate()

    import spark.implicits._

    // Create DataFrame representing the stream of input lines from connection to host:port
    val lines = spark.readStream
      .format("socket")
      .option("host", host)
      .option("port", port)
      .load()

    // Split the lines into words
    val words = lines.as[String].flatMap(_.split(" "))

    // Generate running word count
    val wordCounts = words.groupBy("value").count()

    // Start running the query that prints the running counts to the console
    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .start()

    query.awaitTermination()
  }
}

How to package and deploy the app? Please see my last blog, they are almost the same.

The final step is to run the structure streaming from Spark:

$ spark-submit --class "Myjob" --master local[2] target/scala-2.12/my-strucutre-streaming-job_2.12-1.0.jar localhost 9999

We can see the continuous output in the screen for words counting:

-------------------------------------------
Batch: 190
-------------------------------------------
+-------+-----+
|  value|count|
+-------+-----+
| orange|  360|
|  apple|  372|
|  mango|  380|
| tomato|  362|
|apricot|  357|
| cherry|  366|
| banana|  328|
|  lemon|  341|
|   plum|  365|
|  peach|  372|
+-------+-----+


-------------------------------------------
Batch: 191
-------------------------------------------
+-------+-----+
|  value|count|
+-------+-----+
| orange|  362|
|  apple|  378|
|  mango|  382|
| tomato|  363|
|apricot|  358|
| cherry|  368|
| banana|  332|
|  lemon|  342|
|   plum|  367|
|  peach|  377|
+-------+-----+

-------------------------------------------
Batch: 192
-------------------------------------------
+-------+-----+
|  value|count|
+-------+-----+
| orange|  363|
|  apple|  381|
|  mango|  386|
| tomato|  364|
|apricot|  361|
| cherry|  374|
| banana|  333|
|  lemon|  344|
|   plum|  369|
|  peach|  378|
+-------+-----+

The application environment:

  • OS: ubuntu 18.04 x86_64, a KVM instance
  • Ram: 4GB dedicated
  • CPU: double AMD 7302 processor
  • Spark: 3.2.0
  • Scala: 2.12.15
Print Friendly, PDF & Email