| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.accumulo.test.replication; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| |
| import org.apache.accumulo.core.client.BatchWriter; |
| import org.apache.accumulo.core.client.Connector; |
| import org.apache.accumulo.core.client.Scanner; |
| import org.apache.accumulo.core.conf.Property; |
| import org.apache.accumulo.core.data.Key; |
| import org.apache.accumulo.core.data.Mutation; |
| import org.apache.accumulo.core.data.Value; |
| import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; |
| import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; |
| import org.apache.accumulo.core.replication.ReplicationTable; |
| import org.apache.accumulo.core.replication.ReplicationTarget; |
| import org.apache.accumulo.core.security.TablePermission; |
| import org.apache.accumulo.master.replication.WorkMaker; |
| import org.apache.accumulo.server.replication.StatusUtil; |
| import org.apache.accumulo.server.replication.proto.Replication.Status; |
| import org.apache.accumulo.test.functional.ConfigurableMacBase; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.Text; |
| import org.junit.Before; |
| import org.junit.Ignore; |
| import org.junit.Test; |
| |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Iterables; |
| |
| @Ignore("Replication ITs are not stable and not currently maintained") |
| public class WorkMakerIT extends ConfigurableMacBase { |
| |
| private Connector conn; |
| |
| private static class MockWorkMaker extends WorkMaker { |
| |
| public MockWorkMaker(Connector conn) { |
| super(null, conn); |
| } |
| |
| @Override |
| public void setBatchWriter(BatchWriter bw) { |
| super.setBatchWriter(bw); |
| } |
| |
| @Override |
| public void addWorkRecord(Text file, Value v, Map<String,String> targets, |
| String sourceTableId) { |
| super.addWorkRecord(file, v, targets, sourceTableId); |
| } |
| |
| @Override |
| public boolean shouldCreateWork(Status status) { |
| return super.shouldCreateWork(status); |
| } |
| |
| } |
| |
| @Before |
| public void setupInstance() throws Exception { |
| conn = getConnector(); |
| ReplicationTable.setOnline(conn); |
| conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, |
| TablePermission.WRITE); |
| conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, |
| TablePermission.READ); |
| } |
| |
| @Test |
| public void singleUnitSingleTarget() throws Exception { |
| String table = testName.getMethodName(); |
| conn.tableOperations().create(table); |
| String tableId = conn.tableOperations().tableIdMap().get(table); |
| String file = "hdfs://localhost:8020/accumulo/wal/123456-1234-1234-12345678"; |
| |
| // Create a status record for a file |
| long timeCreated = System.currentTimeMillis(); |
| Mutation m = new Mutation(new Path(file).toString()); |
| m.put(StatusSection.NAME, new Text(tableId), StatusUtil.fileCreatedValue(timeCreated)); |
| BatchWriter bw = ReplicationTable.getBatchWriter(conn); |
| bw.addMutation(m); |
| bw.flush(); |
| |
| // Assert that we have one record in the status section |
| Scanner s = ReplicationTable.getScanner(conn); |
| StatusSection.limit(s); |
| assertEquals(1, Iterables.size(s)); |
| |
| MockWorkMaker workMaker = new MockWorkMaker(conn); |
| |
| // Invoke the addWorkRecord method to create a Work record from the Status record earlier |
| ReplicationTarget expected = new ReplicationTarget("remote_cluster_1", "4", tableId); |
| workMaker.setBatchWriter(bw); |
| workMaker.addWorkRecord(new Text(file), StatusUtil.fileCreatedValue(timeCreated), |
| ImmutableMap.of("remote_cluster_1", "4"), tableId); |
| |
| // Scan over just the WorkSection |
| s = ReplicationTable.getScanner(conn); |
| WorkSection.limit(s); |
| |
| Entry<Key,Value> workEntry = Iterables.getOnlyElement(s); |
| Key workKey = workEntry.getKey(); |
| ReplicationTarget actual = ReplicationTarget.from(workKey.getColumnQualifier()); |
| |
| assertEquals(file, workKey.getRow().toString()); |
| assertEquals(WorkSection.NAME, workKey.getColumnFamily()); |
| assertEquals(expected, actual); |
| assertEquals(workEntry.getValue(), StatusUtil.fileCreatedValue(timeCreated)); |
| } |
| |
| @Test |
| public void singleUnitMultipleTargets() throws Exception { |
| String table = testName.getMethodName(); |
| conn.tableOperations().create(table); |
| |
| String tableId = conn.tableOperations().tableIdMap().get(table); |
| |
| String file = "hdfs://localhost:8020/accumulo/wal/123456-1234-1234-12345678"; |
| |
| Mutation m = new Mutation(new Path(file).toString()); |
| m.put(StatusSection.NAME, new Text(tableId), |
| StatusUtil.fileCreatedValue(System.currentTimeMillis())); |
| BatchWriter bw = ReplicationTable.getBatchWriter(conn); |
| bw.addMutation(m); |
| bw.flush(); |
| |
| // Assert that we have one record in the status section |
| Scanner s = ReplicationTable.getScanner(conn); |
| StatusSection.limit(s); |
| assertEquals(1, Iterables.size(s)); |
| |
| MockWorkMaker workMaker = new MockWorkMaker(conn); |
| |
| Map<String,String> targetClusters = |
| ImmutableMap.of("remote_cluster_1", "4", "remote_cluster_2", "6", "remote_cluster_3", "8"); |
| Set<ReplicationTarget> expectedTargets = new HashSet<>(); |
| for (Entry<String,String> cluster : targetClusters.entrySet()) { |
| expectedTargets.add(new ReplicationTarget(cluster.getKey(), cluster.getValue(), tableId)); |
| } |
| workMaker.setBatchWriter(bw); |
| workMaker.addWorkRecord(new Text(file), StatusUtil.fileCreatedValue(System.currentTimeMillis()), |
| targetClusters, tableId); |
| |
| s = ReplicationTable.getScanner(conn); |
| WorkSection.limit(s); |
| |
| Set<ReplicationTarget> actualTargets = new HashSet<>(); |
| for (Entry<Key,Value> entry : s) { |
| assertEquals(file, entry.getKey().getRow().toString()); |
| assertEquals(WorkSection.NAME, entry.getKey().getColumnFamily()); |
| |
| ReplicationTarget target = ReplicationTarget.from(entry.getKey().getColumnQualifier()); |
| actualTargets.add(target); |
| } |
| |
| for (ReplicationTarget expected : expectedTargets) { |
| assertTrue("Did not find expected target: " + expected, actualTargets.contains(expected)); |
| actualTargets.remove(expected); |
| } |
| |
| assertTrue("Found extra replication work entries: " + actualTargets, actualTargets.isEmpty()); |
| } |
| |
| @Test |
| public void dontCreateWorkForEntriesWithNothingToReplicate() throws Exception { |
| String table = testName.getMethodName(); |
| conn.tableOperations().create(table); |
| String tableId = conn.tableOperations().tableIdMap().get(table); |
| String file = "hdfs://localhost:8020/accumulo/wal/123456-1234-1234-12345678"; |
| |
| Mutation m = new Mutation(new Path(file).toString()); |
| m.put(StatusSection.NAME, new Text(tableId), |
| StatusUtil.fileCreatedValue(System.currentTimeMillis())); |
| BatchWriter bw = ReplicationTable.getBatchWriter(conn); |
| bw.addMutation(m); |
| bw.flush(); |
| |
| // Assert that we have one record in the status section |
| Scanner s = ReplicationTable.getScanner(conn); |
| StatusSection.limit(s); |
| assertEquals(1, Iterables.size(s)); |
| |
| MockWorkMaker workMaker = new MockWorkMaker(conn); |
| |
| conn.tableOperations().setProperty(ReplicationTable.NAME, |
| Property.TABLE_REPLICATION_TARGET.getKey() + "remote_cluster_1", "4"); |
| |
| workMaker.setBatchWriter(bw); |
| |
| // If we don't shortcircuit out, we should get an exception because |
| // ServerConfiguration.getTableConfiguration |
| // won't work with MockAccumulo |
| workMaker.run(); |
| |
| s = ReplicationTable.getScanner(conn); |
| WorkSection.limit(s); |
| |
| assertEquals(0, Iterables.size(s)); |
| } |
| |
| } |