blob: 519f76cc53519d77cc6218d42085c4007c838915 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.hbase;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.HTable;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.grouping.CustomStreamGrouping;
import backtype.storm.task.WorkerTopologyContext;
public class HBaseStreamPartitioner implements CustomStreamGrouping {
private static final long serialVersionUID = -148324019395976092L;
private String[] regionStartKeys = { "0" };
private Map<String, String> regionStartKeyRegionNameMap = new HashMap<String, String>();
private List<Integer> targetTasks = null;
private int targetTasksSize = 0;
private int rowKeyFieldIndex = 0;
private String tableName = null;
private long regionCheckTime = 0;
private int regionInforRefreshIntervalInMins = 60;
private int regionInforRefreshIntervalInMillis = regionInforRefreshIntervalInMins * 60000;
HTable hTable = null;;
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
System.out.println("preparing HBaseStreamPartitioner for streamId " + stream.get_streamId());
this.targetTasks = targetTasks;
this.targetTasksSize = this.targetTasks.size();
Configuration conf = HBaseConfiguration.create();
try {
hTable = new HTable(conf, tableName);
refreshRegionInfo(tableName);
System.out.println("regionStartKeyRegionNameMap: " + regionStartKeyRegionNameMap);
} catch (IOException e) {
e.printStackTrace();
}
}
public void prepare() {
System.out.println("preparing HBaseStreamPartitioner for streamId " );
Configuration conf = HBaseConfiguration.create();
try {
hTable = new HTable(conf, tableName);
refreshRegionInfo(tableName);
System.out.println("regionStartKeyRegionNameMap: " + regionStartKeyRegionNameMap);
} catch (IOException e) {
e.printStackTrace();
}
}
public HBaseStreamPartitioner(String tableName, int rowKeyFieldIndex, int regionInforRefreshIntervalInMins) {
System.out.println("Created HBaseStreamPartitioner ");
this.rowKeyFieldIndex = rowKeyFieldIndex;
this.tableName = tableName;
this.regionInforRefreshIntervalInMins = regionInforRefreshIntervalInMins;
this.regionInforRefreshIntervalInMillis = regionInforRefreshIntervalInMins * 60000;
}
public List<Integer> chooseTasks(int taskId, List<Object> values) {
List<Integer> choosenTasks = null;
System.out.println("Choosing task for taskId " + taskId + " and values " + values);
if (regionInforRefreshIntervalInMillis > (System.currentTimeMillis() - regionCheckTime)) {
try {
refreshRegionInfo(tableName);
} catch (IOException e) {
e.printStackTrace();
}
}
int regionIndex = getRegionIndex((String) values.get(rowKeyFieldIndex));
if (regionIndex < targetTasksSize) {
choosenTasks = Arrays.asList(regionIndex);
} else {
choosenTasks = Arrays.asList(regionIndex % targetTasksSize);
}
System.out.println("Choosen tasks are " + choosenTasks);
return choosenTasks;
}
public int getRegionIndex(String key) {
int index = Arrays.binarySearch(regionStartKeys, key);
if (index < -1) {
index = (index + 2) * -1;
} else if (index == -1) {
index = 0;
}
return index;
}
private void refreshRegionInfo(String tableName) throws IOException {
System.out.println("in refreshRegionInfo ");
Map<HRegionInfo, ServerName> regionMap = hTable.getRegionLocations();
synchronized (regionStartKeys) {
synchronized (regionStartKeyRegionNameMap) {
regionStartKeys = new String[regionMap.size()];
int index = 0;
String startKey = null;
regionStartKeyRegionNameMap.clear();
for (HRegionInfo regionInfo : regionMap.keySet()) {
startKey = new String(regionInfo.getStartKey());
regionStartKeyRegionNameMap.put(startKey, regionInfo.getRegionNameAsString());
regionStartKeys[index] = startKey;
index++;
}
Arrays.sort(regionStartKeys);
regionCheckTime = System.currentTimeMillis();
}
}
}
}