blob: d8264f110d0818e4daaf347ed7f319f9ddfb2651 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.test.replication;
import static org.junit.Assert.assertEquals;
import java.util.Map.Entry;
import java.util.UUID;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.ReplicationTarget;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.master.replication.FinishedWorkUpdater;
import org.apache.accumulo.server.replication.proto.Replication.Status;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import com.google.common.collect.Iterables;
@Ignore("Replication ITs are not stable and not currently maintained")
public class FinishedWorkUpdaterIT extends ConfigurableMacBase {
private Connector conn;
private FinishedWorkUpdater updater;
@Before
public void configureUpdater() throws Exception {
conn = getConnector();
updater = new FinishedWorkUpdater(conn);
}
@Test
public void offlineReplicationTableFailsGracefully() {
updater.run();
}
@Test
public void recordsWithProgressUpdateBothTables() throws Exception {
conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME,
TablePermission.READ);
conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME,
TablePermission.WRITE);
ReplicationTable.setOnline(conn);
String file = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
Status stat =
Status.newBuilder().setBegin(100).setEnd(200).setClosed(true).setInfiniteEnd(false).build();
ReplicationTarget target = new ReplicationTarget("peer", "table1", "1");
// Create a single work record for a file to some peer
BatchWriter bw = ReplicationTable.getBatchWriter(conn);
Mutation m = new Mutation(file);
WorkSection.add(m, target.toText(), ProtobufUtil.toValue(stat));
bw.addMutation(m);
bw.close();
updater.run();
Scanner s = ReplicationTable.getScanner(conn);
s.setRange(Range.exact(file));
StatusSection.limit(s);
Entry<Key,Value> entry = Iterables.getOnlyElement(s);
assertEquals(entry.getKey().getColumnFamily(), StatusSection.NAME);
assertEquals(entry.getKey().getColumnQualifier().toString(), target.getSourceTableId());
// We should only rely on the correct begin attribute being returned
Status actual = Status.parseFrom(entry.getValue().get());
assertEquals(stat.getBegin(), actual.getBegin());
}
@Test
public void chooseMinimumBeginOffset() throws Exception {
conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME,
TablePermission.READ);
conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME,
TablePermission.WRITE);
ReplicationTable.setOnline(conn);
String file = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
Status stat1 = Status.newBuilder().setBegin(100).setEnd(1000).setClosed(true)
.setInfiniteEnd(false).build();
Status stat2 = Status.newBuilder().setBegin(500).setEnd(1000).setClosed(true)
.setInfiniteEnd(false).build();
Status stat3 =
Status.newBuilder().setBegin(1).setEnd(1000).setClosed(true).setInfiniteEnd(false).build();
ReplicationTarget target1 = new ReplicationTarget("peer1", "table1", "1");
ReplicationTarget target2 = new ReplicationTarget("peer2", "table2", "1");
ReplicationTarget target3 = new ReplicationTarget("peer3", "table3", "1");
// Create a single work record for a file to some peer
BatchWriter bw = ReplicationTable.getBatchWriter(conn);
Mutation m = new Mutation(file);
WorkSection.add(m, target1.toText(), ProtobufUtil.toValue(stat1));
WorkSection.add(m, target2.toText(), ProtobufUtil.toValue(stat2));
WorkSection.add(m, target3.toText(), ProtobufUtil.toValue(stat3));
bw.addMutation(m);
bw.close();
updater.run();
Scanner s = ReplicationTable.getScanner(conn);
s.setRange(Range.exact(file));
StatusSection.limit(s);
Entry<Key,Value> entry = Iterables.getOnlyElement(s);
assertEquals(entry.getKey().getColumnFamily(), StatusSection.NAME);
assertEquals(entry.getKey().getColumnQualifier().toString(), target1.getSourceTableId());
// We should only rely on the correct begin attribute being returned
Status actual = Status.parseFrom(entry.getValue().get());
assertEquals(1, actual.getBegin());
}
@Test
public void chooseMinimumBeginOffsetInfiniteEnd() throws Exception {
conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME,
TablePermission.READ);
conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME,
TablePermission.WRITE);
ReplicationTable.setOnline(conn);
String file = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
Status stat1 =
Status.newBuilder().setBegin(100).setEnd(1000).setClosed(true).setInfiniteEnd(true).build();
Status stat2 =
Status.newBuilder().setBegin(1).setEnd(1000).setClosed(true).setInfiniteEnd(true).build();
Status stat3 =
Status.newBuilder().setBegin(500).setEnd(1000).setClosed(true).setInfiniteEnd(true).build();
ReplicationTarget target1 = new ReplicationTarget("peer1", "table1", "1");
ReplicationTarget target2 = new ReplicationTarget("peer2", "table2", "1");
ReplicationTarget target3 = new ReplicationTarget("peer3", "table3", "1");
// Create a single work record for a file to some peer
BatchWriter bw = ReplicationTable.getBatchWriter(conn);
Mutation m = new Mutation(file);
WorkSection.add(m, target1.toText(), ProtobufUtil.toValue(stat1));
WorkSection.add(m, target2.toText(), ProtobufUtil.toValue(stat2));
WorkSection.add(m, target3.toText(), ProtobufUtil.toValue(stat3));
bw.addMutation(m);
bw.close();
updater.run();
Scanner s = ReplicationTable.getScanner(conn);
s.setRange(Range.exact(file));
StatusSection.limit(s);
Entry<Key,Value> entry = Iterables.getOnlyElement(s);
assertEquals(entry.getKey().getColumnFamily(), StatusSection.NAME);
assertEquals(entry.getKey().getColumnQualifier().toString(), target1.getSourceTableId());
// We should only rely on the correct begin attribute being returned
Status actual = Status.parseFrom(entry.getValue().get());
assertEquals(1, actual.getBegin());
}
}