Friday, February 10, 2012

(limited) cross row transactions in HBase

Atomic operations in HBase are currently limited to single row operations. This is achieved by co-locating all cells with the same row-key (see introduction-to-hbase) in the same region.

The key observation is that all that is required for atomic operations are co-located cells.

HBASE-5304 allows defining more flexible RegionSplitPolicies (note Javadoc is not yet updated, so this link leads to nowhere right now).
HBASE-5368 provides an example (KeyPrefixRegionSplitPolicies) that co-locates cells with the same row-key-prefix in the same region.
By choosing the prefix smartly, useful ranges of rows can be co-located together. Obviously one needs to be careful, as regions cannot grow without bounds.

HBASE-5229 finally allows cross row atomic operations over multiple rows as long as they are co-located in the same region.
This is a developer-only feature and only accessible through coprocessor endpoints. However, HBASE-5229 also includes an endpoint that can be used directly.

An example can be setup as follows:
1. add this to hbase-site.xml:
  <property>
    <name>hbase.coprocessor.user.region.classes</name>
    <value>org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint</value>
  </property>

This loads the necessary coprocessor endpoint into all regions of all user tables.

2. Create a table that uses KeyPrefixRegionSplitPolicy:
HTableDescriptor myHtd = new HTableDescriptor("myTable"); myHtd.setValue(HTableDescriptor.SPLIT_POLICY,
               KeyPrefixRegionSplitPolicy.class.getName()); 

// set the prefix to 3 in this example
myHtd.setValue(KeyPrefixRegionSplitPolicy.PREFIX_LENGTH_KEY, 
               String.valueOf(3));
HColumnDescriptor hcd = new HColumnDescriptor(...);
myHtd.addFamily(hcd);
HBaseAdmin admin = ...;
admin.createTable(myHtd);

Regions of "myTable" are now split according to KeyPrefixRegionSplitPolicy with a prefix of 3.


3. Execute an atomic multirow transaction:
List<Mutation> ms = new ArrayList<Mutation>();
Put p = new Put(Bytes.toBytes("xxxabc"));
...
ms.add(p);
Put p1 = new Put(
Bytes.toBytes("xxx123"));
...
ms.add(p1);
Delete d = new Delete(Bytes.toBytes("xxxzzz"));
...
ms.add(d);
// get a proxy for MultiRowMutationEndpoint
// Note that the passed row is used to locate the 
// region. Any of the other row keys could have
// been used as well, as long as they identify 
// the same region.
MultiRowMutationProtocol mr = t.coprocessorProxy(
   MultiRowMutationProtocol.class,
   Bytes.toBytes("xxxabc"));
// perform the atomic operation
mr.mutateRows(ms);
 

Update, Saturday, February 18, 2012:
In fact using just "xxx" in the call to get the coprocessorProxy would be correct as well, since it identifies he same region. "xxx" could be considered the "partition" key.

That's it... The two puts and the delete are executed atomically, even though these are different rows.
KeyPrefixRegionSplitPolicy ensures that the three rows are co-located, because they share the same 3-byte prefix.
If any of the involved row-keys is located in a different region, the operation will fail, the client will not try again.
Again, care must be taken to pick a useful prefix length for co-location here, or regions will grow out of bounds, or at least be very skewed in size. 

This feature is useful in certain multi-tenant setups, or for time-based data sets (for example the split boundaries could be set to whole hours, or days).