Updates for Accumulo 2.0 alpha2 release (#1068)


diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/AccumuloProps.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/AccumuloProps.java
index 9e9c84e..f6585de 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/AccumuloProps.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/AccumuloProps.java
@@ -17,6 +17,9 @@
 
 public class AccumuloProps {
 
+  public static final String CLIENT_INSTANCE_NAME = "instance.name";
+  public static final String CLIENT_ZOOKEEPERS = "instance.zookeepers";
+
   public static final String TABLE_BLOCKCACHE_ENABLED = "table.cache.block.enable";
   public static final String TABLE_CLASSPATH = "table.classpath.context";
   public static final String TABLE_DELETE_BEHAVIOR = "table.delete.behavior";
diff --git a/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java b/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java
index ce6d459..464a904 100644
--- a/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java
+++ b/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java
@@ -49,17 +49,12 @@
 
   private static boolean hasNotifications(Environment env, TableRange range)
       throws TableNotFoundException {
-    Scanner scanner = null;
-    try {
-      scanner = env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations());
+    try (Scanner scanner =
+        env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations())) {
       scanner.setRange(range.getRange());
       Notification.configureScanner(scanner);
 
       return scanner.iterator().hasNext();
-    } finally {
-      if (scanner != null) {
-        scanner.close();
-      }
     }
   }
 
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
index 95a8892..225cd90 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
@@ -117,9 +117,14 @@
           "Fluo application already initialized at " + config.getAppZookeepers());
     }
 
-    AccumuloClient conn = AccumuloUtil.getClient(config);
+    try (AccumuloClient client = AccumuloUtil.getClient(config)) {
+      initialize(opts, client);
+    }
+  }
 
