楼主: ReneeBK
1327 1

【Apache Spark】Building Lambda Architecture with Spark Streaming [推广有奖]

  • 1关注
  • 62粉丝

VIP

学术权威

14%

还不是VIP/贵宾

-

TA的文库  其他...

R资源总汇

Panel Data Analysis

Experimental Design

威望
1
论坛币
49402 个
通用积分
51.7504
学术水平
370 点
热心指数
273 点
信用等级
335 点
经验
57815 点
帖子
4006
精华
21
在线时间
582 小时
注册时间
2005-5-8
最后登录
2023-11-26

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币
  1. Building Lambda Architecture with Spark Streaming

  2. August 29, 2014By Gwen Shapira (@gwenshap)5 Comments
  3. Categories: Kafka Spark
  4. The versatility of Apache Spark’s API for both batch/ETL and streaming workloads brings the promise of lambda architecture to the real world.

  5. Few things help you concentrate like a last-minute change to a major project.

  6. One time, after working with a customer for three weeks to design and implement a proof-of-concept data ingest pipeline, the customer’s chief architect told us:

  7. You know, I really like the design – I like how data is validated on arrival. I like how we store the raw data to allow for exploratory analysis while giving the business analysts pre-computed aggregates for faster response times. I like how we automatically handle data that arrives late and changes to the data structure or algorithms.

  8. But, he continued, I really wish there was a real-time component here. There is a one-hour delay between the point when data is collected until it’s available in our dashboards. I understand that this is to improve efficiency and protect us from unclean data. But for some of our use cases, being able to react immediately to new data is more important than being 100% certain of data validity.

  9. Can we quickly add a real-time component to the POC? It will make the results much more impressive for our users.

  10. Without directly articulating it, the architect was referring to what we call the lambda architecture – originally proposed by Nathan Marz – which usually combines batch and real-time components. One often needs both because data arriving in real-time has inherent issues: there is no guarantee that each event will arrive exactly once, so there may be duplicates that will add noise to the data. Data that arrives late due to network or server instability also routinely causes problems. The lambda architecture handles these issues by processing the data twice — once in the real-time view, and a second time in the batch process – to give you one view that is fast, and one that is reliable.

  11. Why Spark?
  12. But this approach comes with a cost: you’ll have to implement and maintain the same business logic in two different systems. For example, if your batch system is implemented with Apache Hive or Apache Pig and your real-time system is implemented with Apache Storm, you need to write and maintain the same aggregates in SQL and in Java. As Jay Kreps noted in his article “Questioning the Lambda Architecture,” this situation very quickly becomes a maintenance nightmare.

  13. Had we implemented the customer’s POC system in Hive, I would have had to tell him: “No, there is not enough time left to re-implement our entire aggregation logic in Storm.” But fortunately, we were using Apache Spark, not Hive, for the customer’s aggregation logic.

  14. Spark is well known as a framework for machine learning, but it is also quite capable for ETL tasks, as well. Spark has clean and easy-to-use APIs (far more readable and with less boilerplate code than MapReduce), and its REPL interface allows for fast prototyping of logic with business users. Obviously, no one complains when the aggregates execute significantly faster than they would with MapReduce.

  15. But the biggest advantage Spark gave us in this case was Spark Streaming, which allowed us to re-use the same aggregates we wrote for our batch application on a real-time data stream. We didn’t need to re-implement the business logic, nor test and maintain a second code base. As a result, we could rapidly deploy a real-time component in the limited time left — and impress not just the users but also the developers and their management.

  16. DIY
  17. Here’s a quick and simple example of how this was done. (For simplicity, only the most important steps are included.) You can see the complete source code here.

  18. First, we wrote a function to implement business logic. In this example, we want to count the number of errors per day in a collection of log events. The log events comprise date and time, followed by a log level, the logging process, and the actual message:

  19. 14/08/07 19:19:26 INFO Executor: Finished task ID 11


  20. To count the number of errors per day, we need to filter by the log level and then count the number of messages for each day:

  21. def countErrors(rdd: RDD[String]): RDD[(String, Int)] = {
  22.   rdd
  23.     .filter(_.contains("ERROR")) // Keep "ERROR" lines
  24.     .map( s => (s.split(" ")(0), 1) ) // Return tuple with date & count
  25.     .reduceByKey(_+_) // Sum counts for each date
  26. }


  27. In the function we filter all lines that contain “ERROR”, then use a map function to set the first word in the line (the date) as the key. Then we run reduce by key to count the number of errors we got for each day.

  28. As you can see, the function transforms one RDD into another. RDD’s are Spark’s main data structure– essentially partitioned, replicated collections. Spark hides the complexity of handling distributed collections from us, and we can work with them like we would with any other collection.

  29. We can use this function in a Spark ETL process to read data from HDFS to an RDD, count errors, and save the results to HDFS:
  30. val sc = new SparkContext(conf)

  31. val lines = sc.textFile(...)
  32. val errCount = countErrors(lines)
  33. errCount.saveAsTextFile(...)


  34. In this example we initialized a SparkContext to execute our code within a Spark cluster. (Note that this is not necessary if you use the Spark REPL, where the SparkContext is initialized automatically.) Once the SparkContext is initialized, we use it to read lines from a file into an RDD and then execute our error count function and save the result back to a file.

  35. The URLs in spark.textFile and errCount.saveAsTextFile can be placed in HDFS by using hdfs://…or to files in local filesystem, Amazon S3, and so on.

  36. Now, suppose we can’t wait an entire day for the error counts, and need to publish updated results every minute during the day. We don’t have to re-implement the aggregation — we can just reuse it in our streaming code:

  37. val ssc = new StreamingContext(sparkConf, 60)

  38. // Create the DStream from data sent over the network
  39. val dStream = ssc.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_AND_DISK_SER)

  40. // Counting the errors in each RDD in the stream
  41. val errCountStream = dStream.transform(rdd => ErrorCount.countErrors(rdd))

  42. // printing out the current error count
  43. errCountStream.foreachRDD(rdd => {
  44.       System.out.println("Errors this minute:%d".format(rdd.first()._2))
  45. })

  46. // creating a stream with running error count
  47. val stateStream = errCountStream.updateStateByKey[Int](updateFunc)

  48. // printing the running error count
  49. stateStream.foreachRDD(rdd => {
  50.       System.out.println("Errors today:%d".format(rdd.first()._2))
  51. })


  52. Once again, we are initializing a context – this time, it’s a SteamingContext. StreamingContext takes a stream of events (in this case from a network socket; production architecture will use a reliable service like Apache Kafka instead) and turns them into a stream of RDDs.

  53. Each RDD represents a micro-batching of the stream. The duration of each micro-batch is configurable (in this case 60-second batches), and can serve to balance between throughput (larger batches) and latency (smaller batches).

  54. We run a map job on the DStream, using our countErrors function to transform each RDD of lines from the stream into an RDD of (date, errorCount).

  55. For each RDD we output the error count for this specific batch, and use the same RDD to update a stream with running totals of the counts. We use this stream to print the running totals.

  56. For simplicity you could print the output to screen, but you can also save it to HDFS, Apache HBase, or Kafka, where real-time applications and users can use it.

  57. Conclusion
  58. To recap: Spark Streaming lets you implement your business logic function once, and then reuse the code in a batch ETL process as well as a streaming process. In the customer engagement I described previously, this versatility allowed us to very quickly implement (within hours) a real-time layer to complement the batch-processing one, impress users and management with a snazzy demo, and make our flight home. But its not just a short term POC win. In the long term, our architecture will require less maintenance overhead and have lower risk for errors resulting from duplicate code bases.

  59. Acknowledgements
  60. Thanks to Hari Shreedharan, Ted Malaska, Grant Henke, and Sean Owen for their valuable input and feedback.

  61. Gwen Shapira is a Software Engineer (and former Solutions Architect) at Cloudera. She is also a co-author of the forthcoming book Hadoop Application Architectures from O’Reilly Media.

  62. facebooktwittergoogle_pluslinkedinmail
  63. analysis apache Apache HBase apache hive apache pig cloudera data demo developers events Hadoop hadoop application HBase HDFS Hive java log MapReduce o'reilly Pig sql streaming use cases
  64. 5 responses on “Building Lambda Architecture with Spark Streaming”


  65. Edward Capriolo
  66. August 29, 2014 at 1:29 pm
  67. But this approach comes with a cost: you’ll have to implement and maintain the same business logic in two different systems. For example, if your batch system is implemented with Apache Hive or Apache Pig and your real-time system is implemented with Apache Storm, you need to write and maintain the same aggregates in SQL and in Java.

  68. False

  69. https://github.com/twitter/summingbird


  70. Gwen Shapira
  71. August 30, 2014 at 12:10 pm
  72. Thanks for commenting, Ed.

  73. Summingbird is awesome and indeed brings to Scalding+Storm the same benefits I see in Spark and Spark Streaming (which can also use the Summingbird library).

  74. I think this reinforces the point that a maintainable Lambda Architecture depends on a smart choice of frameworks.
