MapReduce Integration

There are two mapreduce packages in HBase as in MapReduce itself: org.apache.hadoop.hbase.mapred and org.apache.hadoop.hbase.mapreduce. The former does old-style API and the latter the new style. The latter has more facility though you can usually find an equivalent in the older package. Pick the package that goes with your MapReduce deploy. When in doubt or starting over, pick the org.apache.hadoop.hbase.mapreduce. In the notes below, we refer to o.a.h.h.mapreduce but replace with the o.a.h.h.mapred if that is what you are using.

By default, MapReduce jobs deployed to a MapReduce cluster do not have access to either the HBase configuration under $HBASE_CONF_DIR or the HBase classes.

To give the MapReduce jobs the access they need, you could add hbase-site.xml to $HADOOP_HOME/conf and add HBase jars to the $HADOOP_HOME/lib directory. You would then need to copy these changes across your cluster. Or you can edit $HADOOP_HOME/conf/hadoop-env.sh and add them to the HADOOP_CLASSPATH variable. However, this approach is not recommended because it will pollute your Hadoop install with HBase references. It also requires you to restart the Hadoop cluster before Hadoop can use the HBase data.

The recommended approach is to let HBase add its dependency jars itself and use HADOOP_CLASSPATH or -libjars.

Since HBase 0.90.x, HBase adds its dependency JARs to the job configuration itself. The dependencies only need to be available on the local CLASSPATH. The following example runs the bundled HBase RowCounter MapReduce job against a table named usertable. If you have not set the environment variables expected in the command (the parts prefixed by a $ sign and surrounded by curly braces), you can use the actual system paths instead. Be sure to use the correct version of the HBase JAR for your system. The backticks (` symbols) cause the shell to execute the sub-commands, setting the output of hbase classpath (the command to dump HBase CLASSPATH) to HADOOP_CLASSPATH. This example assumes you use a BASH-compatible shell.

$ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-server-VERSION.jar rowcounter usertable

When the command runs, internally, the HBase JAR finds the dependencies it needs for ZooKeeper, Guava, and its other dependencies on the passed HADOOP_CLASSPATH and adds the JARs to the MapReduce job configuration. See the source at TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job) for how this is done.

The command hbase mapredcp can also help you dump the CLASSPATH entries required by MapReduce, which are the same jars TableMapReduceUtil#addDependencyJars would add. You can add them together with HBase conf directory to HADOOP_CLASSPATH. For jobs that do not package their dependencies or call TableMapReduceUtil#addDependencyJars, the following command structure is necessary:

$ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf hadoop jar MyApp.jar MyJobMainClass -libjars $(${HBASE_HOME}/bin/hbase mapredcp | tr ‘:’ ‘,’) …

The example may not work if you are running HBase from its build directory rather than an installed location. You may see an error like the following:

java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.mapreduce.RowCounter$RowCounterMapper

If this occurs, try modifying the command as follows, so that it uses the HBase JARs from the target/ directory within the build environment.

$ HADOOP_CLASSPATH=${HBASE_BUILD_HOME}/hbase-server/target/hbase-server-VERSION-SNAPSHOT.jar:`${HBASE_BUILD_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_BUILD_HOME}/hbase-server/target/hbase-server-VERSION-SNAPSHOT.jar rowcounter usertable

Some MapReduce jobs that use HBase fail to launch. The symptom is an exception similar to the following:

Exception in thread “main” java.lang.IllegalAccessError: class

com.google.protobuf.ZeroCopyLiteralByteString cannot access its superclass

com.google.protobuf.LiteralByteString

at java.lang.ClassLoader.defineClass1(Native Method)

at java.lang.ClassLoader.defineClass(ClassLoader.java:792)

at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)

at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)

at java.net.URLClassLoader.access$100(URLClassLoader.java:71)

at java.net.URLClassLoader$1.run(URLClassLoader.java:361)

at java.net.URLClassLoader$1.run(URLClassLoader.java:355)

at java.security.AccessController.doPrivileged(Native Method)

at java.net.URLClassLoader.findClass(URLClassLoader.java:354)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

at

org.apache.hadoop.hbase.protobuf.ProtobufUtil.toScan(ProtobufUtil.java:818)

at

org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.convertScanToString(TableMapReduceUtil.java:433)

at

org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:186)

at

org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:147)

at

org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:270)

at

org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:100)

This is caused by an classloader dependency. This affects both jobs using the -libjars option and “fat jar,” those which package their runtime dependencies in a nested lib folder. In order to satisfy the new classloader requirements, hbase-protocol.jar must be included in Hadoop’s classpath. This can be resolved system-wide by including a reference to the hbase-protocol.jar in Hadoop’s lib directory, via a symlink or by copying the jar into the new location.

