楼主: ReneeBK
5845 24

[精品图书]Storm Real-Time Processing Cookbook [推广有奖]

  • 1关注
  • 62粉丝

VIP

学术权威

14%

还不是VIP/贵宾

-

TA的文库  其他...

R资源总汇

Panel Data Analysis

Experimental Design

威望
1
论坛币
49402 个
通用积分
51.7504
学术水平
370 点
热心指数
273 点
信用等级
335 点
经验
57815 点
帖子
4006
精华
21
在线时间
582 小时
注册时间
2005-5-8
最后登录
2023-11-26

相似文件 换一批

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币

Efficiently process unbounded streams of data in real time

Overview

  • Learn the key concepts of processing data in real time with Storm
  • Concepts ranging from Log stream processing to mastering data management with Storm
  • Written in a Cookbook style, with plenty of practical recipes with well-explained code examples and relevant screenshots and diagrams

In Detail

Storm is a free and open source distributed real-time computation system. Storm makes it easy to reliably process unbounded streams of data, doing for real-time processing what Hadoop did for batch processing. Storm is simple, can be used with any programming language, and is a lot of fun to use!

Storm Real Time Processing Cookbook will have basic to advanced recipes on Storm for real-time computation.

The book begins with setting up the development environment and then teaches log stream processing. This will be followed by real-time payments workflow, distributed RPC, integrating it with other software such as Hadoop and Apache Camel, and more.

What you will learn from this book

  • Create a log spout
  • Consume messages from a JMS queue
  • Implement unidirectional synchronization based on a data stream
  • Execute disaster recovery on a separate AWS region

Approach

A Cookbook with plenty of practical recipes for different uses of Storm.

Who this book is written for

If you are a Java developer with basic knowledge of real-time processing and would like to learn Storm to process unbounded streams of data in real time, then this book is for you.


Product Details
  • Paperback: 254 pages
  • Publisher: Packt Publishing (May 13 2013)
  • Language: English
  • ISBN-10: 1782164421
  • ISBN-13: 978-1782164425
  • http://www.packtpub.com/big-data-and-business-inteliigence/storm-real-time-processing-cookbook
  • http://www.amazon.com/Real-Time-Processing-Cookbook-Quinton-Anderson/dp/1782164421

本帖隐藏的内容

Packt.Storm Real-time Processing Cookbook 2013.pdf (2.03 MB, 需要: 20 个论坛币)




二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:Processing Real-time Cookbook processI Process management practical concepts examples relevant

已有 7 人评分经验 论坛币 学术水平 热心指数 信用等级 收起 理由
newfei188 + 1 精彩帖子
长风神舞 + 60 精彩帖子
xddlovejiao1314 + 100 + 2 + 2 + 2 精彩帖子
np84 + 100 精彩帖子
狂热的爱好者 + 60 + 60 + 3 + 3 + 3 精彩帖子
rhapsodyr + 100 + 3 + 4 + 3 精彩帖子
离歌レ笑 + 5 + 5 + 5 精彩帖子

总评分: 经验 + 420  论坛币 + 60  学术水平 + 14  热心指数 + 14  信用等级 + 13   查看全部评分

本帖被以下文库推荐

沙发
Nicolle 学生认证  发表于 2014-8-10 21:14:25 |只看作者 |坛友微信交流群

Creating an association rules model in R

提示: 作者被禁止或删除 内容自动屏蔽

使用道具

藤椅
pensifia 发表于 2014-8-10 22:25:20 |只看作者 |坛友微信交流群
Creating Recommendation Engine

A recommendation engine makes intelligent guesses as to what a customer may want to buy based on previous lists of products, which has been made famous by leaders such as Amazon. These lists may be from a current selection within the context of the current session. The list of products may be from previous purchases by the particular customer, and it may even simply be the products that the customer has viewed within a given session. Whichever approach you choose, the training data and scoring data during operational phases must follow the same principles.

In this recipe, we will use the association rules model from the previous recipe to create a recommendation engine. The concept behind the engine is that lists are supplied as asynchronous inputs and recommendations are forwarded as asynchronous outputs where applicable.

Tip

There are product combinations that aren't strongly supported by the model; in these cases, no recommendation is emitted. If you need a recommendation for every single input, you could choose to emit a random recommendation when there is no strongly supported recommendation, or you could choose to improve your model through better and generally larger training datasets.



