楼主: Lisrelchen
2046 14

【Jacek Laskowski】Spark Structured Streaming [推广有奖]

  • 0关注
  • 62粉丝

VIP

已卖:4194份资源

院士

67%

还不是VIP/贵宾

-

TA的文库  其他...

Bayesian NewOccidental

Spatial Data Analysis

东西方数据挖掘

威望
0
论坛币
50288 个
通用积分
83.6306
学术水平
253 点
热心指数
300 点
信用等级
208 点
经验
41518 点
帖子
3256
精华
14
在线时间
766 小时
注册时间
2006-5-4
最后登录
2022-11-6

楼主
Lisrelchen 发表于 2017-5-15 07:33:16 |AI写论文

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币
  1. Spark Structured Streaming

  2. Welcome to Spark Structured Streaming notebook!

  3. I’m Jacek Laskowski, an independent consultant who is passionate about Apache Spark, Apache Kafka, Scala and sbt (with some flavour of Apache Mesos, Hadoop YARN, and quite recently DC/OS). I lead Warsaw Scala Enthusiasts and Warsaw Spark meetups in Warsaw, Poland.

  4. Contact me at jacek@japila.pl or @jaceklaskowski to discuss Apache Spark opportunities, e.g. courses, workshops, mentoring or application development services.

  5. If you like the Apache Spark notes you should seriously consider participating in my own, very hands-on Spark Workshops.

  6. Tip
  7. I’m also writing Mastering Apache Spark 2 Notebook, Apache Kafka Notebook and Spark Streaming Notebook.
  8. Spark Structured Streaming Notebook serves as the ultimate place of mine to collect all the nuts and bolts of using Spark Structured Streaming. The notes aim to help me designing and developing better products with Apache Spark. It is also a viable proof of my understanding of Apache Spark. I do eventually want to reach the highest level of mastery in Apache Spark (as do you!)

  9. The collection of notes serves as the study material for my trainings, workshops, videos and courses about Apache Spark. Follow me on twitter @jaceklaskowski to know it early. You will also learn about the upcoming events about Apache Spark.

  10. Expect text and code snippets from Spark’s mailing lists, the official documentation of Apache Spark, StackOverflow, blog posts, books from O’Reilly (and other publishers), press releases, conferences, YouTube or Vimeo videos, Quora, the source code of Apache Spark, etc. Attribution follows.
复制代码

本帖隐藏的内容

Spark Structured Streaming Jacek Laskowski.pdf (421.27 KB)



二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:Ask WSK Apache Spark Conferences Independent

沙发
Lisrelchen 发表于 2017-5-15 07:42:14
  1. DataStreamReader

  2. DataStreamReader is an interface for reading streaming data in DataFrame from data sources with specified format, schema and options.
  3. DataStreamReader offers support for the built-in formats: json, csv, parquet, text. parquet format is the default data source as configured using spark.sql.sources.default setting.
  4. DataStreamReader is available using SparkSession.readStream method.
  5. val spark: SparkSession = ...

  6. val schema = spark.read
  7.   .format("csv")
  8.   .option("header", true)
  9.   .option("inferSchema", true)
  10.   .load("csv-logs/*.csv")
  11.   .schema

  12. val df = spark.readStream
  13.   .format("csv")
  14.   .schema(schema)
  15.   .load("csv-logs/*.csv")
复制代码

藤椅
Lisrelchen 发表于 2017-5-15 07:43:09
  1. FileStreamSource

  2. FileStreamSource is a Source that reads text files from path directory as they appear. It uses LongOffset offsets.+

  3. Note
  4. It is used by DataSource.createSource for FileFormat.
  5. You can provide the schema of the data and dataFrameBuilder - the function to build a DataFrame in getBatch at instantiation time.
  6. // NOTE The source directory must exist
  7. // mkdir text-logs

  8. val df = spark.readStream
  9.   .format("text")
  10.   .option("maxFilesPerTrigger", 1)
  11.   .load("text-logs")

  12. scala> df.printSchema
  13. root
  14. |-- value: string (nullable = true)
复制代码

板凳
Lisrelchen 发表于 2017-5-15 07:43:46
  1. Creating KafkaSourceRDD Instance

  2. KafkaSourceRDD takes the following when created:
  3. SparkContext
  4. Collection of key-value settings for executors reading records from Kafka topics
  5. Collection of KafkaSourceRDDOffsetRange offsets
  6. Timeout (in milliseconds) to poll data from Kafka
  7. Used when KafkaSourceRDD is requested for records (for given offsets) and in turn requests CachedKafkaConsumer to poll for Kafka’s ConsumerRecords.
  8. Flag to…FIXME
  9. Flag to…FIXME
  10. KafkaSourceRDD initializes the internal registries and counters.
复制代码