This can also be achieved on a per-job launch basis by including it in the HADOOP_CLASSPATH environment variable at job submission time. When launching jobs that package their dependencies, all three of the following job launching commands satisfy this requirement:

$ HADOOP_CLASSPATH=/path/to/hbase-protocol.jar:/path/to/hbase/conf hadoop jar MyJob.jar MyJobMainClass

$ HADOOP_CLASSPATH=$(hbase mapredcp):/path/to/hbase/conf hadoop jar MyJob.jar MyJobMainClass

$ HADOOP_CLASSPATH=$(hbase classpath) hadoop jar MyJob.jar MyJobMainClass

For jars that do not package their dependencies, the following command structure is necessary:

$ HADOOP_CLASSPATH=$(hbase mapredcp):/etc/hbase/conf hadoop jar MyApp.jar MyJobMainClass -libjars $(hbase mapredcp | tr ‘:’ ‘,’) …

MapReduce Scan Caching

TableMapReduceUtil now restores the option to set scanner caching (the number of rows which are cached before returning the result to the client) on the Scan object that is passed in. This functionality was lost due to a bug in HBase 0.95 (HBASE-11558), which is fixed for HBase 0.98.5 and 0.96.3. The priority order for choosing the scanner caching is as follows:

  • Caching settings which are set on the scan object.
  • Caching settings which are specified via the configuration option hbase.client.scanner.caching, which can either be set manually in hbase-site.xml or via the helper method TableMapReduceUtil.setScannerCaching().
  • The default value HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING, which is set to 100.

Optimizing the caching settings is a balance between the time the client waits for a result and the number of sets of results the client needs to receive. If the caching setting is too large, the client could end up waiting for a long time or the request could even time out. If the setting is too small, the scan needs to return results in several pieces. If you think of the scan as a shovel, a bigger cache setting is analogous to a bigger shovel, and a smaller cache setting is equivalent to more shoveling in order to fill the bucket.

The list of priorities mentioned above allows you to set a reasonable default, and override it for specific operations.

Bundled HBase MapReduce Jobs

The HBase JAR also serves as a Driver for some bundled MapReduce jobs. To learn about the bundled MapReduce jobs, run the following command.

$ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-server-VERSION.jar

An example program must be given as the first argument.

Valid program names are:

copytable: Export a table from local cluster to peer cluster

completebulkload: Complete a bulk data load.

export: Write table data to HDFS.

import: Import data written by Export.

importtsv: Import data in TSV format.

rowcounter: Count rows in HBase table

Each of the valid program names are bundled MapReduce jobs. To run one of the jobs, model your command after the following example.

$ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-server-VERSION.jar rowcounter myTable

HBase as a MapReduce Job Data Source and Data Sink

HBase can be used as a data source, TableInputFormat, and data sink, TableOutputFormat or MultiTableOutputFormat, for MapReduce jobs. Writing MapReduce jobs that read or write HBase, it is advisable to subclass TableMapper and/or TableReducer. See the do-nothing pass-through classes IdentityTableMapper and IdentityTableReducer for basic usage. For a more involved example, see RowCounter or review the org.apache.hadoop.hbase.mapreduce.TestTableMapReduce unit test.

If you run MapReduce jobs that use HBase as source or sink, need to specify source and sink table and column names in your configuration.

When you read from HBase, the TableInputFormat requests the list of regions from HBase and makes a map, which is either a map-per-region or mapreduce.job.maps map, whichever is smaller. If your job only has two maps, raise mapreduce.job.maps to a number greater than the number of regions. Maps will run on the adjacent TaskTracker/NodeManager if you are running a TaskTracer/NodeManager and RegionServer per node. When writing to HBase, it may make sense to avoid the Reduce step and write back into HBase from within your map. This approach works when your job does not need the sort and collation that MapReduce does on the map-emitted data. On insert, HBase ‘sorts’ so there is no point double-sorting (and shuffling data around your MapReduce cluster) unless you need to. If you do not need the Reduce, your map might emit counts of records processed for reporting at the end of the job, or set the number of Reduces to zero and use TableOutputFormat. If running the Reduce step makes sense in your case, you should typically use multiple reducers so that load is spread across the HBase cluster.

A new HBase partitioner, the HRegionPartitioner, can run as many reducers the number of existing regions. The HRegionPartitioner is suitable when your table is large and your upload will not greatly alter the number of existing regions upon completion. Otherwise use the default partitioner.

Share this post
[social_warfare]
HBase Commands
HBase Security

Get industry recognized certification – Contact us

keyboard_arrow_up