楼主: Lisrelchen
1742 18

【数据科学】Pig Design Patterns [推广有奖]

11
ReneeBK(未真实交易用户) 发表于 2017-1-16 06:49:03
The HBase ingress and egress pattern
  1. The egress code
  2. The following code illustrates content storage of a Pig relation into HBase table:

  3. /*
  4. Load the transactions dataset using PigStorage into the relation transactions
  5. */
  6. transactions = LOAD '/user/cloudera/pdp/datasets/hbase/transactions.csv' USING PigStorage( ',' ) AS (
  7.     listing_id: chararray,
  8.     transaction_date: chararray,
  9.     customer_id: int,
  10.     age: chararray,
  11.     residence_area: chararray,
  12.     product_subclass: int,
  13.     product_id: long,
  14.     amount: int,
  15.     asset: int,
  16.     sales_price: int);

  17. /*
  18. * Some processing logic goes here which is deliberately left out to improve readability
  19. */

  20. /*
  21. Use HBaseStorage to store data from the Pig relation transactions into a HBase table hbase://retail_transactions.
  22. The individual contents of transactions are mapped to three column families transaction_details, product_details and customer_details.
  23. */
  24. STORE transactions INTO 'hbase://retail_transactions' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('transaction_details:transaction_date customer_details:customer_id customer_details:age customer_details:residence_area product_details:product_subclass product_details:product_id product_details:amount product_details:asset product_details:sales_price');
复制代码

12
ReneeBK(未真实交易用户) 发表于 2017-1-16 06:50:54
The Hive ingress and egress patterns
  1. Importing data using RCFile

  2. The following code illustrates the usage of HiveColumnarLoader that loads data from a Hive table stored in a RCFile:

  3. /*
  4. Register the Piggybank jar file to be able to use the UDFs in it
  5. */
  6. REGISTER '/usr/share/pig/contrib/piggybank/java/piggybank.jar';

  7. -- Register Hive common and exec jars
  8. REGISTER '/usr/lib/hive/lib/hive-common-0.11.0.1.3.0.0-107.jar';
  9. REGISTER '/usr/lib/hive/lib/hive-exec-0.11.0.1.3.0.0-107.jar';

  10. /*
  11. Load retail_transactions_rc  RCfile and specify the names of the columns of the table and their types in the constructor of HiveColumnarLoader.
  12. */
  13. transactions = LOAD '/apps/hive/warehouse/transactions_db.db/retail_transactions_rc' USING org.apache.pig.piggybank.storage.HiveColumnarLoader('transaction_no int,transaction_date string,cust_no int,amount double,category string,product string,city string,state string,spendby string');

  14. /*
  15. * Some processing logic goes here which is deliberately left out to improve readability
  16. */

  17. /*
  18. Display the contents of the relation transactions on the console
  19. */
  20. DUMP transactions;
复制代码

13
ReneeBK(未真实交易用户) 发表于 2017-1-16 06:51:38
The Hive ingress and egress patterns
  1. Importing data using HCatalog

  2. The following code illustrates the loading of data from Hive using HCatalog:

  3. /*
  4. Specify the table name as the input to the HCatLoader function provided by HCatalog.
  5. This function abstracts the storage location, files type, schema from the user and takes only the table name as input
  6. */
  7. transactions = LOAD 'transactions_db.retail_transactions' USING org.apache.hcatalog.pig.HCatLoader();

  8. /*
  9. * Some processing logic goes here which is deliberately left out to improve readability
  10. */

  11. /*
  12. Display the contents of the relation transactions on the console
  13. */
  14. DUMP transactions;
复制代码

