blob: 02ed35d6c3fa47144c246e671690edb9b1273acd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.fluo.integration;
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.fluo.accumulo.iterators.NotificationIterator;
import org.apache.fluo.accumulo.util.ColumnConstants;
import org.apache.fluo.accumulo.util.NotificationUtil;
import org.apache.fluo.api.client.AbstractTransactionBase;
import org.apache.fluo.api.client.SnapshotBase;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.client.scanner.ScannerBuilder;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.api.data.Span;
import org.apache.fluo.api.exceptions.AlreadySetException;
import org.apache.fluo.api.exceptions.CommitException;
import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.Notification;
import org.apache.fluo.core.impl.TransactionImpl;
import org.apache.fluo.core.impl.TransactionImpl.CommitData;
import org.apache.fluo.core.impl.TransactorNode;
import org.apache.fluo.core.impl.TxStats;
import org.apache.fluo.core.oracle.Stamp;
import org.apache.fluo.core.util.ByteUtil;
import org.apache.fluo.core.util.SpanUtil;
import org.apache.hadoop.io.Text;
public class TestTransaction extends AbstractTransactionBase implements TransactionBase {
private TransactionImpl tx;
private Environment env;
public static long getNotificationTS(Environment env, String row, Column col) {
Scanner scanner;
try {
scanner = env.getConnector().createScanner(env.getTable(), env.getAuthorizations());
} catch (TableNotFoundException e) {
throw new RuntimeException(e);
}
IteratorSetting iterCfg = new IteratorSetting(11, NotificationIterator.class);
scanner.addScanIterator(iterCfg);
Text cv = ByteUtil.toText(col.getVisibility());
scanner.setRange(SpanUtil.toRange(Span.prefix(row)));
scanner.fetchColumn(ByteUtil.toText(ColumnConstants.NOTIFY_CF),
new Text(NotificationUtil.encodeCol(col)));
for (Entry<Key, org.apache.accumulo.core.data.Value> entry : scanner) {
if (entry.getKey().getColumnVisibility().equals(cv)) {
return Notification.from(entry.getKey()).getTimestamp();
}
}
throw new RuntimeException("No notification found");
}
@SuppressWarnings("resource")
public TestTransaction(Environment env, TransactorNode transactor) {
this(new TransactionImpl(env).setTransactor(transactor), env);
}
public TestTransaction(Environment env) {
this(new TransactionImpl(env), env);
}
private TestTransaction(TransactionImpl transactionImpl, Environment env) {
this.tx = transactionImpl;
this.env = env;
}
public TestTransaction(Environment env, String trow, Column tcol) {
this(env, trow, tcol, getNotificationTS(env, trow, tcol));
}
public TestTransaction(Environment env, String trow, Column tcol, long notificationTS) {
this(new TransactionImpl(env, new Notification(Bytes.of(trow), tcol, notificationTS)), env);
}
/**
* Calls commit() and then close()
*/
public void done() throws CommitException {
try {
commit();
} finally {
close();
}
}
public void commit() throws CommitException {
tx.commit();
env.getSharedResources().getBatchWriter().waitForAsyncFlush();
}
public void close() {
tx.close();
}
public CommitData createCommitData() {
return tx.createCommitData();
}
public boolean preCommit(CommitData cd) throws AlreadyAcknowledgedException {
return tx.preCommit(cd);
}
public boolean preCommit(CommitData cd, RowColumn primary) {
return tx.preCommit(cd, primary);
}
public boolean commitPrimaryColumn(CommitData cd, Stamp commitStamp) {
return tx.commitPrimaryColumn(cd, commitStamp);
}
public void finishCommit(CommitData cd, Stamp commitStamp) {
tx.finishCommit(cd, commitStamp);
env.getSharedResources().getBatchWriter().waitForAsyncFlush();
}
public long getStartTs() {
return tx.getStartTs();
}
public TxStats getStats() {
return tx.getStats();
}
@Override
public void delete(Bytes row, Column col) {
tx.delete(row, col);
}
@Override
public void set(Bytes row, Column col, Bytes value) throws AlreadySetException {
tx.set(row, col, value);
}
@Override
public void setWeakNotification(Bytes row, Column col) {
tx.setWeakNotification(row, col);
}
@Override
public Bytes get(Bytes row, Column column) {
return tx.get(row, column);
}
@Override
public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
return tx.get(row, columns);
}
@Override
public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column> columns) {
return tx.get(rows, columns);
}
@Override
public Map<RowColumn, Bytes> get(Collection<RowColumn> rowColumns) {
return tx.get(rowColumns);
}
@Override
public ScannerBuilder scanner() {
return tx.scanner();
}
@Override
public long getStartTimestamp() {
return tx.getStartTimestamp();
}
@Override
public SnapshotBase withReadLock() {
return tx.withReadLock();
}
}