楼主: Nicolle
2817 21

Mastering Spark for Data Science [推广有奖]

11
Nicolle 学生认证  发表于 2017-5-8 05:15:09 |只看作者 |坛友微信交流群
提示: 作者被禁止或删除 内容自动屏蔽

使用道具

12
Nicolle 学生认证  发表于 2017-5-8 05:15:49 |只看作者 |坛友微信交流群
提示: 作者被禁止或删除 内容自动屏蔽

使用道具

13
Nicolle 学生认证  发表于 2017-5-8 05:16:20 |只看作者 |坛友微信交流群
提示: 作者被禁止或删除 内容自动屏蔽

使用道具

14
Nicolle 学生认证  发表于 2017-5-8 05:16:54 |只看作者 |坛友微信交流群
提示: 作者被禁止或删除 内容自动屏蔽

使用道具

15
Nicolle 学生认证  发表于 2017-5-8 05:17:21 |只看作者 |坛友微信交流群
提示: 作者被禁止或删除 内容自动屏蔽

使用道具

16
HappyAndy_Lo 发表于 2017-5-8 12:40:00 |只看作者 |坛友微信交流群
喜欢!!!

使用道具

17
Lisrelchen 发表于 2017-8-17 09:59:58 |只看作者 |坛友微信交流群
  1. Introducing character class masks

  2. There is another simple type of data profiling that we can also apply that helps with file inspection. It involves profiling the actual bytes that make up a whole file. It is an old method, one that originally comes from cryptography where frequency analysis of letters in texts was used to gain an edge on deciphering substitution codes.

  3. While not a common technique in data science circles today, byte level analysis is surprisingly useful when it's needed. In the past, data encodings were a massive problem. Files were encoded in a range of code pages, across ASCII and EBCDIC standards. Byte frequency reporting was often critical to discover the actual encoding, delimiters, and line endings used in the files. Back, then the number of people who could create files, but not technically describe them, waqs surprising. Today, as the world moves increasingly to Unicode-based character encodings, these old methods need updating. In Unicode, the concept of a byte is modernized to multi-byte code points, which can be revealed in Scala using the following function:

  4. val tst = "Andrew Introducing character class masks"

  5. def toCodePointVector(input: String) = input.map{
  6.     case (i) if i > 65535 =>
  7.         val hchar = (i - 0x10000) / 0x400 + 0xD800
  8.         val lchar = (i - 0x10000) % 0x400 + 0xDC00
  9.         f"\\u$hchar%04x\\u$lchar%04x"
  10.     case (i) if i > 0 => f"\\u$i%04x"
  11.     // kudos to Ben Reich: http://k.bytefreq.com/1MjyvNz
  12.     }
  13.    
  14. val out = toCodePointVector(tst)

  15. val rows = sc.parallelize(out)
  16. rows.countByValue().foreach(println)

  17. // results in the following: [codepoint], [Frequency_count]
  18. (\u0065,1)
  19. (\u03d6,1)
  20. (\u006e,1)
  21. (\u0072,1)
  22. (\u0077,1)
  23. (\u0041,1)
  24. (\u0020,2)
  25. (\u6f22,1)
  26. (\u0064,1)
  27. (\u5b57,1)
  28. Using this function, we can begin to profile any international character level data we receive in our GDELT dataset and start to understand the complexities we might face in exploiting the data. But, unlike the other masks, to create interpretable results from code points, we require a dictionary that we can use to look up meaningful contextual information, such as unicode category and the unicode character names.

  29. To generate a contextual lookup, we can use this quick command line hack to generate a reduced dictionary from the main one found at unicode.org, which should help us to better report on our findings:

  30. $ wget ftp://ftp.unicode.org/Public/UNIDATA/UnicodeData.txt      
  31. $ cat UnicodeData.txt | gawk -F";" '{OFS=";"} {print $1,$3,$2}' \
  32.   | sed 's/-/ /g'| gawk '{print $1,$2}'| gawk -F";" '{OFS="\t"} \
  33.   length($1) < 5 {print $1,$2,$3}' > codepoints.txt

  34. # use "hdfs dfs -put" to load codepoints.txt to hdfs, so  
  35. # you can use it later

  36. head -1300 codepoints.txt | tail -4
  37. 0513      Ll    CYRILLIC SMALL
  38. 0514      Lu    CYRILLIC CAPITAL
  39. 0515      Ll    CYRILLIC SMALL
  40. 0516      Lu    CYRILLIC CAPITAL

  41. We will use this dictionary, joined to our discovered code points, to report on the character class frequencies of each byte in the file. While it seems like a simple form of analysis, the results can often be surprising and offer a forensic level of understanding of the data we are handling, its source, and the types of algorithms and methods we can apply successfully to it. We will also look up the general Unicode Category to simplify our reports using the following lookup table:

  42. Cc  Other, Control
  43. Cf  Other, Format
  44. Cn  Other, Not Assigned
  45. Co  Other, Private Use
  46. Cs  Other, Surrogate
  47. LC  Letter, Cased
  48. Ll  Letter, Lowercase
  49. Lm  Letter, Modifier
  50. Lo  Letter, Other
  51. Lt  Letter, Titlecase
  52. Lu  Letter, Uppercase
  53. Mc  Mark, Spacing Combining
  54. Me  Mark, Enclosing
  55. Mn  Mark, Nonspacing
  56. Nd  Number, Decimal Digit
  57. Nl  Number, Letter
  58. No  Number, Other
  59. Pc  Punctuation, Connector
  60. Pd  Punctuation, Dash
  61. Pe  Punctuation, Close
  62. Pf  Punctuation, Final quote
  63. Pi  Punctuation, Initial quote
  64. Po  Punctuation, Other
  65. Ps  Punctuation, Open
  66. Sc  Symbol, Currency
  67. Sk  Symbol, Modifier
  68. Sm  Symbol, Math
  69. So  Symbol, Other
  70. Zl  Separator, Line
  71. Zp  Separator, Paragraph
  72. Zs  Separator, Space