14
ReneeBK(未真实交易用户) 发表于 2017-1-16 06:52:09
The Hive ingress and egress patterns
  1. The following code illustrates the egression of data to Hive using HCatStorer:

  2. -- Register piggybank and hcatalog-pig-adapter jars
  3. REGISTER '/usr/share/pig/contrib/piggybank/java/piggybank.jar';
  4. REGISTER '/usr/lib/hcatalog/share/hcatalog/hcatalog-pig-adapter.jar';

  5. /*
  6. Load the transactions dataset into the relation transactions
  7. */
  8. transactions = LOAD '/user/cloudera/pdp/datasets/hive/retail_transactions.csv' USING org.apache.pig.piggybank.storage.CSVLoader() AS (transaction_no:int, transaction_date:chararray, cust_no:int, amount:double, category:chararray, product:chararray, city:chararray, state:chararray, spendby:chararray);

  9. /*
  10. * Some processing logic goes here which is deliberately left out to improve readability
  11. */

  12. /*
  13. Specify the Hive table name transactions_db.retail_transactions as the input to the HCatStorer function.
  14. The contents of the relation transactions are stored into the Hive table.
  15. */
  16. STORE transactions INTO 'transactions_db.retail_transactions' using org.apache.hcatalog.pig.HCatStorer();
复制代码

15
ReneeBK(未真实交易用户) 发表于 2017-1-16 06:53:57
The mainframe ingestion pattern
  1. The following is a Java code snippet of VSAMLoader, which is a custom loader implementation:

  2. @Override
  3. public ResourceSchema getSchema(String arg0, Job arg1) throws IOException {
  4.   .
  5.   .
  6.   while (it.hasNext()) {
  7.     Map.Entry pairs = (Map.Entry) it.next();
  8.     //Get the next key/value pairs
  9.     String key = (String) pairs.getKey();
  10.     String value = (String) pairs.getValue();
  11.     /*For Group and Alphanumeric types in copybook, return
  12.     pig compliant type chararray*/
  13.     if (value.toString()
  14.     .equals("class net.sf.cb2java.copybook.Group")
  15.     || value.toString().equals("class net.sf.cb2java.copybook.AlphaNumeric")){
  16.        fieldSchemaList.add(new FieldSchema(key,
  17.         org.apache.pig.data.DataType.CHARARRAY));
  18.       }
  19.     /*For Decimal type in copybook, return
  20.     pig compliant type integer*/
  21.     else if (value.toString()
  22.     .equals("class net.sf.cb2java.copybook.Decimal")){
  23.       fieldSchemaList.add(new FieldSchema(key,
  24.         org.apache.pig.data.DataType.INTEGER));
  25.     }
  26.     // Else return default bytearray
  27.     else
  28.     {
  29.        fieldSchemaList.add(new FieldSchema(key,
  30.        org.apache.pig.data.DataType.BYTEARRAY));
  31.     }
  32.     }
  33.   return new ResourceSchema(new Schema(fieldSchemaList));
  34. }
复制代码

16
ReneeBK(未真实交易用户) 发表于 2017-1-16 06:54:57
  1. The code for simple JSON

  2. The code to load JSON files using JsonLoader is shown as follows:

  3. /*
  4. Use JSONLoader UDF, it takes in the parameter of the JSON schema and loads the contents of the JSON file emails.json into a map enron_emails
  5. */
  6. enron_emails = LOAD '/user/cloudera/pdp/datasets/json/emails.json' USING JsonLoader('body:chararray, from:chararray, tos:chararray, ccs:chararray, bccs:chararray, date:chararray, message_id:chararray, subject:chararray');

  7. /*
  8. * Some processing logic goes here which is deliberately left out to improve readability
  9. */

  10. /*
  11. Display the contents of the relation enron_emails on the console
  12. */
  13. DUMP enron_emails;
  14. It is important to note that the JsonLoader does not use the AS clause to supply the schema.
复制代码

17
ReneeBK(未真实交易用户) 发表于 2017-1-16 06:55:20
  1. The code for nested JSON

  2. The Pig script to load nested JSON is shown as follows, and we use the elephant-bird libraries to accomplish this:

  3. /*
  4. Register elephant-bird and JSON jar files
  5. */
  6. REGISTER '/home/cloudera/pdp/jars/elephant-bird-core-3.0.5.jar';
  7. REGISTER '/home/cloudera/pdp/jars/elephant-bird-pig-3.0.5.jar';
  8. REGISTER '/home/cloudera/pdp/jars/json-simple-1.1.1.jar';

  9. /*
  10. Use ElephantBird's JsonLoader for loading a nested JSON file
  11. The parameter –nestedload denotes nested loading operation
  12. */
  13. emails = LOAD '/user/cloudera/pdp/datasets/json/emails.json' USING com.twitter.elephantbird.pig.load.JsonLoader('-nestedLoad');

  14. /*
  15. * Some processing logic goes here which is deliberately left out to improve readability
  16. */

  17. /*
  18. Display the contents of the relation emails on the console
  19. */
  20. DUMP emails;
