blob: 95fe81eadfd5ab486be0c95a847797e5cef1b650 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.fluo.recipes.accumulo.export.function;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Mutation;
/**
* Writes mutations to Accumulo using a shared batch writer
*
* @since 1.0.0
*/
// intentionally package private
class AccumuloWriter {
private static class Mutations {
List<Mutation> mutations;
CountDownLatch cdl = new CountDownLatch(1);
Mutations(Collection<Mutation> mutations) {
this.mutations = new ArrayList<>(mutations);
}
}
private static class ExportTask implements Runnable {
private BatchWriter bw;
ExportTask(String instanceName, String zookeepers, String user, String password, String table)
throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
AccumuloClient client =
Accumulo.newClient().to(instanceName, zookeepers).as(user, password).build();
// TODO need to close batch writer
try {
bw = client.createBatchWriter(table, new BatchWriterConfig());
} catch (TableNotFoundException tnfe) {
try {
client.tableOperations().create(table);
} catch (TableExistsException e) {
// nothing to do
}
bw = client.createBatchWriter(table, new BatchWriterConfig());
}
}
@Override
public void run() {
ArrayList<AccumuloWriter.Mutations> exports = new ArrayList<>();
while (true) {
try {
exports.clear();
// gather export from all threads that have placed an item on the queue
exports.add(exportQueue.take());
exportQueue.drainTo(exports);
for (AccumuloWriter.Mutations ml : exports) {
bw.addMutations(ml.mutations);
}
bw.flush();
// notify all threads waiting after flushing
for (AccumuloWriter.Mutations ml : exports) {
ml.cdl.countDown();
}
} catch (InterruptedException | MutationsRejectedException e) {
throw new RuntimeException(e);
}
}
}
}
private static LinkedBlockingQueue<AccumuloWriter.Mutations> exportQueue = null;
private AccumuloWriter(String instanceName, String zookeepers, String user, String password,
String table) {
// TODO: fix this write to static and remove findbugs max rank override in pom.xml
exportQueue = new LinkedBlockingQueue<>(10000);
try {
Thread queueProcessingTask =
new Thread(new ExportTask(instanceName, zookeepers, user, password, table));
queueProcessingTask.setDaemon(true);
queueProcessingTask.start();
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
private static Map<String, AccumuloWriter> exporters = new HashMap<>();
static synchronized AccumuloWriter getInstance(String instanceName, String zookeepers,
String user, String password, String table) {
String key =
instanceName + ":" + zookeepers + ":" + user + ":" + password.hashCode() + ":" + table;
AccumuloWriter ret = exporters.get(key);
if (ret == null) {
ret = new AccumuloWriter(instanceName, zookeepers, user, password, table);
exporters.put(key, ret);
}
return ret;
}
void write(Collection<Mutation> mutations) {
AccumuloWriter.Mutations work = new Mutations(mutations);
exportQueue.add(work);
try {
work.cdl.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}