| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| == Replication |
| |
| === Overview |
| |
| Replication is a feature of Accumulo which provides a mechanism to automatically |
| copy data to other systems, typically for the purpose of disaster recovery, |
| high availability, or geographic locality. It is best to consider this feature |
| as a framework for automatic replication instead of the ability to copy data |
| from to another Accumulo instance as copying to another Accumulo cluster is |
| only an implementation detail. The local Accumulo cluster is hereby referred |
| to as the +primary+ while systems being replicated to are known as |
| +peers+. |
| |
| This replication framework makes two Accumulo instances, where one instance |
| replicates to another, eventually consistent between one another, as opposed |
| to the strong consistency that each single Accumulo instance still holds. That |
| is to say, attempts to read data from a table on a peer which has pending replication |
| from the primary will not wait for that data to be replicated before running the scan. |
| This is desirable for a number of reasons, the most important is that the replication |
| framework is not limited by network outages or offline peers, but only by the HDFS |
| space available on the primary system. |
| |
| Replication configurations can be considered as a directed graph which allows cycles. |
| The systems in which data was replicated from is maintained in each Mutation which |
| allow each system to determine if a peer has already has the data in which |
| the system wants to send. |
| |
| Data is replicated by using the Write-Ahead logs (WAL) that each TabletServer is |
| already maintaining. TabletServers records which WALs have data that need to be |
| replicated to the +accumulo.metadata+ table. The Master uses these records, |
| combined with the local Accumulo table that the WAL was used with, to create records |
| in the +replication+ table which track which peers the given WAL should be |
| replicated to. The Master latter uses these work entries to assign the actual |
| replication task to a local TabletServer using ZooKeeper. A TabletServer will get |
| a lock in ZooKeeper for the replication of this file to a peer, and proceed to |
| replicate to the peer, recording progress in the +replication+ table as |
| data is successfully replicated on the peer. Later, the Master and Garbage Collector |
| will remove records from the +accumulo.metadata+ and +replication+ tables |
| and files from HDFS, respectively, after replication to all peers is complete. |
| |
| === Configuration |
| |
| Configuration of Accumulo to replicate data to another system can be categorized |
| into the following sections. |
| |
| ==== Site Configuration |
| |
| Each system involved in replication (even the primary) needs a name that uniquely |
| identifies it across all peers in the replication graph. This should be considered |
| fixed for an instance, and set in +accumulo-site.xml+. |
| |
| ---- |
| <property> |
| <name>replication.name</name> |
| <value>primary</value> |
| <description>Unique name for this system used by replication</description> |
| </property> |
| ---- |
| |
| ==== Instance Configuration |
| |
| For each peer of this system, Accumulo needs to know the name of that peer, |
| the class used to replicate data to that system and some configuration information |
| to connect to this remote peer. In the case of Accumulo, this additional data |
| is the Accumulo instance name and ZooKeeper quorum; however, this varies on the |
| replication implementation for the peer. |
| |
| These can be set in the site configuration to ease deployments; however, as they may |
| change, it can be useful to set this information using the Accumulo shell. |
| |
| To configure a peer with the name +peer1+ which is an Accumulo system with an instance name of +accumulo_peer+ |
| and a ZooKeeper quorum of +10.0.0.1,10.0.2.1,10.0.3.1+, invoke the following |
| command in the shell. |
| |
| ---- |
| root@accumulo_primary> config -s |
| replication.peer.peer1=org.apache.accumulo.tserver.replication.AccumuloReplicaSystem,accumulo_peer,10.0.0.1,10.0.2.1,10.0.3.1 |
| ---- |
| |
| Since this is an Accumulo system, we also want to set a username and password |
| to use when authenticating with this peer. On our peer, we make a special user |
| which has permission to write to the tables we want to replicate data into, "replication" |
| with a password of "password". We then need to record this in the primary's configuration. |
| |
| ---- |
| root@accumulo_primary> config -s replication.peer.user.peer1=replication |
| root@accumulo_primary> config -s replication.peer.password.peer1=password |
| ---- |
| |
| Alternatively, when configuring replication on Accumulo running Kerberos, a keytab |
| file per peer can be configured instead of a password. The provided keytabs must be readable |
| by the unix user running Accumulo. They keytab for a peer can be unique from the |
| keytab used by Accumulo or any keytabs for other peers. |
| |
| ---- |
| accumulo@EXAMPLE.COM@accumulo_primary> config -s replication.peer.user.peer1=replication@EXAMPLE.COM |
| accumulo@EXAMPLE.COM@accumulo_primary> config -s replication.peer.keytab.peer1=/path/to/replication.keytab |
| ---- |
| |
| ==== Table Configuration |
| |
| Now, we presently have a peer defined, so we just need to configure which tables will |
| replicate to that peer. We also need to configure an identifier to determine where |
| this data will be replicated on the peer. Since we're replicating to another Accumulo |
| cluster, this is a table ID. In this example, we want to enable replication on |
| +my_table+ and configure our peer +accumulo_peer+ as a target, sending |
| the data to the table with an ID of +2+ in +accumulo_peer+. |
| |
| ---- |
| root@accumulo_primary> config -t my_table -s table.replication=true |
| root@accumulo_primary> config -t my_table -s table.replication.target.accumulo_peer=2 |
| ---- |
| |
| To replicate a single table on the primary to multiple peers, the second command |
| in the above shell snippet can be issued, for each peer and remote identifier pair. |
| |
| === Monitoring |
| |
| Basic information about replication status from a primary can be found on the Accumulo |
| Monitor server, using the +Replication+ link the sidebar. |
| |
| On this page, information is broken down into the following sections: |
| |
| 1. Files pending replication by peer and target |
| 2. Files queued for replication, with progress made |
| |
| === Work Assignment |
| |
| Depending on the schema of a table, different implementations of the WorkAssigner used could |
| be configured. The implementation is controlled via the property +replication.work.assigner+ |
| and the full class name for the implementation. This can be configured via the shell or |
| +accumulo-site.xml+. |
| |
| ---- |
| <property> |
| <name>replication.work.assigner</name> |
| <value>org.apache.accumulo.master.replication.SequentialWorkAssigner</value> |
| <description>Implementation used to assign work for replication</description> |
| </property> |
| ---- |
| |
| ---- |
| root@accumulo_primary> config -t my_table -s replication.work.assigner=org.apache.accumulo.master.replication.SequentialWorkAssigner |
| ---- |
| |
| Two implementations are provided. By default, the +SequentialWorkAssigner+ is configured for an |
| instance. The SequentialWorkAssigner ensures that, per peer and each remote identifier, each WAL is |
| replicated in the order in which they were created. This is sufficient to ensure that updates to a table |
| will be replayed in the correct order on the peer. This implementation has the downside of only replicating |
| a single WAL at a time. |
| |
| The second implementation, the +UnorderedWorkAssigner+ can be used to overcome the limitation |
| of only a single WAL being replicated to a target and peer at any time. Depending on the table schema, |
| it's possible that multiple versions of the same Key with different values are infrequent or nonexistent. |
| In this case, parallel replication to a peer and target is possible without any downsides. In the case |
| where this implementation is used were column updates are frequent, it is possible that there will be |
| an inconsistency between the primary and the peer. |
| |
| === ReplicaSystems |
| |
| +ReplicaSystem+ is the interface which allows abstraction of replication of data |
| to peers of various types. Presently, only an +AccumuloReplicaSystem+ is provided |
| which will replicate data to another Accumulo instance. A +ReplicaSystem+ implementation |
| is run inside of the TabletServer process, and can be configured as mentioned in the |
| +Instance Configuration+ section of this document. Theoretically, an implementation |
| of this interface could send data to other filesystems, databases, etc. |
| |
| ==== AccumuloReplicaSystem |
| |
| The +AccumuloReplicaSystem+ uses Thrift to communicate with a peer Accumulo instance |
| and replicate the necessary data. The TabletServer running on the primary will communicate |
| with the Master on the peer to request the address of a TabletServer on the peer which |
| this TabletServer will use to replicate the data. |
| |
| The TabletServer on the primary will then replicate data in batches of a configurable |
| size (+replication.max.unit.size+). The TabletServer on the peer will report how many |
| records were applied back to the primary, which will be used to record how many records |
| were successfully replicated. The TabletServer on the primary will continue to replicate |
| data in these batches until no more data can be read from the file. |
| |
| === Other Configuration |
| |
| There are a number of configuration values that can be used to control how |
| the implementation of various components operate. |
| |
| [width="75%",cols=">,^2,^2"] |
| [options="header"] |
| |==== |
| |Property | Description | Default |
| |replication.max.work.queue | Maximum number of files queued for replication at one time | 1000 |
| |replication.work.assignment.sleep | Time between invocations of the WorkAssigner | 30s |
| |replication.worker.threads | Size of threadpool used to replicate data to peers | 4 |
| |replication.receipt.service.port | Thrift service port to listen for replication requests, can use '0' for a random port | 10002 |
| |replication.work.attempts | Number of attempts to replicate to a peer before aborting the attempt | 10 |
| |replication.receiver.min.threads | Minimum number of idle threads for handling incoming replication | 1 |
| |replication.receiver.threadcheck.time | Time between attempting adjustments of thread pool for incoming replications | 30s |
| |replication.max.unit.size | Maximum amount of data to be replicated in one RPC | 64M |
| |replication.work.assigner | Work Assigner implementation | org.apache.accumulo.master.replication.SequentialWorkAssigner |
| |tserver.replication.batchwriter.replayer.memory| Size of BatchWriter cache to use in applying replication requests | 50M |
| |==== |
| |
| === Example Practical Configuration |
| |
| A real-life example is now provided to give concrete application of replication configuration. This |
| example is a two instance Accumulo system, one primary system and one peer system. They are called |
| primary and peer, respectively. Each system also have a table of the same name, "my_table". The instance |
| name for each is also the same (primary and peer), and both have ZooKeeper hosts on a node with a hostname |
| with that name as well (primary:2181 and peer:2181). |
| |
| We want to configure these systems so that "my_table" on "primary" replicates to "my_table" on "peer". |
| |
| ==== conf/accumulo-site.xml |
| |
| We can assign the "unique" name that identifies this Accumulo instance among all others that might participate |
| in replication together. In this example, we will use the names provided in the description. |
| |
| ===== Primary |
| |
| ---- |
| <property> |
| <name>replication.name</name> |
| <value>primary</value> |
| <description>Defines the unique name</description> |
| </property> |
| ---- |
| |
| ===== Peer |
| |
| ---- |
| <property> |
| <name>replication.name</name> |
| <value>peer</value> |
| </property> |
| ---- |
| |
| ==== conf/masters and conf/slaves |
| |
| Be *sure* to use non-local IP addresses. Other nodes need to connect to it and using localhost will likely result in |
| a local node talking to another local node. |
| |
| ==== Start both instances |
| |
| The rest of the configuration is dynamic and is best configured on the fly (in ZooKeeper) than in accumulo-site.xml. |
| |
| ==== Peer |
| |
| The next series of command are to be run on the peer system. Create a user account for the primary instance called |
| "peer". The password for this account will need to be saved in the configuration on the primary |
| |
| ---- |
| root@peer> createtable my_table |
| root@peer> createuser peer |
| root@peer> grant -t my_table -u peer Table.WRITE |
| root@peer> grant -t my_table -u peer Table.READ |
| root@peer> tables -l |
| ---- |
| |
| Remember what the table ID for 'my_table' is. You'll need that to configured the primary instance. |
| |
| ==== Primary |
| |
| Next, configure the primary instance. |
| |
| ===== Set up the table |
| |
| ---- |
| root@primary> createtable my_table |
| ---- |
| |
| ===== Define the Peer as a replication peer to the Primary |
| |
| We're defining the instance with replication.name of 'peer' as a peer. We provide the implementation of ReplicaSystem |
| that we want to use, and the configuration for the AccumuloReplicaSystem. In this case, the configuration is the Accumulo |
| Instance name for 'peer' and the ZooKeeper quorum string. The configuration key is of the form |
| "replication.peer.$peer_name". |
| |
| ---- |
| root@primary> config -s replication.peer.peer=org.apache.accumulo.tserver.replication.AccumuloReplicaSystem,peer,$peer_zk_quorum |
| ---- |
| |
| ===== Set the authentication credentials |
| |
| We want to use that special username and password that we created on the peer, so we have a means to write data to |
| the table that we want to replicate to. The configuration key is of the form "replication.peer.user.$peer_name". |
| |
| ---- |
| root@primary> config -s replication.peer.user.peer=peer |
| root@primary> config -s replication.peer.password.peer=peer |
| ---- |
| |
| ===== Enable replication on the table |
| |
| Now that we have defined the peer on the primary and provided the authentication credentials, we need to configure |
| our table with the implementation of ReplicaSystem we want to use to replicate to the peer. In this case, our peer |
| is an Accumulo instance, so we want to use the AccumuloReplicaSystem. |
| |
| The configuration for the AccumuloReplicaSystem is the table ID for the table on the peer instance that we |
| want to replicate into. Be sure to use the correct value for $peer_table_id. The configuration key is of |
| the form "table.replication.target.$peer_name". |
| |
| ---- |
| root@primary> config -t my_table -s table.replication.target.peer=$peer_table_id |
| ---- |
| |
| Finally, we can enable replication on this table. |
| |
| ---- |
| root@primary> config -t my_table -s table.replication=true |
| ---- |
| |
| === Extra considerations for use |
| |
| While this feature is intended for general-purpose use, its implementation does carry some baggage. Like any software, |
| replication is a feature that operates well within some set of use cases but is not meant to support all use cases. |
| For the benefit of the users, we can enumerate these cases. |
| |
| ==== Latency |
| |
| As previously mentioned, the replication feature uses the Write-Ahead Log files for a number of reasons, one of which |
| is to prevent the need for data to be written to RFiles before it is available to be replicated. While this can help |
| reduce the latency for a batch of Mutations that have been written to Accumulo, the latency is at least seconds to tens |
| of seconds for replication once ingest is active. For a table which replication has just been enabled on, this is likely |
| to take a few minutes before replication will begin. |
| |
| Once ingest is active and flowing into the system at a regular rate, replication should be occurring at a similar rate, |
| given sufficient computing resources. Replication attempts to copy data at a rate that is to be considered low latency |
| but is not a replacement for custom indexing code which can ensure near real-time referential integrity on secondary indexes. |
| |
| ==== Table-Configured Iterators |
| |
| Accumulo Iterators tend to be a heavy hammer which can be used to solve a variety of problems. In general, it is highly |
| recommended that Iterators which are applied at major compaction time are both idempotent and associative due to the |
| non-determinism in which some set of files for a Tablet might be compacted. In practice, this translates to common patterns, |
| such as aggregation, which are implemented in a manner resilient to duplication (such as using a Set instead of a List). |
| |
| Due to the asynchronous nature of replication and the expectation that hardware failures and network partitions will exist, |
| it is generally not recommended to not configure replication on a table which has Iterators set which are not idempotent. |
| While the replication implementation can make some simple assertions to try to avoid re-replication of data, it is not |
| presently guaranteed that all data will only be sent to a peer once. Data will be replicated at least once. Typically, |
| this is not a problem as the VersioningIterator will automaticaly deduplicate this over-replication because they will |
| have the same timestamp; however, certain Combiners may result in inaccurate aggregations. |
| |
| As a concrete example, consider a table which has the SummingCombiner configured to sum all values for |
| multiple versions of the same Key. For some key, consider a set of numeric values that are written to a table on the |
| primary: [1, 2, 3]. On the primary, all of these are successfully written and thus the current value for the given key |
| would be 6, (1 + 2 + 3). Consider, however, that each of these updates to the peer were done independently (because |
| other data was also included in the write-ahead log that needed to be replicated). The update with a value of 1 was |
| successfully replicated, and then we attempted to replicate the update with a value of 2 but the remote server never |
| responded. The primary does not know whether the update with a value of 2 was actually applied or not, so the |
| only recourse is to re-send the update. After we receive confirmation that the update with a value of 2 was replicated, |
| we will then replicate the update with 3. If the peer did never apply the first update of '2', the summation is accurate. |
| If the update was applied but the acknowledgement was lost for some reason (system failure, network partition), the |
| update will be resent to the peer. Because addition is non-idempotent, we have created an inconsistency between the |
| primary and peer. As such, the SummingCombiner wouldn't be recommended on a table being replicated. |
| |
| While there are changes that could be made to the replication implementation which could attempt to mitigate this risk, |
| presently, it is not recommended to configure Iterators or Combiners which are not idempotent to support cases where |
| inaccuracy of aggregations is not acceptable. |
| |
| ==== Duplicate Keys |
| |
| In Accumulo, when more than one key exists that are exactly the same, keys that are equal down to the timestamp, |
| the retained value is non-deterministic. Replication introduces another level of non-determinism in this case. |
| For a table that is being replicated and has multiple equal keys with different values inserted into it, the final |
| value in that table on the primary instance is not guaranteed to be the final value on all replicas. |
| |
| For example, say the values that were inserted on the primary instance were +value1+ and +value2+ and the final |
| value was +value1+, it is not guaranteed that all replicas will have +value1+ like the primary. The final value is |
| non-deterministic for each instance. |
| |
| As is the recommendation without replication enabled, if multiple values for the same key (sans timestamp) are written to |
| Accumulo, it is strongly recommended that the value in the timestamp properly reflects the intended version by |
| the client. That is to say, newer values inserted into the table should have larger timestamps. If the time between |
| writing updates to the same key is significant (order minutes), this concern can likely be ignored. |
| |
| ==== Bulk Imports |
| |
| Currently, files that are bulk imported into a table configured for replication are not replicated. There is no |
| technical reason why it was not implemented, it was simply omitted from the initial implementation. This is considered a |
| fair limitation because bulk importing generated files multiple locations is much simpler than bifurcating "live" ingest |
| data into two instances. Given some existing bulk import process which creates files and them imports them into an |
| Accumulo instance, it is trivial to copy those files to a new HDFS instance and import them into another Accumulo |
| instance using the same process. Hadoop's +distcp+ command provides an easy way to copy large amounts of data to another |
| HDFS instance which makes the problem of duplicating bulk imports very easy to solve. |
| |
| === Table Schema |
| |
| The following describes the kinds of keys, their format, and their general function for the purposes of individuals |
| understanding what the replication table describes. Because the replication table is essentially a state machine, |
| this data is often the source of truth for why Accumulo is doing what it is with respect to replication. There are |
| three "sections" in this table: "repl", "work", and "order". |
| |
| ==== Repl section |
| |
| This section is for the tracking of a WAL file that needs to be replicated to one or more Accumulo remote tables. |
| This entry is tracking that replication needs to happen on the given WAL file, but also that the local Accumulo table, |
| as specified by the column qualifier "local table ID", has information in this WAL file. |
| |
| The structure of the key-value is as follows: |
| ---- |
| <HDFS_uri_to_WAL> repl:<local_table_id> [] -> <protobuf> |
| ---- |
| |
| This entry is created based on a replication entry from the Accumlo metadata table, and is deleted from the replication table |
| when the WAL has been fully replicated to all remote Accumulo tables. |
| |
| ==== Work section |
| |
| This section is for the tracking of a WAL file that needs to be replicated to a single Accumulo table in a remote |
| Accumulo cluster. If a WAL must be replicated to multiple tables, there will be multiple entries. The Value for this |
| Key is a serialized ProtocolBuffer message which encapsulates the portion of the WAL which was already sent for |
| this file. The "replication target" is the unique location of where the file needs to be replicated: the identifier |
| for the remote Accumulo cluster and the table ID in that remote Accumulo cluster. The protocol buffer in the value |
| tracks the progress of replication to the remote cluster. |
| |
| ---- |
| <HDFS_uri_to_WAL> work:<replication_target> [] -> <protobuf> |
| ---- |
| |
| The "work" entry is created when a WAL has an "order" entry, and deleted after the WAL is replicated to all |
| necessary remote clusters. |
| |
| ==== Order section |
| |
| This section is used to order and schedule (create) replication work. In some cases, data with the same timestamp |
| may be provided multiple times. In this case, it is important that WALs are replicated in the same order they were |
| created/used. In this case (and in cases where this is not important), the order entry ensures that oldest WALs |
| are processed most quickly and pushed through the replication framework. |
| |
| ---- |
| <time_of_WAL_closing>\x00<HDFS_uri_to_WAL> order:<local_table_id> [] -> <protobuf> |
| ---- |
| |
| The "order" entry is created when the WAL is closed (no longer being written to) and is removed when |
| the WAL is fully replicated to all remote locations. |