神创天陆活动专区

Hive 数据导入HBase的2种方法详解

最近经常被问到这个问题,所以简单写一下总结。

Hive数据导入到HBase基本有2个方案:

1、HBase中建表,然后Hive中建一个外部表,这样当Hive中写入数据后,HBase中也会同时更新

2、MapReduce读取Hive数据,然后写入(API或者Bulkload)到HBase

1、Hive 外部表

创建hbase表

(1) 建立一个表格classes具有1个列族user

create 'classes','user'

(2) 查看表的构造

hbase(main):005:0> describe 'classes'

DESCRIPTION ENABLED

'classes', {NAME => 'user', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', true

VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => '2147483647', KEEP_DELETED_CELLS => '

false', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}

(3) 加入2行数据

put 'classes','001','user:name','jack'

put 'classes','001','user:age','20'

put 'classes','002','user:name','liza'

put 'classes','002','user:age','18'

(4) 查看classes中的数据

hbase(main):016:0> scan 'classes'

ROW COLUMN+CELL

001 column=user:age, timestamp=1404980824151, value=20

001 column=user:name, timestamp=1404980772073, value=jack

002 column=user:age, timestamp=1404980963764, value=18

002 column=user:name, timestamp=1404980953897, value=liza

(5) 创建外部hive表,查询验证

create external table classes(id int, name string, age int)

STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'

WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,user:name,user:age")

TBLPROPERTIES("hbase.table.name" = "classes");

select * from classes;

OK

1 jack 20

2 liza 18

(6)再添加数据到HBase

put 'classes','003','user:age','1820183291839132'

hbase(main):025:0> scan 'classes'

ROW COLUMN+CELL

001 column=user:age, timestamp=1404980824151, value=20

001 column=user:name, timestamp=1404980772073, value=jack

002 column=user:age, timestamp=1404980963764, value=18

002 column=user:name, timestamp=1404980953897, value=liza

003 column=user:age, timestamp=1404981476497, value=1820183291839132

(7)Hive查询,看看新数据

select * from classes;

OK

1 jack 20

2 liza 18

3 NULL NULL --这里是null了,因为003没有name,所以补位Null,而age为Null是因为超过最大值

(8)如下作为验证

put 'classes','004','user:name','test'

put 'classes','004','user:age','1820183291839112312' -- 已经超int了

hbase(main):030:0> scan 'classes'

ROW COLUMN+CELL

001 column=user:age, timestamp=1404980824151, value=20

001 column=user:name, timestamp=1404980772073, value=jack

002 column=user:age, timestamp=1404980963764, value=18

002 column=user:name, timestamp=1404980953897, value=liza

003 column=user:age, timestamp=1404981476497, value=1820183291839132

004 column=user:age, timestamp=1404981558125, value=1820183291839112312

004 column=user:name, timestamp=1404981551508, value=test

select * from classes;

1 jack 20

2 liza 18

3 NULL NULL

4 test NULL -- 超int后也认为是null

put 'classes','005','user:age','1231342'

hbase(main):034:0* scan 'classes'

ROW COLUMN+CELL

001 column=user:age, timestamp=1404980824151, value=20

001 column=user:name, timestamp=1404980772073, value=jack

002 column=user:age, timestamp=1404980963764, value=18

002 column=user:name, timestamp=1404980953897, value=liza

003 column=user:age, timestamp=1404981476497, value=1820183291839132

004 column=user:age, timestamp=1404981558125, value=1820183291839112312

004 column=user:name, timestamp=1404981551508, value=test

005 column=user:age, timestamp=1404981720600, value=1231342

select * from classes;

1 jack 20

2 liza 18

3 NULL NULL

4 test NULL

5 NULL 1231342

注意点:

1、hbase中的空cell在hive中会补null

2、hive和hbase中不匹配的字段会补null

3、Bytes类型的数据,建hive表示加#b

http://stackoverflow.com/questions/12909118/number-type-value-in-hbase-not-recognized-by-hive

http://www.aboutyun.com/thread-8023-1-1.html

4、HBase CF to hive Map

https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration

2、MapReduce 写入 HBase

MR写入到HBase有2个常用方法,1是直接调用HBase Api,使用Table 、Put写入;2是通过MR生成HFile,然后Bulkload到HBase,数据量很大的时候推荐使用。

注意点:

1、如果需要从hive的路径中读取一些值怎么办

private String reg = "stat_date=(.*?)\\/softid=([\\d]+)/";

private String stat_date;

private String softid;

------------厦门map函数中写入-------------

String filePathString = ((FileSplit) context.getInputSplit()).getPath().toString();

///user/hive/warehouse/snapshot.db/stat_all_info/stat_date=20150820/softid=201/000000_0

// 解析stat_date 和softid

Pattern pattern = Pattern.compile(reg);

Matcher matcher = pattern.matcher(filePathString);

while(matcher.find()){

stat_date = matcher.group(1);

softid = matcher.group(2);

}

2、hive中的map和list怎么处理

hive中的分隔符主要有8种,分别是\001-----> \008

默认 ^A \001

, ^B \002

: ^C \003

Hive中保存的Lis,最底层的数据格式为 jerrick, liza, tom, jerry , Map的数据格式为 jerrick:23, liza:18, tom:0

所以在MR读入时需要简单处理下,例如map需要: "{"+ mapkey.replace("\002", ",").replace("\003", ":")+"}", 由此再转为JSON, toString后再保存到HBase。

3、简单实例,代码删减很多,仅可参考!

public void map(

LongWritable key,

Text value,

Mapper.Context context) {

String filePathString = ((FileSplit) context.getInputSplit()).getPath().toString();

///user/hive/warehouse/snapshot.db/stat_all_info/stat_date=20150820/softid=201/000000_0

// 解析stat_date 和softid

Pattern pattern = Pattern.compile(reg);

Matcher matcher = pattern.matcher(filePathString);

while(matcher.find()){

stat_date = matcher.group(1);

softid = matcher.group(2);

}

rowMap.put("stat_date", stat_date);

rowMap.put("softid", softid);

String[] vals = value.toString().split("\001");

try {

Configuration conf = context.getConfiguration();

String cf = conf.get("hbase.table.cf", HBASE_TABLE_COLUME_FAMILY);

String arow = rowkey;

for(int index=10; index < vals.length; index++){

byte[] row = Bytes.toBytes(arow);

ImmutableBytesWritable k = new ImmutableBytesWritable(row);

KeyValue kv = new KeyValue();

if(index == vals.length-1){

//dict need

logger.info("d is :" + vals[index]);

logger.info("d is :" + "{"+vals[index].replace("\002", ",").replace("\003", ":")+"}");

JSONObject json = new JSONObject("{"+vals[index].replace("\002", ",").replace("\003", ":")+"}");

kv = new KeyValue(row, cf.getBytes(),Bytes.toBytes(valueKeys[index]), Bytes.toBytes(json.toString()));

}else{

kv = new KeyValue(row, cf.getBytes(),Bytes.toBytes(valueKeys[index]), Bytes.toBytes(vals[index]));

}

context.write(k, kv);

}

} catch (Exception e1) {

context.getCounter("offile2HBase", "Map ERROR").increment(1);

logger.info("map error:" + e1.toString());

}

context.getCounter("offile2HBase", "Map TOTAL").increment(1);

}

}

4、bulkload

int jobResult = (job.waitForCompletion(true)) ? 0 : 1;

logger.info("jobResult=" + jobResult);

Boolean bulkloadHfileToHbase = Boolean.valueOf(conf.getBoolean("hbase.table.hfile.bulkload", false));

if ((jobResult == 0) && (bulkloadHfileToHbase.booleanValue())) {

LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);

loader.doBulkLoad(outputDir, hTable);

}