blob: d5f45b6ca281da8a60a83fbbab767697f390a92e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.corda;
import java.util.List;
import net.corda.core.contracts.ContractState;
import net.corda.core.messaging.CordaRPCOps;
import net.corda.core.messaging.DataFeed;
import net.corda.core.messaging.FlowProgressHandle;
import net.corda.core.messaging.StateMachineInfo;
import net.corda.core.messaging.StateMachineTransactionMapping;
import net.corda.core.messaging.StateMachineUpdate;
import net.corda.core.node.NodeInfo;
import net.corda.core.node.services.NetworkMapCache;
import net.corda.core.node.services.Vault;
import net.corda.core.node.services.vault.PageSpecification;
import net.corda.core.node.services.vault.QueryCriteria;
import net.corda.core.node.services.vault.Sort;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.support.DefaultConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;
import static org.apache.camel.component.corda.CordaConstants.NETWORK_MAP_FEED;
import static org.apache.camel.component.corda.CordaConstants.START_TRACKED_FLOW_DYNAMIC;
import static org.apache.camel.component.corda.CordaConstants.STATE_MACHINE_FEED;
import static org.apache.camel.component.corda.CordaConstants.STATE_MACHINE_RECORDED_TRANSACTION_MAPPING_FEED;
import static org.apache.camel.component.corda.CordaConstants.VAULT_TRACK;
import static org.apache.camel.component.corda.CordaConstants.VAULT_TRACK_BY;
import static org.apache.camel.component.corda.CordaConstants.VAULT_TRACK_BY_CRITERIA;
import static org.apache.camel.component.corda.CordaConstants.VAULT_TRACK_BY_WITH_PAGING_SPEC;
import static org.apache.camel.component.corda.CordaConstants.VAULT_TRACK_BY_WITH_SORTING;
/**
* The corda consumer.
*/
public class CordaConsumer extends DefaultConsumer {
private static final Logger LOG = LoggerFactory.getLogger(CordaConsumer.class);
private final CordaConfiguration configuration;
private CordaRPCOps cordaRPCOps;
private Subscription subscription;
public CordaConsumer(CordaEndpoint endpoint, Processor processor, CordaConfiguration configuration, CordaRPCOps cordaRPCOps) {
super(endpoint, processor);
this.configuration = configuration;
this.cordaRPCOps = cordaRPCOps;
}
@Override
public CordaEndpoint getEndpoint() {
return (CordaEndpoint) super.getEndpoint();
}
@Override
protected void doStart() throws Exception {
super.doStart();
Exchange exchange = this.getEndpoint().createExchange();
Class<ContractState> contractStateClass = configuration.getContractStateClass();
QueryCriteria criteria = configuration.getQueryCriteria();
PageSpecification pageSpec = configuration.getPageSpecification();
Sort sorting = configuration.getSort();
DataFeed<Vault.Page<ContractState>, Vault.Update<ContractState>> pageUpdateDataFeed = null;
switch (configuration.getOperation()) {
case VAULT_TRACK:
LOG.debug("subscribing for operation: " + VAULT_TRACK);
pageUpdateDataFeed = cordaRPCOps.vaultTrack(contractStateClass);
processSnapshot(exchange, pageUpdateDataFeed.getSnapshot());
subscription = pageUpdateDataFeed.getUpdates().subscribe(
x -> processContractStateUpdate(x),
t -> processError(t, CordaConstants.VAULT_TRACK),
() -> processDone(CordaConstants.VAULT_TRACK)
);
break;
case VAULT_TRACK_BY:
LOG.debug("subscribing for operation: " + VAULT_TRACK_BY);
pageUpdateDataFeed = cordaRPCOps.vaultTrackBy(criteria, pageSpec, sorting, contractStateClass);
processSnapshot(exchange, pageUpdateDataFeed.getSnapshot());
subscription = pageUpdateDataFeed.getUpdates().subscribe(
x -> processContractStateUpdate(x),
t -> processError(t, CordaConstants.VAULT_TRACK_BY),
() -> processDone(CordaConstants.VAULT_TRACK_BY)
);
break;
case VAULT_TRACK_BY_CRITERIA:
LOG.debug("subscribing for operation: " + VAULT_TRACK_BY_CRITERIA);
pageUpdateDataFeed = cordaRPCOps.vaultTrackByCriteria(contractStateClass, criteria);
processSnapshot(exchange, pageUpdateDataFeed.getSnapshot());
subscription = pageUpdateDataFeed.getUpdates().subscribe(
x -> processContractStateUpdate(x),
t -> processError(t, CordaConstants.VAULT_TRACK_BY_CRITERIA),
() -> processDone(CordaConstants.VAULT_TRACK_BY_CRITERIA)
);
break;
case VAULT_TRACK_BY_WITH_PAGING_SPEC:
LOG.debug("subscribing for operation: " + VAULT_TRACK_BY_WITH_PAGING_SPEC);
pageUpdateDataFeed = cordaRPCOps.vaultTrackByWithPagingSpec(contractStateClass, criteria, pageSpec);
processSnapshot(exchange, pageUpdateDataFeed.getSnapshot());
subscription = pageUpdateDataFeed.getUpdates().subscribe(
x -> processContractStateUpdate(x),
t -> processError(t, CordaConstants.VAULT_TRACK_BY_WITH_PAGING_SPEC),
() -> processDone(CordaConstants.VAULT_TRACK_BY_WITH_PAGING_SPEC)
);
break;
case VAULT_TRACK_BY_WITH_SORTING:
LOG.debug("subscribing for operation: " + VAULT_TRACK_BY_WITH_SORTING);
pageUpdateDataFeed = cordaRPCOps.vaultTrackByWithSorting(contractStateClass, criteria, sorting);
processSnapshot(exchange, pageUpdateDataFeed.getSnapshot());
subscription = pageUpdateDataFeed.getUpdates().subscribe(
x -> processContractStateUpdate(x),
t -> processError(t, CordaConstants.VAULT_TRACK_BY_WITH_SORTING),
() -> processDone(CordaConstants.VAULT_TRACK_BY_WITH_SORTING)
);
break;
case STATE_MACHINE_FEED:
LOG.debug("subscribing for operation: " + STATE_MACHINE_FEED);
DataFeed<List<StateMachineInfo>, StateMachineUpdate> stateFeed = cordaRPCOps.stateMachinesFeed();
processSnapshot(exchange, stateFeed.getSnapshot());
subscription = stateFeed.getUpdates().subscribe(
x -> processStateMachineUpdate(x),
t -> processError(t, CordaConstants.STATE_MACHINE_FEED),
() -> processDone(CordaConstants.STATE_MACHINE_FEED)
);
break;
case NETWORK_MAP_FEED:
LOG.debug("subscribing for operation: " + NETWORK_MAP_FEED);
DataFeed<List<NodeInfo>, NetworkMapCache.MapChange> networkMapFeed = cordaRPCOps.networkMapFeed();
processSnapshot(exchange, networkMapFeed.getSnapshot());
subscription = networkMapFeed.getUpdates().subscribe(
x -> proceedNetworkMapFeed(x),
t -> processError(t, CordaConstants.NETWORK_MAP_FEED),
() -> processDone(CordaConstants.NETWORK_MAP_FEED)
);
break;
case STATE_MACHINE_RECORDED_TRANSACTION_MAPPING_FEED:
LOG.debug("subscribing for operation: " + STATE_MACHINE_RECORDED_TRANSACTION_MAPPING_FEED);
DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping> transactionFeed = cordaRPCOps.stateMachineRecordedTransactionMappingFeed();
processSnapshot(exchange, transactionFeed.getSnapshot());
subscription = transactionFeed.getUpdates().subscribe(
x -> processTransactionMappingFeed(x),
t -> processError(t, CordaConstants.STATE_MACHINE_RECORDED_TRANSACTION_MAPPING_FEED),
() -> processDone(CordaConstants.STATE_MACHINE_RECORDED_TRANSACTION_MAPPING_FEED)
);
break;
case START_TRACKED_FLOW_DYNAMIC:
LOG.debug("subscribing for operation: " + START_TRACKED_FLOW_DYNAMIC);
FlowProgressHandle<Object> objectFlowProgressHandle = cordaRPCOps.startTrackedFlowDynamic(configuration.getFlowLociClass(), configuration.getArguments());
Object result = objectFlowProgressHandle.getReturnValue().get();
Observable<String> progress = objectFlowProgressHandle.getProgress();
processSnapshot(exchange, result);
subscription = progress.subscribe(
x -> processFlowProcess(x),
t -> processError(t, CordaConstants.START_TRACKED_FLOW_DYNAMIC),
() -> processDone(CordaConstants.START_TRACKED_FLOW_DYNAMIC)
);
break;
default:
throw new IllegalArgumentException("Unsupported operation " + configuration.getOperation());
}
LOG.info("Subscribed: {}", this.configuration);
}
private void processSnapshot(Exchange exchange, Object page) {
if (configuration.isProcessSnapshot()) {
try {
exchange.getIn().setBody(page);
getProcessor().process(exchange);
} catch (Exception e) {
LOG.error("Error processing snapshot", e);
}
}
}
private void processFlowProcess(String x) {
LOG.debug("processFlowProcess {}", x);
Exchange exchange = this.getEndpoint().createExchange();
exchange.getIn().setBody(x);
processEvent(exchange);
}
private void processTransactionMappingFeed(StateMachineTransactionMapping x) {
LOG.debug("processTransactionMappingFeed {}", x);
Exchange exchange = this.getEndpoint().createExchange();
exchange.getIn().setBody(x);
processEvent(exchange);
}
private void proceedNetworkMapFeed(NetworkMapCache.MapChange x) {
LOG.debug("proceedNetworkMapFeed {}", x);
Exchange exchange = this.getEndpoint().createExchange();
exchange.getIn().setBody(x);
processEvent(exchange);
}
private void processStateMachineUpdate(StateMachineUpdate x) {
LOG.debug("processStateMachineUpdate {}", x);
Exchange exchange = this.getEndpoint().createExchange();
exchange.getIn().setBody(x);
processEvent(exchange);
}
private void processContractStateUpdate(Vault.Update<ContractState> x) {
LOG.debug("processContractStateUpdate {}", x);
Exchange exchange = this.getEndpoint().createExchange();
exchange.getIn().setBody(x);
processEvent(exchange);
}
private void processError(Throwable throwable, String operation) {
LOG.debug("processError for operation: " + operation + " " + throwable);
Exchange exchange = this.getEndpoint().createExchange();
exchange.setException(throwable);
processEvent(exchange);
}
public void processEvent(Exchange exchange) {
LOG.debug("processEvent {}", exchange);
try {
getProcessor().process(exchange);
} catch (Exception e) {
LOG.error("Error processing event ", e);
}
}
private void processDone(String operation) {
LOG.debug("processDone for operation: {}", operation);
}
@Override
protected void doStop() throws Exception {
if (subscription != null) {
subscription.unsubscribe();
}
super.doStop();
}
}