| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.metron.pcapservice; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.URISyntaxException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import javax.annotation.Resource; |
| import javax.ws.rs.GET; |
| import javax.ws.rs.Path; |
| import javax.ws.rs.Produces; |
| import javax.ws.rs.core.Response; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.hadoop.hbase.Cell; |
| import org.apache.hadoop.hbase.CellUtil; |
| import org.apache.hadoop.hbase.MasterNotRunningException; |
| import org.apache.hadoop.hbase.ZooKeeperConnectionException; |
| import org.apache.hadoop.hbase.client.Get; |
| import org.apache.hadoop.hbase.client.HTable; |
| import org.apache.hadoop.hbase.client.NoServerForRegionException; |
| import org.apache.hadoop.hbase.client.Result; |
| import org.apache.hadoop.hbase.client.ResultScanner; |
| import org.apache.hadoop.hbase.client.Scan; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.log4j.Logger; |
| import org.springframework.util.Assert; |
| import org.springframework.util.CollectionUtils; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| |
| /** |
| * Singleton class which integrates with HBase table and returns pcaps sorted by |
| * timestamp(dsc) for the given list of keys. Creates HConnection if it is not |
| * already created and the same connection instance is being used for all |
| * requests |
| * |
| * @author sheetal |
| * @version $Revision: 1.0 $ |
| */ |
| |
| @Path("/") |
| public class PcapGetterHBaseImpl implements IPcapGetter { |
| |
| /** The pcap getter h base. */ |
| private static IPcapGetter pcapGetterHBase = null; |
| |
| /** The Constant LOG. */ |
| private static final Logger LOGGER = Logger |
| .getLogger(PcapGetterHBaseImpl.class); |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.util.List, |
| * java.lang.String, long, long, boolean, boolean, long) |
| */ |
| |
| |
| @GET |
| @Path("pcap/test") |
| @Produces("text/html") |
| public Response index() throws URISyntaxException { |
| return Response.ok("ALL GOOD").build(); |
| } |
| |
| |
| public PcapsResponse getPcaps(List<String> keys, String lastRowKey, |
| long startTime, long endTime, boolean includeReverseTraffic, |
| boolean includeDuplicateLastRow, long maxResultSize) throws IOException { |
| Assert |
| .isTrue( |
| checkIfValidInput(keys, lastRowKey), |
| "No valid input. One of the value must be present from {keys, lastRowKey}"); |
| LOGGER.info(" keys=" + keys.toString() + "; lastRowKey=" |
| + lastRowKey); |
| |
| PcapsResponse pcapsResponse = new PcapsResponse(); |
| // 1. Process partial response key |
| if (StringUtils.isNotEmpty(lastRowKey)) { |
| pcapsResponse = processKey(pcapsResponse, lastRowKey, startTime, |
| endTime, true, includeDuplicateLastRow, maxResultSize); |
| // LOGGER.debug("after scanning lastRowKey=" + |
| // pcapsResponse.toString()+"*********************************************************************"); |
| if (pcapsResponse.getStatus() == PcapsResponse.Status.PARTIAL) { |
| return pcapsResponse; |
| } |
| } |
| // 2. Process input keys |
| List<String> sortedKeys = sortKeysByAscOrder(keys, includeReverseTraffic); |
| List<String> unprocessedKeys = new ArrayList<String>(); |
| unprocessedKeys.addAll(sortedKeys); |
| if (StringUtils.isNotEmpty(lastRowKey)) { |
| unprocessedKeys.clear(); |
| unprocessedKeys = getUnprocessedSublistOfKeys(sortedKeys, |
| lastRowKey); |
| } |
| LOGGER.info("unprocessedKeys in getPcaps" + unprocessedKeys.toString()); |
| if (!CollectionUtils.isEmpty(unprocessedKeys)) { |
| for (int i = 0; i < unprocessedKeys.size(); i++) { |
| pcapsResponse = processKey(pcapsResponse, unprocessedKeys.get(i), |
| startTime, endTime, false, includeDuplicateLastRow, maxResultSize); |
| // LOGGER.debug("after scanning input unprocessedKeys.get(" + i + ") =" |
| // + |
| // pcapsResponse.toString()+"*********************************************************************"); |
| if (pcapsResponse.getStatus() == PcapsResponse.Status.PARTIAL) { |
| return pcapsResponse; |
| } |
| } |
| } |
| return pcapsResponse; |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.lang.String, long, |
| * long, boolean) |
| */ |
| |
| public PcapsResponse getPcaps(String key, long startTime, long endTime, |
| boolean includeReverseTraffic) throws IOException { |
| Assert.hasText(key, "key must not be null or empty"); |
| return getPcaps(Arrays.asList(key), null, startTime, endTime, |
| includeReverseTraffic, false, ConfigurationUtil.getDefaultResultSize()); |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.util.List) |
| */ |
| |
| public PcapsResponse getPcaps(List<String> keys) throws IOException { |
| Assert.notEmpty(keys, "'keys' must not be null or empty"); |
| return getPcaps(keys, null, -1, -1, |
| ConfigurationUtil.isDefaultIncludeReverseTraffic(), false, |
| ConfigurationUtil.getDefaultResultSize()); |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.lang.String) |
| */ |
| |
| public PcapsResponse getPcaps(String key) throws IOException { |
| Assert.hasText(key, "key must not be null or empty"); |
| return getPcaps(Arrays.asList(key), null, -1, -1, |
| ConfigurationUtil.isDefaultIncludeReverseTraffic(), false, |
| ConfigurationUtil.getDefaultResultSize()); |
| } |
| |
| /** |
| * Always returns the singleton instance. |
| * |
| * @return IPcapGetter singleton instance |
| * @throws IOException |
| * Signals that an I/O exception has occurred. |
| */ |
| public static IPcapGetter getInstance() throws IOException { |
| if (pcapGetterHBase == null) { |
| synchronized (PcapGetterHBaseImpl.class) { |
| if (pcapGetterHBase == null) { |
| pcapGetterHBase = new PcapGetterHBaseImpl(); |
| } |
| } |
| } |
| return pcapGetterHBase; |
| } |
| |
| /** |
| * Instantiates a new pcap getter h base impl. |
| */ |
| private PcapGetterHBaseImpl() { |
| } |
| |
| /** |
| * Adds reverse keys to the list if the flag 'includeReverseTraffic' is set to |
| * true; removes duplicates and sorts the list by ascending order;. |
| * |
| * @param keys |
| * input keys |
| * @param includeReverseTraffic |
| * flag whether or not to include reverse traffic |
| * @return List<String> |
| */ |
| @VisibleForTesting |
| List<String> sortKeysByAscOrder(List<String> keys, |
| boolean includeReverseTraffic) { |
| Assert.notEmpty(keys, "'keys' must not be null"); |
| if (includeReverseTraffic) { |
| keys.addAll(PcapHelper.reverseKey(keys)); |
| } |
| List<String> deDupKeys = removeDuplicateKeys(keys); |
| Collections.sort(deDupKeys); |
| return deDupKeys; |
| } |
| |
| /** |
| * Removes the duplicate keys. |
| * |
| * @param keys |
| * the keys |
| * @return the list |
| */ |
| @VisibleForTesting |
| public |
| List<String> removeDuplicateKeys(List<String> keys) { |
| Set<String> set = new HashSet<String>(keys); |
| return new ArrayList<String>(set); |
| } |
| |
| /** |
| * <p> |
| * Returns the sublist starting from the element after the lastRowKey |
| * to the last element in the list; if the 'lastRowKey' is not matched |
| * the complete list will be returned. |
| * </p> |
| * |
| * <pre> |
| * Eg : |
| * keys = [18800006-1800000b-06-0019-caac, 18800006-1800000b-06-0050-5af6, 18800006-1800000b-11-0035-3810] |
| * lastRowKey = "18800006-1800000b-06-0019-caac-65140-40815" |
| * and the response from this method [18800006-1800000b-06-0050-5af6, 18800006-1800000b-11-0035-3810] |
| * </pre> |
| * |
| * @param keys |
| * keys |
| * @param lastRowKey |
| * last row key of the previous partial response |
| * @return List<String> |
| */ |
| @VisibleForTesting |
| List<String> getUnprocessedSublistOfKeys(List<String> keys, |
| String lastRowKey) { |
| Assert.notEmpty(keys, "'keys' must not be null"); |
| Assert.hasText(lastRowKey, "'lastRowKey' must not be null"); |
| String partialKey = getTokens(lastRowKey, 5); |
| int startIndex = 0; |
| for (int i = 0; i < keys.size(); i++) { |
| if (partialKey.equals(keys.get(i))) { |
| startIndex = i + 1; |
| break; |
| } |
| } |
| List<String> unprocessedKeys = keys.subList(startIndex, keys.size()); |
| return unprocessedKeys; |
| } |
| |
| /** |
| * Returns the first 'noOfTokens' tokens from the given key; token delimiter |
| * "-";. |
| * |
| * @param key |
| * given key |
| * @param noOfTokens |
| * number of tokens to retrieve |
| * @return the tokens |
| */ |
| @VisibleForTesting |
| String getTokens(String key, int noOfTokens) { |
| String delimeter = HBaseConfigConstants.PCAP_KEY_DELIMETER; |
| String regex = "\\" + delimeter; |
| String[] keyTokens = key.split(regex); |
| Assert.isTrue(noOfTokens < keyTokens.length, |
| "Invalid value for 'noOfTokens'"); |
| StringBuffer sbf = new StringBuffer(); |
| for (int i = 0; i < noOfTokens; i++) { |
| sbf.append(keyTokens[i]); |
| if (i != (noOfTokens - 1)) { |
| sbf.append(HBaseConfigConstants.PCAP_KEY_DELIMETER); |
| } |
| |
| } |
| return sbf.toString(); |
| } |
| |
| /** |
| * Process key. |
| * |
| * @param pcapsResponse |
| * the pcaps response |
| * @param key |
| * the key |
| * @param startTime |
| * the start time |
| * @param endTime |
| * the end time |
| * @param isPartialResponse |
| * the is partial response |
| * @param includeDuplicateLastRow |
| * the include duplicate last row |
| * @param maxResultSize |
| * the max result size |
| * @return the pcaps response |
| * @throws IOException |
| * Signals that an I/O exception has occurred. |
| */ |
| @VisibleForTesting |
| PcapsResponse processKey(PcapsResponse pcapsResponse, String key, |
| long startTime, long endTime, boolean isPartialResponse, |
| boolean includeDuplicateLastRow, long maxResultSize) throws IOException { |
| HTable table = null; |
| Scan scan = null; |
| List<Cell> scannedCells = null; |
| try { |
| // 1. Create start and stop row for the key; |
| Map<String, String> keysMap = createStartAndStopRowKeys(key, |
| isPartialResponse, includeDuplicateLastRow); |
| |
| // 2. if the input key contains all fragments (7) and it is not part |
| // of previous partial response (isPartialResponse), |
| // 'keysMap' will be null; do a Get; currently not doing any |
| // response size related checks for Get; |
| // by default all cells from a specific row are sorted by timestamp |
| if (keysMap == null) { |
| Get get = createGetRequest(key, startTime, endTime); |
| List<Cell> cells = executeGetRequest(table, get); |
| for (Cell cell : cells) { |
| pcapsResponse.addPcaps(CellUtil.cloneValue(cell)); |
| } |
| return pcapsResponse; |
| } |
| // 3. Create and execute Scan request |
| scan = createScanRequest(pcapsResponse, keysMap, startTime, endTime, |
| maxResultSize); |
| scannedCells = executeScanRequest(table, scan); |
| LOGGER.info("scannedCells size :" + scannedCells.size()); |
| addToResponse(pcapsResponse, scannedCells, maxResultSize); |
| |
| } catch (IOException e) { |
| LOGGER.error("Exception occurred while fetching Pcaps for the keys :" |
| + key, e); |
| if (e instanceof ZooKeeperConnectionException |
| || e instanceof MasterNotRunningException |
| || e instanceof NoServerForRegionException) { |
| int maxRetryLimit = ConfigurationUtil.getConnectionRetryLimit(); |
| System.out.println("maxRetryLimit =" + maxRetryLimit); |
| for (int attempt = 1; attempt <= maxRetryLimit; attempt++) { |
| System.out.println("attempting =" + attempt); |
| try { |
| HBaseConfigurationUtil.closeConnection(); // closing the |
| // existing |
| // connection |
| // and retry, |
| // it will |
| // create a new |
| // HConnection |
| scannedCells = executeScanRequest(table, scan); |
| addToResponse(pcapsResponse, scannedCells, maxResultSize); |
| break; |
| } catch (IOException ie) { |
| if (attempt == maxRetryLimit) { |
| LOGGER.error("Throwing the exception after retrying " |
| + maxRetryLimit + " times."); |
| throw e; |
| } |
| } |
| } |
| } |
| |
| } finally { |
| if (table != null) { |
| table.close(); |
| } |
| } |
| return pcapsResponse; |
| } |
| |
| /** |
| * Adds the to response. |
| * |
| * @param pcapsResponse |
| * the pcaps response |
| * @param scannedCells |
| * the scanned cells |
| * @param maxResultSize |
| * the max result size |
| */ |
| private void addToResponse(PcapsResponse pcapsResponse, |
| List<Cell> scannedCells, long maxResultSize) { |
| String lastKeyFromCurrentScan = null; |
| if (scannedCells != null && scannedCells.size() > 0) { |
| lastKeyFromCurrentScan = new String(CellUtil.cloneRow(scannedCells |
| .get(scannedCells.size() - 1))); |
| } |
| // 4. calculate the response size |
| Collections.sort(scannedCells, PcapHelper.getCellTimestampComparator()); |
| for (Cell sortedCell : scannedCells) { |
| pcapsResponse.addPcaps(CellUtil.cloneValue(sortedCell)); |
| } |
| if (!pcapsResponse.isResonseSizeWithinLimit(maxResultSize)) { |
| pcapsResponse.setStatus(PcapsResponse.Status.PARTIAL); // response size |
| // reached |
| pcapsResponse.setLastRowKey(new String(lastKeyFromCurrentScan)); |
| } |
| } |
| |
| /** |
| * Builds start and stop row keys according to the following logic : 1. |
| * Creates tokens out of 'key' using pcap_id delimiter ('-') 2. if the input |
| * 'key' contains (assume : configuredTokensInRowKey=7 and |
| * minimumTokensIninputKey=5): a). 5 tokens |
| * ("srcIp-dstIp-protocol-srcPort-dstPort") startKey = |
| * "srcIp-dstIp-protocol-srcPort-dstPort-00000-00000" stopKey = |
| * "srcIp-dstIp-protocol-srcPort-dstPort-99999-99999" b). 6 tokens |
| * ("srcIp-dstIp-protocol-srcPort-dstPort-id1") startKey = |
| * "srcIp-dstIp-protocol-srcPort-dstPort-id1-00000" stopKey = |
| * "srcIp-dstIp-protocol-srcPort-dstPort-id1-99999" |
| * |
| * c). 7 tokens ("srcIp-dstIp-protocol-srcPort-dstPort-id1-id2") 1>. if the |
| * key is NOT part of the partial response from previous request, return |
| * 'null' 2>. if the key is part of partial response from previous request |
| * startKey = "srcIp-dstIp-protocol-srcPort-dstPort-id1-(id2+1)"; 1 is added |
| * to exclude this key as it was included in the previous request stopKey = |
| * "srcIp-dstIp-protocol-srcPort-dstPort-99999-99999" |
| * |
| * @param key |
| * the key |
| * @param isLastRowKey |
| * if the key is part of partial response |
| * @param includeDuplicateLastRow |
| * the include duplicate last row |
| * @return Map<String, String> |
| */ |
| @VisibleForTesting |
| Map<String, String> createStartAndStopRowKeys(String key, |
| boolean isLastRowKey, boolean includeDuplicateLastRow) { |
| String delimeter = HBaseConfigConstants.PCAP_KEY_DELIMETER; |
| String regex = "\\" + delimeter; |
| String[] keyTokens = key.split(regex); |
| |
| String startKey = null; |
| String endKey = null; |
| Map<String, String> map = new HashMap<String, String>(); |
| |
| int configuredTokensInRowKey = ConfigurationUtil |
| .getConfiguredTokensInRowkey(); |
| int minimumTokensIninputKey = ConfigurationUtil |
| .getMinimumTokensInInputkey(); |
| Assert |
| .isTrue( |
| minimumTokensIninputKey <= configuredTokensInRowKey, |
| "tokens in the input key (separated by '-'), must be less than or equal to the tokens used in hbase table row key "); |
| // in case if the input key contains 'configuredTokensInRowKey' tokens and |
| // it is NOT a |
| // partial response key, do a Get instead of Scan |
| if (keyTokens.length == configuredTokensInRowKey) { |
| if (!isLastRowKey) { |
| return null; |
| } |
| // it is a partial response key; 'startKey' is same as input partial |
| // response key; 'endKey' can be built by replacing |
| // (configuredTokensInRowKey - minimumTokensIninputKey) tokens |
| // of input partial response key with '99999' |
| if (keyTokens.length == minimumTokensIninputKey) { |
| return null; |
| } |
| int appendingTokenSlots = configuredTokensInRowKey |
| - minimumTokensIninputKey; |
| if (appendingTokenSlots > 0) { |
| String partialKey = getTokens(key, minimumTokensIninputKey); |
| StringBuffer sbfStartNew = new StringBuffer(partialKey); |
| StringBuffer sbfEndNew = new StringBuffer(partialKey); |
| for (int i = 0; i < appendingTokenSlots; i++) { |
| if (i == (appendingTokenSlots - 1)) { |
| if (!includeDuplicateLastRow) { |
| sbfStartNew |
| .append(HBaseConfigConstants.PCAP_KEY_DELIMETER) |
| .append( |
| Integer.valueOf(keyTokens[minimumTokensIninputKey + i]) + 1); |
| } else { |
| sbfStartNew.append(HBaseConfigConstants.PCAP_KEY_DELIMETER) |
| .append(keyTokens[minimumTokensIninputKey + i]); |
| } |
| } else { |
| sbfStartNew.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append( |
| keyTokens[minimumTokensIninputKey + i]); |
| } |
| sbfEndNew.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append( |
| getMaxLimitForAppendingTokens()); |
| } |
| startKey = sbfStartNew.toString(); |
| endKey = sbfEndNew.toString(); |
| } |
| } else { |
| StringBuffer sbfStart = new StringBuffer(key); |
| StringBuffer sbfEnd = new StringBuffer(key); |
| for (int i = keyTokens.length; i < configuredTokensInRowKey; i++) { |
| sbfStart.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append( |
| getMinLimitForAppendingTokens()); |
| sbfEnd.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append( |
| getMaxLimitForAppendingTokens()); |
| } |
| startKey = sbfStart.toString(); |
| endKey = sbfEnd.toString(); |
| } |
| map.put(HBaseConfigConstants.START_KEY, startKey); |
| map.put(HBaseConfigConstants.END_KEY, endKey); |
| |
| return map; |
| } |
| |
| /** |
| * Returns false if keys is empty or null AND lastRowKey is null or |
| * empty; otherwise returns true;. |
| * |
| * @param keys |
| * input row keys |
| * @param lastRowKey |
| * partial response key |
| * @return boolean |
| */ |
| @VisibleForTesting |
| boolean checkIfValidInput(List<String> keys, String lastRowKey) { |
| if (CollectionUtils.isEmpty(keys) |
| && StringUtils.isEmpty(lastRowKey)) { |
| return false; |
| } |
| return true; |
| } |
| |
| /** |
| * Executes the given Get request. |
| * |
| * @param table |
| * hbase table |
| * @param get |
| * Get |
| * @return List<Cell> |
| * @throws IOException |
| * Signals that an I/O exception has occurred. |
| */ |
| private List<Cell> executeGetRequest(HTable table, Get get) |
| throws IOException { |
| LOGGER.info("Get :" + get.toString()); |
| table = (HTable) HBaseConfigurationUtil.getConnection().getTable( |
| ConfigurationUtil.getTableName()); |
| Result result = table.get(get); |
| List<Cell> cells = result.getColumnCells( |
| ConfigurationUtil.getColumnFamily(), |
| ConfigurationUtil.getColumnQualifier()); |
| return cells; |
| } |
| |
| /** |
| * Execute scan request. |
| * |
| * @param table |
| * hbase table |
| * @param scan |
| * the scan |
| * @return the list |
| * @throws IOException |
| * Signals that an I/O exception has occurred. |
| */ |
| private List<Cell> executeScanRequest(HTable table, Scan scan) |
| throws IOException { |
| LOGGER.info("Scan :" + scan.toString()); |
| table = (HTable) HBaseConfigurationUtil.getConnection().getTable( |
| ConfigurationUtil.getConfiguration().getString("hbase.table.name")); |
| ResultScanner resultScanner = table.getScanner(scan); |
| List<Cell> scannedCells = new ArrayList<Cell>(); |
| for (Result result = resultScanner.next(); result != null; result = resultScanner |
| .next()) { |
| List<Cell> cells = result.getColumnCells( |
| ConfigurationUtil.getColumnFamily(), |
| ConfigurationUtil.getColumnQualifier()); |
| if (cells != null) { |
| for (Cell cell : cells) { |
| scannedCells.add(cell); |
| } |
| } |
| } |
| return scannedCells; |
| } |
| |
| /** |
| * Creates the get request. |
| * |
| * @param key |
| * the key |
| * @param startTime |
| * the start time |
| * @param endTime |
| * the end time |
| * @return the gets the |
| * @throws IOException |
| * Signals that an I/O exception has occurred. |
| */ |
| @VisibleForTesting |
| Get createGetRequest(String key, long startTime, long endTime) |
| throws IOException { |
| Get get = new Get(Bytes.toBytes(key)); |
| // set family name |
| get.addFamily(ConfigurationUtil.getColumnFamily()); |
| |
| // set column family, qualifier |
| get.addColumn(ConfigurationUtil.getColumnFamily(), |
| ConfigurationUtil.getColumnQualifier()); |
| |
| // set max versions |
| get.setMaxVersions(ConfigurationUtil.getMaxVersions()); |
| |
| // set time range |
| setTimeRangeOnGet(get, startTime, endTime); |
| return get; |
| } |
| |
| /** |
| * Creates the scan request. |
| * |
| * @param pcapsResponse |
| * the pcaps response |
| * @param keysMap |
| * the keys map |
| * @param startTime |
| * the start time |
| * @param endTime |
| * the end time |
| * @param maxResultSize |
| * the max result size |
| * @return the scan |
| * @throws IOException |
| * Signals that an I/O exception has occurred. |
| */ |
| @VisibleForTesting |
| Scan createScanRequest(PcapsResponse pcapsResponse, |
| Map<String, String> keysMap, long startTime, long endTime, |
| long maxResultSize) throws IOException { |
| Scan scan = new Scan(); |
| // set column family, qualifier |
| scan.addColumn(ConfigurationUtil.getColumnFamily(), |
| ConfigurationUtil.getColumnQualifier()); |
| |
| // set start and stop keys |
| scan.setStartRow(keysMap.get(HBaseConfigConstants.START_KEY).getBytes()); |
| scan.setStopRow(keysMap.get(HBaseConfigConstants.END_KEY).getBytes()); |
| |
| // set max results size : remaining size = max results size - ( current |
| // pcaps response size + possible maximum row size) |
| long remainingSize = maxResultSize |
| - (pcapsResponse.getResponseSize() + ConfigurationUtil.getMaxRowSize()); |
| |
| if (remainingSize > 0) { |
| scan.setMaxResultSize(remainingSize); |
| } |
| // set max versions |
| scan.setMaxVersions(ConfigurationUtil.getConfiguration().getInt( |
| "hbase.table.column.maxVersions")); |
| |
| // set time range |
| setTimeRangeOnScan(scan, startTime, endTime); |
| return scan; |
| } |
| |
| /** |
| * Sets the time range on scan. |
| * |
| * @param scan |
| * the scan |
| * @param startTime |
| * the start time |
| * @param endTime |
| * the end time |
| * @throws IOException |
| * Signals that an I/O exception has occurred. |
| */ |
| private void setTimeRangeOnScan(Scan scan, long startTime, long endTime) |
| throws IOException { |
| boolean setTimeRange = true; |
| if (startTime < 0 && endTime < 0) { |
| setTimeRange = false; |
| } |
| if (setTimeRange) { |
| if (startTime < 0) { |
| startTime = 0; |
| } else { |
| startTime = PcapHelper.convertToDataCreationTimeUnit(startTime); |
| } |
| if (endTime < 0) { |
| endTime = Long.MAX_VALUE; |
| } else { |
| endTime = PcapHelper.convertToDataCreationTimeUnit(endTime); |
| } |
| Assert.isTrue(startTime < endTime, |
| "startTime value must be less than endTime value"); |
| scan.setTimeRange(startTime, endTime); |
| } |
| } |
| |
| /** |
| * Sets the time range on get. |
| * |
| * @param get |
| * the get |
| * @param startTime |
| * the start time |
| * @param endTime |
| * the end time |
| * @throws IOException |
| * Signals that an I/O exception has occurred. |
| */ |
| private void setTimeRangeOnGet(Get get, long startTime, long endTime) |
| throws IOException { |
| boolean setTimeRange = true; |
| if (startTime < 0 && endTime < 0) { |
| setTimeRange = false; |
| } |
| if (setTimeRange) { |
| if (startTime < 0) { |
| startTime = 0; |
| } else { |
| startTime = PcapHelper.convertToDataCreationTimeUnit(startTime); |
| } |
| if (endTime < 0) { |
| endTime = Long.MAX_VALUE; |
| } else { |
| endTime = PcapHelper.convertToDataCreationTimeUnit(endTime); |
| } |
| Assert.isTrue(startTime < endTime, |
| "startTime value must be less than endTime value"); |
| get.setTimeRange(startTime, endTime); |
| } |
| } |
| |
| /** |
| * Gets the min limit for appending tokens. |
| * |
| * @return the min limit for appending tokens |
| */ |
| private String getMinLimitForAppendingTokens() { |
| int digits = ConfigurationUtil.getAppendingTokenDigits(); |
| StringBuffer sbf = new StringBuffer(); |
| for (int i = 0; i < digits; i++) { |
| sbf.append("0"); |
| } |
| return sbf.toString(); |
| } |
| |
| /** |
| * Gets the max limit for appending tokens. |
| * |
| * @return the max limit for appending tokens |
| */ |
| private String getMaxLimitForAppendingTokens() { |
| int digits = ConfigurationUtil.getAppendingTokenDigits(); |
| StringBuffer sbf = new StringBuffer(); |
| for (int i = 0; i < digits; i++) { |
| sbf.append("9"); |
| } |
| return sbf.toString(); |
| } |
| |
| /** |
| * The main method. |
| * |
| * @param args |
| * the arguments |
| * |
| * @throws IOException |
| * Signals that an I/O exception has occurred. |
| */ |
| public static void main(String[] args) throws IOException { |
| if (args == null || args.length < 2) { |
| usage(); |
| return; |
| } |
| String outputFileName = null; |
| outputFileName = args[1]; |
| List<String> keys = Arrays.asList(StringUtils.split(args[2], ",")); |
| System.out.println("Geting keys " + keys); |
| long startTime = 0; |
| long endTime = Long.MAX_VALUE; |
| if (args.length > 3) { |
| startTime = Long.valueOf(args[3]); |
| } |
| if (args.length > 4) { |
| endTime = Long.valueOf(args[4]); |
| } |
| System.out.println("With start time " + startTime + " and end time " |
| + endTime); |
| PcapGetterHBaseImpl downloader = new PcapGetterHBaseImpl(); |
| PcapsResponse pcaps = downloader.getPcaps(keys, null, startTime, endTime, |
| false, false, 6); |
| File file = new File(outputFileName); |
| FileUtils.write(file, "", false); |
| FileUtils.writeByteArrayToFile(file, pcaps.getPcaps(), true); |
| } |
| |
| /** |
| * Usage. |
| */ |
| private static void usage() { |
| System.out.println("java " + PcapGetterHBaseImpl.class.getName() // $codepro.audit.disable |
| // debuggingCode |
| + " <zk quorum> <output file> <start key> [stop key]"); |
| } |
| |
| } |