复制代码

使用道具

18
Lisrelchen 发表于 2017-8-17 10:01:27 |只看作者 |坛友微信交流群
  1. Introducing mask based data profiling

  2. A simple but effective method for quickly exploring new types of data is to make use of mask based data profiling. A mask in this context is a transformation function for a string that generalizes a data item into a feature, that, as a collection of masks, will have a lower cardinality than the original values in the field of study.

  3. When a column of data is summarized into mask frequency counts, a process commonly called data profiling, it can offer rapid insights into the common structures and content of the strings, and hence reveal how the raw data was encoded. Consider the following mask for exploring data:

  4. Translate uppercase letters to A
  5. Translate lowercase letters to a
  6. Translate numbers, 0 through 9, to 9
  7. It seems like a very simple transformation at first glance. As an example, let's apply this mask to a high cardinality field of data, such as the GDELT GKG file's V2.1 Source Common Name field. The documentation suggests it records the common name of the source of the news article being studied, which typically is the name of the website the news article was scraped from. Our expectation is that it contains domain names, such as nytimes.com.

  8. Before implementing the production solution in Spark, let's prototype a profiler on the Unix command line to provide an example that we can run anywhere:

  9. $ cat 20150218230000.gkg.csv | gawk -F"\t" '{print $4}' | \
  10.   sed "s/[0-9]/9/g; s/[a-z]/a/g; s/[A-Z]/A/g" | sort |    \
  11.   uniq -c | sort -r -n | head -20

  12. 232 aaaa.aaa
  13. 195 aaaaaaaaaa.aaa
  14. 186 aaaaaa.aaa
  15. 182 aaaaaaaa.aaa
  16. 168 aaaaaaa.aaa
  17. 167 aaaaaaaaaaaa.aaa
  18. 167 aaaaa.aaa
  19. 153 aaaaaaaaaaaaa.aaa
  20. 147 aaaaaaaaaaa.aaa
  21. 120 aaaaaaaaaaaaaa.aaa
  22. The output is a sorted count of records found in the Source Common Name column alongside the mask generated by the regular expression (regex). It should be very clear looking at the results of this profiled data that the field contains domain names - or does it? As we have only looked at the most common masks (the top 20 in this case) perhaps the long tail of masks at the other end of the sorted list holds potential data quality issues at a lower frequency.

  23. Rather than looking at just the top 20 masks, or even the bottom 20, we can introduce a subtle change to improve the generalization ability of our mask function. By making the regex collapse multiple adjacent occurrences of lower case letters into a single a character, the mask's cardinality can be reduced without really diminishing our ability to interpret the results. We can prototype this improvement with just a small change to our regex and hopefully view all the masks in one page of output:


  24. $ # note: on a mac use gsed, on linux use sed.
  25. $ hdfs dfs -cat 20150218230000.gkg.csv |                 \
  26.   gawk -F"\t" '{print $4}' | sed "s/[0-9]/9/g; s/[A-Z]/A/g; \
  27.   s/[a-z]/a/g; s/a*a/a/g"| sort | uniq -c | sort -r -n

  28. 2356 a.a
  29. 508 a.a.a
  30.   83 a-a.a
  31.   58 a99.a
  32.   36 a999.a
  33.   24 a-9.a
  34.   21 99a.a
  35.   21 9-a.a
  36.   15 a9.a
  37.   15 999a.a
  38.   12 a9a.a
  39.   11 a99a.a
  40.    8 a-a.a.a
  41.    7 9a.a
  42.    3 a-a-a.a
  43.    2 AAA Aa     <---note here the pattern that stands out
  44.    2 9a99a.a
  45.    2 9a.a.a
  46.    1 a9.a.a
  47.    1 a.99a.a
  48.    1 9a9a.a
  49.    1 9999a.a

  50. Very quickly, we have prototyped a mask that reduces the three thousand or so raw values down to a very short list of 22 values that are easily inspected by eye. As the long tail is now a much shorter tail, we can easily spot any possible outliers in this data field that could represent quality issues or special cases. This type of inspection, although manual, can be very powerful.

  51. Notice, for instance, there is a particular mask in the output, AAA Aa, which doesn't have a dot within it, as we would expect in a domain name. We interpret this finding to mean we've spotted two rows of raw data that are not valid domain names, but perhaps general descriptors. Perhaps this is an error, or an example of what is known as, illogical field use, meaning there could be other values slipping into this column that perhaps should logically go elsewhere.

  52. This is worth investigating, and it is easy to inspect those exact two records. We do so by generating the masks alongside the original data, then filtering on the offending mask to locate the original strings for manual inspection.

  53. Rather than code a very long one liner on the command line, we can inspect these records using a legacy data profiler called bytefreq (short for byte frequencies) written in awk. It has switches to generate formatted reports, database ready metrics, and also a switch to output masks and data side by side. We have open-sourced bytefreq specifically for readers of this book, and suggest you play with it to really understand how useful this technique can be: https://bitbucket.org/bytesumo/bytefreq.

  54. $ # here is a Low Granularity report from bytefreq
  55. $ hdfs dfs –cat 20150218230000.gkg.csv |         \
  56. gawk -F"\t" '{print $4}' | awk -F"," –f        \ ~/bytefreq/bytefreq_v1.04.awk -v header="0" -v report="0"  \
  57.   -v grain="L"

  58. -  ##column_100000001  2356  a.a    sfgate.com
  59. -  ##column_100000001  508  a.a.a    theaustralian.com.au
  60. -  ##column_100000001  109  a9.a    france24.com
  61. -  ##column_100000001  83  a-a.a    news-gazette.com
  62. -  ##column_100000001  44  9a.a    927thevan.com
  63. -  ##column_100000001  24  a-9.a    abc-7.com
  64. -  ##column_100000001  23  a9a.a    abc10up.com
  65. -  ##column_100000001  21  9-a.a    4-traders.com
  66. -  ##column_100000001  8  a-a.a.a  gazette-news.co.uk
  67. -  ##column_100000001  3  9a9a.a    8points9seconds.com
  68. -  ##column_100000001  3  a-a-a.a  the-american-interest.com
  69. -  ##column_100000001  2  9a.a.a    9news.com.au
  70. -  ##column_100000001  2  A Aa    BBC Monitoring
  71. -  ##column_100000001  1  a.9a.a    vancouver.24hrs.ca
  72. -  ##column_100000001  1  a9.a.a    guide2.co.nz

  73. $ hdfs dfs -cat 20150218230000.gkg.csv | gawk                  \
  74. -F"\t" '{print $4}'|gawk -F"," -f ~/bytefreq/bytefreq_v1.04.awk\
  75. -v header="0" -v report="2" -v grain="L" | grep ",A Aa"

  76. BBC Monitoring,A Aa
  77. BBC Monitoring,A Aa
  78. When we inspect the odd mask, A Aa, we can see the offending text found is BBC Monitoring, and in re-reading the GDELT documentation we will see that this is not an error, but a known special case. It means when using this field, we must remember to handle this special case. One way to handle it could be by including a correction rule to swap this string value for a value that works better, for example, the valid domain name www.monitor.bbc.co.uk, which is the data source to which the text string refers.

  79. The idea we are introducing here is that a mask can be used as a key to retrieve offending records in particular fields. This logic leads us to the next major benefit of mask based profiling: the output masks are a form of Data Quality Error Code. These error codes can fall into two categories: a whitelist of good masks, and a blacklist of bad masks that are used to find poor quality data. Thought of this way, masks then form the basis for searching and retrieving data cleansing methods, or perhaps for throwing an alarm or rejecting a record.

  80. The lesson is that we can create Treatment functions to remediate raw strings that are found using a particular mask calculated over data in a particular field. This thinking leads to the following conclusion: we can create a general framework around mask based profiling for doing data quality control and remediation as we read data within our data reading pipeline. This has some really advantageous solution properties:

  81. Generating data quality masks is an on read process; we can accept new raw data and write it to disk then, on read, we can generate masks only when needed at query time - so data cleansing can be a dynamic process.
  82. Treatment functions can then be dynamically applied to targeting remediation efforts that help to cleanse our data at the time of read.
  83. Because previously unseen strings are generalized into masks, new strings can be flagged as having quality issues even if that exact string has never been seen before. This generality helps us to reduce complexity, simplify our processes, and create reusable smart solutions - even across subject areas.
  84. Data items that create masks that do not fall either into mask white-lists, fix-lists, or blacklists can potentially be quarantined for attention; human analysts can inspect the records and hopefully whitelist them, or perhaps create new Treatments Functions that help to get the data out of quarantine and back into production.
  85. Data quarantines can be implemented simply as an on-read filter, and when new remediation functions are created to cleanse or fix data, the dynamic treatments applied at read time will automatically release the corrected data to users without long delays.
  86. Eventually a data quality Treatment library will be created that stabilizes over time. New work is mainly done by mapping and applying the existing treatments to new data. A phone number reformatting Treatment function, for example, can be widely reused over many datasets and projects.
  87. With the method and architectural benefits now explained, the requirements for building a generalized mask based profiler should be clearer. Note that the mask generation process is a classic Hadoop MapReduce process: map input's data out to masks, and reduce those masks back down to summarized frequency counts. Note also how, even in this short example, we have already used two types of masks and each is made up of a pipeline of underlying transformations. It suggests we need a tool that supports a library of predefined masks as well as allowing for user defined masks that can be created quickly and on demand. It also suggests there should be ways to stack the masks to build them up into complex pipelines.

  88. What may not be so obvious yet is that all data profiling done in this way can write profiler metrics to a common output format. This helps to improve reusability of our code through simplifying the logging, storing, retrieval, and consumption of the profiling data.

  89. As an example we should be able to report all mask based profiler metrics using the following schema:

  90. Metric Descriptor
  91. Source Studied
  92. IngestTime
  93. MaskType
  94. FieldName
  95. Occurrence Count
  96. KeyCount   
  97. MaskCount
  98. Description
  99. Once our metrics are captured in this single schema format, we can then build secondary reports using a user interface, such as Zeppelin notebook.

  100. Before we walk through implementing these functions, an introduction to the characte
