| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| |
| Accumulo Multi-DataCenter Replication |
| ===================================== |
| |
| ACCUMULO-378 deals with disaster recovery techniques in Accumulo through cross-site replication of tables. Data which is |
| written to one Accumulo instance will automatically be replicated to a separate Accumulo instance. |
| |
| |
| Justification |
| ------------- |
| |
| Losing an entire instance really stinks. In addition to natural disasters or facility problems, Hadoop always has the |
| potential for failure. In the newest versions of Hadoop, the high availability (HA) namenode functionality increases the |
| redundancy of Hadoop in regards to the single point of failure which the namenode previously was. Despite this, there is |
| always a varying amount of required administrative intervention to ensure that failure does not result in data loss: |
| userspace software (the entire Hadoop and Java stack), kernel-space software (filesystem implementations), “expected” |
| hardware failures (hard drives), unexpected compute hardware failures (NICs, CPU, Memory), and infrastructure failures |
| (switches and routers). Accumulo currently has the ability for manual snapshots/copies across multiple instances; |
| however, this is not sufficient for multiple reasons with the biggest reason being a lack of automated replication. |
| |
| |
| Background |
| ---------- |
| |
| Apache HBase has had master-master replication, cyclic replication and multi-peer replication since 0.92. This |
| satisfies a wide range of cross-site replication strategies. Master-master replication lets us have two systems which |
| both replicate to each other. Both systems can service new writes and will update their “view” of a table from one |
| another. Cyclic replication allows us to have cycles in our replication graph. This is a generalization of the |
| master-master strategy in which we may have ultimately have a system which replicates to a system that it receives data |
| from. A system with three masters, A, B and C, which replicate in a row (A to B, B to C and C to A) is an example of |
| this. More complicated examples of this can be envisioned when dealing with multiple replicas inside one geographic |
| region or data center. Multi-peer replication is a relatively simple in that a single master system will replicate to |
| multiple peers instead of just one. |
| |
| |
| While these are relatively different to one another, I believe most can be satisfied through a single, master-push, |
| replication implementation. Although, the proposed data structure should also be capable of supporting a |
| peer-pull strategy. |
| |
| |
| Implementation |
| -------------- |
| |
| As a first implementation, I will prototype a single master with multiple peer replication strategy. This should grant |
| us the most flexibility and the most functionality. The general implementation should be capable of application to the |
| other replication structures (master-master and cyclic-replication). I’ll outline a simple master-peer replication use |
| case, followed by application of this approach to replication cycles and master-master replication. This approach does |
| not consider conditional mutations. |
| |
| |
| ### Replication Framework |
| |
| In an attempt to be as clear as possible, I’ll use the following terminology when explaining the implementation: master |
| will refer to the “master” Accumulo cluster (the system accepting new writes), peer will refer to the “peer” Accumulo |
| cluster (the system which does not receive new data through the Accumulo client API, but only from master through |
| replication). The design results in an eventual consistency model of replication which will allow for peers to |
| be offline and the online master to still process new updates. |
| |
| |
| In the simplest notion, when a new file is created by master, we want to ensure that this file is also sent to the |
| peer. In practice, this new file can either be an RFile that was bulk-imported to master or this can be a write-ahead |
| log (WAL) file. The bulk-imported RFile is the easy case, but the WAL case merits additional explanation. While data is |
| being written to Accumulo is it written to a sorted, in-memory map and an append-only WAL file. While the in-memory map |
| provides a very useful interface for the TabletServer to use for scans and compactions, it is difficult to extract new |
| updates at the RFile level. As such, this proposed implementation uses the WAL as the transport “file format”[a]. While |
| it is noted that in sending a WAL to multiple peers, each peer will need to reprocess each WAL to make Mutations to |
| apply whereas they could likely be transformed once, that is left as a future optimization. |
| |
| |
| To increase the speed in eventual consistency can be achieved, WAL offsets can be tracked to begin the replication |
| process before a WAL is closed. We can bin these mutations together for a lazy replication which can be combined to each |
| target server which amortizes the cost into a single write set message. It is not apparent that this requires |
| co-location within each source tablet in the Accumulo metadata table which means that the worry of inadvertent errors |
| caused by placing this data in the metadata table is entirely removed. |
| |
| |
| In every replication graph, which consists of master(s) and peer(s), each system should have a unique identifier. It is |
| desirable to be able to uniquely identify each system, and each system should have knowledge of the other systems |
| participating. |
| |
| |
| These identifiers also make implementing cyclic replication easier, as a cluster can ignore any requests to replicate |
| some data when that request already contains the current cluster’s identifier. In other words, data we try to replicate |
| will contain a linked list of identifiers with the provenance of where that data came and each cluster can make the |
| determination of whether or not it has seen this data already (and thus needs to process and propagate it). This also |
| lets us treat replication rules as a graph which grants us a common terminology to use when describing replication. |
| |
| |
| This framework provides a general strategy to allow pluggable replication strategies to export data out of an Accumulo |
| cluster. An AccumuloReplicationStrategy is the only presently targeted replication strategy; however, the implementation |
| should not prohibit alternative approaches to replication such as other databases or filesystems. |
| |
| |
| ### Replication Strategy Implementation |
| |
| |
| Henceforth, both of the RFiles and WAL files that need replication can be treated as a chunk of data. This chunk |
| references a start offset and length from the source (RFile or WAL) which needs to be replicated. This has the nice |
| property of being able to use a Combiner to combine multiple, sequential chunks into one larger chunk to amortize RPC |
| costs. |
| |
| |
| #### Make the master aware of file to replicate |
| |
| |
| Let us define a column family that is used to denote a chunk that needs to be replicated: REPL. We first need to let |
| master know that it has a new chunk which needs to be replicated. When the file comes from a bulk-import, we need to |
| create a new entry in the !METADATA table for the given tablet with the REPL column family. If the file is a WAL, we |
| also want to write an entry for the REPL column[b]. In both cases, the chunk’s URI will be stored in the column |
| qualifier. The Value can contain some serialized data structure to track cluster replication provenance and offset |
| values. Each row (tablet) in the !METADATA table will contain zero to many REPL columns. As such, the garbage collector |
| needs to be modified to not delete these files on the master’s HDFS instance until these files are replicated (copied to |
| the peer). |
| |
| |
| #### Choose local TabletServer to perform replication |
| |
| |
| The Accumulo Master can have a thread that scans the replication table to look for chunks to replicate. When it finds |
| some, choose a TabletServer to perform the replication to all peers. The master should use a FATE operation to manage |
| the state machine of this replication process. The expected principles, such as exponential backoff on network errors, |
| should be followed. When all peers have reported successfully receiving the file, the master can remove the REPL |
| column for the given chunk. |
| |
| |
| On the peer, before beginning transfer, the peer should ascertain a new local, unique filename to use for the remote |
| file. When the transfer is complete, the file should be treated like log recovery and brought into the appropriate |
| Tablet. If the peer is also a master (replicating to other nodes), the replicated data should create a new REPL column |
| in the peer’s table to repeat the replication process, adding in its cluster identifier to the provenance list. |
| Otherwise, the file can be a candidate for deletion by the garbage collection. |
| |
| |
| The tserver chosen to replicate the data from the master cluster should ideally be the tserver that created that data. |
| This helps reduce the complexity of dealing with locality later on. If the HDFS blocks written by the tserver are local, |
| then we gain the same locality perks. |
| |
| |
| #### Recurse |
| |
| |
| In our simple master and peer replication scheme, we are done after the new updates are made available on peer. As |
| aforementioned, it is relatively easy to “schedule” replication of a new file on peer because we just repeat the same |
| process that master did to replicate to peer in the first place. |
| |
| |
| ### Master cluster replication “bookkeeping” |
| |
| |
| This section outlines the steps on the master cluster to manage the lifecycle of data: when/what data needs to be |
| replicated and when is a file safe to be removed. |
| |
| |
| Two key structures are used to implement this bookkeeping: |
| |
| |
| 1. Tablet-level entry: tabletId repl:fully-qualified-file [] value |
| |
| |
| 2. Row-prefix space at end of accumulo.metadata or its own table: *~repl*_fully-qualified-file |
| clusterName:remoteTableID [] value |
| |
| |
| These two key structures will be outlined below, with “*repl* column” and “*~repl* row” denoting which is being referred to. |
| |
| |
| #### Data Structure in Value |
| |
| |
| To avoid the necessity of using conditional mutations or other “transaction-like” operations, we can use a combiner to |
| generate an aggregate view of replication information. Protobuf is decent choice, however, the description isn’t tied to |
| any implementation. I believe a Combiner used in conjunction with the following data structure provides all necessary |
| functionality: |
| |
| |
| ``// Tracks general lifecycle of the data: file is open and might have new data to replicate, or the file has been`` |
| ``// closed and will have no new data to replicate`` |
| |
| |
| ``State:Enum { OPEN, CLOSED }`` |
| |
| |
| ``ReplicationStatus { State state, long replication_needed_offset, long replication_finished_offset }`` |
| |
| |
| The offsets refer to the contiguous ranges of records (key-values) written to the WAL. The replication_finished_offset |
| value tracks what data has been replicated to the given cluster and while the replication_needed_offset value tracks how |
| much data has been written to the WAL that is ready for replication. replication_finished_offset should always be less |
| than or equal to replication_needed_offset. For RFiles instead of WALs, state is always CLOSED and |
| replication_needed_offset is initialized to the length of the RFile. In this context, one can consider the RFile as a |
| read-only file and the WAL as an append-only file. |
| |
| |
| For *~repl* entries, the target clusterName and remote tableId would be stored in the key to preserve uniqueness. Using |
| this information, we would be able to implement the following methods: |
| |
| |
| ``bool isFullyReplicated(ReplicationStatus)`` |
| ``Pair<long,long> rangeNeedingReplication(ReplicationStatus)`` |
| |
| |
| The isFullyReplicated method is straightforward: given the values for start/finish stored for data that needs to be |
| replicated, and the values for start/finish stored for data that has been replicated and the state is CLOSED, is there |
| still more data for this ReplicationStatus that needs to be replicated for the given clustername/tableID. |
| |
| |
| rangeNeedingReplication is a bit more complicated. Given the end of a range of data that has already been replicated, |
| some the end of a range of data that still needs replication, return a range of data that has |
| not yet been replicated. For example, if keyvalues up to offset 100 in a WAL have already been |
| replicated and keyvalues up to offset 300 are marked as needing replication, this method should |
| return [101,300]. Ranges of data replicated, and data needing replication must always be |
| disjoint and contiguous to ensure that data is replayed in the correct order on the peer. |
| |
| |
| The use of a Combiner is used to create a basic notion of “addition” and “subtraction”. We cannot use deletes to manage |
| this without creating a custom iterator, which would not be desirable since it would required to run over the entire |
| accumulo.metadata table. Avoiding deletions exception on cleanup is also desired to avoid handling “tombstone’ing” |
| future version of a Key. The addition operation is when new data is appended to the WAL which signifies new data to be |
| replicated. This equates to an addition to replication_needed_offset. The subtraction operation is when data from the |
| WAL has be successfully replicated to the peer for this *~repl* record. This is implemented as an addition to the |
| replication_finished_offset. |
| |
| |
| When CLOSED is set on a ReplicationStatus, this implies that the WAL has been closed and no new offsets will be added is |
| would be tracked via the *repl* column. As such, a ReplicationStatus “object” is candidate for deletion when the state is |
| CLOSED and replication_finished_offset is equal to replication_needed_offset. A value of CLOSED for state is always |
| propagated over the NEW state. An addition after the state is CLOSED is an invalid operation and would be a logic error. |
| |
| |
| Consider the case of a new data being ingested to the cluster: the following discrete steps should happen. The |
| assumption that replication is enabled is made to not distract from the actual steps. As previously mentioned, a |
| combiner must be set on the *repl* column to aggregate the values to properly maintain replication state. The following is |
| what a tserver will do. |
| |
| |
| 1) When a new WAL is created by request of a tserver and the log column is created for a *repl* column within the tablet’s |
| row to track that this WAL will need to be replicated. |
| |
| |
| INSERT |
| tablet repl:hdfs://localhost:8020/accumulo/.../wal/... -> ReplicationStatus(state=OPEN) |
| |
| |
| 2) As the tserver using this WAL finishes commits to the WAL, it should submit a new mutation to track the current |
| length of data in the WAL that it just wrote that needs to be read for purposes of replication. |
| |
| |
| INSERT |
| tablet repl:hdfs://localhost:8020/accumulo/.../wal/... -> ReplicationStatus(addition |
| offset) |
| |
| |
| 3) Eventually, the tablet server will finish using a WAL, minc contents of memory to disk, and mark the WAL as unused. |
| This results in updating the state to be CLOSED. |
| |
| |
| INSERT |
| tablet repl:hdfs://localhost:8020/accumulo/.../wal/… -> ReplicationStatus(state=CLOSED) |
| |
| |
| The master also needs a new thread to process the *repl* columns across all tablets in a table and create *~repl* row |
| entries for the file and where it should be replicated to. The high-level goals for this thread are as follows: |
| |
| |
| 1) Create mutations for a WAL that outline where the file must be replicated to (cluster and tabletID) |
| |
| |
| INSERT |
| *~repl*_hdfs://localhost:8020/accumulo/.../wal/… clusterName:tableId -> ReplicationStatus(addition |
| offset) |
| |
| |
| 2) Determine when the *repl* column in a tablet is safe for deletion (all data for it has been replicated). This is the |
| sign that the GC can then remove this file. |
| |
| |
| DELETE |
| tablet repl:hdfs://localhost:8020/accumulo/.../wal/… |
| |
| |
| This can be accomplished with a single thread that scans the metadata table: |
| |
| |
| 1) Construct “snapshot” of tablet *repl* file entries with aggregated offsets, sorted by file, |
| |
| |
| [hdfs://localhost:8020/.../file1 => {[tablet1, RS], [tablet2, RS], ... }, |
| hdfs://localhost:8020/.../file2 => {[tablet3, RS], [tablet4, RS], ... }, |
| hdfs://localhost:8020/.../file3 => {[tablet5, [RS:CLOSED]], [tablet6, [RS:CLOSED]], ...] ] |
| |
| |
| 2) Begin scanning *~repl* row-prefix with Scanner, perform merged read to join “state” from aggregate *repl* column across |
| tablets, and columns in *~repl* row for the file. |
| |
| |
| for each file in *~repl* rowspace: |
| if all columns in *~repl*_file row isFullyReplicated: |
| issue deletes for file in *repl* column for all tablets with references |
| if delete of *repl* is successful: |
| delete *~repl* row |
| else if *~repl* row exists but no *repl* columns: |
| // Catch failure case from first conditional |
| delete *~repl* row |
| else |
| for each file in “snapshot” of *repl* columns: |
| make mutation for *~repl*_file |
| for each peer cluster in configuration: |
| if file should be replicated on peer: |
| add column for clusterid:remote_tableID -> RS |
| |
| |
| Combiner should be set on all columns in *~repl* prefix rowspace and the *repl* colfam to ensure multiple runs of the |
| described procedure without actual replication occurring to aggregate data that needs replication. Configuration |
| |
| |
| Replication can be configured on a per-locality-group, replicated that locality group to one or more peers. Given that |
| we have dynamic column families, trying to track per-column-family replication would be unnecessarily difficult. |
| Configuration requires new configuration variables that need to be introduced to support the necessary information. Each |
| peer is defined with a name and the zookeeper quorum of the remote cluster to locate the active Accumulo Master. The |
| API should ease configuration on replication across all locality groups. Replication cannot be configured on the root or |
| metadata table. |
| |
| |
| Site-wide: |
| // The name and location of other clusters |
| instance.cluster.$name.zookeepers=zk1,zk2,zk3[c] |
| // The name of this cluster |
| instance.replication.name=my_cluster_name[d] |
| |
| Per-table: |
| // Declare the locality group(s) that should be replicated and the clusters that they should be replicated to |
| table.replication.$locality_group_name=cluster1,cluster2,... |
| |
| |
| Shell commands can also be created to make this configuration easier. |
| |
| |
| definecluster cluster_name zookeeper_quorum |
| |
| |
| e.g. definecluster peer peerZK1:2181,peerZK2:2181,peerZK3:2181 |
| |
| |
| |
| |
| deletecluster cluster_name zookeeper_quorum |
| |
| |
| e.g. deletecluster peer peerZK1:2181,peerZK2:2181,peerZK3:2181 |
| |
| |
| |
| |
| enablereplication -t table (-lg loc_group | --all-loc-groups) cluster_name |
| |
| |
| e.g. enablereplication -t foo -lg cf1 peer1 enablereplication -t foo -all-loc-groups peer1 |
| |
| |
| |
| |
| |
| |
| disablereplication -t table (-lg loc_group | --all-loc-groups) cluster_name |
| |
| |
| e.g. disablereplication -t foo -lg cf1 peer1 disablereplication -t foo -all-loc-groups peer1 |
| |
| |
| For peers, we likely do not want to allow users to perform writes against the cluster. Thus, they should be read-only. |
| This likely requires custom configuration and some ZK state to not accept regular API connections. Should be |
| exposed/controllable by the shell, too. Common Questions |
| |
| |
| *How do conditional mutations work with this approach?* |
| |
| |
| They do not. They will need to throw an Exception. |
| |
| |
| *How does replication work on a table which already contains data?* |
| |
| |
| When replication is enabled on a table, all new data will be replicated. This implementation does not attempt to support |
| this as the existing importtable and exporttable already provide support to do this. |
| |
| |
| *When I update a table property on the master, will it propagate to the peer?* |
| |
| |
| There are both arguments for and against this. We likely want to revisit this later as a configuration parameter that |
| could allow the user to choose if this should happen. We should avoid implementations that would tie us to one or the |
| other. |
| |
| |
| As an argument against this, consider a production and a backup cluster where the backup cluster is smaller in number of |
| nodes, but contains more disks. Despite wanting to replicate the data in a table, the configuration of that table may |
| not be desired (e.g. split threshold, compression codecs, etc). Another argument against could be age-off. If a replica |
| cluster is not the same size as the production cluster (which is extremely plausible) you would not want the same |
| age-off rules for both the production and replica. |
| |
| |
| An argument for this feature is that you would want custom compaction iterators (as a combiner, for example) to only be |
| configured on a table once. You would want these iterators to appear on all replicas. Such an implementation is also |
| difficult in master-master situations as we don’t have a shared ZooKeeper instance that we can use to reliably commit |
| these changes. |
| |
| |
| *What happens in master-master if two Keys are exactly the same with different values?* |
| |
| |
| Non-deterministic - mostly because we already have this problem: https://issues.apache.org/jira/browse/ACCUMULO-1528 |
| |
| |
| *Did you come up with this all on your own?* |
| |
| |
| Ha, no. Big thanks goes out to HBase’s documentation, Enis Söztutar (HBase), and other Accumulo devs that I’ve bounced |
| these ideas off of (too many to enumerate). |
| |
| |
| |
| |
| Goals |
| 1. Master-Peer configuration that doesn’t exclude future master-master work Per locality-group replication configuration |
| 2. Shell administration of replication Accumulo Monitor integration/insight to replication status State machines for |
| 3. lifecycle of chunks Versionable (read-as protobuf) datastructure to track chunk metadata Thrift for RPC Replication |
| 4. does not require “closed” files (can send incremental updates to peers) Ability to replicate “live inserts” and “bulk |
| 5. imports” Provide replication interface with Accumulo->Accumulo implementation Do not rely on active Accumulo Master to |
| 6. perform replication (send or receive) -- delegate to a TabletServer Use FATE where applicable Gracefully handle |
| 7. offline peers Implement read-only variant Master/TabletServer[e] |
| |
| |
| Non-Goals |
| 1. Replicate on smaller granularity than locality group (not individual colfams/colquals or based on visibilities) |
| 2. Wire security between master and peer |
| 3. Support replication of encrypted data[f] |
| 4. Replication of existing data (use importtable & exporttable) |
| 5. Enforce replication of table configuration |
| |
| |
| References |
| |
| |
| * http://www.cs.mcgill.ca/~kemme/papers/vldb00.html |
| [a] While the WAL is a useful file format for shipping updates (an append-only file), the actual LogFileKey and |
| LogFileValue pairs may not be sufficient? Might need some extra data internally? Maybe the DFSLogger header could |
| contain that? |
| [b] This approach makes the assumption that we only begin the replication process when a WAL is closed. |
| This is likely too long of a period of time: an offset and length likely needed to be interested to decrease latency. |
| [c] This needs to be consistent across clusters. Do we need to control access to ensure that it is? Is it excessive to |
| force users to configure it correctly? |
| [d] Same as instance.cluster.$name: Do we need to enforce these values? |
| [e] This isn't an immediate necessity, so I'm tempted to consider punting it as a non-goal for the first implementation |
| [f] While not in the original scope, it is definitely of great concern. |