Updates for Accumulo 2.0 alpha2 release (#1068)
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/AccumuloProps.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/AccumuloProps.java
index 9e9c84e..f6585de 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/AccumuloProps.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/AccumuloProps.java
@@ -17,6 +17,9 @@
public class AccumuloProps {
+ public static final String CLIENT_INSTANCE_NAME = "instance.name";
+ public static final String CLIENT_ZOOKEEPERS = "instance.zookeepers";
+
public static final String TABLE_BLOCKCACHE_ENABLED = "table.cache.block.enable";
public static final String TABLE_CLASSPATH = "table.classpath.context";
public static final String TABLE_DELETE_BEHAVIOR = "table.delete.behavior";
diff --git a/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java b/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java
index ce6d459..464a904 100644
--- a/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java
+++ b/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java
@@ -49,17 +49,12 @@
private static boolean hasNotifications(Environment env, TableRange range)
throws TableNotFoundException {
- Scanner scanner = null;
- try {
- scanner = env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations());
+ try (Scanner scanner =
+ env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations())) {
scanner.setRange(range.getRange());
Notification.configureScanner(scanner);
return scanner.iterator().hasNext();
- } finally {
- if (scanner != null) {
- scanner.close();
- }
}
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
index 95a8892..225cd90 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
@@ -117,9 +117,14 @@
"Fluo application already initialized at " + config.getAppZookeepers());
}
- AccumuloClient conn = AccumuloUtil.getClient(config);
+ try (AccumuloClient client = AccumuloUtil.getClient(config)) {
+ initialize(opts, client);
+ }
+ }
- boolean tableExists = conn.tableOperations().exists(config.getAccumuloTable());
+ private void initialize(InitializationOptions opts, AccumuloClient client)
+ throws TableExistsException, AlreadyInitializedException {
+ boolean tableExists = client.tableOperations().exists(config.getAccumuloTable());
if (tableExists && !opts.getClearTable()) {
throw new TableExistsException("Accumulo table already exists " + config.getAccumuloTable());
}
@@ -130,7 +135,7 @@
logger.info("The Accumulo table '{}' will be dropped and created as requested by user",
config.getAccumuloTable());
try {
- conn.tableOperations().delete(config.getAccumuloTable());
+ client.tableOperations().delete(config.getAccumuloTable());
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -151,7 +156,7 @@
}
try {
- initializeApplicationInZooKeeper(conn);
+ initializeApplicationInZooKeeper(client);
String accumuloJars;
if (!config.getAccumuloJars().trim().isEmpty()) {
@@ -180,7 +185,7 @@
if (!accumuloClasspath.isEmpty()) {
String contextName = "fluo-" + config.getApplicationName();
- conn.instanceOperations().setProperty(
+ client.instanceOperations().setProperty(
AccumuloProps.VFS_CONTEXT_CLASSPATH_PROPERTY + contextName, accumuloClasspath);
ntcProps.put(AccumuloProps.TABLE_CLASSPATH, contextName);
}
@@ -201,7 +206,7 @@
configureIterators(ntc);
ntc.setProperties(ntcProps);
- conn.tableOperations().create(config.getAccumuloTable(), ntc);
+ client.tableOperations().create(config.getAccumuloTable(), ntc);
updateSharedConfig();
} catch (NodeExistsException nee) {
@@ -246,16 +251,16 @@
throw new FluoException("Must stop the oracle server to remove an application");
}
- AccumuloClient conn = AccumuloUtil.getClient(config);
-
- boolean tableExists = conn.tableOperations().exists(config.getAccumuloTable());
- // With preconditions met, it's now OK to delete table & zookeeper root (if they exist)
- if (tableExists) {
- logger.info("The Accumulo table '{}' will be dropped", config.getAccumuloTable());
- try {
- conn.tableOperations().delete(config.getAccumuloTable());
- } catch (Exception e) {
- throw new RuntimeException(e);
+ try (AccumuloClient client = AccumuloUtil.getClient(config)) {
+ boolean tableExists = client.tableOperations().exists(config.getAccumuloTable());
+ // With preconditions met, it's now OK to delete table & zookeeper root (if they exist)
+ if (tableExists) {
+ logger.info("The Accumulo table '{}' will be dropped", config.getAccumuloTable());
+ try {
+ client.tableOperations().delete(config.getAccumuloTable());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
}
@@ -276,7 +281,8 @@
private void initializeApplicationInZooKeeper(AccumuloClient client) throws Exception {
- final String accumuloInstanceName = client.info().getInstanceName();
+ final String accumuloInstanceName =
+ client.properties().getProperty(AccumuloProps.CLIENT_INSTANCE_NAME);
final String accumuloInstanceID = client.getInstanceID();
final String fluoApplicationID = UUID.randomUUID().toString();
@@ -545,7 +551,8 @@
if (!config.hasRequiredAdminProps()) {
throw new IllegalArgumentException("Admin configuration is missing required properties");
}
- AccumuloClient client = AccumuloUtil.getClient(config);
- return client.tableOperations().exists(config.getAccumuloTable());
+ try (AccumuloClient client = AccumuloUtil.getClient(config)) {
+ return client.tableOperations().exists(config.getAccumuloTable());
+ }
}
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
index 766a4a9..a5c389f 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
@@ -91,9 +91,10 @@
ensureDeletesAreDisabled();
- if (!client.info().getInstanceName().equals(accumuloInstance)) {
- throw new IllegalArgumentException("unexpected accumulo instance name "
- + client.info().getInstanceName() + " != " + accumuloInstance);
+ String instanceName = client.properties().getProperty(AccumuloProps.CLIENT_INSTANCE_NAME);
+ if (!instanceName.equals(accumuloInstance)) {
+ throw new IllegalArgumentException(
+ "unexpected accumulo instance name " + instanceName + " != " + accumuloInstance);
}
if (!client.getInstanceID().equals(accumuloInstanceID)) {
@@ -251,5 +252,6 @@
@Override
public void close() {
resources.close();
+ client.close();
}
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
index 51cdd5b..33f8487 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
@@ -317,10 +317,9 @@
}
}
- BatchScanner bscanner = null;
- try {
- bscanner =
- env.getAccumuloClient().createBatchScanner(env.getTable(), env.getAuthorizations(), 1);
+
+ try (BatchScanner bscanner =
+ env.getAccumuloClient().createBatchScanner(env.getTable(), env.getAuthorizations(), 1)) {
bscanner.setRanges(ranges);
IteratorSetting iterCfg = new IteratorSetting(10, OpenReadLockIterator.class);
@@ -336,10 +335,6 @@
return ret;
- } finally {
- if (bscanner != null) {
- bscanner.close();
- }
}
}
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index 99551d0..b898492 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -674,25 +674,22 @@
Range range = new Range(startKey, endKey);
- Scanner scanner;
- try {
- // TODO reuse or share scanner
- scanner =
- env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations());
+ try (Scanner scanner =
+ env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations())) {
+ scanner.setRange(range);
+
+ // TODO could use iterator that stops after 1st ACK. thought of using versioning iter
+ // but
+ // it scans to ACK
+ if (scanner.iterator().hasNext()) {
+ env.getSharedResources().getBatchWriter()
+ .writeMutationAsync(notification.newDelete(env));
+ return true;
+ }
} catch (TableNotFoundException e) {
// TODO proper exception handling
throw new RuntimeException(e);
}
-
- scanner.setRange(range);
-
- // TODO could use iterator that stops after 1st ACK. thought of using versioning iter but
- // it scans to ACK
- if (scanner.iterator().hasNext()) {
- env.getSharedResources().getBatchWriter()
- .writeMutationAsync(notification.newDelete(env));
- return true;
- }
}
}
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/AccumuloUtil.java b/modules/core/src/main/java/org/apache/fluo/core/util/AccumuloUtil.java
index 3257bd5..b6eaeb6 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/AccumuloUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/AccumuloUtil.java
@@ -17,9 +17,6 @@
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.ClientInfo;
import org.apache.fluo.api.config.FluoConfiguration;
/**
@@ -32,18 +29,7 @@
* Creates Accumulo connector given FluoConfiguration
*/
public static AccumuloClient getClient(FluoConfiguration config) {
- try {
- return Accumulo.newClient()
- .forInstance(config.getAccumuloInstance(), config.getAccumuloZookeepers())
- .usingPassword(config.getAccumuloUser(), config.getAccumuloPassword()).build();
- } catch (AccumuloException | AccumuloSecurityException e) {
- throw new IllegalStateException(e);
- }
- }
-
- public static ClientInfo getClientInfo(FluoConfiguration config) {
- return Accumulo.newClient()
- .forInstance(config.getAccumuloInstance(), config.getAccumuloZookeepers())
- .usingPassword(config.getAccumuloUser(), config.getAccumuloPassword()).info();
+ return Accumulo.newClient().to(config.getAccumuloInstance(), config.getAccumuloZookeepers())
+ .as(config.getAccumuloUser(), config.getAccumuloPassword()).build();
}
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java b/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
index bf32a61..51e4f15 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
@@ -75,34 +75,32 @@
Column col) {
Span span = Span.exact(row, col);
- Scanner scanner;
- try {
- // TODO reuse or share scanner
- scanner = env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations());
+ try (Scanner scanner =
+ env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations())) {
+ scanner.setRange(SpanUtil.toRange(span));
+ scanner.addScanIterator(iterConf);
+
+ Iterator<Entry<Key, Value>> iter = scanner.iterator();
+ if (iter.hasNext()) {
+ Entry<Key, Value> entry = iter.next();
+
+ Key k = entry.getKey();
+ Bytes r = Bytes.of(k.getRowData().toArray());
+ Bytes cf = Bytes.of(k.getColumnFamilyData().toArray());
+ Bytes cq = Bytes.of(k.getColumnQualifierData().toArray());
+ Bytes cv = Bytes.of(k.getColumnVisibilityData().toArray());
+
+ if (r.equals(row) && cf.equals(col.getFamily()) && cq.equals(col.getQualifier())
+ && cv.equals(col.getVisibility())) {
+ return entry;
+ } else {
+ throw new RuntimeException("unexpected key " + k + " " + row + " " + col);
+ }
+ }
} catch (TableNotFoundException e) {
// TODO proper exception handling
throw new RuntimeException(e);
}
- scanner.setRange(SpanUtil.toRange(span));
- scanner.addScanIterator(iterConf);
-
- Iterator<Entry<Key, Value>> iter = scanner.iterator();
- if (iter.hasNext()) {
- Entry<Key, Value> entry = iter.next();
-
- Key k = entry.getKey();
- Bytes r = Bytes.of(k.getRowData().toArray());
- Bytes cf = Bytes.of(k.getColumnFamilyData().toArray());
- Bytes cq = Bytes.of(k.getColumnQualifierData().toArray());
- Bytes cv = Bytes.of(k.getColumnVisibilityData().toArray());
-
- if (r.equals(row) && cf.equals(col.getFamily()) && cq.equals(col.getQualifier())
- && cv.equals(col.getVisibility())) {
- return entry;
- } else {
- throw new RuntimeException("unexpected key " + k + " " + row + " " + col);
- }
- }
return null;
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
index 2c66cbf..7628274 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
@@ -162,10 +162,7 @@
Span span = getSpan(options);
Collection<Column> columns = getColumns(options);
- Scanner scanner = null;
- try {
- scanner = client.createScanner(sConfig.getAccumuloTable(), Authorizations.EMPTY);
-
+ try (Scanner scanner = client.createScanner(sConfig.getAccumuloTable(), Authorizations.EMPTY)) {
scanner.setRange(SpanUtil.toRange(span));
NotificationScanner ntfyScanner = new NotificationScanner(scanner, columns);
@@ -173,10 +170,6 @@
scan(options, out, ntfyScanner);
} catch (TableNotFoundException e) {
throw new RuntimeException(e);
- } finally {
- if (scanner != null) {
- scanner.close();
- }
}
}
@@ -210,13 +203,12 @@
public static void scanAccumulo(ScanOpts options, FluoConfiguration sConfig, PrintStream out) {
- AccumuloClient client = AccumuloUtil.getClient(sConfig);
-
Span span = getSpan(options);
Collection<Column> columns = getColumns(options);
- try {
- Scanner scanner = client.createScanner(sConfig.getAccumuloTable(), Authorizations.EMPTY);
+ try (AccumuloClient client = AccumuloUtil.getClient(sConfig);
+ Scanner scanner = client.createScanner(sConfig.getAccumuloTable(), Authorizations.EMPTY)) {
+
scanner.setRange(SpanUtil.toRange(span));
for (Column col : columns) {
if (col.isQualifierSet()) {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java
index ffe9194..1a47f2a 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
@@ -180,34 +180,35 @@
private ScanCounts scan(Session session, PartitionInfo pi, Range range)
throws TableNotFoundException {
- Scanner scanner =
- env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations());
+ try (Scanner scanner =
+ env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations())) {
- scanner.setRange(range);
+ scanner.setRange(range);
- Notification.configureScanner(scanner);
+ Notification.configureScanner(scanner);
- IteratorSetting iterCfg = new IteratorSetting(30, "nhf", NotificationHashFilter.class);
- NotificationHashFilter.setModulusParams(iterCfg, pi.getMyGroupSize(), pi.getMyIdInGroup());
- scanner.addScanIterator(iterCfg);
+ IteratorSetting iterCfg = new IteratorSetting(30, "nhf", NotificationHashFilter.class);
+ NotificationHashFilter.setModulusParams(iterCfg, pi.getMyGroupSize(), pi.getMyIdInGroup());
+ scanner.addScanIterator(iterCfg);
- ScanCounts counts = new ScanCounts();
+ ScanCounts counts = new ScanCounts();
- for (Entry<Key, Value> entry : scanner) {
- if (!pi.equals(partitionManager.getPartitionInfo())) {
- throw new PartitionInfoChangedException();
+ for (Entry<Key, Value> entry : scanner) {
+ if (!pi.equals(partitionManager.getPartitionInfo())) {
+ throw new PartitionInfoChangedException();
+ }
+
+ if (stopped.get()) {
+ return counts;
+ }
+
+ counts.seen++;
+
+ if (session.addNotification(finder, Notification.from(entry.getKey()))) {
+ counts.added++;
+ }
}
-
- if (stopped.get()) {
- return counts;
- }
-
- counts.seen++;
-
- if (session.addNotification(finder, Notification.from(entry.getKey()))) {
- counts.added++;
- }
+ return counts;
}
- return counts;
}
}
diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBase.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBase.java
index 92436f4..3bb62e3 100644
--- a/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBase.java
+++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBase.java
@@ -16,12 +16,12 @@
package org.apache.fluo.integration;
import java.io.File;
+import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.ClientInfo;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.accumulo.minicluster.MiniAccumuloConfig;
import org.apache.commons.io.FileUtils;
@@ -48,7 +48,6 @@
protected static String instanceName;
protected static AccumuloClient aClient;
- protected static ClientInfo clientInfo;
private static MiniAccumuloCluster cluster;
private static boolean startedCluster = false;
@@ -87,8 +86,8 @@
cluster.start();
startedCluster = true;
}
- clientInfo = MiniAccumuloCluster.getClientInfo(instanceDir);
- aClient = Accumulo.newClient().usingClientInfo(clientInfo).build();
+ Properties props = MiniAccumuloCluster.getClientProperties(instanceDir);
+ aClient = Accumulo.newClient().from(props).build();
}
protected Class<? extends ObserverProvider> getObserverProviderClass() {
@@ -128,6 +127,10 @@
@AfterClass
public static void tearDownAccumulo() throws Exception {
+ if (aClient != null) {
+ aClient.close();
+ }
+
if (startedCluster) {
cluster.stop();
}
diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseImpl.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseImpl.java
index 43bc869..1f77d3f 100644
--- a/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseImpl.java
+++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseImpl.java
@@ -15,8 +15,10 @@
package org.apache.fluo.integration;
+import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import org.apache.fluo.accumulo.util.AccumuloProps;
import org.apache.fluo.api.client.FluoAdmin;
import org.apache.fluo.api.client.FluoAdmin.InitializationOptions;
import org.apache.fluo.api.client.FluoFactory;
@@ -66,14 +68,15 @@
table = getNextTableName();
+ Properties props = aClient.properties();
config = new FluoConfiguration();
config.setApplicationName("impl-test" + testCounter.getAndIncrement());
- config.setAccumuloInstance(clientInfo.getInstanceName());
+ config.setAccumuloInstance(props.getProperty(AccumuloProps.CLIENT_INSTANCE_NAME));
config.setAccumuloUser(USER);
config.setAccumuloPassword(PASSWORD);
config.setAccumuloTable(table);
- config.setAccumuloZookeepers(clientInfo.getZooKeepers());
- config.setInstanceZookeepers(clientInfo.getZooKeepers() + "/fluo");
+ config.setAccumuloZookeepers(props.getProperty(AccumuloProps.CLIENT_ZOOKEEPERS));
+ config.setInstanceZookeepers(props.getProperty(AccumuloProps.CLIENT_ZOOKEEPERS) + "/fluo");
config.setTransactionRollbackTime(1, TimeUnit.SECONDS);
setupObservers(config);
config.setProperty(FluoConfigurationImpl.ZK_UPDATE_PERIOD_PROP, "1000");
diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseMini.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseMini.java
index d31bf9f..f516a10 100644
--- a/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseMini.java
+++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseMini.java
@@ -15,8 +15,10 @@
package org.apache.fluo.integration;
+import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import org.apache.fluo.accumulo.util.AccumuloProps;
import org.apache.fluo.api.client.FluoAdmin;
import org.apache.fluo.api.client.FluoAdmin.InitializationOptions;
import org.apache.fluo.api.client.FluoFactory;
@@ -40,13 +42,14 @@
@Before
public void setUpFluo() throws Exception {
+ Properties props = aClient.properties();
config = new FluoConfiguration();
config.setApplicationName("mini-test" + testCounter.getAndIncrement());
- config.setAccumuloInstance(clientInfo.getInstanceName());
+ config.setAccumuloInstance(props.getProperty(AccumuloProps.CLIENT_INSTANCE_NAME));
config.setAccumuloUser(USER);
config.setAccumuloPassword(PASSWORD);
- config.setAccumuloZookeepers(clientInfo.getZooKeepers());
- config.setInstanceZookeepers(clientInfo.getZooKeepers() + "/fluo");
+ config.setAccumuloZookeepers(props.getProperty(AccumuloProps.CLIENT_ZOOKEEPERS));
+ config.setInstanceZookeepers(props.getProperty(AccumuloProps.CLIENT_ZOOKEEPERS) + "/fluo");
config.setAccumuloTable(getNextTableName());
config.setWorkerThreads(5);
setupObservers(config);
diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/TestTransaction.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/TestTransaction.java
index ebb02d1..7d62f48 100644
--- a/modules/integration-tests/src/main/java/org/apache/fluo/integration/TestTransaction.java
+++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/TestTransaction.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
@@ -55,28 +55,28 @@
private Environment env;
public static long getNotificationTS(Environment env, String row, Column col) {
- Scanner scanner;
- try {
- scanner = env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations());
+ try (Scanner scanner =
+ env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations())) {
+
+ IteratorSetting iterCfg = new IteratorSetting(11, NotificationIterator.class);
+ scanner.addScanIterator(iterCfg);
+
+ Text cv = ByteUtil.toText(col.getVisibility());
+
+ scanner.setRange(SpanUtil.toRange(Span.prefix(row)));
+ scanner.fetchColumn(ByteUtil.toText(ColumnConstants.NOTIFY_CF),
+ new Text(NotificationUtil.encodeCol(col)));
+
+ for (Entry<Key, org.apache.accumulo.core.data.Value> entry : scanner) {
+ if (entry.getKey().getColumnVisibility().equals(cv)) {
+ return Notification.from(entry.getKey()).getTimestamp();
+ }
+ }
+
+ throw new RuntimeException("No notification found");
} catch (TableNotFoundException e) {
throw new RuntimeException(e);
}
- IteratorSetting iterCfg = new IteratorSetting(11, NotificationIterator.class);
- scanner.addScanIterator(iterCfg);
-
- Text cv = ByteUtil.toText(col.getVisibility());
-
- scanner.setRange(SpanUtil.toRange(Span.prefix(row)));
- scanner.fetchColumn(ByteUtil.toText(ColumnConstants.NOTIFY_CF),
- new Text(NotificationUtil.encodeCol(col)));
-
- for (Entry<Key, org.apache.accumulo.core.data.Value> entry : scanner) {
- if (entry.getKey().getColumnVisibility().equals(cv)) {
- return Notification.from(entry.getKey()).getTimestamp();
- }
- }
-
- throw new RuntimeException("No notification found");
}
@SuppressWarnings("resource")
diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
index a9222b0..87996e7 100644
--- a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
+++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
@@ -114,18 +114,19 @@
admin.initialize(opts);
// verify locality groups were set on the table
- AccumuloClient client = AccumuloUtil.getClient(config);
- Map<String, Set<Text>> localityGroups =
- client.tableOperations().getLocalityGroups(config.getAccumuloTable());
- Assert.assertEquals("Unexpected locality group count.", 1, localityGroups.size());
- Entry<String, Set<Text>> localityGroup = localityGroups.entrySet().iterator().next();
- Assert.assertEquals("'notify' locality group not found.",
- ColumnConstants.NOTIFY_LOCALITY_GROUP_NAME, localityGroup.getKey());
- Assert.assertEquals("'notify' locality group does not contain exactly 1 column family.", 1,
- localityGroup.getValue().size());
- Text colFam = localityGroup.getValue().iterator().next();
- Assert.assertTrue("'notify' locality group does not contain the correct column family.",
- ColumnConstants.NOTIFY_CF.contentEquals(colFam.getBytes(), 0, colFam.getLength()));
+ try (AccumuloClient client = AccumuloUtil.getClient(config)) {
+ Map<String, Set<Text>> localityGroups =
+ client.tableOperations().getLocalityGroups(config.getAccumuloTable());
+ Assert.assertEquals("Unexpected locality group count.", 1, localityGroups.size());
+ Entry<String, Set<Text>> localityGroup = localityGroups.entrySet().iterator().next();
+ Assert.assertEquals("'notify' locality group not found.",
+ ColumnConstants.NOTIFY_LOCALITY_GROUP_NAME, localityGroup.getKey());
+ Assert.assertEquals("'notify' locality group does not contain exactly 1 column family.", 1,
+ localityGroup.getValue().size());
+ Text colFam = localityGroup.getValue().iterator().next();
+ Assert.assertTrue("'notify' locality group does not contain the correct column family.",
+ ColumnConstants.NOTIFY_CF.contentEquals(colFam.getBytes(), 0, colFam.getLength()));
+ }
}
try (FluoClientImpl client = new FluoClientImpl(localConfig)) {
diff --git a/modules/mapreduce/pom.xml b/modules/mapreduce/pom.xml
index a6d95d7..c4a1339 100644
--- a/modules/mapreduce/pom.xml
+++ b/modules/mapreduce/pom.xml
@@ -28,10 +28,6 @@
<dependencies>
<dependency>
<groupId>org.apache.accumulo</groupId>
- <artifactId>accumulo-client-mapreduce</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
</dependency>
<dependency>
diff --git a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java
index e09b86f..9d58a86 100644
--- a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java
+++ b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java
@@ -25,6 +25,7 @@
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.api.data.Bytes;
@@ -34,7 +35,6 @@
import org.apache.fluo.api.data.Span;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.TransactionImpl;
-import org.apache.fluo.core.util.AccumuloUtil;
import org.apache.fluo.core.util.SpanUtil;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -154,7 +154,10 @@
conf.getConfiguration().set(PROPS_CONF_KEY,
new String(baos.toByteArray(), StandardCharsets.UTF_8));
- AccumuloInputFormat.setClientInfo(conf, AccumuloUtil.getClientInfo(fconfig));
+ AccumuloInputFormat.setZooKeeperInstance(conf, fconfig.getAccumuloInstance(),
+ fconfig.getAccumuloZookeepers());
+ AccumuloInputFormat.setConnectorInfo(conf, fconfig.getAccumuloUser(),
+ new PasswordToken(fconfig.getAccumuloPassword()));
AccumuloInputFormat.setInputTableName(conf, env.getTable());
AccumuloInputFormat.setScanAuthorizations(conf, env.getAuthorizations());
}
diff --git a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoRowInputFormat.java b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoRowInputFormat.java
index 8993f38..1c5616d 100644
--- a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoRowInputFormat.java
+++ b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoRowInputFormat.java
@@ -25,6 +25,7 @@
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.fluo.api.client.scanner.ColumnScanner;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.SimpleConfiguration;
@@ -34,7 +35,6 @@
import org.apache.fluo.api.data.Span;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.TransactionImpl;
-import org.apache.fluo.core.util.AccumuloUtil;
import org.apache.fluo.core.util.SpanUtil;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -155,7 +155,10 @@
conf.getConfiguration().set(PROPS_CONF_KEY,
new String(baos.toByteArray(), StandardCharsets.UTF_8));
- AccumuloInputFormat.setClientInfo(conf, AccumuloUtil.getClientInfo(fconfig));
+ AccumuloInputFormat.setZooKeeperInstance(conf, fconfig.getAccumuloInstance(),
+ fconfig.getAccumuloZookeepers());
+ AccumuloInputFormat.setConnectorInfo(conf, fconfig.getAccumuloUser(),
+ new PasswordToken(fconfig.getAccumuloPassword()));
AccumuloInputFormat.setInputTableName(conf, env.getTable());
AccumuloInputFormat.setScanAuthorizations(conf, env.getAuthorizations());
}
diff --git a/pom.xml b/pom.xml
index 2dfa651..58abc63 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,7 +54,7 @@
<url>https://github.com/apache/fluo/issues</url>
</issueManagement>
<properties>
- <accumulo.version>2.0.0-alpha-1</accumulo.version>
+ <accumulo.version>2.0.0-alpha-2</accumulo.version>
<curator.version>4.0.1</curator.version>
<dropwizard.version>0.8.1</dropwizard.version>
<findbugs.maxRank>11</findbugs.maxRank>