blob: f3240cf2aa6ecbd6cb3d2577152a60269e3c2235 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.plc4x.java.scraper;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Plc Scraper Task that scrapes one source.
* One {@link ScrapeJob} gets split into multiple tasks.
* One task for each source that is defined in the {@link org.apache.plc4x.java.scraper.config.JobConfiguration}.
*/
public class ScraperTask implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(ScraperTask.class);
private final PlcDriverManager driverManager;
private final String jobName;
private final String connectionAlias;
private final String connectionString;
private final Map<String, String> fields;
private final long requestTimeoutMs;
private final ExecutorService handlerService;
private final ResultHandler resultHandler;
private final AtomicLong requestCounter = new AtomicLong(0);
private final AtomicLong successCounter = new AtomicLong(0);
private final DescriptiveStatistics latencyStatistics = new DescriptiveStatistics(1000);
private final DescriptiveStatistics failedStatistics = new DescriptiveStatistics(1000);
public ScraperTask(PlcDriverManager driverManager, String jobName, String connectionAlias, String connectionString,
Map<String, String> fields, long requestTimeoutMs, ExecutorService handlerService, ResultHandler resultHandler) {
Validate.notNull(driverManager);
Validate.notBlank(jobName);
Validate.notBlank(connectionAlias);
Validate.notBlank(connectionString);
Validate.notEmpty(fields);
Validate.isTrue(requestTimeoutMs > 0);
Validate.notNull(resultHandler);
this.driverManager = driverManager;
this.jobName = jobName;
this.connectionAlias = connectionAlias;
this.connectionString = connectionString;
this.fields = fields;
this.requestTimeoutMs = requestTimeoutMs;
this.handlerService = handlerService;
this.resultHandler = resultHandler;
}
@Override
public void run() {
// Does a single fetch
LOGGER.trace("Start new scrape of task of job {} for connection {}", jobName, connectionAlias);
requestCounter.incrementAndGet();
StopWatch stopWatch = new StopWatch();
stopWatch.start();
PlcConnection connection = null;
try {
CompletableFuture<PlcConnection> future = CompletableFuture.supplyAsync(() -> {
try {
return driverManager.getConnection(connectionString);
} catch (PlcConnectionException e) {
LOGGER.warn("Unable to instantiate connection to " + connectionString, e);
throw new PlcRuntimeException(e);
}
}, handlerService);
connection = future.get(10*requestTimeoutMs, TimeUnit.MILLISECONDS);
LOGGER.trace("Connection to {} established: {}", connectionString, connection);
PlcReadResponse response;
try {
PlcReadRequest.Builder builder = connection.readRequestBuilder();
fields.forEach((alias,qry) -> {
LOGGER.trace("Requesting: {} -> {}", alias, qry);
builder.addItem(alias,qry);
});
response = builder
.build()
.execute()
.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
// Handle execution exception
handleException(e);
return;
}
// Add statistics
stopWatch.stop();
latencyStatistics.addValue(stopWatch.getNanoTime());
failedStatistics.addValue(0.0);
successCounter.incrementAndGet();
// Validate response
validateResponse(response);
// Handle response (Async)
CompletableFuture.runAsync(() -> resultHandler.handle(jobName, connectionAlias, transformResponseToMap(response)), handlerService);
} catch (Exception e) {
LOGGER.debug("Exception during scrape", e);
handleException(e);
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception e) {
// intentionally do nothing
}
}
}
}
private void validateResponse(PlcReadResponse response) {
Map<String, PlcResponseCode> failedFields = response.getFieldNames().stream()
.filter(name -> !PlcResponseCode.OK.equals(response.getResponseCode(name)))
.collect(Collectors.toMap(
Function.identity(),
response::getResponseCode
));
if (failedFields.size() > 0) {
handleErrorResponse(failedFields);
}
}
private Map<String, Object> transformResponseToMap(PlcReadResponse response) {
return response.getFieldNames().stream()
.collect(Collectors.toMap(
name -> name,
response::getObject
));
}
public String getJobName() {
return jobName;
}
public String getConnectionAlias() {
return connectionAlias;
}
public long getRequestCounter() {
return requestCounter.get();
}
public long getSuccessfullRequestCounter() {
return successCounter.get();
}
public DescriptiveStatistics getLatencyStatistics() {
return latencyStatistics;
}
public double getPercentageFailed() {
return 100.0*failedStatistics.getMean();
}
public void handleException(Exception e) {
LOGGER.debug("Exception: ", e);
failedStatistics.addValue(1.0);
}
public void handleErrorResponse(Map<String, PlcResponseCode> failed) {
LOGGER.warn("Handling error responses: {}", failed);
}
}