blob: 1feb6d57de16921492106984f09bb35d1d8d2b16 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.daemon.logviewer.handler;
import static java.util.stream.Collectors.toList;
import static org.apache.storm.daemon.utils.ListFunctionalSupport.drop;
import static org.apache.storm.daemon.utils.ListFunctionalSupport.first;
import static org.apache.storm.daemon.utils.ListFunctionalSupport.last;
import static org.apache.storm.daemon.utils.ListFunctionalSupport.rest;
import static org.apache.storm.daemon.utils.PathUtil.truncatePathToLastElements;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import java.util.zip.GZIPInputStream;
import javax.ws.rs.core.Response;
import org.apache.commons.lang.BooleanUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.storm.DaemonConfig;
import org.apache.storm.daemon.common.JsonResponseBuilder;
import org.apache.storm.daemon.logviewer.LogviewerConstant;
import org.apache.storm.daemon.logviewer.utils.DirectoryCleaner;
import org.apache.storm.daemon.logviewer.utils.ExceptionMeterNames;
import org.apache.storm.daemon.logviewer.utils.LogviewerResponseBuilder;
import org.apache.storm.daemon.logviewer.utils.ResourceAuthorizer;
import org.apache.storm.daemon.logviewer.utils.WorkerLogs;
import org.apache.storm.daemon.ui.InvalidRequestException;
import org.apache.storm.daemon.utils.StreamUtil;
import org.apache.storm.daemon.utils.UrlBuilder;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Utils;
import org.jooq.lambda.Unchecked;
import org.json.simple.JSONAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LogviewerLogSearchHandler {
private static final Logger LOG = LoggerFactory.getLogger(LogviewerLogSearchHandler.class);
public static final int GREP_MAX_SEARCH_SIZE = 1024;
public static final int GREP_BUF_SIZE = 2048;
public static final int GREP_CONTEXT_SIZE = 128;
public static final Pattern WORKER_LOG_FILENAME_PATTERN = Pattern.compile("^worker.log(.*)");
private final Meter numDeepSearchNoResult;
private final Histogram numFileScanned;
private final Meter numSearchRequestNoResult;
private final Meter numFileOpenExceptions;
private final Meter numFileReadExceptions;
private final Map<String, Object> stormConf;
private final Path logRoot;
private final Path daemonLogRoot;
private final ResourceAuthorizer resourceAuthorizer;
private final Integer logviewerPort;
private final String scheme;
private final DirectoryCleaner directoryCleaner;
/**
* Constructor.
*
* @param stormConf storm configuration
* @param logRoot log root directory
* @param daemonLogRoot daemon log root directory
* @param resourceAuthorizer {@link ResourceAuthorizer}
* @param metricsRegistry The logviewer metrics registry
*/
public LogviewerLogSearchHandler(Map<String, Object> stormConf, Path logRoot, Path daemonLogRoot,
ResourceAuthorizer resourceAuthorizer, StormMetricsRegistry metricsRegistry) {
this.stormConf = stormConf;
this.logRoot = logRoot.toAbsolutePath().normalize();
this.daemonLogRoot = daemonLogRoot.toAbsolutePath().normalize();
this.resourceAuthorizer = resourceAuthorizer;
Object httpsPort = stormConf.get(DaemonConfig.LOGVIEWER_HTTPS_PORT);
if (httpsPort == null) {
this.logviewerPort = ObjectReader.getInt(stormConf.get(DaemonConfig.LOGVIEWER_PORT));
this.scheme = "http";
} else {
this.logviewerPort = ObjectReader.getInt(httpsPort);
this.scheme = "https";
}
this.numDeepSearchNoResult = metricsRegistry.registerMeter("logviewer:num-deep-search-no-result");
this.numFileScanned = metricsRegistry.registerHistogram("logviewer:num-files-scanned-per-deep-search");
this.numSearchRequestNoResult = metricsRegistry.registerMeter("logviewer:num-search-request-no-result");
this.numFileOpenExceptions = metricsRegistry.registerMeter(ExceptionMeterNames.NUM_FILE_OPEN_EXCEPTIONS);
this.numFileReadExceptions = metricsRegistry.registerMeter(ExceptionMeterNames.NUM_FILE_READ_EXCEPTIONS);
this.directoryCleaner = new DirectoryCleaner(metricsRegistry);
}
/**
* Search from a worker log file.
*
* @param fileName log file
* @param user username
* @param isDaemon whether the log file is regarding worker or daemon
* @param search search string
* @param numMatchesStr the count of maximum matches
* @param offsetStr start offset for log file
* @param callback callbackParameterName for JSONP
* @param origin origin
* @return Response containing JSON content representing search result
*/
public Response searchLogFile(String fileName, String user, boolean isDaemon, String search,
String numMatchesStr, String offsetStr, String callback, String origin)
throws IOException, InvalidRequestException {
boolean noResult = true;
Path rootDir = isDaemon ? daemonLogRoot : logRoot;
Path rawFile = rootDir.resolve(fileName);
Path absFile = rawFile.toAbsolutePath().normalize();
if (!absFile.startsWith(rootDir) || !rawFile.normalize().toString().equals(rawFile.toString())) {
//Ensure filename doesn't contain ../ parts
return searchLogFileNotFound(callback);
}
if (isDaemon && Paths.get(fileName).getNameCount() != 1) {
//Don't permit path traversal for calls intended to read from the daemon logs
return searchLogFileNotFound(callback);
}
Response response;
if (absFile.toFile().exists()) {
if (isDaemon || resourceAuthorizer.isUserAllowedToAccessFile(user, fileName)) {
Integer numMatchesInt = numMatchesStr != null ? tryParseIntParam("num-matches", numMatchesStr) : null;
Integer offsetInt = offsetStr != null ? tryParseIntParam("start-byte-offset", offsetStr) : null;
try {
if (StringUtils.isNotEmpty(search) && search.getBytes("UTF-8").length <= GREP_MAX_SEARCH_SIZE) {
Map<String, Object> entity = new HashMap<>();
entity.put("isDaemon", isDaemon ? "yes" : "no");
Map<String, Object> res = substringSearch(absFile, search, isDaemon, numMatchesInt, offsetInt);
entity.putAll(res);
noResult = ((List) res.get("matches")).isEmpty();
response = LogviewerResponseBuilder.buildSuccessJsonResponse(entity, callback, origin);
} else {
throw new InvalidRequestException("Search substring must be between 1 and 1024 "
+ "UTF-8 bytes in size (inclusive)");
}
} catch (Exception ex) {
response = LogviewerResponseBuilder.buildExceptionJsonResponse(ex, callback);
}
} else {
// unauthorized
response = LogviewerResponseBuilder.buildUnauthorizedUserJsonResponse(user, callback);
}
} else {
response = searchLogFileNotFound(callback);
}
if (noResult) {
numSearchRequestNoResult.mark();
}
return response;
}
private Response searchLogFileNotFound(String callback) {
Map<String, String> entity = new HashMap<>();
entity.put("error", "Not Found");
entity.put("errorMessage", "The file was not found on this node.");
return new JsonResponseBuilder().setData(entity).setCallback(callback).setStatus(404).build();
}
/**
* Advanced search across worker log files in a topology.
*
* @param topologyId topology ID
* @param user username
* @param search search string
* @param numMatchesStr the count of maximum matches. Note that this number is with respect to each port, not to each log or each search
* request
* @param portStr worker port, null or '*' if the request wants to search from all worker logs
* @param fileOffsetStr index (offset) of the log files
* @param offsetStr start offset for log file
* @param searchArchived true if the request wants to search also archived files, false if not
* @param callback callbackParameterName for JSONP
* @param origin origin
* @return Response containing JSON content representing search result
*/
public Response deepSearchLogsForTopology(String topologyId, String user, String search,
String numMatchesStr, String portStr, String fileOffsetStr, String offsetStr,
Boolean searchArchived, String callback, String origin) throws IOException {
int numMatchedFiles = 0;
int numScannedFiles = 0;
Path rootDir = logRoot;
Path absTopoDir = rootDir.resolve(topologyId).toAbsolutePath().normalize();
Object returnValue;
if (StringUtils.isEmpty(search) || !absTopoDir.toFile().exists() || !absTopoDir.startsWith(rootDir)) {
returnValue = new ArrayList<>();
} else {
int fileOffset = ObjectReader.getInt(fileOffsetStr, 0);
int offset = ObjectReader.getInt(offsetStr, 0);
int numMatches = ObjectReader.getInt(numMatchesStr, 1);
if (StringUtils.isEmpty(portStr) || portStr.equals("*")) {
try (Stream<Path> topoDir = Files.list(absTopoDir)) {
// check for all ports
Stream<List<Path>> portsOfLogs = topoDir
.map(portDir -> logsForPort(user, portDir))
.filter(logs -> logs != null && !logs.isEmpty());
if (BooleanUtils.isNotTrue(searchArchived)) {
portsOfLogs = portsOfLogs.map(fl -> Collections.singletonList(first(fl)));
}
final List<Matched> matchedList = portsOfLogs
.map(logs -> findNMatches(logs, numMatches, 0, 0, search))
.collect(toList());
numMatchedFiles = matchedList.stream().mapToInt(match -> match.getMatches().size()).sum();
numScannedFiles = matchedList.stream().mapToInt(match -> match.openedFiles).sum();
returnValue = matchedList;
}
} else {
int port = Integer.parseInt(portStr);
// check just the one port
@SuppressWarnings("unchecked")
List<Integer> slotsPorts = (List<Integer>) stormConf.getOrDefault(DaemonConfig.SUPERVISOR_SLOTS_PORTS,
new ArrayList<>());
boolean containsPort = slotsPorts.stream()
.anyMatch(slotPort -> slotPort != null && (slotPort == port));
if (!containsPort) {
returnValue = new ArrayList<>();
} else {
Path absPortDir = absTopoDir.resolve(Integer.toString(port)).toAbsolutePath().normalize();
if (!absPortDir.toFile().exists()
|| !absPortDir.startsWith(absTopoDir)) {
returnValue = new ArrayList<>();
} else {
List<Path> filteredLogs = logsForPort(user, absPortDir);
if (BooleanUtils.isNotTrue(searchArchived)) {
filteredLogs = Collections.singletonList(first(filteredLogs));
fileOffset = 0;
}
returnValue = findNMatches(filteredLogs, numMatches, fileOffset, offset, search);
numMatchedFiles = ((Matched) returnValue).getMatches().size();
numScannedFiles = ((Matched) returnValue).openedFiles;
}
}
}
}
if (numMatchedFiles == 0) {
numDeepSearchNoResult.mark();
}
numFileScanned.update(numScannedFiles);
return LogviewerResponseBuilder.buildSuccessJsonResponse(returnValue, callback, origin);
}
private Integer tryParseIntParam(String paramName, String value) throws InvalidRequestException {
try {
return Integer.parseInt(value);
} catch (NumberFormatException e) {
throw new InvalidRequestException("Could not parse " + paramName + " to an integer");
}
}
@VisibleForTesting
Map<String, Object> substringSearch(Path file, String searchString) throws InvalidRequestException {
return substringSearch(file, searchString, false, 10, 0);
}
@VisibleForTesting
Map<String, Object> substringSearch(Path file, String searchString, int numMatches) throws InvalidRequestException {
return substringSearch(file, searchString, false, numMatches, 0);
}
@VisibleForTesting
Map<String, Object> substringSearch(Path file,
String searchString,
int numMatches,
int startByteOffset) throws InvalidRequestException {
return substringSearch(file, searchString, false, numMatches, startByteOffset);
}
private Map<String, Object> substringSearch(Path file, String searchString, boolean isDaemon, Integer numMatches,
Integer startByteOffset) throws InvalidRequestException {
if (StringUtils.isEmpty(searchString)) {
throw new IllegalArgumentException("Precondition fails: search string should not be empty.");
}
if (searchString.getBytes(StandardCharsets.UTF_8).length > GREP_MAX_SEARCH_SIZE) {
throw new IllegalArgumentException("Precondition fails: the length of search string should be less than "
+ GREP_MAX_SEARCH_SIZE);
}
boolean isZipFile = file.toString().endsWith(".gz");
try (InputStream fis = Files.newInputStream(file)) {
try (InputStream gzippedInputStream = isZipFile ? new GZIPInputStream(fis) : fis;
BufferedInputStream stream = new BufferedInputStream(gzippedInputStream)) {
//It's more likely to be a file read exception here, so we don't differentiate
int fileLength = isZipFile ? (int) ServerUtils.zipFileSize(file.toFile()) : (int) Files.size(file);
ByteBuffer buf = ByteBuffer.allocate(GREP_BUF_SIZE);
final byte[] bufArray = buf.array();
final byte[] searchBytes = searchString.getBytes(StandardCharsets.UTF_8);
numMatches = numMatches != null ? numMatches : 10;
startByteOffset = startByteOffset != null ? startByteOffset : 0;
// Start at the part of the log file we are interested in.
// Allow searching when start-byte-offset == file-len so it doesn't blow up on 0-length files
if (startByteOffset > fileLength) {
throw new InvalidRequestException("Cannot search past the end of the file");
}
if (startByteOffset > 0) {
StreamUtil.skipBytes(stream, startByteOffset);
}
Arrays.fill(bufArray, (byte) 0);
int totalBytesRead = 0;
int bytesRead = stream.read(bufArray, 0, Math.min(fileLength, GREP_BUF_SIZE));
buf.limit(bytesRead);
totalBytesRead += bytesRead;
List<Map<String, Object>> initialMatches = new ArrayList<>();
int initBufOffset = 0;
int byteOffset = startByteOffset;
byte[] beforeBytes = null;
Map<String, Object> ret = new HashMap<>();
while (true) {
SubstringSearchResult searchRet = bufferSubstringSearch(isDaemon, file, fileLength, byteOffset, initBufOffset,
stream, startByteOffset, totalBytesRead, buf, searchBytes, initialMatches, numMatches, beforeBytes);
List<Map<String, Object>> matches = searchRet.getMatches();
Integer newByteOffset = searchRet.getNewByteOffset();
byte[] newBeforeBytes = searchRet.getNewBeforeBytes();
if (matches.size() < numMatches && totalBytesRead + startByteOffset < fileLength) {
// The start index is positioned to find any possible
// occurrence search string that did not quite fit in the
// buffer on the previous read.
final int newBufOffset = Math.min(buf.limit(), GREP_MAX_SEARCH_SIZE) - searchBytes.length;
totalBytesRead = rotateGrepBuffer(buf, stream, totalBytesRead, fileLength);
if (totalBytesRead < 0) {
throw new InvalidRequestException("Cannot search past the end of the file");
}
initialMatches = matches;
initBufOffset = newBufOffset;
byteOffset = newByteOffset;
beforeBytes = newBeforeBytes;
} else {
ret.put("isDaemon", isDaemon ? "yes" : "no");
Integer nextByteOffset = null;
if (matches.size() >= numMatches || totalBytesRead < fileLength) {
nextByteOffset = (Integer) last(matches).get("byteOffset") + searchBytes.length;
if (fileLength <= nextByteOffset) {
nextByteOffset = null;
}
}
ret.putAll(mkGrepResponse(searchBytes, startByteOffset, matches, nextByteOffset));
break;
}
}
return ret;
} catch (UnknownHostException | UnsupportedEncodingException e) {
throw new RuntimeException(e);
} catch (IOException e) {
numFileReadExceptions.mark();
throw new RuntimeException(e);
}
} catch (IOException e) {
numFileOpenExceptions.mark();
throw new RuntimeException(e);
}
}
@VisibleForTesting
Map<String, Object> substringSearchDaemonLog(Path file, String searchString) throws InvalidRequestException {
return substringSearch(file, searchString, true, 10, 0);
}
/**
* Get the filtered, authorized, sorted log files for a port.
*/
@VisibleForTesting
List<Path> logsForPort(String user, Path portDir) {
try {
List<Path> workerLogs = directoryCleaner.getFilesForDir(portDir).stream()
.filter(file -> WORKER_LOG_FILENAME_PATTERN.asPredicate().test(file.getFileName().toString()))
.collect(toList());
return workerLogs.stream()
.filter(log -> resourceAuthorizer.isUserAllowedToAccessFile(user, WorkerLogs.getTopologyPortWorkerLog(log)))
.map(Unchecked.function(p -> Pair.of(p, Files.getLastModifiedTime(p))))
.sorted(Comparator.comparing((Pair<Path, FileTime> p) -> p.getRight()).reversed())
.map(p -> p.getLeft())
.collect(toList());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* Find the first N matches of target string in files.
*
* @param logs all candidate log files to search
* @param numMatches number of matches expected
* @param fileOffset number of log files to skip initially
* @param startByteOffset number of byte to be ignored in each log file
* @param targetStr searched string
* @return all matched results
*/
@VisibleForTesting
Matched findNMatches(List<Path> logs, int numMatches, int fileOffset, int startByteOffset, String targetStr) {
logs = drop(logs, fileOffset);
LOG.debug("{} files to scan", logs.size());
List<Map<String, Object>> matches = new ArrayList<>();
int matchCount = 0;
int scannedFiles = 0;
while (true) {
if (logs.isEmpty()) {
//fileOffset = one past last scanned file
break;
}
Path firstLog = logs.get(0);
Map<String, Object> matchInLog;
try {
LOG.debug("Looking through {}", firstLog);
matchInLog = substringSearch(firstLog, targetStr, numMatches - matchCount, startByteOffset);
scannedFiles++;
} catch (InvalidRequestException e) {
LOG.error("Can't search past end of file.", e);
matchInLog = new HashMap<>();
}
String fileName = WorkerLogs.getTopologyPortWorkerLog(firstLog);
//This section simply put the formatted log filename and corresponding port in the matching.
final List<Map<String, Object>> newMatches = new ArrayList<>(matches);
Map<String, Object> currentFileMatch = new HashMap<>(matchInLog);
currentFileMatch.put("fileName", fileName);
Path firstLogAbsPath = firstLog.toAbsolutePath().normalize();
currentFileMatch.put("port", truncatePathToLastElements(firstLogAbsPath, 2).getName(0).toString());
newMatches.add(currentFileMatch);
int newCount = matchCount + ((List<?>) matchInLog.getOrDefault("matches", Collections.emptyList())).size();
if (newCount == matchCount) {
// matches and matchCount is not changed
logs = rest(logs);
startByteOffset = 0;
fileOffset = fileOffset + 1;
} else if (newCount >= numMatches) {
matches = newMatches;
//fileOffset = the index of last scanned file
break;
} else {
matches = newMatches;
logs = rest(logs);
startByteOffset = 0;
fileOffset = fileOffset + 1;
matchCount = newCount;
}
}
LOG.debug("scanned {} files", scannedFiles);
return new Matched(fileOffset, targetStr, matches, scannedFiles);
}
/**
* As the file is read into a buffer, 1/2 the buffer's size at a time, we search the buffer for matches of the substring and return a
* list of zero or more matches.
*/
private SubstringSearchResult bufferSubstringSearch(boolean isDaemon, Path file, int fileLength, int offsetToBuf,
int initBufOffset, BufferedInputStream stream, Integer bytesSkipped,
int bytesRead, ByteBuffer haystack, byte[] needle,
List<Map<String, Object>> initialMatches, Integer numMatches, byte[] beforeBytes)
throws IOException {
int bufOffset = initBufOffset;
List<Map<String, Object>> matches = initialMatches;
byte[] newBeforeBytes;
Integer newByteOffset;
while (true) {
int offset = offsetOfBytes(haystack.array(), needle, bufOffset);
if (matches.size() < numMatches && offset >= 0) {
final int fileOffset = offsetToBuf + offset;
final int bytesNeededAfterMatch = haystack.limit() - GREP_CONTEXT_SIZE - needle.length;
byte[] beforeArg = null;
byte[] afterArg = null;
if (offset < GREP_CONTEXT_SIZE) {
beforeArg = beforeBytes;
}
if (offset > bytesNeededAfterMatch) {
afterArg = tryReadAhead(stream, haystack, offset, fileLength, bytesRead);
}
bufOffset = offset + needle.length;
matches.add(mkMatchData(needle, haystack, offset, fileOffset,
file.toAbsolutePath().normalize(), isDaemon, beforeArg, afterArg));
} else {
int beforeStrToOffset = Math.min(haystack.limit(), GREP_MAX_SEARCH_SIZE);
int beforeStrFromOffset = Math.max(0, beforeStrToOffset - GREP_CONTEXT_SIZE);
newBeforeBytes = Arrays.copyOfRange(haystack.array(), beforeStrFromOffset, beforeStrToOffset);
// It's OK if new-byte-offset is negative.
// This is normal if we are out of bytes to read from a small file.
if (matches.size() >= numMatches) {
newByteOffset = ((Number) last(matches).get("byteOffset")).intValue() + needle.length;
} else {
newByteOffset = bytesSkipped + bytesRead - GREP_MAX_SEARCH_SIZE;
}
break;
}
}
return new SubstringSearchResult(matches, newByteOffset, newBeforeBytes);
}
private int rotateGrepBuffer(ByteBuffer buf, BufferedInputStream stream, int totalBytesRead, int fileLength) throws IOException {
byte[] bufArray = buf.array();
// Copy the 2nd half of the buffer to the first half.
System.arraycopy(bufArray, GREP_MAX_SEARCH_SIZE, bufArray, 0, GREP_MAX_SEARCH_SIZE);
// Zero-out the 2nd half to prevent accidental matches.
Arrays.fill(bufArray, GREP_MAX_SEARCH_SIZE, bufArray.length, (byte) 0);
// Fill the 2nd half with new bytes from the stream.
int bytesRead = stream.read(bufArray, GREP_MAX_SEARCH_SIZE, Math.min(fileLength, GREP_MAX_SEARCH_SIZE));
buf.limit(GREP_MAX_SEARCH_SIZE + bytesRead);
return totalBytesRead + bytesRead;
}
private Map<String, Object> mkMatchData(byte[] needle, ByteBuffer haystack, int haystackOffset, int fileOffset, Path canonicalPath,
boolean isDaemon, byte[] beforeBytes, byte[] afterBytes)
throws UnsupportedEncodingException, UnknownHostException {
String url;
if (isDaemon) {
url = urlToMatchCenteredInLogPageDaemonFile(needle, canonicalPath, fileOffset, logviewerPort);
} else {
url = urlToMatchCenteredInLogPage(needle, canonicalPath, fileOffset, logviewerPort);
}
byte[] haystackBytes = haystack.array();
String beforeString;
String afterString;
if (haystackOffset >= GREP_CONTEXT_SIZE) {
beforeString = new String(haystackBytes, (haystackOffset - GREP_CONTEXT_SIZE), GREP_CONTEXT_SIZE, "UTF-8");
} else {
int numDesired = Math.max(0, GREP_CONTEXT_SIZE - haystackOffset);
int beforeSize = beforeBytes != null ? beforeBytes.length : 0;
int numExpected = Math.min(beforeSize, numDesired);
if (numExpected > 0) {
StringBuilder sb = new StringBuilder();
sb.append(new String(beforeBytes, beforeSize - numExpected, numExpected, "UTF-8"));
sb.append(new String(haystackBytes, 0, haystackOffset, "UTF-8"));
beforeString = sb.toString();
} else {
beforeString = new String(haystackBytes, 0, haystackOffset, "UTF-8");
}
}
int needleSize = needle.length;
int afterOffset = haystackOffset + needleSize;
int haystackSize = haystack.limit();
if ((afterOffset + GREP_CONTEXT_SIZE) < haystackSize) {
afterString = new String(haystackBytes, afterOffset, GREP_CONTEXT_SIZE, "UTF-8");
} else {
int numDesired = GREP_CONTEXT_SIZE - (haystackSize - afterOffset);
int afterSize = afterBytes != null ? afterBytes.length : 0;
int numExpected = Math.min(afterSize, numDesired);
if (numExpected > 0) {
StringBuilder sb = new StringBuilder();
sb.append(new String(haystackBytes, afterOffset, (haystackSize - afterOffset), "UTF-8"));
sb.append(new String(afterBytes, 0, numExpected, "UTF-8"));
afterString = sb.toString();
} else {
afterString = new String(haystackBytes, afterOffset, (haystackSize - afterOffset), "UTF-8");
}
}
Map<String, Object> ret = new HashMap<>();
ret.put("byteOffset", fileOffset);
ret.put("beforeString", beforeString);
ret.put("afterString", afterString);
ret.put("matchString", new String(needle, "UTF-8"));
ret.put("logviewerURL", url);
return ret;
}
/**
* Tries once to read ahead in the stream to fill the context and resets the stream to its position before the call.
*/
private byte[] tryReadAhead(BufferedInputStream stream, ByteBuffer haystack, int offset, int fileLength, int bytesRead)
throws IOException {
int numExpected = Math.min(fileLength - bytesRead, GREP_CONTEXT_SIZE);
byte[] afterBytes = new byte[numExpected];
stream.mark(numExpected);
// Only try reading once.
stream.read(afterBytes, 0, numExpected);
stream.reset();
return afterBytes;
}
/**
* Searches a given byte array for a match of a sub-array of bytes. Returns the offset to the byte that matches, or -1 if no match was
* found.
*/
private int offsetOfBytes(byte[] buffer, byte[] search, int initOffset) {
if (search.length <= 0) {
throw new IllegalArgumentException("Search array should not be empty.");
}
if (initOffset < 0) {
throw new IllegalArgumentException("Start offset shouldn't be negative.");
}
int offset = initOffset;
int candidateOffset = initOffset;
int valOffset = 0;
int retOffset = 0;
while (true) {
if (search.length - valOffset <= 0) {
// found
retOffset = candidateOffset;
break;
} else {
if (offset >= buffer.length) {
// We ran out of buffer for the search.
retOffset = -1;
break;
} else {
if (search[valOffset] != buffer[offset]) {
// The match at this candidate offset failed, so start over with the
// next candidate byte from the buffer.
int newOffset = candidateOffset + 1;
offset = newOffset;
candidateOffset = newOffset;
valOffset = 0;
} else {
// So far it matches. Keep going...
offset = offset + 1;
valOffset = valOffset + 1;
}
}
}
}
return retOffset;
}
/**
* This response data only includes a next byte offset if there is more of the file to read.
*/
private Map<String, Object> mkGrepResponse(byte[] searchBytes, Integer offset, List<Map<String, Object>> matches,
Integer nextByteOffset) throws UnsupportedEncodingException {
Map<String, Object> ret = new HashMap<>();
ret.put("searchString", new String(searchBytes, "UTF-8"));
ret.put("startByteOffset", offset);
ret.put("matches", matches);
if (nextByteOffset != null) {
ret.put("nextByteOffset", nextByteOffset);
}
return ret;
}
@VisibleForTesting
String urlToMatchCenteredInLogPage(byte[] needle, Path canonicalPath, int offset, Integer port) throws UnknownHostException {
final String host = Utils.hostname();
final Path truncatedFilePath = truncatePathToLastElements(canonicalPath, 3);
Map<String, Object> parameters = new HashMap<>();
parameters.put("file", truncatedFilePath.toString());
parameters.put("start", Math.max(0, offset - (LogviewerConstant.DEFAULT_BYTES_PER_PAGE / 2) - (needle.length / -2)));
parameters.put("length", LogviewerConstant.DEFAULT_BYTES_PER_PAGE);
return UrlBuilder.build(String.format(this.scheme + "://%s:%d/api/v1/log", host, port), parameters);
}
@VisibleForTesting
String urlToMatchCenteredInLogPageDaemonFile(byte[] needle, Path canonicalPath, int offset, Integer port) throws UnknownHostException {
final String host = Utils.hostname();
final Path truncatedFilePath = truncatePathToLastElements(canonicalPath, 1);
Map<String, Object> parameters = new HashMap<>();
parameters.put("file", truncatedFilePath.toString());
parameters.put("start", Math.max(0, offset - (LogviewerConstant.DEFAULT_BYTES_PER_PAGE / 2) - (needle.length / -2)));
parameters.put("length", LogviewerConstant.DEFAULT_BYTES_PER_PAGE);
return UrlBuilder.build(String.format(this.scheme + "://%s:%d/api/v1/daemonlog", host, port), parameters);
}
@VisibleForTesting
public static class Matched implements JSONAware {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private int fileOffset;
private String searchString;
private List<Map<String, Object>> matches;
@JsonIgnore
private final int openedFiles;
/**
* Constructor.
*
* @param fileOffset offset (index) of the files
* @param searchString search string
* @param matches map representing matched search result
* @param openedFiles number of files scanned, used for metrics only
*/
public Matched(int fileOffset, String searchString, List<Map<String, Object>> matches, int openedFiles) {
this.fileOffset = fileOffset;
this.searchString = searchString;
this.matches = matches;
this.openedFiles = openedFiles;
}
public int getFileOffset() {
return fileOffset;
}
public String getSearchString() {
return searchString;
}
public List<Map<String, Object>> getMatches() {
return matches;
}
@Override
public String toJSONString() {
try {
return OBJECT_MAPPER.writeValueAsString(this);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}
private static class SubstringSearchResult {
private List<Map<String, Object>> matches;
private Integer newByteOffset;
private byte[] newBeforeBytes;
SubstringSearchResult(List<Map<String, Object>> matches, Integer newByteOffset, byte[] newBeforeBytes) {
this.matches = matches;
this.newByteOffset = newByteOffset;
this.newBeforeBytes = newBeforeBytes;
}
public List<Map<String, Object>> getMatches() {
return matches;
}
public Integer getNewByteOffset() {
return newByteOffset;
}
public byte[] getNewBeforeBytes() {
return newBeforeBytes;
}
}
}