复制代码

使用道具

19
Lisrelchen 发表于 2017-8-17 10:03:32 |只看作者 |坛友微信交流群
  1. GDELT Ingest

  2. The next stage is to obtain the GDELT data and load it into GeoMesa. There are a number of options here, depending upon how you plan to proceed; if you are just working through this chapter, then you can use a script to download the data in one go:

  3. $ mkdir gdelt && cd gdelt
  4. $ wget http://data.gdeltproject.org/events/md5sums
  5. $ for file in `cat md5sums | cut -d' ' -f3 | grep '^201[56]'` ; do wget http://data.gdeltproject.org/events/$file ; done
  6. $ md5sum -c md5sums 2>&1 | grep '^201[56]'
  7. This will download and verify all of the GDELT events data for 2015 and 2016. The amount of data required is something we need to estimate at this stage, as we do not know how our algorithm is going to work out, so we have chosen two years worth to start with.

  8. An alternative to the script is to read Chapter 2, Data Acquisition, which explains in detail how to configure Apache NiFi to download the GDELT data in real time, and further it loads it to HDFS ready for use. Otherwise, a script to allow the preceding data to be transferred to HDFS is shown as follows:

  9. $ ls -1 *.zip | xargs -n 1 unzip
  10. $ rm *.zip
  11. $ hdfs dfs -copyFromLocal *.CSV hdfs:///data/gdelt/
  12. Note

  13. HDFS uses data blocks; we want to ensure that files are stored as efficiently as possible. Writing a method to aggregate files to the HDFS block size (64 MB by default) will ensure the NameNode memory is not filled with many entries for lots of small files, and will make processing more efficient also. Large files that use more than one block (file size > 64 MB) are known as split files.

  14. We have a substantial amount of data in HDFS (approximately 48 GB for 2015/16). Now, we will load this to Accumulo via GeoMesa.
