blob: e08a3acce3616dffd4a4d35a0ed307106a7b5b5f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.remote.protocol.FlowFileTransaction;
import org.apache.nifi.remote.protocol.HandshakenProperties;
import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocol;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.nifi.util.NiFiProperties.DEFAULT_SITE_TO_SITE_HTTP_TRANSACTION_TTL;
import static org.apache.nifi.util.NiFiProperties.SITE_TO_SITE_HTTP_TRANSACTION_TTL;
public class HttpRemoteSiteListener implements RemoteSiteListener {
private static final Logger logger = LoggerFactory.getLogger(HttpRemoteSiteListener.class);
private final int transactionTtlSec;
private static HttpRemoteSiteListener instance;
private final Map<String, TransactionWrapper> transactions = new ConcurrentHashMap<>();
private final ScheduledExecutorService taskExecutor;
private ProcessGroup rootGroup;
private ScheduledFuture<?> transactionMaintenanceTask;
private HttpRemoteSiteListener() {
super();
taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
@Override
public Thread newThread(final Runnable r) {
final Thread thread = defaultFactory.newThread(r);
thread.setName("Http Site-to-Site Transaction Maintenance");
thread.setDaemon(true);
return thread;
}
});
NiFiProperties properties = NiFiProperties.getInstance();
int txTtlSec;
try {
final String snapshotFrequency = properties.getProperty(SITE_TO_SITE_HTTP_TRANSACTION_TTL, DEFAULT_SITE_TO_SITE_HTTP_TRANSACTION_TTL);
txTtlSec = (int) FormatUtils.getTimeDuration(snapshotFrequency, TimeUnit.SECONDS);
} catch (final Exception e) {
txTtlSec = (int) FormatUtils.getTimeDuration(DEFAULT_SITE_TO_SITE_HTTP_TRANSACTION_TTL, TimeUnit.SECONDS);
logger.warn("Failed to parse {} due to {}, use default as {} secs.",
SITE_TO_SITE_HTTP_TRANSACTION_TTL, e.getMessage(), txTtlSec);
}
transactionTtlSec = txTtlSec;
}
public static HttpRemoteSiteListener getInstance() {
if (instance == null) {
synchronized (HttpRemoteSiteListener.class) {
if (instance == null) {
instance = new HttpRemoteSiteListener();
}
}
}
return instance;
}
private class TransactionWrapper {
private final FlowFileTransaction transaction;
private final HandshakenProperties handshakenProperties;
private long lastCommunicationAt;
private TransactionWrapper(final FlowFileTransaction transaction, final HandshakenProperties handshakenProperties) {
this.transaction = transaction;
this.handshakenProperties = handshakenProperties;
this.lastCommunicationAt = System.currentTimeMillis();
}
private boolean isExpired() {
long elapsedMillis = System.currentTimeMillis() - lastCommunicationAt;
long elapsedSec = TimeUnit.SECONDS.convert(elapsedMillis, TimeUnit.MILLISECONDS);
return elapsedSec > transactionTtlSec;
}
private void extend() {
lastCommunicationAt = System.currentTimeMillis();
}
}
@Override
public void setRootGroup(ProcessGroup rootGroup) {
this.rootGroup = rootGroup;
}
public void setupServerProtocol(HttpFlowFileServerProtocol serverProtocol) {
serverProtocol.setRootProcessGroup(rootGroup);
}
@Override
public void start() throws IOException {
transactionMaintenanceTask = taskExecutor.scheduleWithFixedDelay(() -> {
int originalSize = transactions.size();
logger.trace("Transaction maintenance task started.");
try {
Set<String> transactionIds = transactions.keySet().stream().collect(Collectors.toSet());
transactionIds.stream().filter(tid -> !isTransactionActive(tid))
.forEach(tid -> cancelTransaction(tid));
} catch (Exception e) {
// Swallow exception so that this thread can keep working.
logger.error("An exception occurred while maintaining transactions", e);
}
logger.debug("Transaction maintenance task finished. originalSize={}, currentSize={}", originalSize, transactions.size());
}, 0, transactionTtlSec / 2, TimeUnit.SECONDS);
}
public void cancelTransaction(String transactionId) {
TransactionWrapper wrapper = transactions.remove(transactionId);
if (wrapper == null) {
logger.debug("The transaction was not found. transactionId={}", transactionId);
} else {
logger.debug("Cancel a transaction. transactionId={}", transactionId);
FlowFileTransaction t = wrapper.transaction;
if(t != null && t.getSession() != null){
logger.info("Cancel a transaction, rollback its session. transactionId={}", transactionId);
try {
t.getSession().rollback();
} catch (Exception e) {
// Swallow exception so that it can keep expiring other transactions.
logger.error("Failed to rollback. transactionId={}", transactionId, e);
}
}
}
}
@Override
public void stop() {
if(transactionMaintenanceTask != null) {
logger.debug("Stopping transactionMaintenanceTask...");
transactionMaintenanceTask.cancel(true);
}
}
public String createTransaction() {
final String transactionId = UUID.randomUUID().toString();
transactions.put(transactionId, new TransactionWrapper(null, null));
logger.debug("Created a new transaction: {}", transactionId);
return transactionId;
}
public boolean isTransactionActive(final String transactionId) {
TransactionWrapper transaction = transactions.get(transactionId);
return isTransactionActive(transaction);
}
private boolean isTransactionActive(TransactionWrapper transaction) {
if (transaction == null) {
return false;
}
if (transaction.isExpired()) {
return false;
}
return true;
}
/**
* @param transactionId transactionId to check
* @return Returns a HandshakenProperties instance which is created when this transaction is started,
* only if the transaction is active,
* and it holds a HandshakenProperties,
* otherwise return null
*/
public HandshakenProperties getHandshakenProperties(final String transactionId) {
TransactionWrapper transaction = transactions.get(transactionId);
if (isTransactionActive(transaction)) {
return transaction.handshakenProperties;
}
return null;
}
public void holdTransaction(final String transactionId, final FlowFileTransaction transaction,
final HandshakenProperties handshakenProperties) throws IllegalStateException {
// We don't check expiration of the transaction here, to support large file transport or slow network.
// The availability of current transaction is already checked when the HTTP request was received at SiteToSiteResource.
TransactionWrapper currentTransaction = transactions.remove(transactionId);
if (currentTransaction == null) {
logger.debug("The transaction was not found, it looks it took longer than transaction TTL.");
} else if (currentTransaction.transaction != null) {
throw new IllegalStateException("Transaction has already been processed. It can only be finalized. transactionId=" + transactionId);
}
if (transaction.getSession() == null) {
throw new IllegalStateException("Passed transaction is not associated any session yet, can not hold. transactionId=" + transactionId);
}
logger.debug("Holding a transaction: {}", transactionId);
// Server has received or sent all data, and transaction TTL count down starts here.
// However, if the client doesn't consume data fast enough, server might expire and rollback the transaction.
transactions.put(transactionId, new TransactionWrapper(transaction, handshakenProperties));
}
public FlowFileTransaction finalizeTransaction(final String transactionId) throws IllegalStateException {
if (!isTransactionActive(transactionId)){
throw new IllegalStateException("Transaction was not found or not active anymore. transactionId=" + transactionId);
}
TransactionWrapper transaction = transactions.remove(transactionId);
if (transaction == null) {
throw new IllegalStateException("Transaction was not found anymore. It's already finalized or expired. transactionId=" + transactionId);
}
if (transaction.transaction == null) {
throw new IllegalStateException("Transaction has not started yet.");
}
logger.debug("Finalized a transaction: {}", transactionId);
return transaction.transaction;
}
public void extendTransaction(final String transactionId) throws IllegalStateException {
if (!isTransactionActive(transactionId)){
throw new IllegalStateException("Transaction was not found or not active anymore. transactionId=" + transactionId);
}
TransactionWrapper transaction = transactions.get(transactionId);
if (transaction != null) {
logger.debug("Extending transaction TTL, transactionId={}", transactionId);
transaction.extend();
}
}
public int getTransactionTtlSec() {
return transactionTtlSec;
}
}