报纸
Lisrelchen 发表于 2017-5-15 07:45:13
MemoryStream
  1. val spark: SparkSession = ???

  2. implicit val ctx = spark.sqlContext

  3. import org.apache.spark.sql.execution.streaming.MemoryStream
  4. // It uses two implicits: Encoder[Int] and SQLContext
  5. val intsIn = MemoryStream[Int]

  6. val ints = intsIn.toDF
  7.   .withColumn("t", current_timestamp())
  8.   .withWatermark("t", "5 minutes")
  9.   .groupBy(window($"t", "5 minutes") as "window")
  10.   .agg(count("*") as "total")

  11. import org.apache.spark.sql.streaming.OutputMode
  12. import org.apache.spark.sql.streaming.ProcessingTime
  13. import scala.concurrent.duration._
  14. val totalsOver5mins = ints.writeStream
  15.   .format("memory")
  16.   .queryName("totalsOver5mins")
  17.   .outputMode(OutputMode.Append)
  18.   .trigger(ProcessingTime(30.seconds))
  19.   .start

  20. scala> val zeroOffset = intsIn.addData(0, 1, 2)
  21. zeroOffset: org.apache.spark.sql.execution.streaming.Offset = #0

  22. totalsOver5mins.processAllAvailable()
  23. spark.table("totalsOver5mins").show

  24. scala> intsOut.show
复制代码

地板
Lisrelchen 发表于 2017-5-15 07:46:32
TextSocketSource

TextSocketSource is a streaming source that reads lines from a socket at the host and port (defined by parameters).+

It uses lines internal in-memory buffer to keep all of the lines that were read from a socket forever.
  1. import org.apache.spark.sql.SparkSession
  2. val spark: SparkSession = SparkSession.builder.getOrCreate()

  3. // Connect to localhost:9999
  4. // You can use "nc -lk 9999" for demos
  5. val textSocket = spark.readStream
  6.   .format("socket")
  7.   .option("host", "localhost")
  8.   .option("port", 9999)
  9.   .load

  10. import org.apache.spark.sql.Dataset
  11. val lines: Dataset[String] = textSocket.as[String].map(_.toUpperCase)

  12. val query = lines.writeStream.format("console").start

  13. // Start typing the lines in nc session
  14. // They will appear UPPERCASE in the terminal
复制代码

7
Lisrelchen 发表于 2017-5-15 07:46:56
  1. DataStreamWriter

  2. DataStreamWriter is a part of Structured Streaming that is responsible for writing the output of streaming queries to sinks and therefore starting their execution.
  3. val people: Dataset[Person] = ...

  4. import org.apache.spark.sql.streaming.ProcessingTime
  5. import scala.concurrent.duration._
  6. import org.apache.spark.sql.streaming.OutputMode.Complete
  7. df.writeStream
  8.   .queryName("textStream")
  9.   .outputMode(Complete)
  10.   .trigger(ProcessingTime(10.seconds))
  11.   .format("console")
  12.   .start
复制代码

8
Lisrelchen 发表于 2017-5-15 07:47:57
  1. ConsoleSink

  2. ConsoleSink is a streaming sink that is registered as the console format.
  3. val spark: SparkSession = ...
  4. spark.readStream
  5.   .format("text")
  6.   .load("server-logs/*.out")
  7.   .as[String]
  8.   .writeStream
  9.   .queryName("server-logs processor")
  10.   .format("console")  // <-- uses ConsoleSink
  11.   .start

  12. scala> spark.streams.active.foreach(println)
  13. Streaming Query - server-logs processor [state = ACTIVE]

  14. // in another terminal
  15. $ echo hello > server-logs/hello.out
复制代码

9
Lisrelchen 发表于 2017-5-15 07:48:35
  1. FileStreamSink

  2. FileStreamSink is the streaming sink for the parquet format.

  3. import scala.concurrent.duration._
  4. import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime}
  5. val out = in.writeStream
  6.   .format("parquet")
  7.   .option("path", "parquet-output-dir")
  8.   .option("checkpointLocation", "checkpoint-dir")
  9.   .trigger(ProcessingTime(5.seconds))
  10.   .outputMode(OutputMode.Append)
  11.   .start()
  12. FileStreamSink supports Append output mode only.
  13. It uses spark.sql.streaming.fileSink.log.deletion (as isDeletingExpiredLog)
复制代码

10
Lisrelchen 发表于 2017-5-15 07:49:26
  1. ForeachSink

  2. ForeachSink is a typed Sink that passes records (of the type T) to ForeachWriter (one record at a time per partition).
  3. It is used exclusively in foreach operator.
  4. val records = spark.readStream
  5.   .format("text")
  6.   .load("server-logs/*.out")
  7.   .as[String]

  8. import org.apache.spark.sql.ForeachWriter
  9. val writer = new ForeachWriter[String] {
  10.   override def open(partitionId: Long, version: Long) = true
  11.   override def process(value: String) = println(value)
  12.   override def close(errorOrNull: Throwable) = {}
  13. }

  14. records.writeStream
  15.   .queryName("server-logs processor")
  16.   .foreach(writer)
  17.   .start
复制代码

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注jltj
拉您入交流群
GMT+8, 2025-12-30 06:45