-    boolean tableExists = conn.tableOperations().exists(config.getAccumuloTable());
+  private void initialize(InitializationOptions opts, AccumuloClient client)
+      throws TableExistsException, AlreadyInitializedException {
+    boolean tableExists = client.tableOperations().exists(config.getAccumuloTable());
     if (tableExists && !opts.getClearTable()) {
       throw new TableExistsException("Accumulo table already exists " + config.getAccumuloTable());
     }
@@ -130,7 +135,7 @@
       logger.info("The Accumulo table '{}' will be dropped and created as requested by user",
           config.getAccumuloTable());
       try {
-        conn.tableOperations().delete(config.getAccumuloTable());
+        client.tableOperations().delete(config.getAccumuloTable());
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
@@ -151,7 +156,7 @@
     }
 
     try {
-      initializeApplicationInZooKeeper(conn);
+      initializeApplicationInZooKeeper(client);
 
       String accumuloJars;
       if (!config.getAccumuloJars().trim().isEmpty()) {
@@ -180,7 +185,7 @@
 
       if (!accumuloClasspath.isEmpty()) {
         String contextName = "fluo-" + config.getApplicationName();
-        conn.instanceOperations().setProperty(
+        client.instanceOperations().setProperty(
             AccumuloProps.VFS_CONTEXT_CLASSPATH_PROPERTY + contextName, accumuloClasspath);
         ntcProps.put(AccumuloProps.TABLE_CLASSPATH, contextName);
       }
@@ -201,7 +206,7 @@
       configureIterators(ntc);
 
       ntc.setProperties(ntcProps);
-      conn.tableOperations().create(config.getAccumuloTable(), ntc);
+      client.tableOperations().create(config.getAccumuloTable(), ntc);
 
       updateSharedConfig();
     } catch (NodeExistsException nee) {
@@ -246,16 +251,16 @@
       throw new FluoException("Must stop the oracle server to remove an application");
     }
 
-    AccumuloClient conn = AccumuloUtil.getClient(config);
-
-    boolean tableExists = conn.tableOperations().exists(config.getAccumuloTable());
-    // With preconditions met, it's now OK to delete table & zookeeper root (if they exist)
-    if (tableExists) {
-      logger.info("The Accumulo table '{}' will be dropped", config.getAccumuloTable());
-      try {
-        conn.tableOperations().delete(config.getAccumuloTable());
-      } catch (Exception e) {
-        throw new RuntimeException(e);
+    try (AccumuloClient client = AccumuloUtil.getClient(config)) {
+      boolean tableExists = client.tableOperations().exists(config.getAccumuloTable());
+      // With preconditions met, it's now OK to delete table & zookeeper root (if they exist)
+      if (tableExists) {
+        logger.info("The Accumulo table '{}' will be dropped", config.getAccumuloTable());
+        try {
+          client.tableOperations().delete(config.getAccumuloTable());
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
       }
     }
 
@@ -276,7 +281,8 @@
 
   private void initializeApplicationInZooKeeper(AccumuloClient client) throws Exception {
 
-    final String accumuloInstanceName = client.info().getInstanceName();
+    final String accumuloInstanceName =
+        client.properties().getProperty(AccumuloProps.CLIENT_INSTANCE_NAME);
     final String accumuloInstanceID = client.getInstanceID();
     final String fluoApplicationID = UUID.randomUUID().toString();
 
@@ -545,7 +551,8 @@
     if (!config.hasRequiredAdminProps()) {
       throw new IllegalArgumentException("Admin configuration is missing required properties");
     }
-    AccumuloClient client = AccumuloUtil.getClient(config);
-    return client.tableOperations().exists(config.getAccumuloTable());
+    try (AccumuloClient client = AccumuloUtil.getClient(config)) {
+      return client.tableOperations().exists(config.getAccumuloTable());
+    }
   }
 }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
index 766a4a9..a5c389f 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
@@ -91,9 +91,10 @@
 
     ensureDeletesAreDisabled();
 
-    if (!client.info().getInstanceName().equals(accumuloInstance)) {
-      throw new IllegalArgumentException("unexpected accumulo instance name "
-          + client.info().getInstanceName() + " != " + accumuloInstance);
+    String instanceName = client.properties().getProperty(AccumuloProps.CLIENT_INSTANCE_NAME);
+    if (!instanceName.equals(accumuloInstance)) {
+      throw new IllegalArgumentException(
+          "unexpected accumulo instance name " + instanceName + " != " + accumuloInstance);
     }
 
     if (!client.getInstanceID().equals(accumuloInstanceID)) {
@@ -251,5 +252,6 @@
   @Override
   public void close() {
     resources.close();
+    client.close();
   }
 }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
index 51cdd5b..33f8487 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
@@ -317,10 +317,9 @@
       }
     }
 
-    BatchScanner bscanner = null;
-    try {
-      bscanner =
-          env.getAccumuloClient().createBatchScanner(env.getTable(), env.getAuthorizations(), 1);
+
+    try (BatchScanner bscanner =
+        env.getAccumuloClient().createBatchScanner(env.getTable(), env.getAuthorizations(), 1)) {
 
       bscanner.setRanges(ranges);
       IteratorSetting iterCfg = new IteratorSetting(10, OpenReadLockIterator.class);
@@ -336,10 +335,6 @@
 
       return ret;
 
-    } finally {
-      if (bscanner != null) {
-        bscanner.close();
-      }
     }
   }
 }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index 99551d0..b898492 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -674,25 +674,22 @@
 
           Range range = new Range(startKey, endKey);
 
-          Scanner scanner;
-          try {
-            // TODO reuse or share scanner
-            scanner =
-                env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations());
+          try (Scanner scanner =
+              env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations())) {
+            scanner.setRange(range);
+
+            // TODO could use iterator that stops after 1st ACK. thought of using versioning iter
+            // but
+            // it scans to ACK
+            if (scanner.iterator().hasNext()) {
+              env.getSharedResources().getBatchWriter()
+                  .writeMutationAsync(notification.newDelete(env));
+              return true;
+            }
           } catch (TableNotFoundException e) {
             // TODO proper exception handling
             throw new RuntimeException(e);
           }
-
-          scanner.setRange(range);
-
-          // TODO could use iterator that stops after 1st ACK. thought of using versioning iter but
-          // it scans to ACK
-          if (scanner.iterator().hasNext()) {
-            env.getSharedResources().getBatchWriter()
-                .writeMutationAsync(notification.newDelete(env));
-            return true;
-          }
         }
       }
     }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/AccumuloUtil.java b/modules/core/src/main/java/org/apache/fluo/core/util/AccumuloUtil.java
