blob: 3db14495a9497c7ef43547c35f1b67d3ef7b6fb8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.commons.clam.internal;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.apache.commons.io.IOUtils;
import org.apache.sling.commons.clam.ClamService;
import org.apache.sling.commons.clam.ScanResult;
import org.apache.sling.commons.clam.ScanResult.Status;
import org.apache.sling.commons.content.analyzing.ContentAnalyzer;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.osgi.framework.Constants;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Modified;
import org.osgi.service.metatype.annotations.Designate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Service for scanning data with Clam daemon.<br>Connections are established via TCP/IP.
*/
@Component(
property = {
Constants.SERVICE_DESCRIPTION + "=Sling Commons Clamd Service",
Constants.SERVICE_VENDOR + "=The Apache Software Foundation"
}
)
@Designate(
ocd = ClamdServiceConfiguration.class
)
public final class ClamdService implements ClamService, ContentAnalyzer {
private ClamdServiceConfiguration configuration;
private static final byte[] PING_COMMAND = "nPING\n".getBytes(StandardCharsets.US_ASCII);
private static final byte[] PONG_REPLY = "PONG\n".getBytes(StandardCharsets.US_ASCII);
private static final byte[] INSTREAM_COMMAND = "nINSTREAM\n".getBytes(StandardCharsets.US_ASCII);
private static final String OK_REPLY_PATTERN = "stream: OK";
private static final String FOUND_REPLY_PATTERN = "stream: .+ FOUND";
private static final String INSTREAM_SIZE_LIMIT_EXCEEDED_PATTERN = "INSTREAM size limit exceeded. ERROR";
// length is the size of the following data in bytes expressed as a 4 byte unsigned integer in network byte order
private static final int CHUNK_LENGTH_DATUM_SIZE = 4;
private final Logger logger = LoggerFactory.getLogger(ClamdService.class);
public ClamdService() { //
}
@Activate
private void activate(final ClamdServiceConfiguration configuration) {
logger.debug("activating");
configure(configuration);
}
@Modified
private void modified(final ClamdServiceConfiguration configuration) {
logger.debug("modifying");
configure(configuration);
}
@Deactivate
private void deactivate() {
logger.debug("deactivating");
}
private void configure(final ClamdServiceConfiguration configuration) {
this.configuration = configuration;
playPingPong();
}
@Override
public @NotNull ScanResult scan(@NotNull final InputStream inputStream) throws IOException {
try {
return doInstream(inputStream);
} catch (InstreamSizeLimitExceededException e) {
logger.error("doing INSTREAM failed", e);
return new ScanResult(ScanResult.Status.ERROR, e.getMessage(), e.getStarted(), e.getSize());
}
}
@Override
public @NotNull CompletableFuture<Void> analyze(@NotNull final InputStream input, @Nullable final Map<String, Object> parameters, @NotNull final Map<String, Object> report) {
return CompletableFuture.runAsync(() -> {
try {
scan(input, report);
} catch (IOException e) {
throw new CompletionException(e);
}
});
}
private void scan(@NotNull final InputStream inputStream, @NotNull final Map<String, Object> report) throws IOException {
final ScanResult scanResult = scan(inputStream);
report.put("sling.commons.clam.scanresult.timestamp", scanResult.getTimestamp());
report.put("sling.commons.clam.scanresult.status", scanResult.getStatus());
report.put("sling.commons.clam.scanresult.message", scanResult.getMessage());
report.put("sling.commons.clam.scanresult.started", scanResult.getStarted());
report.put("sling.commons.clam.scanresult.size", scanResult.getSize());
}
private byte[] doPing() throws IOException {
logger.info("pinging clam daemon at {}:{}", configuration.clamd_host(), configuration.clamd_port());
try (Socket socket = new Socket(configuration.clamd_host(), configuration.clamd_port());
OutputStream out = new BufferedOutputStream(socket.getOutputStream());
InputStream in = socket.getInputStream()) {
socket.setSoTimeout(configuration.connection_timeout());
// send command
out.write(PING_COMMAND);
out.flush();
return IOUtils.toByteArray(in);
}
}
private void playPingPong() {
try {
final byte[] reply = doPing();
if (Arrays.equals(reply, PONG_REPLY)) {
logger.info("clam daemon replied with PONG");
} else {
final String message = new String(reply, StandardCharsets.US_ASCII);
logger.error("clam daemon replied with unknown message: {}", message);
}
} catch (IOException e) {
logger.error("pinging clam daemon failed: {}", e.getMessage());
}
}
/**
* man (8) clamd
* INSTREAM
* It is mandatory to prefix this command with n or z.
* Scan a stream of data. The stream is sent to clamd in chunks, after INSTREAM, on the same socket on which the
* command was sent. This avoids the overhead of establishing new TCP connections and problems with NAT.
* The format of the chunk is: '&lt;length&gt;&lt;data&gt;' where &lt;length&gt; is the size of the following data in bytes
* expressed as a 4 byte unsigned integer in network byte order and &lt;data&gt; is the actual chunk.
* Streaming is terminated by sending a zero-length chunk.
* Note: do not exceed StreamMaxLength as defined in clamd.conf, otherwise clamd will reply with INSTREAM size
* limit exceeded and close the connection.
*
* @param inputStream data sent to clamd in chunks
* @return scan result from clamd
*/
private ScanResult doInstream(final InputStream inputStream) throws IOException, InstreamSizeLimitExceededException {
logger.info("connecting to clam daemon at {}:{} for scanning", configuration.clamd_host(), configuration.clamd_port());
final long started = System.currentTimeMillis();
try (Socket socket = new Socket(configuration.clamd_host(), configuration.clamd_port());
OutputStream out = new BufferedOutputStream(socket.getOutputStream());
InputStream in = socket.getInputStream()) {
socket.setSoTimeout(configuration.connection_timeout());
// send command
out.write(INSTREAM_COMMAND);
out.flush();
// send data in chunks
final byte[] data = new byte[configuration.chunk_length()];
long total = 0;
int read = inputStream.read(data);
while (read >= 0) {
logger.trace("current chunk length: {}", read);
total = total + read;
final byte[] length = ByteBuffer.allocate(CHUNK_LENGTH_DATUM_SIZE).putInt(read).array();
out.write(length);
out.write(data, 0, read);
// handle premature reply
if (in.available() > 0) {
logger.info("total bytes sent: {}", total);
final byte[] reply = IOUtils.toByteArray(in);
throw new InstreamSizeLimitExceededException(reply, started, total);
}
read = inputStream.read(data);
}
logger.info("total bytes sent: {}", total);
// terminate by sending a zero-length chunk
out.write(new byte[]{0, 0, 0, 0});
out.flush();
// return reply on complete
final byte[] reply = IOUtils.toByteArray(in);
return parseClamdReply(reply, started, total);
}
}
private ScanResult parseClamdReply(final byte[] reply, final long started, final long size) {
final String message = new String(reply, StandardCharsets.US_ASCII).trim();
logger.info("reply message from clam daemon: '{}'", message);
final ScanResult result;
if (message.matches(OK_REPLY_PATTERN)) {
result = new ScanResult(Status.OK, message, started, size);
} else if (message.matches(FOUND_REPLY_PATTERN)) {
result = new ScanResult(Status.FOUND, message, started, size);
} else if (message.matches(INSTREAM_SIZE_LIMIT_EXCEEDED_PATTERN)) {
result = new ScanResult(Status.ERROR, message, started, size);
} else {
result = new ScanResult(Status.UNKNOWN, message, started, size);
}
return result;
}
}