复制代码

18
ReneeBK(未真实交易用户) 发表于 2017-1-16 06:56:00
  1. The egress code
  2. The following section shows the code and its explanation to egress data stored in a Pig relation to JSON format:

  3. /*
  4. Load the JSON file using JsonLoader to the relation enron_emails
  5. */
  6. enron_emails = LOAD '/user/cloudera/pdp/datasets/json/emails.json' USING JsonLoader('body:chararray, from:chararray, tos:chararray, ccs:chararray, bccs:chararray, date:chararray, message_id:chararray, subject:chararray');

  7. /*
  8. * Some processing logic goes here which is deliberately left out to improve readability
  9. */

  10. /*
  11. Use JsonStorage to store the contents of the relation to a json file
  12. */
  13. STORE enron_emails into '/user/cloudera/pdp/output/json/output.json' USING JsonStorage()
复制代码

19
ReneeBK(未真实交易用户) 发表于 2017-1-16 07:08:54
  1. Pig script
  2. The following is the Pig script illustrating the implementation of this pattern:

  3. /*
  4. Register the datatypeinferer and custom storage jar files
  5. */
  6. REGISTER '/home/cloudera/pdp/jars/datatypeinfererudf.jar';
  7. REGISTER'/home/cloudera/pdp/jars/customdatatypeinfererstorage.jar';

  8. /*
  9. Load the transactions dataset into the relation transactions
  10. */
  11. transactions = LOAD'/user/cloudera/pdp/datasets/data_profiling/transactions.csv'USING  PigStorage(',') AS (transaction_id:long,transaction_date:chararray, cust_id:chararray, age:chararray,area:chararray, prod_subclass:int, prod_id:long, amt:int,asset:int, sales_price:int, phone_no:chararray,country_code:chararray);

  12. /*
  13. Infer the data type of the field cust_id by invoking the DataTypeInfererUDF.
  14. It returns a tuple with the inferred data type.
  15. */
  16. data_types = FOREACH transactions GENERATEcom.profiler.DataTypeInfererUDF(cust_id) AS inferred_data_type;

  17. /*
  18. Compute the count of each data type, total count, percentage.
  19. The data type with the highest count is considered as dominant data type
  20. */
  21. grpd = GROUP data_types BY inferred_data_type;
  22. inferred_type_count = FOREACH grpd GENERATE group ASinferred_type, COUNT(data_types) AS count;
  23. grpd_inf_type_count_all = GROUP inferred_type_count ALL;
  24. total_count = FOREACH grpd_inf_type_count_all GENERATESUM(inferred_type_count.count) AS tot_sum,MAX(inferred_type_count.count) AS max_val;
  25. percentage = FOREACH inferred_type_count GENERATE inferred_type AStype, count AS total_cnt,CONCAT((Chararray)ROUND(count*100.0/total_count.tot_sum),'%') ASpercent,(count==total_count.max_val?'Dominant':'Other') ASinferred_dominant_other_datatype;
  26. percentage_ord = ORDER percentage BYinferred_dominant_other_datatype ASC;

  27. /*
  28. CustomDatatypeInfererStorage UDF extends the StoreFunc. All the abstract methods have been overridden to implement logic that writes the contents of the relation into a file in a custom report like format.
  29. The results are stored on the HDFS in the directory datatype_inferer
  30. */
  31. STORE percentage_ord INTO'/user/cloudera/pdp/output/data_profiling/datatype_inferer'using com.profiler.CustomDatatypeInfererStorage('cust_id','chararray');
复制代码

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注jltj
拉您入交流群
GMT+8, 2026-1-7 07:15