blob: 693b3c77241704a39ceb5982abae14dfd0b9ed07 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote.protocol.http;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.AbstractTransaction;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.codec.StandardFlowFileCodec;
import org.apache.nifi.remote.io.http.HttpCommunicationsSession;
import org.apache.nifi.remote.protocol.Response;
import org.apache.nifi.remote.protocol.ResponseCode;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.web.api.entity.TransactionResultEntity;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
public class HttpClientTransaction extends AbstractTransaction {
private SiteToSiteRestApiClient apiClient;
private String transactionUrl;
public HttpClientTransaction(final int protocolVersion, final Peer peer, TransferDirection direction,
final boolean useCompression, final String portId, int penaltyMillis, EventReporter eventReporter) throws IOException {
super(peer, direction, useCompression, new StandardFlowFileCodec(), eventReporter, protocolVersion, penaltyMillis, portId);
}
public void initialize(SiteToSiteRestApiClient apiUtil, String transactionUrl) throws IOException {
this.transactionUrl = transactionUrl;
this.apiClient = apiUtil;
if(TransferDirection.RECEIVE.equals(direction)){
dataAvailable = apiUtil.openConnectionForReceive(transactionUrl, peer);
} else {
apiUtil.openConnectionForSend(transactionUrl, peer);
}
}
@Override
protected Response readTransactionResponse() throws IOException {
HttpCommunicationsSession commSession = (HttpCommunicationsSession) peer.getCommunicationsSession();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
if(TransferDirection.RECEIVE.equals(direction)){
switch (state){
case TRANSACTION_STARTED:
case DATA_EXCHANGED:
logger.debug("{} {} readTransactionResponse. checksum={}", this, peer, commSession.getChecksum());
if(StringUtils.isEmpty(commSession.getChecksum())){
// We don't know if there's more data to receive, so just continue it.
ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
} else {
// We got a checksum to send to server.
if (TransactionState.TRANSACTION_STARTED.equals(state)) {
logger.debug("{} {} There's no transaction to confirm.", this, peer);
ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, "");
} else {
TransactionResultEntity transactionResult
= apiClient.commitReceivingFlowFiles(transactionUrl, ResponseCode.CONFIRM_TRANSACTION, commSession.getChecksum());
ResponseCode responseCode = ResponseCode.fromCode(transactionResult.getResponseCode());
if(responseCode.containsMessage()){
String message = transactionResult.getMessage();
responseCode.writeResponse(dos, message == null ? "" : message);
} else {
responseCode.writeResponse(dos);
}
}
}
break;
}
} else {
switch (state){
case DATA_EXCHANGED:
// Some flow files have been sent via stream, finish transferring.
apiClient.finishTransferFlowFiles(commSession);
ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, commSession.getChecksum());
break;
case TRANSACTION_CONFIRMED:
TransactionResultEntity resultEntity = apiClient.commitTransferFlowFiles(transactionUrl, ResponseCode.CONFIRM_TRANSACTION);
ResponseCode responseCode = ResponseCode.fromCode(resultEntity.getResponseCode());
if(responseCode.containsMessage()){
responseCode.writeResponse(dos, resultEntity.getMessage());
} else {
responseCode.writeResponse(dos);
}
break;
}
}
ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
return Response.read(new DataInputStream(bis));
}
@Override
protected void writeTransactionResponse(ResponseCode response, String explanation) throws IOException {
HttpCommunicationsSession commSession = (HttpCommunicationsSession) peer.getCommunicationsSession();
if(TransferDirection.RECEIVE.equals(direction)){
switch (response) {
case CONFIRM_TRANSACTION:
logger.debug("{} Confirming transaction. checksum={}", this, explanation);
commSession.setChecksum(explanation);
break;
case TRANSACTION_FINISHED:
logger.debug("{} Finishing transaction.", this);
break;
case CANCEL_TRANSACTION:
logger.debug("{} Canceling transaction. explanation={}", this, explanation);
TransactionResultEntity resultEntity = apiClient.commitReceivingFlowFiles(transactionUrl, ResponseCode.CANCEL_TRANSACTION, null);
ResponseCode cancelResponse = ResponseCode.fromCode(resultEntity.getResponseCode());
switch (cancelResponse) {
case CANCEL_TRANSACTION:
logger.debug("{} CANCEL_TRANSACTION, The transaction is canceled on server properly.", this);
break;
default:
logger.warn("{} CANCEL_TRANSACTION, Expected the transaction is canceled on server, but received {}.", this, cancelResponse);
break;
}
break;
}
} else {
switch (response) {
case FINISH_TRANSACTION:
// The actual HTTP request will be sent in readTransactionResponse.
logger.debug("{} Finished sending flow files.", this);
break;
case BAD_CHECKSUM: {
TransactionResultEntity resultEntity = apiClient.commitTransferFlowFiles(transactionUrl, ResponseCode.BAD_CHECKSUM);
ResponseCode badChecksumCancelResponse = ResponseCode.fromCode(resultEntity.getResponseCode());
switch (badChecksumCancelResponse) {
case CANCEL_TRANSACTION:
logger.debug("{} BAD_CHECKSUM, The transaction is canceled on server properly.", this);
break;
default:
logger.warn("{} BAD_CHECKSUM, Expected the transaction is canceled on server, but received {}.", this, badChecksumCancelResponse);
break;
}
}
break;
case CONFIRM_TRANSACTION:
// The actual HTTP request will be sent in readTransactionResponse.
logger.debug("{} Transaction is confirmed.", this);
break;
case CANCEL_TRANSACTION: {
logger.debug("{} Canceling transaction.", this);
TransactionResultEntity resultEntity = apiClient.commitTransferFlowFiles(transactionUrl, ResponseCode.CANCEL_TRANSACTION);
ResponseCode cancelResponse = ResponseCode.fromCode(resultEntity.getResponseCode());
switch (cancelResponse) {
case CANCEL_TRANSACTION:
logger.debug("{} CANCEL_TRANSACTION, The transaction is canceled on server properly.", this);
break;
default:
logger.warn("{} CANCEL_TRANSACTION, Expected the transaction is canceled on server, but received {}.", this, cancelResponse);
break;
}
}
break;
}
}
}
@Override
protected void close() throws IOException {
if (apiClient != null) {
apiClient.close();
}
}
}