blob: 1feb6d57de16921492106984f09bb35d1d8d2b16 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.storm.daemon.logviewer.handler;
import static;
import static org.apache.storm.daemon.utils.ListFunctionalSupport.drop;
import static org.apache.storm.daemon.utils.ListFunctionalSupport.first;
import static org.apache.storm.daemon.utils.ListFunctionalSupport.last;
import static;
import static org.apache.storm.daemon.utils.PathUtil.truncatePathToLastElements;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.commons.lang.BooleanUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.storm.DaemonConfig;
import org.apache.storm.daemon.common.JsonResponseBuilder;
import org.apache.storm.daemon.logviewer.LogviewerConstant;
import org.apache.storm.daemon.logviewer.utils.DirectoryCleaner;
import org.apache.storm.daemon.logviewer.utils.ExceptionMeterNames;
import org.apache.storm.daemon.logviewer.utils.LogviewerResponseBuilder;
import org.apache.storm.daemon.logviewer.utils.ResourceAuthorizer;
import org.apache.storm.daemon.logviewer.utils.WorkerLogs;
import org.apache.storm.daemon.ui.InvalidRequestException;
import org.apache.storm.daemon.utils.StreamUtil;
import org.apache.storm.daemon.utils.UrlBuilder;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Utils;
import org.jooq.lambda.Unchecked;
import org.json.simple.JSONAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LogviewerLogSearchHandler {
private static final Logger LOG = LoggerFactory.getLogger(LogviewerLogSearchHandler.class);
public static final int GREP_MAX_SEARCH_SIZE = 1024;
public static final int GREP_BUF_SIZE = 2048;
public static final int GREP_CONTEXT_SIZE = 128;
public static final Pattern WORKER_LOG_FILENAME_PATTERN = Pattern.compile("^worker.log(.*)");
private final Meter numDeepSearchNoResult;
private final Histogram numFileScanned;
private final Meter numSearchRequestNoResult;
private final Meter numFileOpenExceptions;
private final Meter numFileReadExceptions;
private final Map<String, Object> stormConf;
private final Path logRoot;
private final Path daemonLogRoot;
private final ResourceAuthorizer resourceAuthorizer;
private final Integer logviewerPort;
private final String scheme;
private final DirectoryCleaner directoryCleaner;
* Constructor.
* @param stormConf storm configuration
* @param logRoot log root directory
* @param daemonLogRoot daemon log root directory
* @param resourceAuthorizer {@link ResourceAuthorizer}
* @param metricsRegistry The logviewer metrics registry
public LogviewerLogSearchHandler(Map<String, Object> stormConf, Path logRoot, Path daemonLogRoot,
ResourceAuthorizer resourceAuthorizer, StormMetricsRegistry metricsRegistry) {
this.stormConf = stormConf;
this.logRoot = logRoot.toAbsolutePath().normalize();
this.daemonLogRoot = daemonLogRoot.toAbsolutePath().normalize();
this.resourceAuthorizer = resourceAuthorizer;
Object httpsPort = stormConf.get(DaemonConfig.LOGVIEWER_HTTPS_PORT);
if (httpsPort == null) {
this.logviewerPort = ObjectReader.getInt(stormConf.get(DaemonConfig.LOGVIEWER_PORT));
this.scheme = "http";
} else {
this.logviewerPort = ObjectReader.getInt(httpsPort);
this.scheme = "https";
this.numDeepSearchNoResult = metricsRegistry.registerMeter("logviewer:num-deep-search-no-result");
this.numFileScanned = metricsRegistry.registerHistogram("logviewer:num-files-scanned-per-deep-search");
this.numSearchRequestNoResult = metricsRegistry.registerMeter("logviewer:num-search-request-no-result");
this.numFileOpenExceptions = metricsRegistry.registerMeter(ExceptionMeterNames.NUM_FILE_OPEN_EXCEPTIONS);
this.numFileReadExceptions = metricsRegistry.registerMeter(ExceptionMeterNames.NUM_FILE_READ_EXCEPTIONS);
this.directoryCleaner = new DirectoryCleaner(metricsRegistry);
* Search from a worker log file.
* @param fileName log file
* @param user username
* @param isDaemon whether the log file is regarding worker or daemon
* @param search search string
* @param numMatchesStr the count of maximum matches
* @param offsetStr start offset for log file
* @param callback callbackParameterName for JSONP
* @param origin origin
* @return Response containing JSON content representing search result
public Response searchLogFile(String fileName, String user, boolean isDaemon, String search,
String numMatchesStr, String offsetStr, String callback, String origin)
throws IOException, InvalidRequestException {
boolean noResult = true;
Path rootDir = isDaemon ? daemonLogRoot : logRoot;
Path rawFile = rootDir.resolve(fileName);
Path absFile = rawFile.toAbsolutePath().normalize();
if (!absFile.startsWith(rootDir) || !rawFile.normalize().toString().equals(rawFile.toString())) {
//Ensure filename doesn't contain ../ parts
return searchLogFileNotFound(callback);
if (isDaemon && Paths.get(fileName).getNameCount() != 1) {
//Don't permit path traversal for calls intended to read from the daemon logs
return searchLogFileNotFound(callback);
Response response;
if (absFile.toFile().exists()) {
if (isDaemon || resourceAuthorizer.isUserAllowedToAccessFile(user, fileName)) {
Integer numMatchesInt = numMatchesStr != null ? tryParseIntParam("num-matches", numMatchesStr) : null;
Integer offsetInt = offsetStr != null ? tryParseIntParam("start-byte-offset", offsetStr) : null;
try {
if (StringUtils.isNotEmpty(search) && search.getBytes("UTF-8").length <= GREP_MAX_SEARCH_SIZE) {
Map<String, Object> entity = new HashMap<>();
entity.put("isDaemon", isDaemon ? "yes" : "no");
Map<String, Object> res = substringSearch(absFile, search, isDaemon, numMatchesInt, offsetInt);
noResult = ((List) res.get("matches")).isEmpty();
response = LogviewerResponseBuilder.buildSuccessJsonResponse(entity, callback, origin);
} else {
throw new InvalidRequestException("Search substring must be between 1 and 1024 "
+ "UTF-8 bytes in size (inclusive)");
} catch (Exception ex) {
response = LogviewerResponseBuilder.buildExceptionJsonResponse(ex, callback);
} else {
// unauthorized
response = LogviewerResponseBuilder.buildUnauthorizedUserJsonResponse(user, callback);
} else {
response = searchLogFileNotFound(callback);
if (noResult) {
return response;
private Response searchLogFileNotFound(String callback) {
Map<String, String> entity = new HashMap<>();
entity.put("error", "Not Found");
entity.put("errorMessage", "The file was not found on this node.");
return new JsonResponseBuilder().setData(entity).setCallback(callback).setStatus(404).build();
* Advanced search across worker log files in a topology.
* @param topologyId topology ID
* @param user username
* @param search search string
* @param numMatchesStr the count of maximum matches. Note that this number is with respect to each port, not to each log or each search
* request
* @param portStr worker port, null or '*' if the request wants to search from all worker logs
* @param fileOffsetStr index (offset) of the log files
* @param offsetStr start offset for log file
* @param searchArchived true if the request wants to search also archived files, false if not
* @param callback callbackParameterName for JSONP
* @param origin origin
* @return Response containing JSON content representing search result
public Response deepSearchLogsForTopology(String topologyId, String user, String search,
String numMatchesStr, String portStr, String fileOffsetStr, String offsetStr,
Boolean searchArchived, String callback, String origin) throws IOException {
int numMatchedFiles = 0;
int numScannedFiles = 0;
Path rootDir = logRoot;
Path absTopoDir = rootDir.resolve(topologyId).toAbsolutePath().normalize();
Object returnValue;
if (StringUtils.isEmpty(search) || !absTopoDir.toFile().exists() || !absTopoDir.startsWith(rootDir)) {
returnValue = new ArrayList<>();
} else {
int fileOffset = ObjectReader.getInt(fileOffsetStr, 0);
int offset = ObjectReader.getInt(offsetStr, 0);
int numMatches = ObjectReader.getInt(numMatchesStr, 1);
if (StringUtils.isEmpty(portStr) || portStr.equals("*")) {
try (Stream<Path> topoDir = Files.list(absTopoDir)) {
// check for all ports
Stream<List<Path>> portsOfLogs = topoDir
.map(portDir -> logsForPort(user, portDir))
.filter(logs -> logs != null && !logs.isEmpty());
if (BooleanUtils.isNotTrue(searchArchived)) {
portsOfLogs = -> Collections.singletonList(first(fl)));
final List<Matched> matchedList = portsOfLogs
.map(logs -> findNMatches(logs, numMatches, 0, 0, search))
numMatchedFiles = -> match.getMatches().size()).sum();
numScannedFiles = -> match.openedFiles).sum();
returnValue = matchedList;
} else {
int port = Integer.parseInt(portStr);
// check just the one port
List<Integer> slotsPorts = (List<Integer>) stormConf.getOrDefault(DaemonConfig.SUPERVISOR_SLOTS_PORTS,
new ArrayList<>());
boolean containsPort =
.anyMatch(slotPort -> slotPort != null && (slotPort == port));
if (!containsPort) {
returnValue = new ArrayList<>();
} else {
Path absPortDir = absTopoDir.resolve(Integer.toString(port)).toAbsolutePath().normalize();
if (!absPortDir.toFile().exists()
|| !absPortDir.startsWith(absTopoDir)) {
returnValue = new ArrayList<>();
} else {
List<Path> filteredLogs = logsForPort(user, absPortDir);
if (BooleanUtils.isNotTrue(searchArchived)) {
filteredLogs = Collections.singletonList(first(filteredLogs));
fileOffset = 0;
returnValue = findNMatches(filteredLogs, numMatches, fileOffset, offset, search);
numMatchedFiles = ((Matched) returnValue).getMatches().size();
numScannedFiles = ((Matched) returnValue).openedFiles;
if (numMatchedFiles == 0) {
return LogviewerResponseBuilder.buildSuccessJsonResponse(returnValue, callback, origin);
private Integer tryParseIntParam(String paramName, String value) throws InvalidRequestException {
try {
return Integer.parseInt(value);
} catch (NumberFormatException e) {
throw new InvalidRequestException("Could not parse " + paramName + " to an integer");
Map<String, Object> substringSearch(Path file, String searchString) throws InvalidRequestException {
return substringSearch(file, searchString, false, 10, 0);
Map<String, Object> substringSearch(Path file, String searchString, int numMatches) throws InvalidRequestException {
return substringSearch(file, searchString, false, numMatches, 0);
Map<String, Object> substringSearch(Path file,
String searchString,
int numMatches,
int startByteOffset) throws InvalidRequestException {
return substringSearch(file, searchString, false, numMatches, startByteOffset);
private Map<String, Object> substringSearch(Path file, String searchString, boolean isDaemon, Integer numMatches,
Integer startByteOffset) throws InvalidRequestException {
if (StringUtils.isEmpty(searchString)) {
throw new IllegalArgumentException("Precondition fails: search string should not be empty.");
if (searchString.getBytes(StandardCharsets.UTF_8).length > GREP_MAX_SEARCH_SIZE) {
throw new IllegalArgumentException("Precondition fails: the length of search string should be less than "
boolean isZipFile = file.toString().endsWith(".gz");
try (InputStream fis = Files.newInputStream(file)) {
try (InputStream gzippedInputStream = isZipFile ? new GZIPInputStream(fis) : fis;
BufferedInputStream stream = new BufferedInputStream(gzippedInputStream)) {
//It's more likely to be a file read exception here, so we don't differentiate
int fileLength = isZipFile ? (int) ServerUtils.zipFileSize(file.toFile()) : (int) Files.size(file);
ByteBuffer buf = ByteBuffer.allocate(GREP_BUF_SIZE);
final byte[] bufArray = buf.array();
final byte[] searchBytes = searchString.getBytes(StandardCharsets.UTF_8);
numMatches = numMatches != null ? numMatches : 10;
startByteOffset = startByteOffset != null ? startByteOffset : 0;
// Start at the part of the log file we are interested in.
// Allow searching when start-byte-offset == file-len so it doesn't blow up on 0-length files
if (startByteOffset > fileLength) {
throw new InvalidRequestException("Cannot search past the end of the file");
if (startByteOffset > 0) {
StreamUtil.skipBytes(stream, startByteOffset);
Arrays.fill(bufArray, (byte) 0);
int totalBytesRead = 0;
int bytesRead =, 0, Math.min(fileLength, GREP_BUF_SIZE));
totalBytesRead += bytesRead;
List<Map<String, Object>> initialMatches = new ArrayList<>();
int initBufOffset = 0;
int byteOffset = startByteOffset;
byte[] beforeBytes = null;
Map<String, Object> ret = new HashMap<>();
while (true) {
SubstringSearchResult searchRet = bufferSubstringSearch(isDaemon, file, fileLength, byteOffset, initBufOffset,
stream, startByteOffset, totalBytesRead, buf, searchBytes, initialMatches, numMatches, beforeBytes);
List<Map<String, Object>> matches = searchRet.getMatches();
Integer newByteOffset = searchRet.getNewByteOffset();
byte[] newBeforeBytes = searchRet.getNewBeforeBytes();
if (matches.size() < numMatches && totalBytesRead + startByteOffset < fileLength) {
// The start index is positioned to find any possible
// occurrence search string that did not quite fit in the
// buffer on the previous read.
final int newBufOffset = Math.min(buf.limit(), GREP_MAX_SEARCH_SIZE) - searchBytes.length;
totalBytesRead = rotateGrepBuffer(buf, stream, totalBytesRead, fileLength);
if (totalBytesRead < 0) {
throw new InvalidRequestException("Cannot search past the end of the file");
initialMatches = matches;
initBufOffset = newBufOffset;
byteOffset = newByteOffset;
beforeBytes = newBeforeBytes;
} else {
ret.put("isDaemon", isDaemon ? "yes" : "no");
Integer nextByteOffset = null;
if (matches.size() >= numMatches || totalBytesRead < fileLength) {
nextByteOffset = (Integer) last(matches).get("byteOffset") + searchBytes.length;
if (fileLength <= nextByteOffset) {
nextByteOffset = null;
ret.putAll(mkGrepResponse(searchBytes, startByteOffset, matches, nextByteOffset));
return ret;
} catch (UnknownHostException | UnsupportedEncodingException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
Map<String, Object> substringSearchDaemonLog(Path file, String searchString) throws InvalidRequestException {
return substringSearch(file, searchString, true, 10, 0);
* Get the filtered, authorized, sorted log files for a port.
List<Path> logsForPort(String user, Path portDir) {
try {
List<Path> workerLogs = directoryCleaner.getFilesForDir(portDir).stream()
.filter(file -> WORKER_LOG_FILENAME_PATTERN.asPredicate().test(file.getFileName().toString()))
.filter(log -> resourceAuthorizer.isUserAllowedToAccessFile(user, WorkerLogs.getTopologyPortWorkerLog(log)))
.map(Unchecked.function(p -> Pair.of(p, Files.getLastModifiedTime(p))))
.sorted(Comparator.comparing((Pair<Path, FileTime> p) -> p.getRight()).reversed())
.map(p -> p.getLeft())
} catch (IOException e) {
throw new RuntimeException(e);
* Find the first N matches of target string in files.
* @param logs all candidate log files to search
* @param numMatches number of matches expected
* @param fileOffset number of log files to skip initially
* @param startByteOffset number of byte to be ignored in each log file
* @param targetStr searched string
* @return all matched results
Matched findNMatches(List<Path> logs, int numMatches, int fileOffset, int startByteOffset, String targetStr) {
logs = drop(logs, fileOffset);
LOG.debug("{} files to scan", logs.size());
List<Map<String, Object>> matches = new ArrayList<>();
int matchCount = 0;
int scannedFiles = 0;
while (true) {
if (logs.isEmpty()) {
//fileOffset = one past last scanned file
Path firstLog = logs.get(0);
Map<String, Object> matchInLog;
try {
LOG.debug("Looking through {}", firstLog);
matchInLog = substringSearch(firstLog, targetStr, numMatches - matchCount, startByteOffset);
} catch (InvalidRequestException e) {
LOG.error("Can't search past end of file.", e);
matchInLog = new HashMap<>();
String fileName = WorkerLogs.getTopologyPortWorkerLog(firstLog);
//This section simply put the formatted log filename and corresponding port in the matching.
final List<Map<String, Object>> newMatches = new ArrayList<>(matches);
Map<String, Object> currentFileMatch = new HashMap<>(matchInLog);
currentFileMatch.put("fileName", fileName);
Path firstLogAbsPath = firstLog.toAbsolutePath().normalize();
currentFileMatch.put("port", truncatePathToLastElements(firstLogAbsPath, 2).getName(0).toString());
int newCount = matchCount + ((List<?>) matchInLog.getOrDefault("matches", Collections.emptyList())).size();
if (newCount == matchCount) {
// matches and matchCount is not changed
logs = rest(logs);
startByteOffset = 0;
fileOffset = fileOffset + 1;
} else if (newCount >= numMatches) {
matches = newMatches;
//fileOffset = the index of last scanned file
} else {
matches = newMatches;
logs = rest(logs);
startByteOffset = 0;
fileOffset = fileOffset + 1;
matchCount = newCount;
LOG.debug("scanned {} files", scannedFiles);
return new Matched(fileOffset, targetStr, matches, scannedFiles);
* As the file is read into a buffer, 1/2 the buffer's size at a time, we search the buffer for matches of the substring and return a
* list of zero or more matches.
private SubstringSearchResult bufferSubstringSearch(boolean isDaemon, Path file, int fileLength, int offsetToBuf,
int initBufOffset, BufferedInputStream stream, Integer bytesSkipped,
int bytesRead, ByteBuffer haystack, byte[] needle,
List<Map<String, Object>> initialMatches, Integer numMatches, byte[] beforeBytes)
throws IOException {
int bufOffset = initBufOffset;
List<Map<String, Object>> matches = initialMatches;
byte[] newBeforeBytes;
Integer newByteOffset;
while (true) {
int offset = offsetOfBytes(haystack.array(), needle, bufOffset);
if (matches.size() < numMatches && offset >= 0) {
final int fileOffset = offsetToBuf + offset;
final int bytesNeededAfterMatch = haystack.limit() - GREP_CONTEXT_SIZE - needle.length;
byte[] beforeArg = null;
byte[] afterArg = null;
if (offset < GREP_CONTEXT_SIZE) {
beforeArg = beforeBytes;
if (offset > bytesNeededAfterMatch) {
afterArg = tryReadAhead(stream, haystack, offset, fileLength, bytesRead);
bufOffset = offset + needle.length;
matches.add(mkMatchData(needle, haystack, offset, fileOffset,
file.toAbsolutePath().normalize(), isDaemon, beforeArg, afterArg));
} else {
int beforeStrToOffset = Math.min(haystack.limit(), GREP_MAX_SEARCH_SIZE);
int beforeStrFromOffset = Math.max(0, beforeStrToOffset - GREP_CONTEXT_SIZE);
newBeforeBytes = Arrays.copyOfRange(haystack.array(), beforeStrFromOffset, beforeStrToOffset);
// It's OK if new-byte-offset is negative.
// This is normal if we are out of bytes to read from a small file.
if (matches.size() >= numMatches) {
newByteOffset = ((Number) last(matches).get("byteOffset")).intValue() + needle.length;
} else {
newByteOffset = bytesSkipped + bytesRead - GREP_MAX_SEARCH_SIZE;
return new SubstringSearchResult(matches, newByteOffset, newBeforeBytes);
private int rotateGrepBuffer(ByteBuffer buf, BufferedInputStream stream, int totalBytesRead, int fileLength) throws IOException {
byte[] bufArray = buf.array();
// Copy the 2nd half of the buffer to the first half.
System.arraycopy(bufArray, GREP_MAX_SEARCH_SIZE, bufArray, 0, GREP_MAX_SEARCH_SIZE);
// Zero-out the 2nd half to prevent accidental matches.
Arrays.fill(bufArray, GREP_MAX_SEARCH_SIZE, bufArray.length, (byte) 0);
// Fill the 2nd half with new bytes from the stream.
int bytesRead =, GREP_MAX_SEARCH_SIZE, Math.min(fileLength, GREP_MAX_SEARCH_SIZE));
buf.limit(GREP_MAX_SEARCH_SIZE + bytesRead);
return totalBytesRead + bytesRead;
private Map<String, Object> mkMatchData(byte[] needle, ByteBuffer haystack, int haystackOffset, int fileOffset, Path canonicalPath,
boolean isDaemon, byte[] beforeBytes, byte[] afterBytes)
throws UnsupportedEncodingException, UnknownHostException {
String url;
if (isDaemon) {
url = urlToMatchCenteredInLogPageDaemonFile(needle, canonicalPath, fileOffset, logviewerPort);
} else {
url = urlToMatchCenteredInLogPage(needle, canonicalPath, fileOffset, logviewerPort);
byte[] haystackBytes = haystack.array();
String beforeString;
String afterString;
if (haystackOffset >= GREP_CONTEXT_SIZE) {
beforeString = new String(haystackBytes, (haystackOffset - GREP_CONTEXT_SIZE), GREP_CONTEXT_SIZE, "UTF-8");
} else {
int numDesired = Math.max(0, GREP_CONTEXT_SIZE - haystackOffset);
int beforeSize = beforeBytes != null ? beforeBytes.length : 0;
int numExpected = Math.min(beforeSize, numDesired);
if (numExpected > 0) {
StringBuilder sb = new StringBuilder();
sb.append(new String(beforeBytes, beforeSize - numExpected, numExpected, "UTF-8"));
sb.append(new String(haystackBytes, 0, haystackOffset, "UTF-8"));
beforeString = sb.toString();
} else {
beforeString = new String(haystackBytes, 0, haystackOffset, "UTF-8");
int needleSize = needle.length;
int afterOffset = haystackOffset + needleSize;
int haystackSize = haystack.limit();
if ((afterOffset + GREP_CONTEXT_SIZE) < haystackSize) {
afterString = new String(haystackBytes, afterOffset, GREP_CONTEXT_SIZE, "UTF-8");
} else {
int numDesired = GREP_CONTEXT_SIZE - (haystackSize - afterOffset);
int afterSize = afterBytes != null ? afterBytes.length : 0;
int numExpected = Math.min(afterSize, numDesired);
if (numExpected > 0) {
StringBuilder sb = new StringBuilder();
sb.append(new String(haystackBytes, afterOffset, (haystackSize - afterOffset), "UTF-8"));
sb.append(new String(afterBytes, 0, numExpected, "UTF-8"));
afterString = sb.toString();
} else {
afterString = new String(haystackBytes, afterOffset, (haystackSize - afterOffset), "UTF-8");
Map<String, Object> ret = new HashMap<>();
ret.put("byteOffset", fileOffset);
ret.put("beforeString", beforeString);
ret.put("afterString", afterString);
ret.put("matchString", new String(needle, "UTF-8"));
ret.put("logviewerURL", url);
return ret;
* Tries once to read ahead in the stream to fill the context and resets the stream to its position before the call.
private byte[] tryReadAhead(BufferedInputStream stream, ByteBuffer haystack, int offset, int fileLength, int bytesRead)
throws IOException {
int numExpected = Math.min(fileLength - bytesRead, GREP_CONTEXT_SIZE);
byte[] afterBytes = new byte[numExpected];
// Only try reading once., 0, numExpected);
return afterBytes;
* Searches a given byte array for a match of a sub-array of bytes. Returns the offset to the byte that matches, or -1 if no match was
* found.
private int offsetOfBytes(byte[] buffer, byte[] search, int initOffset) {
if (search.length <= 0) {
throw new IllegalArgumentException("Search array should not be empty.");
if (initOffset < 0) {
throw new IllegalArgumentException("Start offset shouldn't be negative.");
int offset = initOffset;
int candidateOffset = initOffset;
int valOffset = 0;
int retOffset = 0;
while (true) {
if (search.length - valOffset <= 0) {
// found
retOffset = candidateOffset;
} else {
if (offset >= buffer.length) {
// We ran out of buffer for the search.
retOffset = -1;
} else {
if (search[valOffset] != buffer[offset]) {
// The match at this candidate offset failed, so start over with the
// next candidate byte from the buffer.
int newOffset = candidateOffset + 1;
offset = newOffset;
candidateOffset = newOffset;
valOffset = 0;
} else {
// So far it matches. Keep going...
offset = offset + 1;
valOffset = valOffset + 1;
return retOffset;
* This response data only includes a next byte offset if there is more of the file to read.
private Map<String, Object> mkGrepResponse(byte[] searchBytes, Integer offset, List<Map<String, Object>> matches,
Integer nextByteOffset) throws UnsupportedEncodingException {
Map<String, Object> ret = new HashMap<>();
ret.put("searchString", new String(searchBytes, "UTF-8"));
ret.put("startByteOffset", offset);
ret.put("matches", matches);
if (nextByteOffset != null) {
ret.put("nextByteOffset", nextByteOffset);
return ret;
String urlToMatchCenteredInLogPage(byte[] needle, Path canonicalPath, int offset, Integer port) throws UnknownHostException {
final String host = Utils.hostname();
final Path truncatedFilePath = truncatePathToLastElements(canonicalPath, 3);
Map<String, Object> parameters = new HashMap<>();
parameters.put("file", truncatedFilePath.toString());
parameters.put("start", Math.max(0, offset - (LogviewerConstant.DEFAULT_BYTES_PER_PAGE / 2) - (needle.length / -2)));
parameters.put("length", LogviewerConstant.DEFAULT_BYTES_PER_PAGE);
return + "://%s:%d/api/v1/log", host, port), parameters);
String urlToMatchCenteredInLogPageDaemonFile(byte[] needle, Path canonicalPath, int offset, Integer port) throws UnknownHostException {
final String host = Utils.hostname();
final Path truncatedFilePath = truncatePathToLastElements(canonicalPath, 1);
Map<String, Object> parameters = new HashMap<>();
parameters.put("file", truncatedFilePath.toString());
parameters.put("start", Math.max(0, offset - (LogviewerConstant.DEFAULT_BYTES_PER_PAGE / 2) - (needle.length / -2)));
parameters.put("length", LogviewerConstant.DEFAULT_BYTES_PER_PAGE);
return + "://%s:%d/api/v1/daemonlog", host, port), parameters);
public static class Matched implements JSONAware {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private int fileOffset;
private String searchString;
private List<Map<String, Object>> matches;
private final int openedFiles;
* Constructor.
* @param fileOffset offset (index) of the files
* @param searchString search string
* @param matches map representing matched search result
* @param openedFiles number of files scanned, used for metrics only
public Matched(int fileOffset, String searchString, List<Map<String, Object>> matches, int openedFiles) {
this.fileOffset = fileOffset;
this.searchString = searchString;
this.matches = matches;
this.openedFiles = openedFiles;
public int getFileOffset() {
return fileOffset;
public String getSearchString() {
return searchString;
public List<Map<String, Object>> getMatches() {
return matches;
public String toJSONString() {
try {
return OBJECT_MAPPER.writeValueAsString(this);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
private static class SubstringSearchResult {
private List<Map<String, Object>> matches;
private Integer newByteOffset;
private byte[] newBeforeBytes;
SubstringSearchResult(List<Map<String, Object>> matches, Integer newByteOffset, byte[] newBeforeBytes) {
this.matches = matches;
this.newByteOffset = newByteOffset;
this.newBeforeBytes = newBeforeBytes;
public List<Map<String, Object>> getMatches() {
return matches;
public Integer getNewByteOffset() {
return newByteOffset;
public byte[] getNewBeforeBytes() {
return newBeforeBytes;