Skip to the content.

StructuredStreamingConnector is a new connector added since the version 0.4.3. It brings the Spark Structured Streaming API together with the Connector API. It allows users to manipulate streaming data like any other static connectors.

Here is an implementation of the word count program from the Spark structured streaming documentation:

  // Configuration
  val input = Map(
    "storage" -> "STRUCTURED_STREAMING",
    "format" -> "socket",
    "host" -> "localhost",
    "port" -> "9999"
  )

  val output = Map(
    "storage" -> "STRUCTURED_STREAMING",
    "outputMode" -> "complete",
    "format" -> "console"
  )

  val spark = SparkSession
    .builder
    .appName("StructuredNetworkWordCount")
    .master("local")
    .getOrCreate()

  import spark.implicits._

  val inputConnector = new ConnectorBuilder(Conf.fromMap(input)).getOrCreate()
  val outputConnector = new ConnectorBuilder(Conf.fromMap(output)).getOrCreate().asInstanceOf[StructuredStreamingConnector]
  
  // read lines
  val lines = inputConnector.read()
  // Split the lines into words
  val words = lines.as[String].flatMap(_.split(" "))
  // Generate running word count
  val wordCounts = words.groupBy("value").count()
  // Show the output
  outputConnector.write(wordCounts)
  outputConnector.awaitTermination()