blob: d41323be0270cf85f207420294e445745ee04b3b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.fluo.integration.impl;
import java.util.Iterator;
import java.util.Map.Entry;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.observer.ObserverProvider;
import org.apache.fluo.api.observer.StringObserver;
import org.apache.fluo.core.impl.Notification;
import org.apache.fluo.core.impl.TransactionImpl.CommitData;
import org.apache.fluo.core.oracle.Stamp;
import org.apache.fluo.integration.ITBaseImpl;
import org.apache.fluo.integration.TestTransaction;
import org.apache.fluo.integration.TestUtil;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import static org.apache.fluo.api.observer.Observer.NotificationType.WEAK;
public class WeakNotificationOverlapIT extends ITBaseImpl {
@Rule
public Timeout globalTimeout = Timeout.seconds(getTestTimeout());
private static final Column STAT_TOTAL = new Column("stat", "total");
private static final Column STAT_PROCESSED = new Column("stat", "processed");
private static final Column STAT_CHANGED = new Column("stat", "changed");
private static final StringObserver TOTAL_OBSERVER = (tx, row, col) -> {
String totalStr = tx.gets(row, STAT_TOTAL);
if (totalStr == null) {
return;
}
Integer total = Integer.parseInt(totalStr);
int processed = TestUtil.getOrDefault(tx, row, STAT_PROCESSED, 0);
tx.set(row, new Column("stat", "processed"), total + "");
TestUtil.increment(tx, "all", new Column("stat", "total"), total - processed);
};
public static class WeakNtfyObserverProvider implements ObserverProvider {
@Override
public void provide(Registry or, Context ctx) {
or.forColumn(STAT_CHANGED, WEAK).useStrObserver(TOTAL_OBSERVER);
}
}
@Override
protected Class<? extends ObserverProvider> getObserverProviderClass() {
return WeakNtfyObserverProvider.class;
}
@Test
public void testOverlap() throws Exception {
// this test ensures that processing of weak notification deletes based on startTs and not
// commitTs
final TestTransaction ttx1 = new TestTransaction(env);
TestUtil.increment(ttx1, "1", STAT_TOTAL, 1);
ttx1.setWeakNotification("1", STAT_CHANGED);
ttx1.done();
final TestTransaction ttx2 = new TestTransaction(env, "1", STAT_CHANGED);
final TestTransaction ttx3 = new TestTransaction(env);
TestUtil.increment(ttx3, "1", STAT_TOTAL, 1);
ttx3.setWeakNotification("1", STAT_CHANGED);
ttx3.done();
Assert.assertEquals(1, countNotifications());
TOTAL_OBSERVER.process(ttx2, Bytes.of("1"), STAT_CHANGED);
// should not delete notification created by ttx3
ttx2.done();
final TestTransaction snap1 = new TestTransaction(env);
Assert.assertEquals("1", snap1.gets("all", STAT_TOTAL));
snap1.done();
Assert.assertEquals(1, countNotifications());
final TestTransaction ttx4 = new TestTransaction(env, "1", STAT_CHANGED);
TOTAL_OBSERVER.process(ttx4, Bytes.of("1"), STAT_CHANGED);
ttx4.done();
Assert.assertEquals(0, countNotifications());
final TestTransaction snap2 = new TestTransaction(env);
Assert.assertEquals("2", snap2.gets("all", STAT_TOTAL));
snap2.done();
// the following code is a repeat of the above with a slight diff. The following tx creates a
// notification, but deletes the data so there is no work for the
// observer. This test the case where a observer deletes a notification w/o making any updates.
final TestTransaction ttx5 = new TestTransaction(env);
ttx5.delete("1", STAT_TOTAL);
ttx5.delete("1", STAT_PROCESSED);
ttx5.setWeakNotification("1", STAT_CHANGED);
ttx5.done();
Assert.assertEquals(1, countNotifications());
final TestTransaction ttx6 = new TestTransaction(env, "1", STAT_CHANGED);
final TestTransaction ttx7 = new TestTransaction(env);
TestUtil.increment(ttx7, "1", STAT_TOTAL, 1);
ttx7.setWeakNotification("1", STAT_CHANGED);
ttx7.done();
Assert.assertEquals(1, countNotifications());
TOTAL_OBSERVER.process(ttx6, Bytes.of("1"), STAT_CHANGED);
// should not delete notification created by ttx7
ttx6.done();
Assert.assertEquals(1, countNotifications());
final TestTransaction snap3 = new TestTransaction(env);
Assert.assertEquals("2", snap3.gets("all", STAT_TOTAL));
snap3.done();
final TestTransaction ttx8 = new TestTransaction(env, "1", STAT_CHANGED);
TOTAL_OBSERVER.process(ttx8, Bytes.of("1"), STAT_CHANGED);
ttx8.done();
Assert.assertEquals(0, countNotifications());
final TestTransaction snap4 = new TestTransaction(env);
Assert.assertEquals("3", snap4.gets("all", STAT_TOTAL));
snap4.done();
}
@Test
public void testOverlap2() throws Exception {
// this test ensures that setting weak notification is based on commitTs and not startTs
final TestTransaction ttx1 = new TestTransaction(env);
TestUtil.increment(ttx1, "1", STAT_TOTAL, 1);
ttx1.setWeakNotification("1", STAT_CHANGED);
ttx1.done();
Assert.assertEquals(1, countNotifications());
final TestTransaction ttx2 = new TestTransaction(env);
TestUtil.increment(ttx2, "1", STAT_TOTAL, 1);
ttx2.setWeakNotification("1", STAT_CHANGED);
CommitData cd2 = ttx2.createCommitData();
Assert.assertTrue(ttx2.preCommit(cd2));
// simulate an observer processing the notification created by ttx1 while ttx2 is in the middle
// of committing. Processing this observer should not delete
// the notification for ttx2. It should delete the notification for ttx1.
final TestTransaction ttx3 = new TestTransaction(env, "1", STAT_CHANGED);
Stamp commitTs = env.getSharedResources().getOracleClient().getStamp();
Assert.assertTrue(ttx2.commitPrimaryColumn(cd2, commitTs));
ttx2.finishCommit(cd2, commitTs);
ttx2.close();
Assert.assertEquals(1, countNotifications());
TOTAL_OBSERVER.process(ttx3, Bytes.of("1"), STAT_CHANGED);
ttx3.done();
Assert.assertEquals(1, countNotifications());
try (Snapshot snapshot = client.newSnapshot()) {
Assert.assertEquals("1", snapshot.gets("all", STAT_TOTAL));
}
final TestTransaction ttx4 = new TestTransaction(env, "1", STAT_CHANGED);
TOTAL_OBSERVER.process(ttx4, Bytes.of("1"), STAT_CHANGED);
ttx4.done();
Assert.assertEquals(0, countNotifications());
try (Snapshot snapshot = client.newSnapshot()) {
Assert.assertEquals("2", snapshot.gets("all", STAT_TOTAL));
}
}
private int countNotifications() throws Exception {
// deletes of notifications are queued async at end of transaction
env.getSharedResources().getBatchWriter().waitForAsyncFlush();
Scanner scanner = aClient.createScanner(getCurTableName(), Authorizations.EMPTY);
Notification.configureScanner(scanner);
int count = 0;
for (Iterator<Entry<Key, Value>> iterator = scanner.iterator(); iterator.hasNext();) {
iterator.next();
count++;
}
return count;
}
}