复制代码

二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:streaming customer promise design change

沙发
ReneeBK 发表于 2017-3-10 09:32:43 |只看作者 |坛友微信交流群
Summingbird

Summingbird is a library that lets you write MapReduce programs that look like native Scala or Java collection transformations and execute them on a number of well-known distributed MapReduce platforms, including Storm and Scalding.

本帖隐藏的内容

https://github.com/twitter/summingbird


  1. While a word-counting aggregation in pure Scala might look like this:

  2.   def wordCount(source: Iterable[String], store: MutableMap[String, Long]) =
  3.     source.flatMap { sentence =>
  4.       toWords(sentence).map(_ -> 1L)
  5.     }.foreach { case (k, v) => store.update(k, store.get(k) + v) }
  6. Counting words in Summingbird looks like this:

  7.   def wordCount[P <: Platform[P]]
  8.     (source: Producer[P, String], store: P#Store[String, Long]) =
  9.       source.flatMap { sentence =>
  10.         toWords(sentence).map(_ -> 1L)
  11.       }.sumByKey(store)
复制代码

使用道具

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注cda
拉您进交流群

京ICP备16021002-2号 京B2-20170662号 京公网安备 11010802022788号 论坛法律顾问:王进律师 知识产权保护声明   免责及隐私声明

GMT+8, 2024-4-20 05:18