复制代码

使用道具

20
Lisrelchen 发表于 2017-8-17 10:05:36 |只看作者 |坛友微信交流群
  1. MapReduce to Spark

  2. Since MapReduce (MR) is generally considered dead, or at least dying, it is very useful to know how to create Spark jobs from those existing in MR. The following method can be applied to any MR job. We will consider the GeoMesa Accumulo loading job described in the GeoMesa tutorial (geomesa-examples-gdelt) for this case.

  3. An MR job is typically made up of three parts: the mapper, the reducer, and the driver. The GeoMesa example is a map-only job and therefore requires no reducer. The job takes a GDELT input line, creates a (Key,Value) pair from an empty Text object and the created GeoMesa SimpleFeature, and uses the GeoMesaOutputFormat to load the data to Accumulo. The full code of the MR job can be found in our repository; next this we will work through the key parts and suggest the changes required for Spark.

  4. The job is initiated from the main method; the first few lines are related to parsing the required options from the command line, such as the Accumulo username and password. We then reach:

  5. SimpleFeatureType featureType =
  6.     buildGDELTFeatureType(featureName);
  7. DataStore ds = DataStoreFinder.getDataStore(dsConf);
  8. ds.createSchema(featureType);
  9. runMapReduceJob(featureName, dsConf,
  10.     new Path(cmd.getOptionValue(INGEST_FILE)));
  11. The GeoMesa SimpleFeatureType is the primary mechanism used to store data in a GeoMesa data store and it needs to be initialized once, along with the data store initialization. Once this is done we execute the MR job itself. In Spark, we can pass the arguments via the command line as before, and then do the one-off setup:

  12. spark-submit --class io.gzet.geomesa.ingest /
  13.              --master yarn /
  14.              geomesa-ingest.jar <accumulo-instance-id>
  15. ...
  16. The contents of the jar contain a standard Spark job:

  17. val conf = new SparkConf()
  18. val sc = new SparkContext(conf.setAppName("Geomesa Ingest"))
  19. Parse the command line arguments as before, as well as performing the initialization:

  20. val featureType = buildGDELTFeatureType(featureName)
  21. val ds = DataStoreFinder
  22.    .getDataStore(dsConf)
  23.    .createSchema(featureType)
  24. Now we can load the data from HDFS, using wildcards if required. This creates one partition for each block of the file (64 MB default), resulting in an RDD[String]:

  25. val distDataRDD = sc.textFile(/data/gdelt/*.CSV)
  26. Or we can fix the number of partitions, depending upon our available resources:

  27. val distDataRDD = sc.textFile(/data/gdelt/*.CSV, 20)
  28. Then we can perform the map, where we can embed the function to replace the process in the original MR map method. We create a tuple (Text,SimpleFeatureType) to replicate a (Key, Value) pair so that we can use the OutputFormat in the next step. When Scala Tuples are created in this way, the resulting RDD gains extra methods, such as ReduceByKey, which is functionally equivalent to the MR Reducer (see below for further information on what we should really be using, mapPartitions):

  29. val processedRDD = distDataRDD.map(s =>{
  30.    // Processing as before to build the SimpleFeatureType
  31.    (new Text, simpleFeatureType)
  32. })
  33. Then, we can finally output to Accumulo using the GeomesaOutputFormat from the original job:

  34. processedRDD.saveAsNewAPIHadoopFile("output/path", classOf[Text], classOf[SimpleFeatureType], classOf[GeomesaOutputFormat])
  35. At this stage, we have not mentioned the setup method in the MR job; this method is called before any input is processed to allocate an expensive resource like a database connection, or in our case, a reusable object, and a cleanup method is then used to release that resource if it were to persist when out of scope. In our case, the setup method is used to create a SimpleFeatureBuilder which can be reused during each call of the mapper to build SimpleFeatures for output; there is no cleanup method as the memory is automatically released when the object is out of scope (the code has completed).

  36. The Spark map function only operates on one input at a time, and provides no means to execute code before or after transforming a batch of values. It looks reasonable to simply put the setup and cleanup code before and after a call to map:

  37. // do setup work
  38. val processedRDD = distDataRDD.map(s =>{
  39.    // Processing as before to build the SimpleFeatureType
  40.    (new Text, simpleFeatureType)
  41. })
  42. // do cleanup work
  43. But, this fails for several reasons:

  44. It puts any objects used in map into the map function's closure, which requires that it be serializable (for example, by implementing java.io.Serializable). Not all objects will be serializable, thus exceptions may be thrown.
  45. The map function is a transformation, rather than an operation, and is lazily evaluated. Thus, instructions after the map function are not guaranteed to be executed immediately.
  46. Even if the preceding issues were covered for a particular implementation, we would only be executing code on the driver, not necessarily freeing resources allocated by serialized copies.
  47. The closest counterpart to a mapper in Spark is the mapPartitions method. This method does not map just one value to another value, but maps an Iterator of values to an Iterator of other values, akin to a bulk-map method. This means that the mapPartitions can allocate resources locally at its start:

  48. val processedRDD = distDataRDD.mapPartitions { valueIterator =>
  49.    // setup code for SimpleFeatureBuilder
  50.    val transformed = valueIterator.map( . . . )
  51.    transformed
  52. }
  53. However, releasing resources (cleanup) is not straightforward as we still experience the lazy evaluation problem; if resources are freed after the map, then the iterator may not have evaluated before the disappearance of those resources. One solution to this is as follows:

  54. val processedRDD = distDataRDD.mapPartitions { valueIterator =>
  55.   if (valueIterator.isEmpty) {
  56.     // return an Iterator
  57.   } else {
  58.     //  setup code for SimpleFeatureBuilder
  59.     valueIterator.map { s =>
  60. // Processing as before to build the SimpleFeatureType
  61.       val simpleFeature =
  62.       if (!valueIterator.hasNext) {
  63.        // cleanup here
  64.       }
  65.       simpleFeature
  66.     }
  67.   }
复制代码

使用道具

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注jltj
拉您入交流群

京ICP备16021002-2号 京B2-20170662号 京公网安备 11010802022788号 论坛法律顾问:王进律师 知识产权保护声明   免责及隐私声明

GMT+8, 2024-4-19 14:20