- 阅读权限
- 255
- 威望
- 0 级
- 论坛币
- 49957 个
- 通用积分
- 79.5487
- 学术水平
- 253 点
- 热心指数
- 300 点
- 信用等级
- 208 点
- 经验
- 41518 点
- 帖子
- 3256
- 精华
- 14
- 在线时间
- 766 小时
- 注册时间
- 2006-5-4
- 最后登录
- 2022-11-6
|
- Example 49. Using Additional Parameters
- val sc = new SparkContext("local", "test")
- val config = new HBaseConfiguration()
- val hbaseContext = new HBaseContext(sc, config)
- val stagingFolder = ...
- val rdd = sc.parallelize(Array(
- (Bytes.toBytes("1"),
- (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
- (Bytes.toBytes("3"),
- (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...
- val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions]
- val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128, "PREFIX")
- familyHBaseWriterOptions.put(Bytes.toBytes("columnFamily1"), f1Options)
- rdd.hbaseBulkLoad(TableName.valueOf(tableName),
- t => {
- val rowKey = t._1
- val family:Array[Byte] = t._2(0)._1
- val qualifier = t._2(0)._2
- val value = t._2(0)._3
- val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
- Seq((keyFamilyQualifier, value)).iterator
- },
- stagingFolder.getPath,
- familyHBaseWriterOptions,
- compactionExclude = false,
- HConstants.DEFAULT_MAX_FILE_SIZE)
- val load = new LoadIncrementalHFiles(config)
- load.doBulkLoad(new Path(stagingFolder.getPath),
- conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
复制代码
|
|