index 3257bd5..b6eaeb6 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/AccumuloUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/AccumuloUtil.java
@@ -17,9 +17,6 @@
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.ClientInfo;
 import org.apache.fluo.api.config.FluoConfiguration;
 
 /**
@@ -32,18 +29,7 @@
    * Creates Accumulo connector given FluoConfiguration
    */
   public static AccumuloClient getClient(FluoConfiguration config) {
-    try {
-      return Accumulo.newClient()
-          .forInstance(config.getAccumuloInstance(), config.getAccumuloZookeepers())
-          .usingPassword(config.getAccumuloUser(), config.getAccumuloPassword()).build();
-    } catch (AccumuloException | AccumuloSecurityException e) {
-      throw new IllegalStateException(e);
-    }
-  }
-
-  public static ClientInfo getClientInfo(FluoConfiguration config) {
-    return Accumulo.newClient()
-        .forInstance(config.getAccumuloInstance(), config.getAccumuloZookeepers())
-        .usingPassword(config.getAccumuloUser(), config.getAccumuloPassword()).info();
+    return Accumulo.newClient().to(config.getAccumuloInstance(), config.getAccumuloZookeepers())
+        .as(config.getAccumuloUser(), config.getAccumuloPassword()).build();
   }
 }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java b/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
index bf32a61..51e4f15 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
@@ -75,34 +75,32 @@
       Column col) {
     Span span = Span.exact(row, col);
 
-    Scanner scanner;
-    try {
-      // TODO reuse or share scanner
-      scanner = env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations());
+    try (Scanner scanner =
+        env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations())) {
+      scanner.setRange(SpanUtil.toRange(span));
+      scanner.addScanIterator(iterConf);
+
+      Iterator<Entry<Key, Value>> iter = scanner.iterator();
+      if (iter.hasNext()) {
+        Entry<Key, Value> entry = iter.next();
+
+        Key k = entry.getKey();
+        Bytes r = Bytes.of(k.getRowData().toArray());
+        Bytes cf = Bytes.of(k.getColumnFamilyData().toArray());
+        Bytes cq = Bytes.of(k.getColumnQualifierData().toArray());
+        Bytes cv = Bytes.of(k.getColumnVisibilityData().toArray());
+
+        if (r.equals(row) && cf.equals(col.getFamily()) && cq.equals(col.getQualifier())
+            && cv.equals(col.getVisibility())) {
+          return entry;
+        } else {
+          throw new RuntimeException("unexpected key " + k + " " + row + " " + col);
+        }
+      }
     } catch (TableNotFoundException e) {
       // TODO proper exception handling
       throw new RuntimeException(e);
     }
-    scanner.setRange(SpanUtil.toRange(span));
-    scanner.addScanIterator(iterConf);
-
-    Iterator<Entry<Key, Value>> iter = scanner.iterator();
-    if (iter.hasNext()) {
-      Entry<Key, Value> entry = iter.next();
-
-      Key k = entry.getKey();
-      Bytes r = Bytes.of(k.getRowData().toArray());
-      Bytes cf = Bytes.of(k.getColumnFamilyData().toArray());
-      Bytes cq = Bytes.of(k.getColumnQualifierData().toArray());
-      Bytes cv = Bytes.of(k.getColumnVisibilityData().toArray());
-
-      if (r.equals(row) && cf.equals(col.getFamily()) && cq.equals(col.getQualifier())
-          && cv.equals(col.getVisibility())) {
-        return entry;
-      } else {
-        throw new RuntimeException("unexpected key " + k + " " + row + " " + col);
-      }
-    }
 
     return null;
   }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
index 2c66cbf..7628274 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
@@ -162,10 +162,7 @@
     Span span = getSpan(options);
     Collection<Column> columns = getColumns(options);
 