How to do it…
  • Start by creating a Maven project called arules-topology and add the following dependencies:
    1. <dependency>
    2.          <groupId>com.github.quintona</groupId>
    3.          <artifactId>trident-kafka-push</artifactId>
    4.          <version>1.0-SNAPSHOT</version>
    5.       </dependency>
    6.       <dependency>
    7.          <groupId>storm</groupId>
    8.          <artifactId>storm-kafka</artifactId>
    9.          <version>0.9.0-wip16b-scala292</version>
    10.       </dependency>
    11.       <dependency>
    12.          <groupId>com.github.quintona</groupId>
    13.          <artifactId>storm-r</artifactId>
    14.          <version>0.0.1-SNAPSHOT</version>
    15.       </dependency>
    复制代码

  • Next, create a main topology class called RecommendationTopology using the idiomatic Storm main method. For this recipe, we will be receiving the product list as a JSON array on a Kafka topic. We will therefore need to coerce the byte array input into a tuple containing two separate values, one being the transaction ID and the other being the list of products, as shown in the following lines of code:
    1. public static class CoerceInFunction extends BaseFunction {

    2.       @Override
    3.       public void execute(TridentTuple tuple, TridentCollector collector) {
    4.           String text = new String(tuple.getBinary(0));
    5.           JSONArray array = (JSONArray) JSONValue.parse(text);
    6.           List<String> values = new ArrayList<String>(array.size()-1);
    7.           String id= (String) array.get(0);
    8.           array.remove(0);
    9.           for(Object obj : array){
    10.              values.add((String)obj);
    11.           }
    12.           if(array.size() > 0){
    13.             collector.emit(new Values(id, values));
    14.           }
    15.       }
    16.    }
    复制代码

  • We will also need to publish the output message using the Kafka partition persist. The recommendation and transaction ID need to be coerced into a single value consisting of a JSON array as follows:
    1. public static class CoerceOutFunction extends BaseFunction {
    2.       @Override
    3.       public void execute(TridentTuple tuple, TridentCollector collector) {
    4.           JSONObject obj = new JSONObject();
    5.           obj.put("transaction-id", tuple.getStringByField("transaction-id"));
    6.           obj.put("recommendation", tuple.getStringByField("recommendation"));
    7.           collector.emit(new Values(obj.toJSONString()));
    8.       }
    9.    }
    复制代码

  • We then need to define the topology as described here:
    1. topology.newStream("kafka",
    2.             new TransactionalTridentKafkaSpout(spoutConfig))
    3.                .each(new Fields("bytes"), new CoerceInFunction(),new Fields("transaction-id","current-list"))
    4.                .each(new Fields("current-list"), new ListRFunction(Arrays.asList(new String[] { "arules" }), "recommend").withNamedInitCode("recommend"),
    5.                      new Fields("recommendation"))
    6.                .each(new Fields("transaction-id", "recommendation"),
    7.                      new CoerceOutFunction(), new Fields("message"))
    8.                .partitionPersist(KafkaState.transactional("recommendation-output",
    9.                      new KafkaState.Options()), new Fields("message"),
    10.                      new KafkaStateUpdater("message"), new Fields());
    复制代码

  • The Storm-R project's standard function supports only a known input array size. This works for most use cases; however, for the association case, the input size will vary for each tuple. It is therefore necessary to override the execute function to cater for this particular case as shown here:
    1. public static class ListRFunction extends RFunction {

    2.       public ListRFunction(List<String> libraries, String functionName) {
    3.          super(libraries, functionName);
    4.       }
    5.       
    6.       @Override
    7.       public void execute(TridentTuple tuple, TridentCollector collector) {
    8.          List<String> items = (List<String>) tuple.get(0);
    9.          JSONArray functionInput = new JSONArray();
    10.          functionInput.addAll(items);
    11.          JSONArray result = performFunction(functionInput);
    12.          if(result != null)
    13.             collector.emit(coerceResponce(result));
    14.       }
    15.       
    16.    }
    复制代码

  • These elements are all that is required to create the recommendation engine. You can now start your topology in local mode from Eclipse. In order to test it, a test script is provided with the chapter code bundle named sendSelection.py. This takes a single parameter, which is the number of transactions, to publish onto the queue as follows:
    1. python sendSelection.py 1000
    复制代码

  • You can view the output recommendations by issuing the following command from the Kafka command line:
    1. bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic recommendation-output --from-beginning
    复制代码



使用道具

板凳
fin-qq 发表于 2014-8-11 08:16:00 |只看作者 |坛友微信交流群
thank you for sharing~

使用道具

报纸
xxka917 发表于 2014-8-11 08:53:08 |只看作者 |坛友微信交流群
xiexie  kankan

使用道具

地板
igs816 在职认证  发表于 2014-8-11 09:08:09 |只看作者 |坛友微信交流群
已有,谢谢

使用道具

7
lisiheng 发表于 2014-8-11 15:03:54 |只看作者 |坛友微信交流群
支持!支持!支持!

使用道具

8
rhapsodyr 发表于 2014-8-11 20:26:28 |只看作者 |坛友微信交流群
再加上点简介吧,看着也清楚。

使用道具

9
Nicolle 学生认证  发表于 2015-8-24 02:17:21 |只看作者 |坛友微信交流群

Rule-based analysis of the log stream

提示: 作者被禁止或删除 内容自动屏蔽

使用道具

10
Nicolle 学生认证  发表于 2015-8-24 02:19:13 |只看作者 |坛友微信交流群

Indexing and persisting the log data

提示: 作者被禁止或删除 内容自动屏蔽

使用道具

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注cda
拉您进交流群

京ICP备16021002-2号 京B2-20170662号 京公网安备 11010802022788号 论坛法律顾问:王进律师 知识产权保护声明   免责及隐私声明

GMT+8, 2024-4-19 11:35