[code]##
#
# Spark for Python - Chapter 5 - Code
#
##
#
# Spark Streaming Wordcount - netcat client
#
# Spark example code from:
# https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/network_wordcount.py
#
"""
Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
Usage: network_wordcount.py <hostname> <port>
<hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
To run this on your local machine, you need to first run a Netcat server
`$ nc -lk 9999`
and then run the example
`$ bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999`
"""
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
#
#
# Twitter Streaming API Spark Streaming into an RDD-Queue to process tweets live
#
#
"""
Twitter Streaming API Spark Streaming into an RDD-Queue to process tweets live
Create a queue of RDDs that will be mapped/reduced one at a time in
1 second intervals.
To run this example use
'$ bin/spark-submit examples/AN_Spark/AN_Spark_Code/twitterstreaming.py'
"""
#
import time
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import twitter
import dateutil.parser
import json
#
#
# Connecting Streaming Twitter with Streaming Spark via Queue
#
class Tweet(dict):
def __init__(self, tweet_in):
super(Tweet, self).__init__(self)
if tweet_in and 'delete' not in tweet_in:
self['timestamp'] = dateutil.parser.parse(tweet_in[u'created_at']
).replace(tzinfo=None).isoformat()
self['text'] = tweet_in['text'].encode('utf-8')
#self['text'] = tweet_in['text']
self['hashtags'] = [x['text'].encode('utf-8') for x in tweet_in['entities']['hashtags']]
#self['hashtags'] = [x['text'] for x in tweet_in['entities']['hashtags']]
self['geo'] = tweet_in['geo']['coordinates'] if tweet_in['geo'] else None
self['id'] = tweet_in['id']
self['screen_name'] = tweet_in['user']['screen_name'].encode('utf-8')
#self['screen_name'] = tweet_in['user']['screen_name']
self['user_id'] = tweet_in['user']['id']
def connect_twitter():
twitter_stream = twitter.TwitterStream(auth=twitter.OAuth(
token = "get_your_own_credentials",
token_secret = "get_your_own_credentials",
consumer_key = "get_your_own_credentials",
consumer_secret = "get_your_own_credentials"))
return twitter_stream
def get_next_tweet(twitter_stream):
stream = twitter_stream.statuses.sample(block=True)
# testing = stream.next() # This is just to make sure the stream is emitting data.
tweet_in = None
while not tweet_in or 'delete' in tweet_in:
tweet_in = stream.next()
tweet_parsed = Tweet(tweet_in)
# print(json.dumps(tweet_in, indent=2, sort_keys=True))
# return json.dumps(tweet_in, indent=2, sort_keys=True)
return json.dumps(tweet_parsed)
def process_rdd_queue(twitter_stream):
# Create the queue through which RDDs can be pushed to
# a QueueInputDStream
rddQueue = []
for i in range(3):
rddQueue += [ssc.sparkContext.parallelize([get_next_tweet(twitter_stream)], 5)]
lines = ssc.queueStream(rddQueue)
lines.pprint()
if __name__ == "__main__":
sc = SparkContext(appName="PythonStreamingQueueStream")
ssc = StreamingContext(sc, 1)
# Instantiate the twitter_stream
twitter_stream = connect_twitter()
# Get RDD queue of the streams json or parsed
process_rdd_queue(twitter_stream)
ssc.start()
time.sleep(2)
ssc.stop(stopSparkContext=True, stopGraceFully=True)
#
#
# Kafka and Spark Streaming
#
#
#
# kafka producer
#
#
import time
from kafka.common import LeaderNotAvailableError
from kafka.client import KafkaClient
from kafka.producer import SimpleProducer
from datetime import datetime
def print_response(response=None):
if response:
print('Error: {0}'.format(response[0].error))
print('Offset: {0}'.format(response[0].offset))
def main():
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
try:
time.sleep(5)
topic = 'test'
for i in range(5):
time.sleep(1)
msg = 'This is a message sent from the kafka producer: ' \
+ str(datetime.now().time()) + ' -- '\
+ str(datetime.now().strftime("%A, %d %B %Y %I:%M%p"))
print_response(producer.send_messages(topic, msg))
except LeaderNotAvailableError:
# https://github.com/mumrah/kafka-python/issues/249
time.sleep(1)
print_response(producer.send_messages(topic, msg))
kafka.close()
if __name__ == "__main__":
main()
#
# kafka consumer
# consumes messages from a
|