-    Scanner scanner = null;
-    try {
-      scanner = client.createScanner(sConfig.getAccumuloTable(), Authorizations.EMPTY);
-
+    try (Scanner scanner = client.createScanner(sConfig.getAccumuloTable(), Authorizations.EMPTY)) {
       scanner.setRange(SpanUtil.toRange(span));
 
       NotificationScanner ntfyScanner = new NotificationScanner(scanner, columns);
@@ -173,10 +170,6 @@
       scan(options, out, ntfyScanner);
     } catch (TableNotFoundException e) {
       throw new RuntimeException(e);
-    } finally {
-      if (scanner != null) {
-        scanner.close();
-      }
     }
   }
 
@@ -210,13 +203,12 @@
 
   public static void scanAccumulo(ScanOpts options, FluoConfiguration sConfig, PrintStream out) {
 
-    AccumuloClient client = AccumuloUtil.getClient(sConfig);
-
     Span span = getSpan(options);
     Collection<Column> columns = getColumns(options);
 
-    try {
-      Scanner scanner = client.createScanner(sConfig.getAccumuloTable(), Authorizations.EMPTY);
+    try (AccumuloClient client = AccumuloUtil.getClient(sConfig);
+        Scanner scanner = client.createScanner(sConfig.getAccumuloTable(), Authorizations.EMPTY)) {
+
       scanner.setRange(SpanUtil.toRange(span));
       for (Column col : columns) {
         if (col.isQualifierSet()) {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java
index ffe9194..1a47f2a 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -180,34 +180,35 @@
 
   private ScanCounts scan(Session session, PartitionInfo pi, Range range)
       throws TableNotFoundException {
-    Scanner scanner =
-        env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations());
+    try (Scanner scanner =
+        env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations())) {
 
-    scanner.setRange(range);
+      scanner.setRange(range);
 
-    Notification.configureScanner(scanner);
+      Notification.configureScanner(scanner);
 
-    IteratorSetting iterCfg = new IteratorSetting(30, "nhf", NotificationHashFilter.class);
-    NotificationHashFilter.setModulusParams(iterCfg, pi.getMyGroupSize(), pi.getMyIdInGroup());
-    scanner.addScanIterator(iterCfg);
+      IteratorSetting iterCfg = new IteratorSetting(30, "nhf", NotificationHashFilter.class);
+      NotificationHashFilter.setModulusParams(iterCfg, pi.getMyGroupSize(), pi.getMyIdInGroup());
+      scanner.addScanIterator(iterCfg);
 
-    ScanCounts counts = new ScanCounts();
+      ScanCounts counts = new ScanCounts();
 
-    for (Entry<Key, Value> entry : scanner) {
-      if (!pi.equals(partitionManager.getPartitionInfo())) {
-        throw new PartitionInfoChangedException();
+      for (Entry<Key, Value> entry : scanner) {
+        if (!pi.equals(partitionManager.getPartitionInfo())) {
+          throw new PartitionInfoChangedException();
+        }
+
+        if (stopped.get()) {
+          return counts;
+        }
+
+        counts.seen++;
+
+        if (session.addNotification(finder, Notification.from(entry.getKey()))) {
+          counts.added++;
+        }
       }
-
-      if (stopped.get()) {
-        return counts;
-      }
-
-      counts.seen++;
-
-      if (session.addNotification(finder, Notification.from(entry.getKey()))) {
-        counts.added++;
-      }
+      return counts;
     }
-    return counts;
   }
 }
diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBase.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBase.java
index 92436f4..3bb62e3 100644
--- a/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBase.java
+++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBase.java
@@ -16,12 +16,12 @@
 package org.apache.fluo.integration;
 
 import java.io.File;
+import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.ClientInfo;
 import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.accumulo.minicluster.MiniAccumuloConfig;
 import org.apache.commons.io.FileUtils;
@@ -48,7 +48,6 @@
 
   protected static String instanceName;
   protected static AccumuloClient aClient;
-  protected static ClientInfo clientInfo;
   private static MiniAccumuloCluster cluster;
   private static boolean startedCluster = false;
 
@@ -87,8 +86,8 @@
       cluster.start();
       startedCluster = true;
     }
-    clientInfo = MiniAccumuloCluster.getClientInfo(instanceDir);
-    aClient = Accumulo.newClient().usingClientInfo(clientInfo).build();
+    Properties props = MiniAccumuloCluster.getClientProperties(instanceDir);
+    aClient = Accumulo.newClient().from(props).build();
   }
 
   protected Class<? extends ObserverProvider> getObserverProviderClass() {
@@ -128,6 +127,10 @@
 
   @AfterClass
   public static void tearDownAccumulo() throws Exception {
+    if (aClient != null) {
+      aClient.close();
+    }
+
     if (startedCluster) {
       cluster.stop();
     }
diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseImpl.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseImpl.java
index 43bc869..1f77d3f 100644
--- a/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseImpl.java
+++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseImpl.java
@@ -15,8 +15,10 @@
 
 package org.apache.fluo.integration;
 
+import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.fluo.accumulo.util.AccumuloProps;
 import org.apache.fluo.api.client.FluoAdmin;
 import org.apache.fluo.api.client.FluoAdmin.InitializationOptions;
 import org.apache.fluo.api.client.FluoFactory;
@@ -66,14 +68,15 @@
 
     table = getNextTableName();
 
+    Properties props = aClient.properties();
     config = new FluoConfiguration();
     config.setApplicationName("impl-test" + testCounter.getAndIncrement());
-    config.setAccumuloInstance(clientInfo.getInstanceName());
+    config.setAccumuloInstance(props.getProperty(AccumuloProps.CLIENT_INSTANCE_NAME));
     config.setAccumuloUser(USER);
     config.setAccumuloPassword(PASSWORD);
     config.setAccumuloTable(table);
-    config.setAccumuloZookeepers(clientInfo.getZooKeepers());
-    config.setInstanceZookeepers(clientInfo.getZooKeepers() + "/fluo");
+    config.setAccumuloZookeepers(props.getProperty(AccumuloProps.CLIENT_ZOOKEEPERS));
+    config.setInstanceZookeepers(props.getProperty(AccumuloProps.CLIENT_ZOOKEEPERS) + "/fluo");
     config.setTransactionRollbackTime(1, TimeUnit.SECONDS);
     setupObservers(config);
     config.setProperty(FluoConfigurationImpl.ZK_UPDATE_PERIOD_PROP, "1000");
diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseMini.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseMini.java
index d31bf9f..f516a10 100644
--- a/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseMini.java
+++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseMini.java
@@ -15,8 +15,10 @@
 
 package org.apache.fluo.integration;
 
+import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.fluo.accumulo.util.AccumuloProps;
 import org.apache.fluo.api.client.FluoAdmin;
 import org.apache.fluo.api.client.FluoAdmin.InitializationOptions;
 import org.apache.fluo.api.client.FluoFactory;
@@ -40,13 +42,14 @@
   @Before
   public void setUpFluo() throws Exception {
 
+    Properties props = aClient.properties();
     config = new FluoConfiguration();
     config.setApplicationName("mini-test" + testCounter.getAndIncrement());
-    config.setAccumuloInstance(clientInfo.getInstanceName());
+    config.setAccumuloInstance(props.getProperty(AccumuloProps.CLIENT_INSTANCE_NAME));
     config.setAccumuloUser(USER);
     config.setAccumuloPassword(PASSWORD);
-    config.setAccumuloZookeepers(clientInfo.getZooKeepers());
-    config.setInstanceZookeepers(clientInfo.getZooKeepers() + "/fluo");
+    config.setAccumuloZookeepers(props.getProperty(AccumuloProps.CLIENT_ZOOKEEPERS));
+    config.setInstanceZookeepers(props.getProperty(AccumuloProps.CLIENT_ZOOKEEPERS) + "/fluo");
     config.setAccumuloTable(getNextTableName());
     config.setWorkerThreads(5);
     setupObservers(config);
diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/TestTransaction.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/TestTransaction.java
index ebb02d1..7d62f48 100644
--- a/modules/integration-tests/src/main/java/org/apache/fluo/integration/TestTransaction.java
+++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/TestTransaction.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -55,28 +55,28 @@
   private Environment env;
 
   public static long getNotificationTS(Environment env, String row, Column col) {
-    Scanner scanner;
-    try {
-      scanner = env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations());
+    try (Scanner scanner =
+        env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations())) {
+
+      IteratorSetting iterCfg = new IteratorSetting(11, NotificationIterator.class);
+      scanner.addScanIterator(iterCfg);
+
+      Text cv = ByteUtil.toText(col.getVisibility());
+
+      scanner.setRange(SpanUtil.toRange(Span.prefix(row)));
+      scanner.fetchColumn(ByteUtil.toText(ColumnConstants.NOTIFY_CF),
+          new Text(NotificationUtil.encodeCol(col)));
+
+      for (Entry<Key, org.apache.accumulo.core.data.Value> entry : scanner) {
+        if (entry.getKey().getColumnVisibility().equals(cv)) {
+          return Notification.from(entry.getKey()).getTimestamp();
+        }
+      }
+
+      throw new RuntimeException("No notification found");
     } catch (TableNotFoundException e) {
       throw new RuntimeException(e);
     }
-    IteratorSetting iterCfg = new IteratorSetting(11, NotificationIterator.class);
-    scanner.addScanIterator(iterCfg);
-
-    Text cv = ByteUtil.toText(col.getVisibility());
-
-    scanner.setRange(SpanUtil.toRange(Span.prefix(row)));
-    scanner.fetchColumn(ByteUtil.toText(ColumnConstants.NOTIFY_CF),
-        new Text(NotificationUtil.encodeCol(col)));
-
-    for (Entry<Key, org.apache.accumulo.core.data.Value> entry : scanner) {
-      if (entry.getKey().getColumnVisibility().equals(cv)) {
-        return Notification.from(entry.getKey()).getTimestamp();
-      }
-    }
-
-    throw new RuntimeException("No notification found");
   }
 
   @SuppressWarnings("resource")
diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
index a9222b0..87996e7 100644
--- a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
+++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
@@ -114,18 +114,19 @@
       admin.initialize(opts);
 
       // verify locality groups were set on the table
-      AccumuloClient client = AccumuloUtil.getClient(config);
-      Map<String, Set<Text>> localityGroups =
-          client.tableOperations().getLocalityGroups(config.getAccumuloTable());
-      Assert.assertEquals("Unexpected locality group count.", 1, localityGroups.size());
-      Entry<String, Set<Text>> localityGroup = localityGroups.entrySet().iterator().next();
-      Assert.assertEquals("'notify' locality group not found.",
-          ColumnConstants.NOTIFY_LOCALITY_GROUP_NAME, localityGroup.getKey());
-      Assert.assertEquals("'notify' locality group does not contain exactly 1 column family.", 1,
-          localityGroup.getValue().size());
-      Text colFam = localityGroup.getValue().iterator().next();
-      Assert.assertTrue("'notify' locality group does not contain the correct column family.",
-          ColumnConstants.NOTIFY_CF.contentEquals(colFam.getBytes(), 0, colFam.getLength()));
+      try (AccumuloClient client = AccumuloUtil.getClient(config)) {
+        Map<String, Set<Text>> localityGroups =
+            client.tableOperations().getLocalityGroups(config.getAccumuloTable());
+        Assert.assertEquals("Unexpected locality group count.", 1, localityGroups.size());
+        Entry<String, Set<Text>> localityGroup = localityGroups.entrySet().iterator().next();
+        Assert.assertEquals("'notify' locality group not found.",
+            ColumnConstants.NOTIFY_LOCALITY_GROUP_NAME, localityGroup.getKey());
+        Assert.assertEquals("'notify' locality group does not contain exactly 1 column family.", 1,
+            localityGroup.getValue().size());
+        Text colFam = localityGroup.getValue().iterator().next();
+        Assert.assertTrue("'notify' locality group does not contain the correct column family.",
+            ColumnConstants.NOTIFY_CF.contentEquals(colFam.getBytes(), 0, colFam.getLength()));
+      }
     }
 
     try (FluoClientImpl client = new FluoClientImpl(localConfig)) {
diff --git a/modules/mapreduce/pom.xml b/modules/mapreduce/pom.xml
index a6d95d7..c4a1339 100644
--- a/modules/mapreduce/pom.xml
+++ b/modules/mapreduce/pom.xml
@@ -28,10 +28,6 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.accumulo</groupId>
-      <artifactId>accumulo-client-mapreduce</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.accumulo</groupId>
       <artifactId>accumulo-core</artifactId>
     </dependency>
     <dependency>
diff --git a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java
index e09b86f..9d58a86 100644
--- a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java
+++ b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java
@@ -25,6 +25,7 @@
 
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.config.SimpleConfiguration;
 import org.apache.fluo.api.data.Bytes;
@@ -34,7 +35,6 @@
 import org.apache.fluo.api.data.Span;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.impl.TransactionImpl;
-import org.apache.fluo.core.util.AccumuloUtil;
 import org.apache.fluo.core.util.SpanUtil;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -154,7 +154,10 @@
         conf.getConfiguration().set(PROPS_CONF_KEY,
             new String(baos.toByteArray(), StandardCharsets.UTF_8));
 
-        AccumuloInputFormat.setClientInfo(conf, AccumuloUtil.getClientInfo(fconfig));
+        AccumuloInputFormat.setZooKeeperInstance(conf, fconfig.getAccumuloInstance(),
+            fconfig.getAccumuloZookeepers());
+        AccumuloInputFormat.setConnectorInfo(conf, fconfig.getAccumuloUser(),
+            new PasswordToken(fconfig.getAccumuloPassword()));
         AccumuloInputFormat.setInputTableName(conf, env.getTable());
         AccumuloInputFormat.setScanAuthorizations(conf, env.getAuthorizations());
       }
diff --git a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoRowInputFormat.java b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoRowInputFormat.java
index 8993f38..1c5616d 100644
--- a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoRowInputFormat.java
+++ b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoRowInputFormat.java
@@ -25,6 +25,7 @@
 
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.fluo.api.client.scanner.ColumnScanner;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.config.SimpleConfiguration;
@@ -34,7 +35,6 @@
 import org.apache.fluo.api.data.Span;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.impl.TransactionImpl;
-import org.apache.fluo.core.util.AccumuloUtil;
 import org.apache.fluo.core.util.SpanUtil;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -155,7 +155,10 @@
         conf.getConfiguration().set(PROPS_CONF_KEY,
             new String(baos.toByteArray(), StandardCharsets.UTF_8));
 
-        AccumuloInputFormat.setClientInfo(conf, AccumuloUtil.getClientInfo(fconfig));
+        AccumuloInputFormat.setZooKeeperInstance(conf, fconfig.getAccumuloInstance(),
+            fconfig.getAccumuloZookeepers());
+        AccumuloInputFormat.setConnectorInfo(conf, fconfig.getAccumuloUser(),
+            new PasswordToken(fconfig.getAccumuloPassword()));
         AccumuloInputFormat.setInputTableName(conf, env.getTable());
         AccumuloInputFormat.setScanAuthorizations(conf, env.getAuthorizations());
       }
diff --git a/pom.xml b/pom.xml
index 2dfa651..58abc63 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,7 +54,7 @@
     <url>https://github.com/apache/fluo/issues</url>
   </issueManagement>
   <properties>
-    <accumulo.version>2.0.0-alpha-1</accumulo.version>
+    <accumulo.version>2.0.0-alpha-2</accumulo.version>
     <curator.version>4.0.1</curator.version>
     <dropwizard.version>0.8.1</dropwizard.version>
     <findbugs.maxRank>11</findbugs.maxRank>