blob: bb2d17c7da58219d48c08efb3f332f96f799bef6 [file] [log] [blame]
/*
* Copyright 2001-2008 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.juddi.api.impl;
import java.io.StringWriter;
import java.math.BigInteger;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedDeque;
import javax.jws.WebParam;
import javax.jws.WebResult;
import javax.jws.WebService;
import javax.jws.soap.SOAPBinding;
import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction;
import javax.persistence.Query;
import javax.xml.bind.JAXB;
import javax.xml.bind.annotation.XmlSeeAlso;
import javax.xml.ws.BindingProvider;
import org.apache.commons.configuration.ConfigurationException;
import static org.apache.juddi.api.impl.AuthenticatedService.logger;
import org.apache.juddi.api.util.QueryStatus;
import org.apache.juddi.api.util.ReplicationQuery;
import org.apache.juddi.config.AppConfig;
import org.apache.juddi.config.PersistenceManager;
import org.apache.juddi.config.Property;
import org.apache.juddi.mapping.MappingApiToModel;
import org.apache.juddi.mapping.MappingModelToApi;
import org.apache.juddi.model.BindingTemplate;
import org.apache.juddi.model.BusinessEntity;
import org.apache.juddi.model.BusinessService;
import org.apache.juddi.model.Operator;
import org.apache.juddi.model.Tmodel;
import static org.apache.juddi.replication.ReplicationNotifier.FetchEdges;
import org.apache.juddi.v3.client.UDDIService;
import org.apache.juddi.v3.error.ErrorMessage;
import org.apache.juddi.v3.error.FatalErrorException;
import org.apache.juddi.validation.ValidateReplication;
import org.uddi.api_v3.OperationalInfo;
import org.uddi.custody_v3.DiscardTransferToken;
import org.uddi.repl_v3.ChangeRecord;
import org.uddi.repl_v3.ChangeRecordIDType;
import org.uddi.repl_v3.ChangeRecords;
import org.uddi.repl_v3.DoPing;
import org.uddi.repl_v3.GetChangeRecords;
import org.uddi.repl_v3.HighWaterMarkVectorType;
import org.uddi.repl_v3.NotifyChangeRecordsAvailable;
import org.uddi.repl_v3.ReplicationConfiguration;
import org.uddi.repl_v3.TransferCustody;
import org.uddi.v3_service.DispositionReportFaultMessage;
import org.uddi.v3_service.UDDIReplicationPortType;
//@WebService(serviceName="UDDIReplicationService",
// endpointInterface="org.uddi.v3_service.UDDIReplicationPortType",
// targetNamespace = "urn:uddi-org:v3_service")
/**
* UDDI Replication defines four APIs. The first two presented here are used to
* perform replication and issue notifications. The latter ancillary APIs
* provide support for other aspects of UDDI Replication.
* <ul><li>get_changeRecords</li>
* <li>notify_changeRecordsAvailable</li>
* <li>do_ping</li>
* <li>get_highWaterMarks</li>
*
* @author <a href="mailto:alexoree@apache.org">Alex O'Ree</a>
*/
@WebService(serviceName = "UDDI_Replication_PortType", targetNamespace = "urn:uddi-org:repl_v3_portType",
endpointInterface = "org.uddi.v3_service.UDDIReplicationPortType")
@XmlSeeAlso({
org.uddi.custody_v3.ObjectFactory.class,
org.uddi.repl_v3.ObjectFactory.class,
org.uddi.subr_v3.ObjectFactory.class,
org.uddi.api_v3.ObjectFactory.class,
org.uddi.vscache_v3.ObjectFactory.class,
org.uddi.vs_v3.ObjectFactory.class,
org.uddi.sub_v3.ObjectFactory.class,
org.w3._2000._09.xmldsig_.ObjectFactory.class,
org.uddi.policy_v3.ObjectFactory.class,
org.uddi.policy_v3_instanceparms.ObjectFactory.class
})
public class UDDIReplicationImpl extends AuthenticatedService implements UDDIReplicationPortType {
static void notifyConfigurationChange(ReplicationConfiguration oldConfig, ReplicationConfiguration newConfig) {
//if the config is different
Set<String> oldnodes = getNodes(oldConfig);
Set<String> newNodes = getNodes(newConfig);
Set<String> addedNodes = diffNodeList(oldnodes, newNodes);
if (queue == null) {
queue = new ConcurrentLinkedDeque<NotifyChangeRecordsAvailable>();
}
for (String s : addedNodes) {
if (!s.equals(node)) {
logger.info("This node: " + node + ". New replication node queue for synchronization: " + s);
HighWaterMarkVectorType highWaterMarkVectorType = new HighWaterMarkVectorType();
highWaterMarkVectorType.getHighWaterMark().add(new ChangeRecordIDType(s, 0L));
queue.add(new NotifyChangeRecordsAvailable(s, highWaterMarkVectorType));
}
}
}
private static Set<String> getNodes(ReplicationConfiguration oldConfig) {
Set<String> ret = new HashSet<String>();
if (oldConfig == null) {
return ret;
}
for (org.uddi.repl_v3.Operator o : oldConfig.getOperator()) {
ret.add(o.getOperatorNodeID());
}
if (oldConfig.getCommunicationGraph() != null) {
ret.addAll(oldConfig.getCommunicationGraph().getNode());
}
return ret;
}
private static Set<String> diffNodeList(Set<String> oldnodes, Set<String> newNodes) {
Set<String> diff = new HashSet<String>();
Iterator<String> iterator = null;
/*oldnodes.iterator();
while (iterator.hasNext()){
String lhs=iterator.next();
if (!newNodes.contains(lhs))
diff.add(lhs);
}*/
iterator = newNodes.iterator();
while (iterator.hasNext()) {
String lhs = iterator.next();
if (!oldnodes.contains(lhs)) {
diff.add(lhs);
}
}
return diff;
}
private UDDIServiceCounter serviceCounter;
private static PullTimerTask timer = null;
private long startBuffer = 20000l;//AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER, 20000l); // 20s startup delay default
private long interval = 5000l;// AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l); //5 min default
private static UDDIPublicationImpl pub = null;
public UDDIReplicationImpl() {
super();
if (pub == null) {
pub = new UDDIPublicationImpl();
}
serviceCounter = ServiceCounterLifecycleResource.getServiceCounter(UDDIReplicationImpl.class);
Init();
try {
startBuffer = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER, 20000l); // 20s startup delay default
interval = 5000;//AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l); //5 min default
} catch (ConfigurationException ex) {
logger.fatal(ex);
}
}
private synchronized void Init() {
if (queue == null) {
queue = new ConcurrentLinkedDeque<NotifyChangeRecordsAvailable>();
}
timer = new PullTimerTask();
}
private boolean Excluded(HighWaterMarkVectorType changesAlreadySeen, ChangeRecord r) {
if (changesAlreadySeen != null) {
for (int i = 0; i < changesAlreadySeen.getHighWaterMark().size(); i++) {
if (changesAlreadySeen.getHighWaterMark().get(i).getNodeID().equals(r.getChangeID().getNodeID())
&& changesAlreadySeen.getHighWaterMark().get(i).getOriginatingUSN().equals(r.getChangeID().getOriginatingUSN())) {
return true;
}
}
}
return false;
}
/**
* handles when a remote node tells me that there's an update(s)
* available
*/
private class PullTimerTask extends TimerTask {
private Timer timer = null;
public PullTimerTask() {
super();
timer = new Timer(true);
timer.scheduleAtFixedRate(this, startBuffer, interval);
}
@Override
public void run() {
logger.info("Replication change puller thread started. Queue size: " + queue.size());
//ok someone told me there's a change available
while (!queue.isEmpty()) {
NotifyChangeRecordsAvailable poll = queue.poll();
if (poll != null) {
UDDIReplicationPortType replicationClient = getReplicationClient(poll.getNotifyingNode());
if (replicationClient == null) {
logger.fatal("unable to obtain a replication client to node " + poll.getNotifyingNode());
} else {
try {
//get the high water marks for this node
//ok now get all the changes
//done TODO replace with last known record from the given node
//for (int xx = 0; xx < poll.getChangesAvailable().getHighWaterMark().size(); xx++) {
// logger.info("Node " + poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID()
// + " USN " + poll.getChangesAvailable().getHighWaterMark().get(xx).getOriginatingUSN());
//}
GetChangeRecords body = new GetChangeRecords();
body.setRequestingNode(node);
body.setResponseLimitCount(BigInteger.valueOf(100));
body.setChangesAlreadySeen(getLastChangeRecordFrom(poll.getNotifyingNode()));
logger.info("fetching updates from " + poll.getNotifyingNode() + " since " + body.getChangesAlreadySeen().getHighWaterMark().get(0).getOriginatingUSN());
List<ChangeRecord> records
= replicationClient.getChangeRecords(body).getChangeRecord();
//ok now we need to persist the change records
logger.info("Change records retrieved " + records.size());
for (int i = 0; i < records.size(); i++) {
logger.info("Change records retrieved " + records.get(i).getChangeID().getNodeID() + " USN " + records.get(i).getChangeID().getOriginatingUSN());
PersistChangeRecord(records.get(i));
}
} catch (Exception ex) {
logger.error("Error caught fetching replication changes from " + poll.getNotifyingNode(), ex);
}
}
} else {
logger.warn("weird, popped an object from the queue but it was null.");
}
}
}
@Override
public boolean cancel() {
timer.cancel();
return super.cancel();
}
/**
* someone told me there's a change available, we retrieved it
* and are processing the changes locally
*
* @param rec
*/
private void PersistChangeRecord(ChangeRecord rec) {
if (rec == null) {
return;
}
EntityManager em = PersistenceManager.getEntityManager();
EntityTransaction tx = em.getTransaction();
/**
* In nodes that support pre-bundled replication
* responses, the recipient of the get_changeRecords
* message MAY return more change records than requested
* by the caller. In this scenario, the caller MUST also
* be prepared to deal with such redundant changes where
* a USN is less than the USN specified in the
* changesAlreadySeen highWaterMarkVector.
*/
// StringWriter sw = new StringWriter();
//JAXB.marshal(rec, sw);
logger.info("_______________________Remote change request " + rec.getChangeID().getNodeID() + ":" + rec.getChangeID().getOriginatingUSN());
try {
tx.begin();
//the change record rec must also be persisted!!
org.apache.juddi.model.ChangeRecord mapChangeRecord = MappingApiToModel.mapChangeRecord(rec);
mapChangeRecord.setId(null);
em.persist(mapChangeRecord);
tx.commit();
tx = em.getTransaction();
tx.begin();
//<editor-fold defaultstate="collapsed" desc="delete a record">
if (rec.getChangeRecordDelete() != null) {
if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBindingKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBindingKey())) {
//delete a binding template
pub.deleteBinding(rec.getChangeRecordDelete().getBindingKey(), em);
}
if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBusinessKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBusinessKey())) {
//delete a business
pub.deleteBusiness(rec.getChangeRecordDelete().getBusinessKey(), em);
}
if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getServiceKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getServiceKey())) {
//delete a service
pub.deleteService(rec.getChangeRecordDelete().getServiceKey(), em);
}
if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getTModelKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getTModelKey())) {
//delete a tmodel
/**
* The changeRecordDelete for a
* tModel does not correspond to
* any API described in this
* specification and should only
* appear in the replication
* stream as the result of an
* administrative function to
* permanently remove a tModel.
*/
pub.deleteTModel(rec.getChangeRecordDelete().getTModelKey(), em);
}
}
if (rec.getChangeRecordDeleteAssertion() != null && rec.getChangeRecordDeleteAssertion().getPublisherAssertion() != null) {
//delete a pa template
pub.deletePublisherAssertion(rec.getChangeRecordDeleteAssertion().getPublisherAssertion(), em);
}
//</editor-fold>
//<editor-fold defaultstate="collapsed" desc="New Data">
if (rec.getChangeRecordNewData() != null) {
if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID() == null) {
throw new Exception("Inbound replication data is missiong node id!");
}
//The operationalInfo element MUST contain the operational information associated with the indicated new data.
if (rec.getChangeRecordNewData().getOperationalInfo() == null) {
logger.warn("Inbound replication data does not have the required OperationalInfo element and is NOT spec compliant. Data will be ignored");
} else {
if (rec.getChangeRecordNewData().getBindingTemplate() != null) {
//fetch the binding template if it exists already
//if it exists,
// confirm the owning node, it shouldn't be the local node id, if it is, throw
// the owning node should be the same as it was before
BusinessService model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewData().getBindingTemplate().getServiceKey());
if (model == null) {
logger.error("Replication error, attempting to insert a binding where the service doesn't exist yet");
} else {
ValidateNodeIdMatches(model.getNodeId(), rec.getChangeRecordNewData().getOperationalInfo());
org.apache.juddi.model.BindingTemplate bt = em.find(org.apache.juddi.model.BindingTemplate.class, rec.getChangeRecordNewData().getBindingTemplate().getBindingKey());
if (bt != null) {
em.remove(bt);
}
bt = new BindingTemplate();
MappingApiToModel.mapBindingTemplate(rec.getChangeRecordNewData().getBindingTemplate(), bt, model);
MappingApiToModel.mapOperationalInfo(bt, rec.getChangeRecordNewData().getOperationalInfo());
// MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());
em.persist(bt);
}
} else if (rec.getChangeRecordNewData().getBusinessEntity() != null) {
BusinessEntity model = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessEntity().getBusinessKey());
if (model != null) {
ValidateNodeIdMatches(model.getNodeId(), rec.getChangeRecordNewData().getOperationalInfo());
//TODO revisit access control rules
em.remove(model);
}
model = new BusinessEntity();
MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model);
// MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());
logger.warn("Name size on save is " + model.getBusinessNames().size());
em.persist(model);
}
if (rec.getChangeRecordNewData().getBusinessService() != null) {
BusinessEntity find = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessService().getBusinessKey());
if (find == null) {
logger.error("Replication error, attempting to insert a service where the business doesn't exist yet");
} else {
org.apache.juddi.model.BusinessService model = null;
model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewData().getBusinessService().getServiceKey());
if (model != null) {
ValidateNodeIdMatches(model.getNodeId(), rec.getChangeRecordNewData().getOperationalInfo());
em.remove(model);
}
model = new org.apache.juddi.model.BusinessService();
MappingApiToModel.mapBusinessService(rec.getChangeRecordNewData().getBusinessService(), model, find);
MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());
em.persist(model);
}
} else if (rec.getChangeRecordNewData().getTModel() != null) {
Tmodel model = em.find(org.apache.juddi.model.Tmodel.class, rec.getChangeRecordNewData().getTModel().getTModelKey());
if (model != null) {
ValidateNodeIdMatches(model.getNodeId(), rec.getChangeRecordNewData().getOperationalInfo());
em.remove(model);
}
model = new Tmodel();
MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);
MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
em.persist(model);
}
}
}
//</editor-fold>
// changeRecordNull no action needed
// changeRecordHide tmodel only
//<editor-fold defaultstate="collapsed" desc="hide tmodel">
if (rec.getChangeRecordHide() != null) {
/*
A changeRecordHide element corresponds to the behavior of hiding a tModel described in the delete_tModel in the Publish API section of this Specification. A tModel listed in a changeRecordHide should be marked as hidden, so that it is not returned in response to a find_tModel API call.
The changeRecordHide MUST contain a modified timestamp to allow multi-node registries to calculate consistent modifiedIncludingChildren timestamps as described in Section 3.8 operationalInfo Structure.
*/
String key = rec.getChangeRecordHide().getTModelKey();
org.apache.juddi.model.Tmodel existing = em.find(org.apache.juddi.model.Tmodel.class, key);
if (existing == null) {
logger.error("Unexpected delete/hide tmodel message received for non existing key " + key);
} else {
existing.setDeleted(true);
existing.setModified(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime());
existing.setModifiedIncludingChildren(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime());
em.persist(existing);
}
}
//</editor-fold>
//<editor-fold defaultstate="collapsed" desc="changeRecordPublisherAssertion">
if (rec.getChangeRecordPublisherAssertion() != null) {
//TODO implement
}
//</editor-fold>
tx.commit();
} catch (Exception drfm) {
logger.warn("Error persisting change record!", drfm);
} finally {
if (tx.isActive()) {
tx.rollback();
}
em.close();
}
}
private HighWaterMarkVectorType getLastChangeRecordFrom(String notifyingNode) {
HighWaterMarkVectorType ret = new HighWaterMarkVectorType();
ChangeRecordIDType cid = new ChangeRecordIDType();
cid.setNodeID(notifyingNode);
cid.setOriginatingUSN(0L);
EntityManager em = PersistenceManager.getEntityManager();
EntityTransaction tx = em.getTransaction();
try {
tx.begin();
Long id = 0L;
try {
cid.setOriginatingUSN((Long) em.createQuery("select e.originatingUSN from ChangeRecord e where e.nodeID = :node order by e.originatingUSN desc").setParameter("node", notifyingNode).setMaxResults(1).getSingleResult());
} catch (Exception ex) {
logger.info(ex);
}
tx.rollback();
} catch (Exception drfm) {
logger.warn("error caught fetching newest record from node " + notifyingNode, drfm);
} finally {
if (tx.isActive()) {
tx.rollback();
}
em.close();
}
ret.getHighWaterMark().add(cid);
return ret;
}
}
// private void ValidateDontChangeMyRecordsAtAnotherNode(String )
private void ValidateNodeIdMatches(String modelNodeId, OperationalInfo newDataOperationalInfo) throws Exception {
if (modelNodeId == null || newDataOperationalInfo == null) {
throw new Exception("either the local node ID is null or the inbound replication data's node id is null");
}
//only time this is allowed is custody transfer
if (!modelNodeId.equals(newDataOperationalInfo.getNodeID())) {
throw new Exception("node id mismatch!");
}
//if i already have a record and "own it" and the remote node has a record with the same key, reject the update
//1.5.8
/**
* Each node has custody of a portion of the aggregate data
* managed by the registry of which it is a part. Each datum is
* by definition in the custody of exactly one such node. A
* datum in this context can be a businessEntity, a
* businessService, a bindingTemplate, a tModel, or a
* publisherAssertion. Changes to a datum in the registry MUST
* originate at the node which is the custodian of the datum.
* The registry defines the policy for data custody and, if
* allowed, the custodian node for a given datum can be changed;
* such custody transfer processes are discussed in Section 5.4
* Custody and Ownership Transfer API.
*/
//so someone else attempted to update one of my records, reject it
if (modelNodeId.equals(node)) {
throw new Exception("node id mismatch! this node already has a record for key " + newDataOperationalInfo.getEntityKey() + " and I'm the authority for it.");
}
}
private synchronized UDDIReplicationPortType getReplicationClient(String node) {
if (cache.containsKey(node)) {
return cache.get(node);
}
UDDIService svc = new UDDIService();
UDDIReplicationPortType replicationClient = svc.getUDDIReplicationPort();
EntityManager em = PersistenceManager.getEntityManager();
EntityTransaction tx = em.getTransaction();
try {
StringBuilder sql = new StringBuilder();
sql.append("select c from ReplicationConfiguration c order by c.serialNumber desc");
sql.toString();
Query qry = em.createQuery(sql.toString());
qry.setMaxResults(1);
org.apache.juddi.model.ReplicationConfiguration resultList = (org.apache.juddi.model.ReplicationConfiguration) qry.getSingleResult();
for (Operator o : resultList.getOperator()) {
if (o.getOperatorNodeID().equalsIgnoreCase(node)) {
((BindingProvider) replicationClient).getRequestContext().put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, o.getSoapReplicationURL());
cache.put(node, replicationClient);
return replicationClient;
}
}
tx.rollback();
} catch (Exception ex) {
logger.fatal("Node not found!" + node, ex);
} finally {
if (tx.isActive()) {
tx.rollback();
}
em.close();
}
//em.close();
return null;
}
private Map<String, UDDIReplicationPortType> cache = new HashMap<String, UDDIReplicationPortType>();
/**
* @since 3.3
* @param body
* @return
* @throws DispositionReportFaultMessage
*/
public String doPing(DoPing body) throws DispositionReportFaultMessage {
long startTime = System.currentTimeMillis();
long procTime = System.currentTimeMillis() - startTime;
serviceCounter.update(ReplicationQuery.DO_PING, QueryStatus.SUCCESS, procTime);
return node;
}
@SOAPBinding(parameterStyle = SOAPBinding.ParameterStyle.BARE)
@WebResult(name = "changeRecords", targetNamespace = "urn:uddi-org:repl_v3", partName = "body")
// @WebMethod(operationName = "get_changeRecords", action = "get_changeRecords")
@Override
public org.uddi.repl_v3.ChangeRecords getChangeRecords(
@WebParam(partName = "body", name = "get_changeRecords", targetNamespace = "urn:uddi-org:repl_v3") org.uddi.repl_v3.GetChangeRecords body
) throws DispositionReportFaultMessage, RemoteException {/*
@WebResult(name = "changeRecord", targetNamespace = "urn:uddi-org:repl_v3")
@RequestWrapper(localName = "get_changeRecords", targetNamespace = "urn:uddi-org:repl_v3", className = "org.uddi.repl_v3.GetChangeRecords")
@ResponseWrapper(localName = "changeRecords", targetNamespace = "urn:uddi-org:repl_v3", className = "org.uddi.repl_v3.ChangeRecords")
@Override
public List<ChangeRecord> getChangeRecords(@WebParam(name = "requestingNode", targetNamespace = "urn:uddi-org:repl_v3") String requestingNode,
@WebParam(name = "changesAlreadySeen", targetNamespace = "urn:uddi-org:repl_v3") HighWaterMarkVectorType changesAlreadySeen,
@WebParam(name = "responseLimitCount", targetNamespace = "urn:uddi-org:repl_v3") BigInteger responseLimitCount,
@WebParam(name = "responseLimitVector", targetNamespace = "urn:uddi-org:repl_v3") HighWaterMarkVectorType responseLimitVector)
throws DispositionReportFaultMessage {*/
long startTime = System.currentTimeMillis();
String requestingNode = body.getRequestingNode();
HighWaterMarkVectorType changesAlreadySeen = body.getChangesAlreadySeen();
BigInteger responseLimitCount = body.getResponseLimitCount();
HighWaterMarkVectorType responseLimitVector = body.getResponseLimitVector();
new ValidateReplication(null).validateGetChangeRecords(requestingNode, changesAlreadySeen, responseLimitCount, responseLimitVector, FetchEdges(), ctx);
//TODO should we validate that "requestingNode" is in the replication config?
List<ChangeRecord> ret = new ArrayList<ChangeRecord>();
EntityManager em = PersistenceManager.getEntityManager();
EntityTransaction tx = em.getTransaction();
/**
* More specifically, the recipient determines the particular
* change records that are returned by comparing the originating
* USNs in the caller’s high water mark vector with the
* originating USNs of each of the changes the recipient has
* seen from others or generated by itself. The recipient SHOULD
* only return change records that have originating USNs that
* are greater than those listed in the changesAlreadySeen
* highWaterMarkVector and less than the limit required by
* either the responseLimitCount or the responseLimitVector.
*
*
* Part of the message is a high water mark vector that contains
* for each node of the registry the originating USN of the most
* recent change record that has been successfully processed by
* the invocating node
*/
int maxrecords = 100;
if (responseLimitCount != null) {
maxrecords = responseLimitCount.intValue();
}
try {
tx.begin();
Long firstrecord = 0L;
Long lastrecord = null;
if (changesAlreadySeen != null) {
//this is basically a lower limit (i.e. the newest record that was processed by the requestor
//therefore we want the oldest record stored locally to return to the requestor for processing
for (int i = 0; i < changesAlreadySeen.getHighWaterMark().size(); i++) {
if (changesAlreadySeen.getHighWaterMark().get(i).getNodeID().equals(node)) {
firstrecord = changesAlreadySeen.getHighWaterMark().get(i).getOriginatingUSN();
}
}
}
if (responseLimitVector != null) {
//using responseLimitVector, indicating for each node in the graph the first change originating there that he does not wish to be returned.
//upper limit basically
for (int i = 0; i < responseLimitVector.getHighWaterMark().size(); i++) {
if (responseLimitVector.getHighWaterMark().get(i).getNodeID().equals(node)) {
lastrecord = responseLimitVector.getHighWaterMark().get(i).getOriginatingUSN();
}
}
}
logger.info("Query db for replication changes, lower index is " + (firstrecord - 1) + " last index " + lastrecord + " record limit " + maxrecords);
Query createQuery = null;
if (lastrecord != null) {
createQuery = em.createQuery("select e from ChangeRecord e where "
+ "((e.id > :inbound AND e.nodeID = :node AND e.id < :lastrecord) OR "
+ "(e.originatingUSN > :inbound AND e.nodeID <> :node AND e.originatingUSN < :lastrecord)) "
+ "order by e.id ASC");
createQuery.setParameter("lastrecord", lastrecord);
} else {
createQuery = em.createQuery("select e from ChangeRecord e where "
+ "((e.id > :inbound AND e.nodeID = :node) OR "
+ "(e.originatingUSN > :inbound AND e.nodeID <> :node)) "
+ "order by e.id ASC");
}
createQuery.setMaxResults(maxrecords);
createQuery.setParameter("inbound", firstrecord - 1);
createQuery.setParameter("node", node);
List<org.apache.juddi.model.ChangeRecord> records = (List<org.apache.juddi.model.ChangeRecord>) createQuery.getResultList();
logger.info(records.size() + " CR records returned from query");
for (int i = 0; i < records.size(); i++) {
ChangeRecord r = MappingModelToApi.mapChangeRecord(records.get(i));
//if (!Excluded(changesAlreadySeen, r)) {
ret.add(r);
//}
}
tx.rollback();
long procTime = System.currentTimeMillis() - startTime;
serviceCounter.update(ReplicationQuery.GET_CHANGERECORDS,
QueryStatus.SUCCESS, procTime);
} catch (Exception ex) {
logger.fatal("Error, this node is: " + node, ex);
throw new FatalErrorException(new ErrorMessage("E_fatalError", ex.getMessage()));
} finally {
if (tx.isActive()) {
tx.rollback();
}
em.close();
}
logger.info("Change records returned for " + requestingNode + ": " + ret.size());
//JAXB.marshal(ret, System.out);
ChangeRecords x = new ChangeRecords();
x.getChangeRecord().addAll(ret);
return x;
}
/**
* This UDDI API message provides a means to obtain a list of
* highWaterMark element containing the highest known USN for all nodes
* in the replication graph. If there is no graph, we just return the
* local bits
*
* @return
* @throws DispositionReportFaultMessage
*/
@Override
public List<ChangeRecordIDType> getHighWaterMarks()
throws DispositionReportFaultMessage {
long startTime = System.currentTimeMillis();
List<ChangeRecordIDType> ret = new ArrayList<ChangeRecordIDType>();
//fetch from database the highest known watermark
ReplicationConfiguration FetchEdges = FetchEdges();
EntityManager em = PersistenceManager.getEntityManager();
EntityTransaction tx = em.getTransaction();
HashMap<String, Long> map = new HashMap<String, Long>();
try {
tx.begin();
if (FetchEdges != null) {
Iterator<String> it = FetchEdges.getCommunicationGraph().getNode().iterator();
while (it.hasNext()) {
String nextNode = it.next();
if (!nextNode.equals(node)) {
if (!map.containsKey(nextNode)) {
Long id = 0L;
try {
id = (Long) em.createQuery("select e.originatingUSN from ChangeRecord e where e.nodeID = :node order by e.originatingUSN desc").setParameter("node", nextNode).setMaxResults(1).getSingleResult();
} catch (Exception ex) {
logger.debug(ex);
}
if (id == null) {
id = 0L;
//per the spec
}
map.put(nextNode, id);
}
}
}
}
//dont forget this node
Long id = (Long) em.createQuery("select (e.id) from ChangeRecord e where e.nodeID = :node order by e.id desc")
.setParameter("node", node).setMaxResults(1).getSingleResult();
if (id == null) {
id = 0L;
}
ChangeRecordIDType x = new ChangeRecordIDType();
x.setNodeID(node);
x.setOriginatingUSN(id);
ret.add(x);
tx.rollback();
long procTime = System.currentTimeMillis() - startTime;
serviceCounter.update(ReplicationQuery.GET_HIGHWATERMARKS, QueryStatus.SUCCESS, procTime);
} catch (Exception drfm) {
throw new FatalErrorException(new ErrorMessage("E_fatalError", drfm.getMessage()));
} finally {
if (tx.isActive()) {
tx.rollback();
}
em.close();
}
Iterator<Map.Entry<String, Long>> iterator = map.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Long> next = iterator.next();
ret.add(new ChangeRecordIDType(next.getKey(), next.getValue()));
}
return ret;
}
/**
* this means that another node has a change and we need to pick up the
* change and apply it to our local database.
*
* @param body
* @throws DispositionReportFaultMessage
*/
@Override
public void notifyChangeRecordsAvailable(NotifyChangeRecordsAvailable body)
throws DispositionReportFaultMessage {
long startTime = System.currentTimeMillis();
long procTime = System.currentTimeMillis() - startTime;
serviceCounter.update(ReplicationQuery.NOTIFY_CHANGERECORDSAVAILABLE,
QueryStatus.SUCCESS, procTime);
//some other node just told us there's new records available, call
//getChangeRecords from the remote node asynch
new ValidateReplication(null).validateNotifyChangeRecordsAvailable(body, ctx);
queue.add(body);
logger.info(body.getNotifyingNode() + " just told me that there are change records available, enqueuing...size is " + queue.size());
//ValidateReplication.unsupportedAPICall();
}
private static Queue<NotifyChangeRecordsAvailable> queue = null;
/**
* transfers custody of an entity from node1/user1 to node2/user2
*
* @param body
* @throws DispositionReportFaultMessage
*/
@Override
public void transferCustody(TransferCustody body)
throws DispositionReportFaultMessage {
long startTime = System.currentTimeMillis();
//*this node is transfering data to another node
//body.getTransferOperationalInfo().
ValidateReplication.unsupportedAPICall();
EntityManager em = PersistenceManager.getEntityManager();
//EntityTransaction tx = em.getTransaction();
/**
* The custodial node must verify that it has granted permission
* to transfer the entities identified and that this permission
* is still valid. This operation is comprised of two steps:
*
* 1. Verification that the transferToken was issued by it, that
* it has not expired, that it represents the authority to
* transfer no more and no less than those entities identified
* by the businessKey and tModelKey elements and that all these
* entities are still valid and not yet transferred. The
* transferToken is invalidated if any of these conditions are
* not met.
*
* 2. If the conditions above are met, the custodial node will
* prevent any further changes to the entities identified by the
* businessKey and tModelKey elements identified. The entity
* will remain in this state until the replication stream
* indicates it has been successfully processed via the
* replication stream. Upon successful verification of the
* custody transfer request by the custodial node, an empty
* message is returned by it indicating the success of the
* request and acknowledging the custody transfer. Following the
* issue of the empty message, the custodial node will submit
* into the replication stream a changeRecordNewData providing
* in the operationalInfo, the nodeID accepting custody of the
* datum and the authorizedName of the publisher accepting
* ownership. The acknowledgmentRequested attribute of this
* change record MUST be set to "true".
*
* TODO enqueue Replication message
*
* Finally, the custodial node invalidates the transferToken in
* order to prevent additional calls of the transfer_entities
* API.
*/
DiscardTransferToken dtt = new DiscardTransferToken();
dtt.setKeyBag(body.getKeyBag());
dtt.setTransferToken(body.getTransferToken());
new UDDICustodyTransferImpl().discardTransferToken(dtt);
}
}