2015年3月18日水曜日

[Hadoop] MapReduceでHBaseにバルクロードする

JavaでHBaseへデータを登録する方法についてのメモ。
一連の流れが具体的に書かれてるようなのが見つからなかったので。
詳しくは馬本を読めとのこと。。

[目的]
HDFSファイルの内容をHBaseに登録すること。
一行ずつ直接HBaseにPutしてもいいけど、ファイルの中身が膨大なんでMapReduceを使いたいのと、登録はバルクロードでやりたい。

[概要]
・MapReduceでHFile(※)を生成する
・HFileのあるディレクトリを指定してHBaseにバルクロードする
※HBase用データファイル

[方法]
全体のソースは丸ごと最後に書くので、部分的に。

まずMapReduceJobの設定。
Inputはファイルの中身のテキストを読み込むので、
job.setInputFormatClass(TextInputFormat.class);


OutputはHBaseへの登録。なのだが、それは最終的な出力であって、MapReduceの出力はあくまで"HFileの生成"。
OutputFormatには以下のように"HFIleOutputFormat"を指定。
job.setOutputFormatClass(HFileOutputFormat.class);

そして、HFileOutputFormat.configureIncrementalLoad() というメソッドを使って設定をしておく。これをやっとけば勝手に指定HTbaleのHFile生成用Reducerを作ってくれる。
HFileOutputFormat.configureIncrementalLoad(job, hTable);

ちなみに、MapReduceではJobの結果ファイルを出力するが、その時指定した出力先ディレクトリがHFileの出力先になる。指定ディレクトリ配下に、HBaseへPutするファミリーの名前で階層が自動で作られ、そこにHFileができる。
FileOutputFormat.setOutputPath(job, outputDir); 

Mapper側では、InputはTextでOutputは以下のキー・バリューを使用する。
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);

ImmutableBytesWritableはHBaseに登録するRowKeyになる。
そして、そのキーに対して登録する内容をPutクラスで作ってバリューにセットする。
ここで出力をPutにしたことで、HFileOutputFormatは自動で"PutSortReducer"というReducerを自動で使ってくれる。あとはKeyValueクラスも使えるが、他の形式には対応していない。
正直、Incrementにも対応していて欲しかった。拡張もできねーし。。

話戻りますと、↑のような感じなのでReducerはナシ。作るのはMapperのみ。

で、Jobを実行。無事Jobが終わったら、HFileが出来ているので、ディレクトリを指定してバルクロード。"LoadIncrementalHFiles"というAPIを使う。(JavaのAPIがあることに中々気づけなくて苦労した。。)
以下のように実行。
LoadIncrementalHFiles loadTool = new LoadIncrementalHFiles(conf);
loadTool.doBulkLoad(outputDir, hTable);

これでバルクロードしてくれる。超早いです。
Jobが終わってからファイルでロードするので、Jobが途中でこけたりしても問題なし。
中途半端にデータが入ったりせず、簡単に再実行できる。


以下サンプル。

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TestBulkLoadJob {

 /**
  * 
 
  * args[0]:InputPath
  * args[1]:OutputDir
  * args[2]:HTableName
  * 
* @param args */ public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); String inputPath = args[0]; Path outputDir = new Path(args[1]); HTable hTable = new HTable(conf, args[2]); Job job = new Job(conf, "hbase bulk load job"); job.setJarByClass(TestBulkLoadJob.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); job.setMapperClass(TestMapper.class); job.setNumReduceTasks(0); // Reduceは勝手にPutSortReducerでやってくれる job.setInputFormatClass(TextInputFormat.class); // ファイルから入力 job.setOutputFormatClass(HFileOutputFormat.class); // HBaseへ出力 // Input設定 FileInputFormat.setInputPaths(job, inputPath); // Output設定 FileOutputFormat.setOutputPath(job, outputDir); // 結果ファイル(HFileを含む)出力先 HFileOutputFormat.configureIncrementalLoad(job, hTable); // HFileのOutput設定 // Job実行 boolean success = job.waitForCompletion(true); if (success) { // Jobが成功したらHFileのあるディレクトリを指定してバルクロード LoadIncrementalHFiles loadTool = new LoadIncrementalHFiles(conf); loadTool.doBulkLoad(outputDir, hTable); } System.exit(success ? 0 : 1); } public static class TestMapper extends Mapper { @Override protected void map(Writable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); // HBaseのRowKey(とりあえずlineをそのままkeyとしておく) byte[] rowKey = Bytes.toBytes(line); // 登録用Putオブジェクト作成(とりあえずカウント(1)をセットしておく) Put put = new Put(rowKey); put.add(Bytes.toBytes("family"), Bytes.toBytes("qualifier"), Bytes.toBytes(1L)); // MapのOutputKeyはRowKeyで作成 ImmutableBytesWritable imKey = new ImmutableBytesWritable(rowKey); // contextにkeyとvalue(Put)をセット context.write(imKey, put); } } }