netcat (windows) >nc -L -p 9999
import java.sql.Timestampimport org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.functions._/** */object Test extends App { val host = "localhost" val port = 9999 val windowSize = 10 val slideSize = 5 if (slideSize > windowSize) { System.err.println("must be less than or equal to ") } val windowDuration = s"$windowSize seconds" val slideDuration = s"$slideSize seconds" val spark = SparkSession .builder .appName("StructuredNetworkWordCountWindowed") .master("local[3]") .config("spark.sql.shuffle.partitions", 3) .getOrCreate() spark.sparkContext.setLogLevel("ERROR") import spark.implicits._ // Create DataFrame representing the stream of input lines from connection to host:port val lines = spark.readStream .format("socket") .option("host", host) .option("port", port) .option("includeTimestamp", true) .load() // Split the lines into words, retaining timestamps val words = lines.as[(String, Timestamp)].flatMap(line => line._1.split(" ").map(word => (word, line._2)) ).toDF("word", "timestamp") // Group the data by window and word and compute the count of each group val windowedCounts = words.groupBy( window($"timestamp", windowDuration, slideDuration), $"word" ).count().orderBy($"window".desc) // Start running the query that prints the windowed word counts to the console val query = windowedCounts.writeStream .outputMode("complete") .format("console") .option("truncate", "false") .start() query.awaitTermination()}
Result:
-------------------------------------------Batch: 1-------------------------------------------+---------------------------------------------+----+-----+|window |word|count|+---------------------------------------------+----+-----+|[2017-10-24 16:09:30.0,2017-10-24 16:09:40.0]|b |3 ||[2017-10-24 16:09:30.0,2017-10-24 16:09:40.0]|a |3 ||[2017-10-24 16:09:30.0,2017-10-24 16:09:40.0]|c |1 ||[2017-10-24 16:09:30.0,2017-10-24 16:09:40.0]|d |1 ||[2017-10-24 16:06:40.0,2017-10-24 16:06:50.0]|a |4 ||[2017-10-24 16:06:35.0,2017-10-24 16:06:45.0]|a |8 ||[2017-10-24 16:06:30.0,2017-10-24 16:06:40.0]|a |4 |+---------------------------------------------+----+-----+
窗口移动5秒,窗口宽度10秒。
聚合维度: window, {world}