blob: d5f46ef96801a119141caf8298fe6d7fc581fe5e [file] [log] [blame]
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Accumulo Multi-DataCenter Replication
=====================================
ACCUMULO-378 deals with disaster recovery techniques in Accumulo through cross-site replication of tables. Data which is
written to one Accumulo instance will automatically be replicated to a separate Accumulo instance.
Justification
-------------
Losing an entire instance really stinks. In addition to natural disasters or facility problems, Hadoop always has the
potential for failure. In the newest versions of Hadoop, the high availability (HA) namenode functionality increases the
redundancy of Hadoop in regards to the single point of failure which the namenode previously was. Despite this, there is
always a varying amount of required administrative intervention to ensure that failure does not result in data loss:
userspace software (the entire Hadoop and Java stack), kernel-space software (filesystem implementations), expected
hardware failures (hard drives), unexpected compute hardware failures (NICs, CPU, Memory), and infrastructure failures
(switches and routers). Accumulo currently has the ability for manual snapshots/copies across multiple instances;
however, this is not sufficient for multiple reasons with the biggest reason being a lack of automated replication.
Background
----------
Apache HBase has had master-master replication, cyclic replication and multi-peer replication since 0.92. This
satisfies a wide range of cross-site replication strategies. Master-master replication lets us have two systems which
both replicate to each other. Both systems can service new writes and will update their view of a table from one
another. Cyclic replication allows us to have cycles in our replication graph. This is a generalization of the
master-master strategy in which we may have ultimately have a system which replicates to a system that it receives data
from. A system with three masters, A, B and C, which replicate in a row (A to B, B to C and C to A) is an example of
this. More complicated examples of this can be envisioned when dealing with multiple replicas inside one geographic
region or data center. Multi-peer replication is a relatively simple in that a single master system will replicate to
multiple peers instead of just one.
While these are relatively different to one another, I believe most can be satisfied through a single, master-push,
replication implementation. Although, the proposed data structure should also be capable of supporting a
peer-pull strategy.
Implementation
--------------
As a first implementation, I will prototype a single master with multiple peer replication strategy. This should grant
us the most flexibility and the most functionality. The general implementation should be capable of application to the
other replication structures (master-master and cyclic-replication). Ill outline a simple master-peer replication use
case, followed by application of this approach to replication cycles and master-master replication. This approach does
not consider conditional mutations.
### Replication Framework
In an attempt to be as clear as possible, Ill use the following terminology when explaining the implementation: master
will refer to the master Accumulo cluster (the system accepting new writes), peer will refer to the peer Accumulo
cluster (the system which does not receive new data through the Accumulo client API, but only from master through
replication). The design results in an eventual consistency model of replication which will allow for peers to
be offline and the online master to still process new updates.
In the simplest notion, when a new file is created by master, we want to ensure that this file is also sent to the
peer. In practice, this new file can either be an RFile that was bulk-imported to master or this can be a write-ahead
log (WAL) file. The bulk-imported RFile is the easy case, but the WAL case merits additional explanation. While data is
being written to Accumulo is it written to a sorted, in-memory map and an append-only WAL file. While the in-memory map
provides a very useful interface for the TabletServer to use for scans and compactions, it is difficult to extract new
updates at the RFile level. As such, this proposed implementation uses the WAL as the transport file format”[a]. While
it is noted that in sending a WAL to multiple peers, each peer will need to reprocess each WAL to make Mutations to
apply whereas they could likely be transformed once, that is left as a future optimization.
To increase the speed in eventual consistency can be achieved, WAL offsets can be tracked to begin the replication
process before a WAL is closed. We can bin these mutations together for a lazy replication which can be combined to each
target server which amortizes the cost into a single write set message. It is not apparent that this requires
co-location within each source tablet in the Accumulo metadata table which means that the worry of inadvertent errors
caused by placing this data in the metadata table is entirely removed.
In every replication graph, which consists of master(s) and peer(s), each system should have a unique identifier. It is
desirable to be able to uniquely identify each system, and each system should have knowledge of the other systems
participating.
These identifiers also make implementing cyclic replication easier, as a cluster can ignore any requests to replicate
some data when that request already contains the current clusters identifier. In other words, data we try to replicate
will contain a linked list of identifiers with the provenance of where that data came and each cluster can make the
determination of whether or not it has seen this data already (and thus needs to process and propagate it). This also
lets us treat replication rules as a graph which grants us a common terminology to use when describing replication.
This framework provides a general strategy to allow pluggable replication strategies to export data out of an Accumulo
cluster. An AccumuloReplicationStrategy is the only presently targeted replication strategy; however, the implementation
should not prohibit alternative approaches to replication such as other databases or filesystems.
### Replication Strategy Implementation
Henceforth, both of the RFiles and WAL files that need replication can be treated as a chunk of data. This chunk
references a start offset and length from the source (RFile or WAL) which needs to be replicated. This has the nice
property of being able to use a Combiner to combine multiple, sequential chunks into one larger chunk to amortize RPC
costs.
#### Make the master aware of file to replicate
Let us define a column family that is used to denote a chunk that needs to be replicated: REPL. We first need to let
master know that it has a new chunk which needs to be replicated. When the file comes from a bulk-import, we need to
create a new entry in the !METADATA table for the given tablet with the REPL column family. If the file is a WAL, we
also want to write an entry for the REPL column[b]. In both cases, the chunks URI will be stored in the column
qualifier. The Value can contain some serialized data structure to track cluster replication provenance and offset
values. Each row (tablet) in the !METADATA table will contain zero to many REPL columns. As such, the garbage collector
needs to be modified to not delete these files on the masters HDFS instance until these files are replicated (copied to
the peer).
#### Choose local TabletServer to perform replication
The Accumulo Master can have a thread that scans the replication table to look for chunks to replicate. When it finds
some, choose a TabletServer to perform the replication to all peers. The master should use a FATE operation to manage
the state machine of this replication process. The expected principles, such as exponential backoff on network errors,
should be followed. When all peers have reported successfully receiving the file, the master can remove the REPL
column for the given chunk.
On the peer, before beginning transfer, the peer should ascertain a new local, unique filename to use for the remote
file. When the transfer is complete, the file should be treated like log recovery and brought into the appropriate
Tablet. If the peer is also a master (replicating to other nodes), the replicated data should create a new REPL column
in the peers table to repeat the replication process, adding in its cluster identifier to the provenance list.
Otherwise, the file can be a candidate for deletion by the garbage collection.
The tserver chosen to replicate the data from the master cluster should ideally be the tserver that created that data.
This helps reduce the complexity of dealing with locality later on. If the HDFS blocks written by the tserver are local,
then we gain the same locality perks.
#### Recurse
In our simple master and peer replication scheme, we are done after the new updates are made available on peer. As
aforementioned, it is relatively easy to schedule replication of a new file on peer because we just repeat the same
process that master did to replicate to peer in the first place.
### Master cluster replication “bookkeeping”
This section outlines the steps on the master cluster to manage the lifecycle of data: when/what data needs to be
replicated and when is a file safe to be removed.
Two key structures are used to implement this bookkeeping:
1. Tablet-level entry: tabletId repl:fully-qualified-file [] value
2. Row-prefix space at end of accumulo.metadata or its own table: *~repl*_fully-qualified-file
clusterName:remoteTableID [] value
These two key structures will be outlined below, with “*repl* column and “*~repl* row denoting which is being referred to.
#### Data Structure in Value
To avoid the necessity of using conditional mutations or other transaction-like operations, we can use a combiner to
generate an aggregate view of replication information. Protobuf is decent choice, however, the description isnt tied to
any implementation. I believe a Combiner used in conjunction with the following data structure provides all necessary
functionality:
``// Tracks general lifecycle of the data: file is open and might have new data to replicate, or the file has been``
``// closed and will have no new data to replicate``
``State:Enum { OPEN, CLOSED }``
``ReplicationStatus { State state, long replication_needed_offset, long replication_finished_offset }``
The offsets refer to the contiguous ranges of records (key-values) written to the WAL. The replication_finished_offset
value tracks what data has been replicated to the given cluster and while the replication_needed_offset value tracks how
much data has been written to the WAL that is ready for replication. replication_finished_offset should always be less
than or equal to replication_needed_offset. For RFiles instead of WALs, state is always CLOSED and
replication_needed_offset is initialized to the length of the RFile. In this context, one can consider the RFile as a
read-only file and the WAL as an append-only file.
For *~repl* entries, the target clusterName and remote tableId would be stored in the key to preserve uniqueness. Using
this information, we would be able to implement the following methods:
``bool isFullyReplicated(ReplicationStatus)``
``Pair<long,long> rangeNeedingReplication(ReplicationStatus)``
The isFullyReplicated method is straightforward: given the values for start/finish stored for data that needs to be
replicated, and the values for start/finish stored for data that has been replicated and the state is CLOSED, is there
still more data for this ReplicationStatus that needs to be replicated for the given clustername/tableID.
rangeNeedingReplication is a bit more complicated. Given the end of a range of data that has already been replicated,
some the end of a range of data that still needs replication, return a range of data that has
not yet been replicated. For example, if keyvalues up to offset 100 in a WAL have already been
replicated and keyvalues up to offset 300 are marked as needing replication, this method should
return [101,300]. Ranges of data replicated, and data needing replication must always be
disjoint and contiguous to ensure that data is replayed in the correct order on the peer.
The use of a Combiner is used to create a basic notion of addition and subtraction”. We cannot use deletes to manage
this without creating a custom iterator, which would not be desirable since it would required to run over the entire
accumulo.metadata table. Avoiding deletions exception on cleanup is also desired to avoid handling tombstoneing
future version of a Key. The addition operation is when new data is appended to the WAL which signifies new data to be
replicated. This equates to an addition to replication_needed_offset. The subtraction operation is when data from the
WAL has be successfully replicated to the peer for this *~repl* record. This is implemented as an addition to the
replication_finished_offset.
When CLOSED is set on a ReplicationStatus, this implies that the WAL has been closed and no new offsets will be added is
would be tracked via the *repl* column. As such, a ReplicationStatus object is candidate for deletion when the state is
CLOSED and replication_finished_offset is equal to replication_needed_offset. A value of CLOSED for state is always
propagated over the NEW state. An addition after the state is CLOSED is an invalid operation and would be a logic error.
Consider the case of a new data being ingested to the cluster: the following discrete steps should happen. The
assumption that replication is enabled is made to not distract from the actual steps. As previously mentioned, a
combiner must be set on the *repl* column to aggregate the values to properly maintain replication state. The following is
what a tserver will do.
1) When a new WAL is created by request of a tserver and the log column is created for a *repl* column within the tablets
row to track that this WAL will need to be replicated.
INSERT
tablet repl:hdfs://localhost:8020/accumulo/.../wal/... -> ReplicationStatus(state=OPEN)
2) As the tserver using this WAL finishes commits to the WAL, it should submit a new mutation to track the current
length of data in the WAL that it just wrote that needs to be read for purposes of replication.
INSERT
tablet repl:hdfs://localhost:8020/accumulo/.../wal/... -> ReplicationStatus(addition
offset)
3) Eventually, the tablet server will finish using a WAL, minc contents of memory to disk, and mark the WAL as unused.
This results in updating the state to be CLOSED.
INSERT
tablet repl:hdfs://localhost:8020/accumulo/.../wal/… -> ReplicationStatus(state=CLOSED)
The master also needs a new thread to process the *repl* columns across all tablets in a table and create *~repl* row
entries for the file and where it should be replicated to. The high-level goals for this thread are as follows:
1) Create mutations for a WAL that outline where the file must be replicated to (cluster and tabletID)
INSERT
*~repl*_hdfs://localhost:8020/accumulo/.../wal/… clusterName:tableId -> ReplicationStatus(addition
offset)
2) Determine when the *repl* column in a tablet is safe for deletion (all data for it has been replicated). This is the
sign that the GC can then remove this file.
DELETE
tablet repl:hdfs://localhost:8020/accumulo/.../wal/…
This can be accomplished with a single thread that scans the metadata table:
1) Construct snapshot of tablet *repl* file entries with aggregated offsets, sorted by file,
[hdfs://localhost:8020/.../file1 => {[tablet1, RS], [tablet2, RS], ... },
hdfs://localhost:8020/.../file2 => {[tablet3, RS], [tablet4, RS], ... },
hdfs://localhost:8020/.../file3 => {[tablet5, [RS:CLOSED]], [tablet6, [RS:CLOSED]], ...] ]
2) Begin scanning *~repl* row-prefix with Scanner, perform merged read to join state from aggregate *repl* column across
tablets, and columns in *~repl* row for the file.
for each file in *~repl* rowspace:
if all columns in *~repl*_file row isFullyReplicated:
issue deletes for file in *repl* column for all tablets with references
if delete of *repl* is successful:
delete *~repl* row
else if *~repl* row exists but no *repl* columns:
// Catch failure case from first conditional
delete *~repl* row
else
for each file in snapshot of *repl* columns:
make mutation for *~repl*_file
for each peer cluster in configuration:
if file should be replicated on peer:
add column for clusterid:remote_tableID -> RS
Combiner should be set on all columns in *~repl* prefix rowspace and the *repl* colfam to ensure multiple runs of the
described procedure without actual replication occurring to aggregate data that needs replication. Configuration
Replication can be configured on a per-locality-group, replicated that locality group to one or more peers. Given that
we have dynamic column families, trying to track per-column-family replication would be unnecessarily difficult.
Configuration requires new configuration variables that need to be introduced to support the necessary information. Each
peer is defined with a name and the zookeeper quorum of the remote cluster to locate the active Accumulo Master. The
API should ease configuration on replication across all locality groups. Replication cannot be configured on the root or
metadata table.
Site-wide:
// The name and location of other clusters
instance.cluster.$name.zookeepers=zk1,zk2,zk3[c]
// The name of this cluster
instance.replication.name=my_cluster_name[d]
Per-table:
// Declare the locality group(s) that should be replicated and the clusters that they should be replicated to
table.replication.$locality_group_name=cluster1,cluster2,...
Shell commands can also be created to make this configuration easier.
definecluster cluster_name zookeeper_quorum
e.g. definecluster peer peerZK1:2181,peerZK2:2181,peerZK3:2181
deletecluster cluster_name zookeeper_quorum
e.g. deletecluster peer peerZK1:2181,peerZK2:2181,peerZK3:2181
enablereplication -t table (-lg loc_group | --all-loc-groups) cluster_name
e.g. enablereplication -t foo -lg cf1 peer1 enablereplication -t foo -all-loc-groups peer1
disablereplication -t table (-lg loc_group | --all-loc-groups) cluster_name
e.g. disablereplication -t foo -lg cf1 peer1 disablereplication -t foo -all-loc-groups peer1
For peers, we likely do not want to allow users to perform writes against the cluster. Thus, they should be read-only.
This likely requires custom configuration and some ZK state to not accept regular API connections. Should be
exposed/controllable by the shell, too. Common Questions
*How do conditional mutations work with this approach?*
They do not. They will need to throw an Exception.
*How does replication work on a table which already contains data?*
When replication is enabled on a table, all new data will be replicated. This implementation does not attempt to support
this as the existing importtable and exporttable already provide support to do this.
*When I update a table property on the master, will it propagate to the peer?*
There are both arguments for and against this. We likely want to revisit this later as a configuration parameter that
could allow the user to choose if this should happen. We should avoid implementations that would tie us to one or the
other.
As an argument against this, consider a production and a backup cluster where the backup cluster is smaller in number of
nodes, but contains more disks. Despite wanting to replicate the data in a table, the configuration of that table may
not be desired (e.g. split threshold, compression codecs, etc). Another argument against could be age-off. If a replica
cluster is not the same size as the production cluster (which is extremely plausible) you would not want the same
age-off rules for both the production and replica.
An argument for this feature is that you would want custom compaction iterators (as a combiner, for example) to only be
configured on a table once. You would want these iterators to appear on all replicas. Such an implementation is also
difficult in master-master situations as we dont have a shared ZooKeeper instance that we can use to reliably commit
these changes.
*What happens in master-master if two Keys are exactly the same with different values?*
Non-deterministic - mostly because we already have this problem: https://issues.apache.org/jira/browse/ACCUMULO-1528
*Did you come up with this all on your own?*
Ha, no. Big thanks goes out to HBases documentation, Enis Söztutar (HBase), and other Accumulo devs that Ive bounced
these ideas off of (too many to enumerate).
Goals
1. Master-Peer configuration that doesnt exclude future master-master work Per locality-group replication configuration
2. Shell administration of replication Accumulo Monitor integration/insight to replication status State machines for
3. lifecycle of chunks Versionable (read-as protobuf) datastructure to track chunk metadata Thrift for RPC Replication
4. does not require closed files (can send incremental updates to peers) Ability to replicate live inserts and bulk
5. imports Provide replication interface with Accumulo->Accumulo implementation Do not rely on active Accumulo Master to
6. perform replication (send or receive) -- delegate to a TabletServer Use FATE where applicable Gracefully handle
7. offline peers Implement read-only variant Master/TabletServer[e]
Non-Goals
1. Replicate on smaller granularity than locality group (not individual colfams/colquals or based on visibilities)
2. Wire security between master and peer
3. Support replication of encrypted data[f]
4. Replication of existing data (use importtable & exporttable)
5. Enforce replication of table configuration
References
* http://www.cs.mcgill.ca/~kemme/papers/vldb00.html
[a] While the WAL is a useful file format for shipping updates (an append-only file), the actual LogFileKey and
LogFileValue pairs may not be sufficient? Might need some extra data internally? Maybe the DFSLogger header could
contain that?
[b] This approach makes the assumption that we only begin the replication process when a WAL is closed.
This is likely too long of a period of time: an offset and length likely needed to be interested to decrease latency.
[c] This needs to be consistent across clusters. Do we need to control access to ensure that it is? Is it excessive to
force users to configure it correctly?
[d] Same as instance.cluster.$name: Do we need to enforce these values?
[e] This isn't an immediate necessity, so I'm tempted to consider punting it as a non-goal for the first implementation
[f] While not in the original scope, it is definitely of great concern.