blob: dd89d910995acd07446a55ed6a6c198b2a1f2218 [file] [log] [blame]
<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"><html xmlns="http://www.w3.org/1999/xhtml" lang="en"><head><meta http-equiv="Content-Type" content="text/html;charset=UTF-8"/><link rel="stylesheet" href="../jacoco-resources/report.css" type="text/css"/><link rel="shortcut icon" href="../jacoco-resources/report.gif" type="image/gif"/><title>UDDIReplicationImpl.java</title><link rel="stylesheet" href="../jacoco-resources/prettify.css" type="text/css"/><script type="text/javascript" src="../jacoco-resources/prettify.js"></script></head><body onload="window['PR_TAB_WIDTH']=4;prettyPrint()"><div class="breadcrumb" id="breadcrumb"><span class="info"><a href="../jacoco-sessions.html" class="el_session">Sessions</a></span><a href="../index.html" class="el_report">jUDDI Core Services</a> &gt; <a href="index.source.html" class="el_package">org.apache.juddi.api.impl</a> &gt; <span class="el_source">UDDIReplicationImpl.java</span></div><h1>UDDIReplicationImpl.java</h1><pre class="source lang-java linenums">/*
* Copyright 2001-2008 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the &quot;License&quot;);
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an &quot;AS IS&quot; BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.juddi.api.impl;
import java.io.StringWriter;
import java.io.UnsupportedEncodingException;
import java.math.BigInteger;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.jws.WebParam;
import javax.jws.WebResult;
import javax.jws.WebService;
import javax.jws.soap.SOAPBinding;
import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction;
import javax.persistence.Query;
import javax.xml.bind.JAXB;
import javax.xml.bind.annotation.XmlSeeAlso;
import javax.xml.ws.BindingProvider;
import org.apache.juddi.api.util.QueryStatus;
import org.apache.juddi.api.util.ReplicationQuery;
import org.apache.juddi.config.AppConfig;
import org.apache.juddi.config.PersistenceManager;
import org.apache.juddi.config.Property;
import org.apache.juddi.mapping.MappingApiToModel;
import org.apache.juddi.mapping.MappingModelToApi;
import org.apache.juddi.model.BindingTemplate;
import org.apache.juddi.model.BusinessEntity;
import org.apache.juddi.model.BusinessService;
import org.apache.juddi.model.Operator;
import org.apache.juddi.model.PublisherAssertion;
import org.apache.juddi.model.PublisherAssertionId;
import org.apache.juddi.model.Tmodel;
import org.apache.juddi.model.UddiEntity;
import org.apache.juddi.replication.ReplicationNotifier;
import static org.apache.juddi.replication.ReplicationNotifier.FetchEdges;
import org.apache.juddi.v3.client.UDDIService;
import org.apache.juddi.v3.client.cryptor.TransportSecurityHelper;
import org.apache.juddi.v3.error.ErrorMessage;
import org.apache.juddi.v3.error.FatalErrorException;
import org.apache.juddi.v3.error.TransferNotAllowedException;
import org.apache.juddi.validation.ValidateReplication;
import org.uddi.custody_v3.TransferEntities;
import org.uddi.repl_v3.ChangeRecord;
import org.uddi.repl_v3.ChangeRecordAcknowledgement;
import org.uddi.repl_v3.ChangeRecordIDType;
import org.uddi.repl_v3.ChangeRecords;
import org.uddi.repl_v3.CommunicationGraph.Edge;
import org.uddi.repl_v3.DoPing;
import org.uddi.repl_v3.GetChangeRecords;
import org.uddi.repl_v3.HighWaterMarkVectorType;
import org.uddi.repl_v3.NotifyChangeRecordsAvailable;
import org.uddi.repl_v3.ReplicationConfiguration;
import org.uddi.repl_v3.TransferCustody;
import org.uddi.v3_service.DispositionReportFaultMessage;
import org.uddi.v3_service.UDDIReplicationPortType;
/**
* UDDI Replication defines four APIs. The first two presented here are used to
* perform replication and issue notifications. The latter ancillary APIs
* provide support for other aspects of UDDI Replication.
* &lt;ul&gt;
* &lt;li&gt;get_changeRecords&lt;/li&gt;
* &lt;li&gt;notify_changeRecordsAvailable&lt;/li&gt;
* &lt;li&gt;do_ping&lt;/li&gt;
* &lt;li&gt;get_highWaterMarks&lt;/li&gt;&lt;/ul&gt;
*
* @author &lt;a href=&quot;mailto:alexoree@apache.org&quot;&gt;Alex O'Ree&lt;/a&gt;
*/
@WebService(serviceName = &quot;UDDI_Replication_PortType&quot;, targetNamespace = &quot;urn:uddi-org:api_v3_portType&quot;,
endpointInterface = &quot;org.uddi.v3_service.UDDIReplicationPortType&quot;)
@XmlSeeAlso({
org.uddi.custody_v3.ObjectFactory.class,
org.uddi.repl_v3.ObjectFactory.class,
org.uddi.subr_v3.ObjectFactory.class,
org.uddi.api_v3.ObjectFactory.class,
org.uddi.vscache_v3.ObjectFactory.class,
org.uddi.vs_v3.ObjectFactory.class,
org.uddi.sub_v3.ObjectFactory.class,
org.w3._2000._09.xmldsig_.ObjectFactory.class,
org.uddi.policy_v3.ObjectFactory.class,
org.uddi.policy_v3_instanceparms.ObjectFactory.class
})
public class UDDIReplicationImpl extends AuthenticatedService implements UDDIReplicationPortType {
static void notifyConfigurationChange(ReplicationConfiguration oldConfig, ReplicationConfiguration newConfig, AuthenticatedService service) {
//if the config is different
<span class="fc" id="L113"> Set&lt;String&gt; oldnodes = getNodes(oldConfig);</span>
<span class="fc" id="L114"> Set&lt;String&gt; newNodes = getNodes(newConfig);</span>
<span class="fc" id="L116"> Set&lt;String&gt; addedNodes = diffNodeList(oldnodes, newNodes);</span>
<span class="pc bpc" id="L117" title="1 of 2 branches missed."> if (queue == null) {</span>
<span class="nc" id="L118"> queue = new ConcurrentLinkedQueue&lt;NotifyChangeRecordsAvailable&gt;();</span>
}
<span class="fc bfc" id="L120" title="All 2 branches covered."> for (String s : addedNodes) {</span>
<span class="pc bpc" id="L121" title="1 of 2 branches missed."> if (!s.equals(service.getNode())) {</span>
<span class="fc" id="L122"> logger.info(&quot;This node: &quot; + service.getNode() + &quot;. New replication node queue for synchronization: &quot; + s);</span>
<span class="fc" id="L123"> HighWaterMarkVectorType highWaterMarkVectorType = new HighWaterMarkVectorType();</span>
<span class="fc" id="L124"> highWaterMarkVectorType.getHighWaterMark().add(new ChangeRecordIDType(s, 0L));</span>
<span class="fc" id="L125"> queue.add(new NotifyChangeRecordsAvailable(s, highWaterMarkVectorType));</span>
}
<span class="fc" id="L127"> }</span>
<span class="fc" id="L129"> }</span>
private static Set&lt;String&gt; getNodes(ReplicationConfiguration oldConfig) {
<span class="fc" id="L132"> Set&lt;String&gt; ret = new HashSet&lt;String&gt;();</span>
<span class="fc bfc" id="L133" title="All 2 branches covered."> if (oldConfig == null) {</span>
<span class="fc" id="L134"> return ret;</span>
}
<span class="fc bfc" id="L136" title="All 2 branches covered."> for (org.uddi.repl_v3.Operator o : oldConfig.getOperator()) {</span>
<span class="fc" id="L137"> ret.add(o.getOperatorNodeID());</span>
<span class="fc" id="L138"> }</span>
<span class="pc bpc" id="L139" title="1 of 2 branches missed."> if (oldConfig.getCommunicationGraph() != null) {</span>
<span class="fc" id="L140"> ret.addAll(oldConfig.getCommunicationGraph().getNode());</span>
}
<span class="fc" id="L142"> return ret;</span>
}
/**
* returns items in &quot;newNodes&quot; that are not in &quot;oldNodes&quot;
*
* @param oldnodes
* @param newNodes
* @return
*/
private static Set&lt;String&gt; diffNodeList(Set&lt;String&gt; oldnodes, Set&lt;String&gt; newNodes) {
<span class="fc" id="L153"> Set&lt;String&gt; diff = new HashSet&lt;String&gt;();</span>
<span class="fc" id="L154"> Iterator&lt;String&gt; iterator = newNodes.iterator();</span>
<span class="fc bfc" id="L155" title="All 2 branches covered."> while (iterator.hasNext()) {</span>
<span class="fc" id="L156"> String lhs = iterator.next();</span>
<span class="fc" id="L157"> Iterator&lt;String&gt; iterator1 = oldnodes.iterator();</span>
<span class="fc" id="L158"> boolean found = false;</span>
<span class="fc bfc" id="L159" title="All 2 branches covered."> while (iterator1.hasNext()) {</span>
<span class="fc" id="L160"> String rhs = iterator1.next();</span>
<span class="pc bpc" id="L161" title="1 of 2 branches missed."> if (rhs.equalsIgnoreCase(lhs)) {</span>
<span class="nc" id="L162"> found = true;</span>
<span class="nc" id="L163"> break;</span>
}
<span class="fc" id="L165"> }</span>
<span class="pc bpc" id="L166" title="1 of 2 branches missed."> if (!found) {</span>
<span class="fc" id="L167"> diff.add(lhs);</span>
}
<span class="fc" id="L170"> }</span>
<span class="fc" id="L171"> return diff;</span>
}
private UDDIServiceCounter serviceCounter;
<span class="fc" id="L176"> private static PullTimerTask timer = null;</span>
private long startBuffer;
private long interval;
<span class="fc" id="L180"> private static UDDIPublicationImpl pub = null;</span>
public UDDIReplicationImpl() {
<span class="fc" id="L183"> super();</span>
try {
<span class="fc" id="L185"> this.interval = AppConfig.getConfiguration().getLong(Property.JUDDI_REPLICATION_INTERVAL, 5000L);</span>
<span class="fc" id="L186"> this.startBuffer = AppConfig.getConfiguration().getLong(Property.JUDDI_REPLICATION_START_BUFFER, 5000L);</span>
<span class="nc" id="L187"> } catch (Exception ex) {</span>
<span class="nc" id="L188"> logger.warn(&quot;Config error!&quot;, ex);</span>
<span class="fc" id="L189"> }</span>
<span class="fc" id="L191"> serviceCounter = ServiceCounterLifecycleResource.getServiceCounter(UDDIReplicationImpl.class);</span>
<span class="fc" id="L192"> init();</span>
<span class="fc" id="L194"> }</span>
private synchronized void init() {
<span class="fc bfc" id="L197" title="All 2 branches covered."> if (pub == null) {</span>
<span class="fc" id="L198"> pub = new UDDIPublicationImpl();</span>
}
<span class="fc bfc" id="L200" title="All 2 branches covered."> if (queue == null) {</span>
<span class="fc" id="L201"> queue = new ConcurrentLinkedQueue&lt;NotifyChangeRecordsAvailable&gt;();</span>
}
<span class="fc" id="L203"> timer = new PullTimerTask();</span>
<span class="fc" id="L205"> }</span>
/**
* handles when a remote node tells me that there's an update(s)
* available
*/
private class PullTimerTask extends TimerTask {
<span class="fc" id="L214"> private Timer timer = null;</span>
<span class="fc" id="L216"> public PullTimerTask() {</span>
<span class="fc" id="L217"> super();</span>
<span class="fc" id="L218"> timer = new Timer(true);</span>
<span class="fc" id="L219"> timer.scheduleAtFixedRate(this, startBuffer, interval);</span>
<span class="fc" id="L220"> }</span>
<span class="fc" id="L221"> boolean firstrun = true;</span>
@Override
public void run() {
<span class="nc bnc" id="L225" title="All 2 branches missed."> if (firstrun) {</span>
<span class="nc" id="L226"> enqueueAllReceivingNodes();</span>
<span class="nc" id="L227"> firstrun = false;</span>
}
<span class="nc bnc" id="L230" title="All 2 branches missed."> if (!queue.isEmpty()) {</span>
<span class="nc" id="L231"> logger.info(&quot;Replication change puller thread started. Queue size: &quot; + queue.size());</span>
}
//ok someone told me there's a change available
<span class="nc bnc" id="L234" title="All 2 branches missed."> while (!queue.isEmpty()) {</span>
<span class="nc" id="L235"> NotifyChangeRecordsAvailable poll = queue.poll();</span>
<span class="nc bnc" id="L236" title="All 4 branches missed."> if (poll != null &amp;&amp; !poll.getNotifyingNode().equalsIgnoreCase(getNode())) {</span>
<span class="nc" id="L237"> UDDIReplicationPortType replicationClient = getReplicationClient(poll.getNotifyingNode());</span>
<span class="nc bnc" id="L238" title="All 2 branches missed."> if (replicationClient == null) {</span>
<span class="nc" id="L239"> logger.fatal(&quot;unable to obtain a replication client to node &quot; + poll);</span>
} else {
try {
//get the high water marks for this node
//ok now get all the changes
//done replace with last known record from the given node
//for (int xx = 0; xx &lt; poll.getChangesAvailable().getHighWaterMark().size(); xx++) {
// logger.info(&quot;Node &quot; + poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID()
// + &quot; USN &quot; + poll.getChangesAvailable().getHighWaterMark().get(xx).getOriginatingUSN());
//}
<span class="nc" id="L250"> Set&lt;String&gt; nodesHitThisCycle = new HashSet&lt;String&gt;();</span>
<span class="nc bnc" id="L251" title="All 2 branches missed."> for (int xx = 0; xx &lt; poll.getChangesAvailable().getHighWaterMark().size(); xx++) {</span>
<span class="nc" id="L252"> int recordsreturned = 21;</span>
<span class="nc bnc" id="L253" title="All 2 branches missed."> while (recordsreturned &gt;= 20) {</span>
<span class="nc bnc" id="L254" title="All 2 branches missed."> if (nodesHitThisCycle.contains(poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID())) {</span>
<span class="nc" id="L255"> logger.info(&quot;i've already hit the node &quot; + poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID() + &quot; this cycle, skipping&quot;);</span>
<span class="nc" id="L256"> break;</span>
}
<span class="nc bnc" id="L258" title="All 2 branches missed."> if (poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID().equalsIgnoreCase(getNode())) {</span>
<span class="nc" id="L259"> logger.info(&quot;ignoring updates that were generated here &quot; + poll.getChangesAvailable().getHighWaterMark().get(xx).getOriginatingUSN() + &quot; sent by &quot; + poll.getNotifyingNode() + &quot; this node is &quot; + getNode());</span>
<span class="nc" id="L260"> break;</span>
}
<span class="nc" id="L262"> nodesHitThisCycle.add(poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID());</span>
<span class="nc" id="L263"> GetChangeRecords body = new GetChangeRecords();</span>
<span class="nc" id="L264"> body.setRequestingNode(getNode());</span>
<span class="nc" id="L265"> body.setResponseLimitCount(BigInteger.valueOf(100L));</span>
<span class="nc" id="L267"> body.setChangesAlreadySeen(getLastChangeRecordFrom(poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID()));</span>
<span class="nc" id="L268"> logger.info(&quot;fetching updates from &quot; + poll.getNotifyingNode() + &quot; since &quot; + body.getChangesAlreadySeen().getHighWaterMark().get(0).getNodeID() + &quot;:&quot; + body.getChangesAlreadySeen().getHighWaterMark().get(0).getOriginatingUSN() + &quot;, items still in the queue: &quot; + queue.size());</span>
//JAXB.marshal(body, System.out);
<span class="nc" id="L270"> List&lt;ChangeRecord&gt; records</span>
<span class="nc" id="L271"> = replicationClient.getChangeRecords(body).getChangeRecord();</span>
//ok now we need to persist the change records
<span class="nc" id="L273"> logger.info(&quot;Change records retrieved from &quot; + poll.getNotifyingNode() + &quot;, &quot; + records.size());</span>
<span class="nc bnc" id="L274" title="All 2 branches missed."> for (int i = 0; i &lt; records.size(); i++) {</span>
<span class="nc" id="L275"> logger.info(&quot;Change records retrieved &quot; + records.get(i).getChangeID().getNodeID() + &quot; USN &quot; + records.get(i).getChangeID().getOriginatingUSN());</span>
<span class="nc" id="L276"> persistChangeRecord(records.get(i));</span>
}
<span class="nc" id="L278"> recordsreturned = records.size();</span>
<span class="nc" id="L279"> }</span>
}
<span class="nc" id="L281"> } catch (Exception ex) {</span>
<span class="nc" id="L282"> logger.error(&quot;Error caught fetching replication changes from &quot; + poll + &quot; @&quot; + ((BindingProvider) replicationClient).getRequestContext().get(BindingProvider.ENDPOINT_ADDRESS_PROPERTY), ex);</span>
<span class="nc" id="L283"> }</span>
}
<span class="nc" id="L285"> } else {</span>
<span class="nc bnc" id="L286" title="All 2 branches missed."> if (poll == null) {</span>
<span class="nc" id="L287"> logger.warn(&quot;strange, popped a null object&quot;);</span>
<span class="nc bnc" id="L288" title="All 2 branches missed."> } else if (poll.getNotifyingNode().equalsIgnoreCase(getNode())) {</span>
<span class="nc" id="L289"> logger.warn(&quot;strange, popped an object from the queue but it was from myself. This probably indicates a configuration error! ignoring...first record: &quot; + poll.getChangesAvailable().getHighWaterMark().get(0).getNodeID()+&quot;:&quot; + poll.getChangesAvailable().getHighWaterMark().get(0).getOriginatingUSN());</span>
}
}
<span class="nc" id="L292"> }</span>
<span class="nc" id="L293"> }</span>
@Override
public boolean cancel() {
<span class="nc" id="L297"> timer.cancel();</span>
<span class="nc" id="L298"> return super.cancel();</span>
}
/**
* someone told me there's a change available, we retrieved it
* and are processing the changes locally.
*
* @param rec
*/
private void persistChangeRecord(ChangeRecord rec) {
<span class="nc bnc" id="L308" title="All 2 branches missed."> if (rec == null) {</span>
<span class="nc" id="L309"> return;</span>
}
<span class="nc" id="L311"> logger.debug(&quot;_______________________Remote change request &quot; + rec.getChangeID().getNodeID() + &quot;:&quot; + rec.getChangeID().getOriginatingUSN());</span>
<span class="nc bnc" id="L313" title="All 2 branches missed."> if (rec.getChangeID().getNodeID().equalsIgnoreCase(getNode())) {</span>
<span class="nc" id="L314"> logger.info(&quot;Just received a change record that i created, ignoring....&quot;);</span>
<span class="nc" id="L315"> return;</span>
}
<span class="nc" id="L317"> EntityManager em = PersistenceManager.getEntityManager();</span>
<span class="nc" id="L318"> EntityTransaction tx = em.getTransaction();</span>
<span class="nc" id="L319"> org.apache.juddi.model.ChangeRecord mapChangeRecord = null;</span>
/**
* In nodes that support pre-bundled replication
* responses, the recipient of the get_changeRecords
* message MAY return more change records than requested
* by the caller. In this scenario, the caller MUST also
* be prepared to deal with such redundant changes where
* a USN is less than the USN specified in the
* changesAlreadySeen highWaterMarkVector.
*/
try {
<span class="nc" id="L331"> tx.begin();</span>
//check to see if we have this update already
<span class="nc" id="L333"> Query createQuery = em.createQuery(&quot;select c from ChangeRecord c where c.nodeID=:node and c.originatingUSN=:oid&quot;);</span>
<span class="nc" id="L334"> createQuery.setParameter(&quot;node&quot;, rec.getChangeID().getNodeID());</span>
<span class="nc" id="L335"> createQuery.setParameter(&quot;oid&quot;, rec.getChangeID().getOriginatingUSN());</span>
<span class="nc" id="L336"> Object existingrecord = null;</span>
try {
<span class="nc" id="L338"> existingrecord = createQuery.getSingleResult();</span>
<span class="nc" id="L339"> } catch (Exception ex) {</span>
<span class="nc" id="L340"> logger.debug(&quot;error checking to see if change record exists already (expected failure)&quot;, ex);</span>
<span class="nc" id="L341"> }</span>
<span class="nc bnc" id="L342" title="All 2 branches missed."> if (existingrecord != null) {</span>
<span class="nc" id="L343"> logger.info(&quot;I've already processed change record &quot; + rec.getChangeID().getNodeID() + &quot; &quot; + rec.getChangeID().getOriginatingUSN());</span>
<span class="nc" id="L344"> return;</span>
}
//if it didn't come from here and i haven't seen it yet
<span class="nc" id="L347"> ReplicationNotifier.EnqueueRetransmit(rec);</span>
//the remotechange record rec must also be persisted!!
<span class="nc" id="L349"> mapChangeRecord = MappingApiToModel.mapChangeRecord(rec);</span>
<span class="nc" id="L350"> mapChangeRecord.setId(null);</span>
<span class="nc" id="L351"> mapChangeRecord.setIsAppliedLocally(true);</span>
<span class="nc" id="L352"> em.persist(mapChangeRecord);</span>
<span class="nc" id="L353"> tx.commit();</span>
<span class="nc" id="L354"> logger.info(&quot;Remote CR saved, it was from &quot; + mapChangeRecord.getNodeID() //this is the origin of the change</span>
<span class="nc" id="L355"> + &quot; USN:&quot; + mapChangeRecord.getOriginatingUSN()</span>
<span class="nc" id="L356"> + &quot; Type:&quot; + mapChangeRecord.getRecordType().name()</span>
<span class="nc" id="L357"> + &quot; Key:&quot; + mapChangeRecord.getEntityKey()</span>
<span class="nc" id="L358"> + &quot; Local id from sender:&quot; + mapChangeRecord.getId());</span>
<span class="nc" id="L359"> tx = em.getTransaction();</span>
<span class="nc" id="L360"> tx.begin();</span>
//&lt;editor-fold defaultstate=&quot;collapsed&quot; desc=&quot;delete a record&quot;&gt;
<span class="nc bnc" id="L363" title="All 2 branches missed."> if (rec.getChangeRecordDelete() != null) {</span>
<span class="nc bnc" id="L364" title="All 6 branches missed."> if (rec.getChangeRecordDelete() != null &amp;&amp; rec.getChangeRecordDelete().getBindingKey() != null &amp;&amp; !&quot;&quot;.equalsIgnoreCase(rec.getChangeRecordDelete().getBindingKey())) {</span>
//delete a binding template
<span class="nc" id="L366"> UddiEntity ue = em.find(BindingTemplate.class, rec.getChangeRecordDelete().getBindingKey());</span>
<span class="nc" id="L367"> validateNodeIdMisMatches(ue, getNode());</span>
<span class="nc" id="L368"> pub.deleteBinding(rec.getChangeRecordDelete().getBindingKey(), em);</span>
}
<span class="nc bnc" id="L370" title="All 6 branches missed."> if (rec.getChangeRecordDelete() != null &amp;&amp; rec.getChangeRecordDelete().getBusinessKey() != null &amp;&amp; !&quot;&quot;.equalsIgnoreCase(rec.getChangeRecordDelete().getBusinessKey())) {</span>
//delete a business
<span class="nc" id="L372"> UddiEntity ue = em.find(BusinessEntity.class, rec.getChangeRecordDelete().getBusinessKey());</span>
<span class="nc" id="L373"> validateNodeIdMisMatches(ue, getNode());</span>
<span class="nc" id="L374"> pub.deleteBusiness(rec.getChangeRecordDelete().getBusinessKey(), em);</span>
}
<span class="nc bnc" id="L376" title="All 6 branches missed."> if (rec.getChangeRecordDelete() != null &amp;&amp; rec.getChangeRecordDelete().getServiceKey() != null &amp;&amp; !&quot;&quot;.equalsIgnoreCase(rec.getChangeRecordDelete().getServiceKey())) {</span>
<span class="nc" id="L377"> UddiEntity ue = em.find(BusinessService.class, rec.getChangeRecordDelete().getServiceKey());</span>
<span class="nc" id="L378"> validateNodeIdMisMatches(ue, getNode());</span>
//delete a service
<span class="nc" id="L380"> pub.deleteService(rec.getChangeRecordDelete().getServiceKey(), em);</span>
}
<span class="nc bnc" id="L382" title="All 6 branches missed."> if (rec.getChangeRecordDelete() != null &amp;&amp; rec.getChangeRecordDelete().getTModelKey() != null &amp;&amp; !&quot;&quot;.equalsIgnoreCase(rec.getChangeRecordDelete().getTModelKey())) {</span>
//delete a tmodel
/**
* The changeRecordDelete for a
* tModel does not correspond to
* any API described in this
* specification and should only
* appear in the replication
* stream as the result of an
* administrative function to
* permanently remove a tModel.
*/
<span class="nc" id="L394"> UddiEntity tm = em.find(Tmodel.class, rec.getChangeRecordDelete().getTModelKey());</span>
<span class="nc bnc" id="L395" title="All 2 branches missed."> if (tm != null) {</span>
<span class="nc" id="L396"> validateNodeIdMisMatches(tm, getNode());</span>
<span class="nc" id="L397"> em.remove(tm);</span>
} else {
<span class="nc" id="L399"> logger.error(&quot;failed to adminstratively delete tmodel because it doesn't exist. &quot; + rec.getChangeRecordDelete().getTModelKey());</span>
}
//pub.deleteTModel(rec.getChangeRecordDelete().getTModelKey(), em);
}
}
<span class="nc bnc" id="L404" title="All 4 branches missed."> if (rec.getChangeRecordDeleteAssertion() != null &amp;&amp; rec.getChangeRecordDeleteAssertion().getPublisherAssertion() != null) {</span>
//delete a pa template
<span class="nc" id="L406"> pub.deletePublisherAssertion(rec.getChangeRecordDeleteAssertion(), em);</span>
}
//&lt;/editor-fold&gt;
//&lt;editor-fold defaultstate=&quot;collapsed&quot; desc=&quot;New Data&quot;&gt;
<span class="nc bnc" id="L411" title="All 2 branches missed."> if (rec.getChangeRecordNewData() != null) {</span>
//The operationalInfo element MUST contain the operational information associated with the indicated new data.
<span class="nc bnc" id="L414" title="All 2 branches missed."> if (rec.getChangeRecordNewData().getOperationalInfo() == null) {</span>
<span class="nc" id="L415"> logger.warn(&quot;Inbound replication data does not have the required OperationalInfo element and is NOT spec compliant. Data will be ignored&quot;);</span>
} else {
<span class="nc bnc" id="L417" title="All 2 branches missed."> if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID() == null) {</span>
<span class="nc" id="L418"> throw new Exception(&quot;Inbound replication data is missiong node id! Change will not be applied&quot;);</span>
}
<span class="nc bnc" id="L420" title="All 2 branches missed."> if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equalsIgnoreCase(getNode())) {</span>
<span class="nc" id="L421"> logger.warn(&quot;Inbound replication data is modifying locally owned data. This is not allowed, except for custody transfer&quot;);</span>
}
<span class="nc bnc" id="L423" title="All 2 branches missed."> if (rec.getChangeRecordNewData().getBindingTemplate() != null) {</span>
//fetch the binding template if it exists already
//if it exists,
// confirm the owning node, it shouldn't be the local node id, if it is, throw
// the owning node should be the same as it was before
<span class="nc" id="L429"> BusinessService model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewData().getBindingTemplate().getServiceKey());</span>
<span class="nc bnc" id="L430" title="All 2 branches missed."> if (model == null) {</span>
<span class="nc" id="L431"> logger.error(&quot;Replication error, attempting to insert a binding where the service doesn't exist yet&quot;);</span>
} else {
<span class="nc" id="L433"> validateNodeIdMatches(rec.getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());</span>
<span class="nc" id="L435"> org.apache.juddi.model.BindingTemplate bt = em.find(org.apache.juddi.model.BindingTemplate.class, rec.getChangeRecordNewData().getBindingTemplate().getBindingKey());</span>
<span class="nc bnc" id="L436" title="All 2 branches missed."> if (bt != null) {</span>
//ValidateNodeIdMatches(node, bt.getNodeId());
<span class="nc" id="L438"> em.remove(bt);</span>
}
<span class="nc" id="L440"> bt = new BindingTemplate();</span>
<span class="nc" id="L441"> MappingApiToModel.mapBindingTemplate(rec.getChangeRecordNewData().getBindingTemplate(), bt, model);</span>
<span class="nc" id="L442"> MappingApiToModel.mapOperationalInfo(bt, rec.getChangeRecordNewData().getOperationalInfo());</span>
// MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());
<span class="nc" id="L444"> em.persist(bt);</span>
}
<span class="nc bnc" id="L447" title="All 2 branches missed."> } else if (rec.getChangeRecordNewData().getBusinessEntity() != null) {</span>
<span class="nc" id="L449"> BusinessEntity model = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessEntity().getBusinessKey());</span>
<span class="nc bnc" id="L450" title="All 2 branches missed."> if (model != null) {</span>
//if the owner of the new data is me, and the update didn't originate from me
<span class="nc bnc" id="L452" title="All 2 branches missed."> if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())</span>
<span class="nc bnc" id="L453" title="All 2 branches missed."> &amp;&amp; !model.getNodeId().equals(getNode())) {</span>
<span class="nc bnc" id="L454" title="All 2 branches missed."> if (model.getIsTransferInProgress()) {</span>
//allow the transfer
<span class="nc" id="L456"> MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model);</span>
<span class="nc" id="L457"> MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());</span>
<span class="nc" id="L458"> MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());</span>
<span class="nc" id="L459"> model.setIsTransferInProgress(false);</span>
<span class="nc" id="L460"> em.merge(model);</span>
} else {
//block it, unexpected transfer
<span class="nc" id="L463"> throw new Exception(&quot;Unexpected entity transfer to to node &quot; + getNode() + &quot; from &quot; + rec.getChangeID().getNodeID());</span>
}
<span class="nc bnc" id="L466" title="All 2 branches missed."> } else if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())</span>
<span class="nc bnc" id="L467" title="All 2 branches missed."> &amp;&amp; model.getNodeId().equals(getNode())) {</span>
//if destination is here and it's staying here, then this is strange also
//someone else updated one of my records
<span class="nc" id="L470"> throw new Exception(&quot;unexpected modification of records that this server owns, &quot; + model.getEntityKey());</span>
<span class="nc bnc" id="L471" title="All 2 branches missed."> } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())</span>
<span class="nc bnc" id="L472" title="All 2 branches missed."> &amp;&amp; model.getNodeId().equals(getNode())) {</span>
//this is also strange, destination is elsewhere however it's owned by me.
<span class="nc" id="L474"> throw new Exception(&quot;unexpected transfer from this node to elsewhere, possible that the key in question exists at two places prior to replication sync, &quot; + model.getEntityKey());</span>
<span class="nc bnc" id="L476" title="All 2 branches missed."> } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())</span>
<span class="nc bnc" id="L477" title="All 2 branches missed."> &amp;&amp; !model.getNodeId().equals(getNode())) {</span>
//changes on a remote node, for an existing item
<span class="nc" id="L479"> MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model);</span>
<span class="nc" id="L480"> MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());</span>
<span class="nc" id="L481"> em.merge(model);</span>
}
} else {
<span class="nc" id="L486"> model = new BusinessEntity();</span>
<span class="nc" id="L487"> MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model);</span>
<span class="nc" id="L488"> MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());</span>
<span class="nc" id="L489"> em.persist(model);</span>
}
}
<span class="nc bnc" id="L492" title="All 2 branches missed."> if (rec.getChangeRecordNewData().getBusinessService() != null) {</span>
<span class="nc" id="L493"> BusinessEntity find = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessService().getBusinessKey());</span>
<span class="nc bnc" id="L494" title="All 2 branches missed."> if (find == null) {</span>
<span class="nc" id="L495"> logger.error(&quot;Replication error, attempting to insert a service where the business doesn't exist yet&quot;);</span>
} else {
<span class="nc" id="L498"> org.apache.juddi.model.BusinessService model = null;</span>
<span class="nc" id="L499"> model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewData().getBusinessService().getServiceKey());</span>
<span class="nc bnc" id="L500" title="All 2 branches missed."> if (model != null) {</span>
<span class="nc" id="L501"> validateNodeIdMatches(rec.getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());</span>
<span class="nc" id="L502"> em.remove(model);</span>
}
<span class="nc" id="L505"> model = new org.apache.juddi.model.BusinessService();</span>
<span class="nc" id="L506"> MappingApiToModel.mapBusinessService(rec.getChangeRecordNewData().getBusinessService(), model, find);</span>
<span class="nc" id="L507"> MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());</span>
<span class="nc" id="L508"> MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());</span>
<span class="nc" id="L510"> em.persist(model);</span>
}
<span class="nc bnc" id="L513" title="All 2 branches missed."> } else if (rec.getChangeRecordNewData().getTModel() != null) {</span>
<span class="nc" id="L515"> Tmodel model = em.find(org.apache.juddi.model.Tmodel.class, rec.getChangeRecordNewData().getTModel().getTModelKey());</span>
<span class="nc bnc" id="L516" title="All 2 branches missed."> if (model != null) {</span>
//in the case of a transfer
//if the new entity is being transfer to ME, accept and i didn't previously own it, but only if the local record is flagged as transferable
//meaning, only accept if i'm expecting a transfer
<span class="nc bnc" id="L520" title="All 2 branches missed."> if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())</span>
<span class="nc bnc" id="L521" title="All 2 branches missed."> &amp;&amp; !model.getNodeId().equals(getNode())) {</span>
<span class="nc bnc" id="L522" title="All 2 branches missed."> if (model.getIsTransferInProgress()) {</span>
//allow the transfer
<span class="nc" id="L524"> em.remove(model);</span>
<span class="nc" id="L525"> model = new Tmodel();</span>
<span class="nc" id="L526"> MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);</span>
<span class="nc" id="L527"> MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());</span>
<span class="nc" id="L528"> model.setIsTransferInProgress(false);</span>
<span class="nc" id="L529"> em.persist(model);</span>
} else {
//block it, unexpected transfer
<span class="nc" id="L532"> throw new Exception(&quot;Unexpected entity transfer to this node from &quot; + rec.getChangeID().getNodeID());</span>
}
<span class="nc bnc" id="L535" title="All 2 branches missed."> } else if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())</span>
<span class="nc bnc" id="L536" title="All 2 branches missed."> &amp;&amp; model.getNodeId().equals(getNode())) {</span>
//if destination is here and it's staying here, then this is strange also
//someone else updated one of my records
<span class="nc" id="L539"> throw new Exception(&quot;unexpected modification of records that this server owns, &quot; + model.getEntityKey());</span>
<span class="nc bnc" id="L540" title="All 2 branches missed."> } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())</span>
<span class="nc bnc" id="L541" title="All 2 branches missed."> &amp;&amp; model.getNodeId().equals(getNode())) {</span>
//this is also strange, destination is elsewhere however it's owned by me.
<span class="nc" id="L543"> throw new Exception(&quot;unexpected transfer from this node to elsewhere, possible that the key in question exists at two places prior to replication sync, &quot; + model.getEntityKey());</span>
<span class="nc bnc" id="L545" title="All 2 branches missed."> } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())</span>
<span class="nc bnc" id="L546" title="All 2 branches missed."> &amp;&amp; !model.getNodeId().equals(getNode())) {</span>
//changes on a remote node, for an existing item
<span class="nc" id="L548"> em.remove(model);</span>
<span class="nc" id="L549"> model = new Tmodel();</span>
<span class="nc" id="L550"> MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);</span>
<span class="nc" id="L552"> MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());</span>
<span class="nc" id="L554"> em.persist(model);</span>
}
} else {
<span class="nc" id="L558"> model = new Tmodel();</span>
<span class="nc" id="L559"> MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);</span>
<span class="nc" id="L561"> MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());</span>
<span class="nc" id="L563"> em.persist(model);</span>
}
}
}
}
//&lt;/editor-fold&gt;
// changeRecordNull no action needed
// changeRecordHide tmodel only
//&lt;editor-fold defaultstate=&quot;collapsed&quot; desc=&quot;hide tmodel&quot;&gt;
<span class="nc bnc" id="L575" title="All 2 branches missed."> if (rec.getChangeRecordHide() != null) {</span>
/*
A changeRecordHide element corresponds to the behavior of hiding a tModel described in the delete_tModel in the Publish API section of this Specification. A tModel listed in a changeRecordHide should be marked as hidden, so that it is not returned in response to a find_tModel API call.
The changeRecordHide MUST contain a modified timestamp to allow multi-node registries to calculate consistent modifiedIncludingChildren timestamps as described in Section 3.8 operationalInfo Structure.
*/
<span class="nc" id="L581"> String key = rec.getChangeRecordHide().getTModelKey();</span>
<span class="nc" id="L582"> org.apache.juddi.model.Tmodel existing = em.find(org.apache.juddi.model.Tmodel.class, key);</span>
<span class="nc bnc" id="L583" title="All 2 branches missed."> if (existing == null) {</span>
<span class="nc" id="L584"> logger.error(&quot;Unexpected delete/hide tmodel message received for non existing key &quot; + key);</span>
} else {
//no one else can delete/hide my tmodel
<span class="nc" id="L587"> validateNodeIdMisMatches(existing, getNode());</span>
<span class="nc" id="L588"> existing.setDeleted(true);</span>
<span class="nc" id="L589"> existing.setModified(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime());</span>
<span class="nc" id="L590"> existing.setModifiedIncludingChildren(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime());</span>
<span class="nc" id="L591"> em.persist(existing);</span>
}
}
//&lt;/editor-fold&gt;
//&lt;editor-fold defaultstate=&quot;collapsed&quot; desc=&quot;changeRecordPublisherAssertion&quot;&gt;
<span class="nc bnc" id="L597" title="All 2 branches missed."> if (rec.getChangeRecordPublisherAssertion() != null) {</span>
<span class="nc" id="L599"> logger.info(&quot;Repl CR Publisher Assertion&quot;);</span>
//TODO are publisher assertions owned by a given node?
<span class="nc" id="L601"> PublisherAssertionId paid = new PublisherAssertionId(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getFromKey(), rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getToKey());</span>
<span class="nc" id="L602"> org.apache.juddi.model.PublisherAssertion model = em.find(org.apache.juddi.model.PublisherAssertion.class, paid);</span>
<span class="nc bnc" id="L603" title="All 2 branches missed."> if (model != null) {</span>
<span class="nc" id="L604"> logger.info(&quot;Repl CR Publisher Assertion - Existing&quot;);</span>
<span class="nc bnc" id="L606" title="All 2 branches missed."> if (rec.getChangeRecordPublisherAssertion().isFromBusinessCheck()) {</span>
<span class="nc" id="L607"> model.setFromCheck(&quot;true&quot;);</span>
} else {
<span class="nc" id="L609"> model.setFromCheck(&quot;false&quot;);</span>
}
<span class="nc bnc" id="L612" title="All 2 branches missed."> if (rec.getChangeRecordPublisherAssertion().isToBusinessCheck()) {</span>
<span class="nc" id="L613"> model.setToCheck(&quot;true&quot;);</span>
} else {
<span class="nc" id="L615"> model.setToCheck(&quot;false&quot;);</span>
}
<span class="nc" id="L618"> model.setKeyName(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getKeyedReference().getKeyName());</span>
<span class="nc" id="L619"> model.setKeyValue(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getKeyedReference().getKeyValue());</span>
<span class="nc" id="L620"> model.setTmodelKey(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getKeyedReference().getTModelKey());</span>
<span class="nc" id="L621"> model.setModified(rec.getChangeRecordPublisherAssertion().getModified().toGregorianCalendar().getTime());</span>
//model.setSignatures(MappingApiToModel.mapApiSignaturesToModelSignatures(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getSignature()));
<span class="nc bnc" id="L623" title="All 2 branches missed."> if (&quot;false&quot;.equalsIgnoreCase(model.getFromCheck())</span>
<span class="nc bnc" id="L624" title="All 2 branches missed."> &amp;&amp; &quot;false&quot;.equalsIgnoreCase(model.getToCheck())) {</span>
<span class="nc" id="L625"> logger.warn(&quot;!!!New publisher assertion is both false and false, strange. no need to save it then!&quot;);</span>
<span class="nc" id="L626"> em.remove(model);</span>
}
<span class="nc" id="L628"> em.merge(model);</span>
} else {
<span class="nc" id="L630"> logger.info(&quot;Repl CR Publisher Assertion - new PA&quot;);</span>
<span class="nc" id="L632"> model = new PublisherAssertion();</span>
<span class="nc" id="L633"> MappingApiToModel.mapPublisherAssertion(rec.getChangeRecordPublisherAssertion().getPublisherAssertion(), model);</span>
<span class="nc" id="L634"> model.setBusinessEntityByFromKey(null);</span>
<span class="nc" id="L635"> model.setBusinessEntityByToKey(null);</span>
<span class="nc" id="L636"> model.setBusinessEntityByFromKey(em.find(BusinessEntity.class, rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getFromKey()));</span>
<span class="nc" id="L637"> model.setBusinessEntityByToKey(em.find(BusinessEntity.class, rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getToKey()));</span>
<span class="nc bnc" id="L639" title="All 2 branches missed."> if (rec.getChangeRecordPublisherAssertion().isFromBusinessCheck()) {</span>
<span class="nc" id="L640"> model.setFromCheck(&quot;true&quot;);</span>
} else {
<span class="nc" id="L642"> model.setFromCheck(&quot;false&quot;);</span>
}
<span class="nc bnc" id="L645" title="All 2 branches missed."> if (rec.getChangeRecordPublisherAssertion().isToBusinessCheck()) {</span>
<span class="nc" id="L646"> model.setToCheck(&quot;true&quot;);</span>
} else {
<span class="nc" id="L648"> model.setToCheck(&quot;false&quot;);</span>
}
<span class="nc" id="L650"> model.setModified(rec.getChangeRecordPublisherAssertion().getModified().toGregorianCalendar().getTime());</span>
<span class="nc" id="L651"> em.persist(model);</span>
}
}
//&lt;/editor-fold&gt;
<span class="nc bnc" id="L656" title="All 2 branches missed."> if (rec.isAcknowledgementRequested()) {</span>
<span class="nc" id="L657"> ChangeRecord posack = new ChangeRecord();</span>
<span class="nc" id="L658"> posack.setChangeRecordAcknowledgement(new ChangeRecordAcknowledgement());</span>
<span class="nc" id="L659"> posack.getChangeRecordAcknowledgement().setAcknowledgedChange(rec.getChangeID());</span>
<span class="nc" id="L660"> posack.setAcknowledgementRequested(false);</span>
<span class="nc" id="L661"> ReplicationNotifier.enqueue(MappingApiToModel.mapChangeRecord(posack));</span>
}
<span class="nc bnc" id="L663" title="All 2 branches missed."> if (rec.getChangeRecordNewDataConditional() != null) {</span>
<span class="nc bnc" id="L665" title="All 2 branches missed."> if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID() == null) {</span>
<span class="nc" id="L666"> throw new Exception(&quot;Inbound replication data is missiong node id!&quot;);</span>
}
//The operationalInfo element MUST contain the operational information associated with the indicated new data.
<span class="nc bnc" id="L670" title="All 2 branches missed."> if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo() == null) {</span>
<span class="nc" id="L671"> logger.warn(&quot;Inbound replication data does not have the required OperationalInfo element and is NOT spec compliant. Data will be ignored&quot;);</span>
} else {
<span class="nc bnc" id="L673" title="All 2 branches missed."> if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBindingTemplate() != null) {</span>
//fetch the binding template if it exists already
//if it exists,
// confirm the owning node, it shouldn't be the local node id, if it is, throw
// the owning node should be the same as it was before
<span class="nc" id="L679"> BusinessService model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBindingTemplate().getServiceKey());</span>
<span class="nc bnc" id="L680" title="All 2 branches missed."> if (model == null) {</span>
<span class="nc" id="L681"> logger.error(&quot;Replication error, attempting to insert a binding where the service doesn't exist yet&quot;);</span>
} else {
<span class="nc" id="L684"> org.apache.juddi.model.BindingTemplate bt = em.find(org.apache.juddi.model.BindingTemplate.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBindingTemplate().getBindingKey());</span>
<span class="nc bnc" id="L685" title="All 2 branches missed."> if (bt != null) {</span>
<span class="nc" id="L686"> validateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), bt.getNodeId());</span>
<span class="nc" id="L688"> em.remove(bt);</span>
}
<span class="nc" id="L690"> bt = new BindingTemplate();</span>
<span class="nc" id="L691"> MappingApiToModel.mapBindingTemplate(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBindingTemplate(), bt, model);</span>
<span class="nc" id="L692"> MappingApiToModel.mapOperationalInfo(bt, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());</span>
// MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
<span class="nc" id="L694"> em.persist(bt);</span>
}
<span class="nc bnc" id="L697" title="All 2 branches missed."> } else if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessEntity() != null) {</span>
<span class="nc" id="L699"> BusinessEntity model = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessEntity().getBusinessKey());</span>
<span class="nc bnc" id="L700" title="All 2 branches missed."> if (model != null) {</span>
<span class="nc" id="L701"> validateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());</span>
//TODO revisit access control rules
<span class="nc" id="L703"> em.remove(model);</span>
}
<span class="nc" id="L705"> model = new BusinessEntity();</span>
<span class="nc" id="L706"> MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessEntity(), model);</span>
// MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
<span class="nc" id="L709"> MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());</span>
<span class="nc" id="L710"> logger.warn(&quot;Name size on save is &quot; + model.getBusinessNames().size());</span>
<span class="nc" id="L711"> em.persist(model);</span>
}
<span class="nc bnc" id="L714" title="All 2 branches missed."> if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService() != null) {</span>
<span class="nc" id="L715"> BusinessEntity find = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService().getBusinessKey());</span>
<span class="nc bnc" id="L716" title="All 2 branches missed."> if (find == null) {</span>
<span class="nc" id="L717"> logger.error(&quot;Replication error, attempting to insert a service where the business doesn't exist yet&quot;);</span>
} else {
<span class="nc" id="L720"> org.apache.juddi.model.BusinessService model = null;</span>
<span class="nc" id="L721"> model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService().getServiceKey());</span>
<span class="nc bnc" id="L722" title="All 2 branches missed."> if (model != null) {</span>
<span class="nc" id="L723"> validateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());</span>
<span class="nc" id="L724"> em.remove(model);</span>
}
<span class="nc" id="L727"> model = new org.apache.juddi.model.BusinessService();</span>
<span class="nc" id="L728"> MappingApiToModel.mapBusinessService(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService(), model, find);</span>
<span class="nc" id="L729"> MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());</span>
<span class="nc" id="L730"> MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());</span>
<span class="nc" id="L732"> em.persist(model);</span>
}
<span class="nc bnc" id="L735" title="All 2 branches missed."> } else if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getTModel() != null) {</span>
<span class="nc" id="L737"> Tmodel model = em.find(org.apache.juddi.model.Tmodel.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getTModel().getTModelKey());</span>
<span class="nc bnc" id="L738" title="All 2 branches missed."> if (model != null) {</span>
<span class="nc" id="L739"> validateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());</span>
<span class="nc" id="L740"> em.remove(model);</span>
}
<span class="nc" id="L742"> model = new Tmodel();</span>
<span class="nc" id="L743"> MappingApiToModel.mapTModel(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getTModel(), model);</span>
<span class="nc" id="L745"> MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());</span>
<span class="nc" id="L747"> em.persist(model);</span>
}
}
}
<span class="nc bnc" id="L753" title="All 2 branches missed."> if (rec.getChangeRecordNull() != null) {</span>
//No action required
}
<span class="nc bnc" id="L757" title="All 2 branches missed."> if (rec.getChangeRecordCorrection() != null) {</span>
//TODO implement
}
<span class="nc bnc" id="L761" title="All 2 branches missed."> if (rec.getChangeRecordConditionFailed() != null) {</span>
//TODO implement
}
<span class="nc" id="L765"> tx.commit();</span>
<span class="nc" id="L767"> } catch (Exception drfm) {</span>
<span class="nc" id="L769"> logger.warn(&quot;Error applying the change record! &quot;, drfm);</span>
<span class="nc" id="L770"> StringWriter sw = new StringWriter();</span>
<span class="nc" id="L771"> JAXB.marshal(rec, sw);</span>
<span class="nc" id="L772"> logger.warn(&quot;This is the record that failed to persist: &quot; + sw.toString());</span>
<span class="nc bnc" id="L773" title="All 2 branches missed."> if (tx.isActive()) {</span>
<span class="nc" id="L774"> tx.rollback();</span>
}
<span class="nc bnc" id="L776" title="All 2 branches missed."> if (mapChangeRecord != null) {</span>
//set the change record's isApplied to false
try {
<span class="nc" id="L779"> tx = em.getTransaction();</span>
<span class="nc" id="L780"> tx.begin();</span>
<span class="nc" id="L781"> mapChangeRecord.setIsAppliedLocally(false);</span>
<span class="nc" id="L782"> em.merge(mapChangeRecord);</span>
<span class="nc" id="L783"> tx.commit();</span>
<span class="nc" id="L784"> } catch (Exception e) {</span>
<span class="nc" id="L785"> logger.error(&quot;error updating change record!!&quot;, e);</span>
<span class="nc bnc" id="L786" title="All 2 branches missed."> if (tx.isActive()) {</span>
<span class="nc" id="L787"> tx.rollback();</span>
}
<span class="nc" id="L789"> }</span>
} else {
<span class="nc" id="L791"> logger.fatal(&quot;whoa! change record is null when saving a remote change record, this is unexpected and should be reported&quot;);</span>
}
} finally {
<span class="nc bnc" id="L794" title="All 8 branches missed."> if (tx.isActive()) {</span>
<span class="nc" id="L795"> tx.rollback();</span>
}
<span class="nc" id="L797"> em.close();</span>
<span class="nc" id="L798"> }</span>
<span class="nc" id="L799"> }</span>
private HighWaterMarkVectorType getLastChangeRecordFrom(String sourcenode) {
<span class="nc" id="L802"> HighWaterMarkVectorType ret = new HighWaterMarkVectorType();</span>
<span class="nc" id="L803"> ChangeRecordIDType cid = new ChangeRecordIDType();</span>
<span class="nc" id="L804"> cid.setNodeID(sourcenode);</span>
<span class="nc" id="L805"> cid.setOriginatingUSN(0L);</span>
<span class="nc" id="L806"> EntityManager em = PersistenceManager.getEntityManager();</span>
<span class="nc" id="L807"> EntityTransaction tx = em.getTransaction();</span>
try {
<span class="nc" id="L809"> tx.begin();</span>
//Long id = 0L;
try {
<span class="nc" id="L812"> cid.setOriginatingUSN((Long) em.createQuery(&quot;select MAX(e.originatingUSN) from ChangeRecord e where e.nodeID = :node&quot;)</span>
<span class="nc" id="L813"> .setParameter(&quot;node&quot;, sourcenode)</span>
<span class="nc" id="L814"> .getSingleResult());</span>
<span class="nc" id="L815"> } catch (Exception ex) {</span>
<span class="nc" id="L816"> logger.info(&quot;unexpected error searching for last record from &quot; + sourcenode, ex);</span>
<span class="nc" id="L817"> }</span>
<span class="nc" id="L819"> tx.rollback();</span>
<span class="nc" id="L821"> } catch (Exception drfm) {</span>
<span class="nc" id="L822"> logger.warn(&quot;error caught fetching newest record from node &quot; + sourcenode, drfm);</span>
} finally {
<span class="nc bnc" id="L824" title="All 6 branches missed."> if (tx.isActive()) {</span>
<span class="nc" id="L825"> tx.rollback();</span>
}
<span class="nc" id="L827"> em.close();</span>
<span class="nc" id="L828"> }</span>
<span class="nc" id="L829"> logger.info(&quot;Highest known record for &quot; + sourcenode + &quot; is &quot; + cid.getOriginatingUSN());</span>
<span class="nc" id="L830"> ret.getHighWaterMark().add(cid);</span>
<span class="nc" id="L832"> return ret;</span>
}
private void enqueueAllReceivingNodes() {
<span class="nc bnc" id="L836" title="All 2 branches missed."> if (queue == null) {</span>
<span class="nc" id="L837"> queue = new ConcurrentLinkedQueue&lt;NotifyChangeRecordsAvailable&gt;();</span>
}
//get the replication config
//get everyone we are expecting to receive data from, then enqueue them for pulling
<span class="nc" id="L841"> ReplicationConfiguration repcfg = ReplicationNotifier.FetchEdges();</span>
<span class="nc bnc" id="L842" title="All 2 branches missed."> if (repcfg == null) {</span>
<span class="nc" id="L843"> return;</span>
}
<span class="nc" id="L845"> Set&lt;String&gt; allnodes = new HashSet&lt;String&gt;();</span>
<span class="nc bnc" id="L846" title="All 2 branches missed."> for (int i = 0; i &lt; repcfg.getOperator().size(); i++) {</span>
<span class="nc" id="L847"> allnodes.add(repcfg.getOperator().get(i).getOperatorNodeID());</span>
}
<span class="nc" id="L849"> Set&lt;String&gt; receivers = new HashSet&lt;String&gt;();</span>
<span class="nc bnc" id="L850" title="All 2 branches missed."> if (repcfg.getCommunicationGraph() == null</span>
<span class="nc bnc" id="L851" title="All 2 branches missed."> || repcfg.getCommunicationGraph().getEdge().isEmpty()) {</span>
//no edges or graph defined, default to the operator list
<span class="nc bnc" id="L853" title="All 2 branches missed."> for (org.uddi.repl_v3.Operator o : repcfg.getOperator()) {</span>
//no need to tell myself about a change at myself
<span class="nc bnc" id="L855" title="All 2 branches missed."> if (!o.getOperatorNodeID().equalsIgnoreCase(getNode())) {</span>
<span class="nc" id="L856"> receivers.add(o.getOperatorNodeID());</span>
}
<span class="nc" id="L858"> }</span>
} else {
//repcfg.getCommunicationGraph()
<span class="nc" id="L861"> Iterator&lt;Edge&gt; iterator = repcfg.getCommunicationGraph().getEdge().iterator();</span>
<span class="nc bnc" id="L862" title="All 2 branches missed."> while (iterator.hasNext()) {</span>
<span class="nc" id="L863"> Edge next = iterator.next();</span>
<span class="nc bnc" id="L865" title="All 2 branches missed."> if (next.getMessageReceiver().equalsIgnoreCase(getNode())) {</span>
<span class="nc" id="L866"> receivers.add(next.getMessageSender());</span>
}
<span class="nc" id="L869"> }</span>
}
<span class="nc bnc" id="L872" title="All 2 branches missed."> for (String s : receivers) {</span>
//this is a list of nodes that this node is expecting updates from
//here are we ticking the notification engine to ping the remove service for updates
<span class="nc bnc" id="L875" title="All 2 branches missed."> for (String nodeping : allnodes) {</span>
<span class="nc" id="L876"> queue.add(new NotifyChangeRecordsAvailable(s, getLastChangeRecordFrom(nodeping)));</span>
//for each node we are expecting data from, go fetch it, along the way, we'll request all data for all nodes
//that we know about
<span class="nc" id="L879"> }</span>
<span class="nc" id="L881"> }</span>
<span class="nc" id="L882"> }</span>
}
/**
* used to check for alterations on *this node's data from another node,
* which isn't allowed
*
* @param ue
* @param node
* @throws Exception
*/
private static void validateNodeIdMisMatches(UddiEntity ue, String node) throws Exception {
<span class="nc bnc" id="L895" title="All 2 branches missed."> if (ue == null) {</span>
<span class="nc" id="L896"> return;//object doesn't exist</span>
}
<span class="nc bnc" id="L898" title="All 2 branches missed."> if (ue.getNodeId().equals(node)) {</span>
<span class="nc" id="L899"> throw new Exception(&quot;Alert! attempt to alter locally owned entity &quot; + ue.getEntityKey() + &quot; owned by &quot; + ue.getAuthorizedName() + &quot;@&quot; + ue.getNodeId());</span>
}
<span class="nc" id="L901"> }</span>
/**
* use to validate that changed data maintained ownership, except for
* business entities and tmodels since they allow transfer
*
* @param newNodeId
* @param currentOwningNode
* @throws Exception
*/
private void validateNodeIdMatches(String newNodeId, String currentOwningNode) throws Exception {
<span class="nc bnc" id="L912" title="All 4 branches missed."> if (newNodeId == null || currentOwningNode == null) {</span>
<span class="nc" id="L913"> throw new Exception(&quot;either the local node ID is null or the inbound replication data's node id is null&quot;);</span>
}
//only time this is allowed is custody transfer
<span class="nc bnc" id="L916" title="All 2 branches missed."> if (!newNodeId.equals(currentOwningNode)) {</span>
<span class="nc" id="L917"> logger.info(&quot;AUDIT, custody transfer from node, &quot; + currentOwningNode + &quot; to &quot; + newNodeId + &quot; current node is &quot; + getNode());</span>
//throw new Exception(&quot;node id mismatch!&quot;);
}
//if i already have a record and &quot;own it&quot; and the remote node has a record with the same key, reject the update
//1.5.8
/**
* Each node has custody of a portion of the aggregate data
* managed by the registry of which it is a part. Each datum is
* by definition in the custody of exactly one such node. A
* datum in this context can be a businessEntity, a
* businessService, a bindingTemplate, a tModel, or a
* publisherAssertion. Changes to a datum in the registry MUST
* originate at the node which is the custodian of the datum.
* The registry defines the policy for data custody and, if
* allowed, the custodian node for a given datum can be changed;
* such custody transfer processes are discussed in Section 5.4
* Custody and Ownership Transfer API.
*/
//so someone else attempted to update one of my records, reject it
<span class="nc bnc" id="L937" title="All 2 branches missed."> if (newNodeId.equals(getNode())) {</span>
//throw new Exception(&quot;node id mismatch! this node already has a record for key &quot; + newDataOperationalInfo.getEntityKey() + &quot; and I'm the authority for it.&quot;);
}
<span class="nc" id="L940"> }</span>
private synchronized UDDIReplicationPortType getReplicationClient(String node) {
<span class="nc bnc" id="L943" title="All 2 branches missed."> if (cache.containsKey(node)) {</span>
<span class="nc" id="L944"> return cache.get(node);</span>
}
<span class="nc" id="L946"> UDDIService svc = new UDDIService();</span>
<span class="nc" id="L947"> UDDIReplicationPortType replicationClient = svc.getUDDIReplicationPort();</span>
<span class="nc" id="L948"> TransportSecurityHelper.applyTransportSecurity((BindingProvider) replicationClient);</span>
<span class="nc" id="L950"> EntityManager em = PersistenceManager.getEntityManager();</span>
<span class="nc" id="L951"> EntityTransaction tx = em.getTransaction();</span>
try {
<span class="nc" id="L953"> tx.begin();</span>
<span class="nc" id="L954"> StringBuilder sql = new StringBuilder();</span>
<span class="nc" id="L955"> sql.append(&quot;select c from ReplicationConfiguration c order by c.serialNumber desc&quot;);</span>
//sql.toString();
<span class="nc" id="L957"> Query qry = em.createQuery(sql.toString());</span>
<span class="nc" id="L958"> qry.setMaxResults(1);</span>
<span class="nc" id="L960"> org.apache.juddi.model.ReplicationConfiguration resultList = (org.apache.juddi.model.ReplicationConfiguration) qry.getSingleResult();</span>
<span class="nc bnc" id="L961" title="All 2 branches missed."> for (Operator o : resultList.getOperator()) {</span>
<span class="nc bnc" id="L962" title="All 2 branches missed."> if (o.getOperatorNodeID().equalsIgnoreCase(node)) {</span>
<span class="nc" id="L963"> ((BindingProvider) replicationClient).getRequestContext().put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, o.getSoapReplicationURL());</span>
<span class="nc" id="L964"> cache.put(node, replicationClient);</span>
<span class="nc" id="L965"> return replicationClient;</span>
}
<span class="nc" id="L967"> }</span>
<span class="nc" id="L968"> tx.rollback();</span>
<span class="nc" id="L970"> } catch (Exception ex) {</span>
<span class="nc" id="L971"> logger.fatal(&quot;Node not found!&quot; + node, ex);</span>
} finally {
<span class="nc bnc" id="L973" title="All 8 branches missed."> if (tx.isActive()) {</span>
<span class="nc" id="L974"> tx.rollback();</span>
}
<span class="nc" id="L976"> em.close();</span>
<span class="nc" id="L977"> }</span>
//em.close();
<span class="nc" id="L979"> return null;</span>
}
<span class="fc" id="L982"> private Map&lt;String, UDDIReplicationPortType&gt; cache = new HashMap&lt;String, UDDIReplicationPortType&gt;();</span>
/**
* @since 3.3
* @param body
* @return
* @throws DispositionReportFaultMessage
*/
public String doPing(DoPing body) throws DispositionReportFaultMessage {
<span class="fc" id="L991"> long startTime = System.currentTimeMillis();</span>
<span class="fc" id="L992"> long procTime = System.currentTimeMillis() - startTime;</span>
<span class="fc" id="L993"> serviceCounter.update(ReplicationQuery.DO_PING, QueryStatus.SUCCESS, procTime);</span>
<span class="fc" id="L995"> return getNode();</span>
}
@SOAPBinding(parameterStyle = SOAPBinding.ParameterStyle.BARE)
@WebResult(name = &quot;changeRecords&quot;, targetNamespace = &quot;urn:uddi-org:repl_v3&quot;, partName = &quot;body&quot;)
// @WebMethod(operationName = &quot;get_changeRecords&quot;, action = &quot;get_changeRecords&quot;)
@Override
public org.uddi.repl_v3.ChangeRecords getChangeRecords(
@WebParam(partName = &quot;body&quot;, name = &quot;get_changeRecords&quot;, targetNamespace = &quot;urn:uddi-org:repl_v3&quot;) org.uddi.repl_v3.GetChangeRecords body
) throws DispositionReportFaultMessage, RemoteException {
<span class="nc" id="L1006"> long startTime = System.currentTimeMillis();</span>
<span class="nc" id="L1007"> String requestingNode = body.getRequestingNode();</span>
<span class="nc" id="L1008"> HighWaterMarkVectorType changesAlreadySeen = body.getChangesAlreadySeen();</span>
<span class="nc" id="L1009"> BigInteger responseLimitCount = body.getResponseLimitCount();</span>
<span class="nc" id="L1010"> HighWaterMarkVectorType responseLimitVector = body.getResponseLimitVector();</span>
<span class="nc" id="L1012"> new ValidateReplication(null).validateGetChangeRecords(requestingNode, changesAlreadySeen, responseLimitCount, responseLimitVector, FetchEdges(), ctx);</span>
//TODO should we validate that &quot;requestingNode&quot; is in the replication config?
<span class="nc" id="L1015"> List&lt;ChangeRecord&gt; ret = new ArrayList&lt;ChangeRecord&gt;();</span>
<span class="nc" id="L1016"> EntityManager em = PersistenceManager.getEntityManager();</span>
<span class="nc" id="L1017"> EntityTransaction tx = em.getTransaction();</span>
/**
* More specifically, the recipient determines the particular
* change records that are returned by comparing the originating
* USNs in the caller’s high water mark vector with the
* originating USNs of each of the changes the recipient has
* seen from others or generated by itself. The recipient SHOULD
* only return change records that have originating USNs that
* are greater than those listed in the changesAlreadySeen
* highWaterMarkVector and less than the limit required by
* either the responseLimitCount or the responseLimitVector.
*
*
* Part of the message is a high water mark vector that contains
* for each node of the registry the originating USN of the most
* recent change record that has been successfully processed by
* the invocating node
*/
try {
<span class="nc" id="L1037"> int maxrecords = AppConfig.getConfiguration().getInt(Property.JUDDI_REPLICATION_GET_CHANGE_RECORDS_MAX, 100);</span>
<span class="nc bnc" id="L1038" title="All 2 branches missed."> if (responseLimitCount != null) {</span>
<span class="nc" id="L1039"> maxrecords = responseLimitCount.intValue();</span>
}
<span class="nc" id="L1041"> tx.begin();</span>
<span class="nc" id="L1042"> Long firstrecord = 0L;</span>
<span class="nc" id="L1043"> Long lastrecord = null;</span>
<span class="nc" id="L1044"> Query createQuery = null;</span>
//SELECT t0.id, t0.change_contents, t0.entity_key, t0.appliedlocal, t0.node_id, t0.orginating_usn, t0.record_type FROM j3_chg_record t0 WHERE (t0.id &gt; NULL AND t0.node_id = ?) ORDER BY t0.id ASC
<span class="nc bnc" id="L1046" title="All 2 branches missed."> if (changesAlreadySeen != null) {</span>
//this is basically a lower limit (i.e. the newest record that was processed by the requestor
//therefore we want the oldest record stored locally to return to the requestor for processing
<span class="nc bnc" id="L1049" title="All 2 branches missed."> for (int i = 0; i &lt; changesAlreadySeen.getHighWaterMark().size(); i++) {</span>
<span class="nc" id="L1050"> firstrecord = changesAlreadySeen.getHighWaterMark().get(i).getOriginatingUSN();</span>
<span class="nc bnc" id="L1051" title="All 2 branches missed."> if (firstrecord == null) {</span>
<span class="nc" id="L1052"> firstrecord = 0L;</span>
}
<span class="nc bnc" id="L1054" title="All 2 branches missed."> if (changesAlreadySeen.getHighWaterMark().get(i).getNodeID().equals(getNode())) {</span>
//special case, search by database id
<span class="nc" id="L1056"> createQuery = em.createQuery(&quot;select e from ChangeRecord e where &quot;</span>
+ &quot;(e.id &gt; :inbound AND e.nodeID = :node) &quot;
+ &quot;order by e.id ASC&quot;);
} else {
<span class="nc" id="L1061"> createQuery = em.createQuery(&quot;select e from ChangeRecord e where &quot;</span>
+ &quot;e.originatingUSN &gt; :inbound AND e.nodeID = :node &quot;
+ &quot;order by e.originatingUSN ASC&quot;);
}
<span class="nc" id="L1065"> logger.info(&quot;Query db for replication changes, lower index is &quot; + (firstrecord) + &quot; last index &quot; + lastrecord + &quot; record limit &quot; + maxrecords);</span>
<span class="nc" id="L1066"> logger.info(&quot;This node is &quot; + getNode() + &quot;, request is for data originated from &quot; + changesAlreadySeen.getHighWaterMark().get(i).getNodeID() + &quot; and it's being sent back to &quot; + requestingNode);</span>
<span class="nc" id="L1068"> createQuery.setMaxResults(maxrecords);</span>
<span class="nc" id="L1069"> createQuery.setParameter(&quot;inbound&quot;, firstrecord);</span>
<span class="nc" id="L1070"> createQuery.setParameter(&quot;node&quot;, changesAlreadySeen.getHighWaterMark().get(i).getNodeID());</span>
<span class="nc" id="L1071"> List&lt;org.apache.juddi.model.ChangeRecord&gt; records = (List&lt;org.apache.juddi.model.ChangeRecord&gt;) createQuery.getResultList();</span>
<span class="nc" id="L1072"> logger.info(records.size() + &quot; CR records returned from query&quot;);</span>
<span class="nc bnc" id="L1073" title="All 2 branches missed."> for (int x = 0; x &lt; records.size(); x++) {</span>
<span class="nc" id="L1074"> ChangeRecord r = MappingModelToApi.mapChangeRecord(records.get(x));</span>
//if (!Excluded(changesAlreadySeen, r)) {
<span class="nc" id="L1076"> ret.add(r);</span>
//}
}
}
} /*if (responseLimitVector != null) {
//using responseLimitVector, indicating for each node in the graph the first change originating there that he does not wish to be returned.
//upper limit basically
for (int i = 0; i &lt; responseLimitVector.getHighWaterMark().size(); i++) {
//if (responseLimitVector.getHighWaterMark().get(i).getNodeID().equals(node)) {
lastrecord = responseLimitVector.getHighWaterMark().get(i).getOriginatingUSN();
//}
}
}*/ else {
<span class="nc bnc" id="L1090" title="All 2 branches missed."> if (firstrecord == null) {</span>
<span class="nc" id="L1091"> firstrecord = 0L;</span>
}
//assume that they just want records that originated from here?
<span class="nc" id="L1094"> logger.info(&quot;Query db for replication changes, lower index is &quot; + (firstrecord) + &quot; last index &quot; + lastrecord + &quot; record limit &quot; + maxrecords);</span>
<span class="nc" id="L1095"> logger.info(&quot;This node is &quot; + getNode() + &quot; requesting node &quot; + requestingNode);</span>
<span class="nc bnc" id="L1097" title="All 2 branches missed."> if (lastrecord != null) {</span>
<span class="nc" id="L1098"> createQuery = em.createQuery(&quot;select e from ChangeRecord e where &quot;</span>
+ &quot;(e.id &gt; :inbound AND e.nodeID = :node AND e.id &lt; :lastrecord) &quot;
+ &quot;order by e.id ASC&quot;);
<span class="nc" id="L1101"> createQuery.setParameter(&quot;lastrecord&quot;, lastrecord);</span>
} else {
<span class="nc" id="L1103"> createQuery = em.createQuery(&quot;select e from ChangeRecord e where &quot;</span>
+ &quot;(e.id &gt; :inbound AND e.nodeID = :node) &quot;
+ &quot;order by e.id ASC&quot;);
}
<span class="nc" id="L1107"> createQuery.setMaxResults(maxrecords);</span>
<span class="nc" id="L1108"> createQuery.setParameter(&quot;inbound&quot;, firstrecord);</span>
<span class="nc" id="L1109"> createQuery.setParameter(&quot;node&quot;, getNode());</span>
<span class="nc" id="L1111"> List&lt;org.apache.juddi.model.ChangeRecord&gt; records = (List&lt;org.apache.juddi.model.ChangeRecord&gt;) createQuery.getResultList();</span>
<span class="nc" id="L1112"> logger.info(records.size() + &quot; CR records returned from query&quot;);</span>
<span class="nc bnc" id="L1113" title="All 2 branches missed."> for (int i = 0; i &lt; records.size(); i++) {</span>
<span class="nc" id="L1114"> ChangeRecord r = MappingModelToApi.mapChangeRecord(records.get(i));</span>
//if (!Excluded(changesAlreadySeen, r)) {
<span class="nc" id="L1116"> ret.add(r);</span>
//}
}
}
<span class="nc" id="L1121"> tx.rollback();</span>
<span class="nc" id="L1122"> long procTime = System.currentTimeMillis() - startTime;</span>
<span class="nc" id="L1123"> serviceCounter.update(ReplicationQuery.GET_CHANGERECORDS,</span>
QueryStatus.SUCCESS, procTime);
<span class="nc" id="L1126"> } catch (Exception ex) {</span>
<span class="nc" id="L1127"> logger.fatal(&quot;Error, this node is: &quot; + getNode(), ex);</span>
<span class="nc" id="L1128"> throw new FatalErrorException(new ErrorMessage(&quot;E_fatalError&quot;, ex.getMessage()));</span>
} finally {
<span class="nc bnc" id="L1131" title="All 4 branches missed."> if (tx.isActive()) {</span>
<span class="nc" id="L1132"> tx.rollback();</span>
}
<span class="nc" id="L1134"> em.close();</span>
<span class="nc" id="L1135"> }</span>
<span class="nc" id="L1136"> logger.info(&quot;Change records returned for &quot; + requestingNode + &quot;: &quot; + ret.size());</span>
//JAXB.marshal(ret, System.out);
<span class="nc" id="L1138"> ChangeRecords x = new ChangeRecords();</span>
<span class="nc" id="L1139"> x.getChangeRecord().addAll(ret);</span>
//JAXB.marshal(x, System.out);
<span class="nc" id="L1141"> return x;</span>
}
/**
* This UDDI API message provides a means to obtain a list of
* highWaterMark element containing the highest known USN for all nodes
* in the replication graph. If there is no graph, we just return the
* local bits
*
* @return
* @throws DispositionReportFaultMessage
*/
@Override
public List&lt;ChangeRecordIDType&gt; getHighWaterMarks()
throws DispositionReportFaultMessage {
<span class="fc" id="L1156"> long startTime = System.currentTimeMillis();</span>
<span class="fc" id="L1158"> List&lt;ChangeRecordIDType&gt; ret = new ArrayList&lt;ChangeRecordIDType&gt;();</span>
//fetch from database the highest known watermark
<span class="fc" id="L1161"> ReplicationConfiguration FetchEdges = FetchEdges();</span>
<span class="fc" id="L1163"> EntityManager em = PersistenceManager.getEntityManager();</span>
<span class="fc" id="L1164"> EntityTransaction tx = em.getTransaction();</span>
<span class="fc" id="L1165"> HashMap&lt;String, Long&gt; map = new HashMap&lt;String, Long&gt;();</span>
try {
<span class="fc" id="L1167"> tx.begin();</span>
<span class="pc bpc" id="L1168" title="1 of 2 branches missed."> if (FetchEdges != null) {</span>
<span class="fc" id="L1169"> Iterator&lt;String&gt; it = FetchEdges.getCommunicationGraph().getNode().iterator();</span>
<span class="fc bfc" id="L1170" title="All 2 branches covered."> while (it.hasNext()) {</span>
<span class="fc" id="L1171"> String nextNode = it.next();</span>
<span class="pc bpc" id="L1172" title="1 of 2 branches missed."> if (!nextNode.equals(getNode())) {</span>
<span class="pc bpc" id="L1173" title="1 of 2 branches missed."> if (!map.containsKey(nextNode)) {</span>
<span class="fc" id="L1174"> Long id = 0L;</span>
try {
<span class="nc" id="L1176"> id = (Long) em.createQuery(&quot;select e.originatingUSN from ChangeRecord e where e.nodeID = :node order by e.originatingUSN desc&quot;).setParameter(&quot;node&quot;, nextNode).setMaxResults(1).getSingleResult();</span>
<span class="fc" id="L1177"> } catch (Exception ex) {</span>
<span class="fc" id="L1178"> logger.debug(ex);</span>
<span class="nc" id="L1179"> }</span>
<span class="pc bpc" id="L1180" title="1 of 2 branches missed."> if (id == null) {</span>
<span class="nc" id="L1181"> id = 0L;</span>
//per the spec
}
<span class="fc" id="L1184"> map.put(nextNode, id);</span>
}
}
<span class="fc" id="L1188"> }</span>
}
//dont forget this node
<span class="fc" id="L1191"> Long id = (Long) em.createQuery(&quot;select (e.id) from ChangeRecord e where e.nodeID = :node order by e.id desc&quot;)</span>
<span class="fc" id="L1192"> .setParameter(&quot;node&quot;, getNode()).setMaxResults(1).getSingleResult();</span>
<span class="pc bpc" id="L1193" title="1 of 2 branches missed."> if (id == null) {</span>
<span class="nc" id="L1194"> id = 0L;</span>
}
<span class="fc" id="L1196"> ChangeRecordIDType x = new ChangeRecordIDType();</span>
<span class="fc" id="L1197"> x.setNodeID(getNode());</span>
<span class="fc" id="L1198"> x.setOriginatingUSN(id);</span>
<span class="fc" id="L1199"> ret.add(x);</span>
<span class="fc" id="L1201"> tx.rollback();</span>
<span class="fc" id="L1202"> long procTime = System.currentTimeMillis() - startTime;</span>
<span class="fc" id="L1203"> serviceCounter.update(ReplicationQuery.GET_HIGHWATERMARKS, QueryStatus.SUCCESS, procTime);</span>
<span class="nc" id="L1205"> } catch (Exception drfm) {</span>
<span class="nc" id="L1206"> throw new FatalErrorException(new ErrorMessage(&quot;E_fatalError&quot;, drfm.getMessage()));</span>
} finally {
<span class="pc bpc" id="L1209" title="3 of 4 branches missed."> if (tx.isActive()) {</span>
<span class="nc" id="L1210"> tx.rollback();</span>
}
<span class="pc" id="L1212"> em.close();</span>
<span class="fc" id="L1213"> }</span>
<span class="fc" id="L1215"> Iterator&lt;Map.Entry&lt;String, Long&gt;&gt; iterator = map.entrySet().iterator();</span>
<span class="fc bfc" id="L1216" title="All 2 branches covered."> while (iterator.hasNext()) {</span>
<span class="fc" id="L1217"> Map.Entry&lt;String, Long&gt; next = iterator.next();</span>
<span class="fc" id="L1218"> ret.add(new ChangeRecordIDType(next.getKey(), next.getValue()));</span>
<span class="fc" id="L1219"> }</span>
<span class="fc" id="L1220"> return ret;</span>
}
/**
* this means that another node has a change and we need to pick up the
* change and apply it to our local database.
*
* @param body
* @throws DispositionReportFaultMessage
*/
@Override
public void notifyChangeRecordsAvailable(NotifyChangeRecordsAvailable body)
throws DispositionReportFaultMessage {
<span class="nc" id="L1233"> long startTime = System.currentTimeMillis();</span>
//some other node just told us there's new records available, call
//getChangeRecords from the remote node asynch
<span class="nc" id="L1237"> new ValidateReplication(null).validateNotifyChangeRecordsAvailable(body, ctx);</span>
<span class="nc" id="L1239"> logger.info(body.getNotifyingNode() + &quot; just told me that there are change records available, enqueuing...size is &quot; + queue.size() + &quot; this node is &quot; + getNode());</span>
//if (!queue.contains(body.getNotifyingNode())) {
<span class="nc" id="L1241"> queue.add(body);</span>
//}
<span class="nc" id="L1243"> long procTime = System.currentTimeMillis() - startTime;</span>
<span class="nc" id="L1244"> serviceCounter.update(ReplicationQuery.NOTIFY_CHANGERECORDSAVAILABLE,</span>
QueryStatus.SUCCESS, procTime);
<span class="nc" id="L1246"> }</span>
<span class="fc" id="L1247"> private static Queue&lt;NotifyChangeRecordsAvailable&gt; queue = null;</span>
/**
* transfers custody of an entity from node1/user1 to node2/user2
*
* assume this node is node 2.
*
* user1 on node1 requests a transfer token. node 1 issues the token.
*
* user1 now has a transfer token for their stuff user now takes the
* token to node 2 and calls transferEntities
* &lt;img src=&quot;http://www.uddi.org/pubs/uddi-v3.0.2-20041019_files/image086.gif&quot;&gt;
*
* @param body
* @throws DispositionReportFaultMessage
*/
@Override
public void transferCustody(TransferCustody body)
throws DispositionReportFaultMessage {
<span class="nc" id="L1266"> long startTime = System.currentTimeMillis();</span>
<span class="nc" id="L1267"> EntityManager em = PersistenceManager.getEntityManager();</span>
<span class="nc" id="L1268"> EntityTransaction tx = em.getTransaction();</span>
<span class="nc" id="L1269"> logger.info(&quot;Inbound transfer request (via replication api, node to node&quot;);</span>
try {
<span class="nc" id="L1271"> tx.begin();</span>
//*this node is transfering data to another node
//ValidateReplication.unsupportedAPICall();
//a remote node just told me to give up control of some of my entities
//EntityTransaction tx = em.getTransaction();
//confirm i have a replication config
<span class="nc" id="L1278"> boolean ok = false;</span>
<span class="nc" id="L1279"> ReplicationConfiguration FetchEdges = ReplicationNotifier.FetchEdges();</span>
<span class="nc bnc" id="L1280" title="All 2 branches missed."> if (FetchEdges != null) {</span>
<span class="nc bnc" id="L1281" title="All 2 branches missed."> for (int i = 0; i &lt; FetchEdges.getOperator().size(); i++) {</span>
//confirm that the destination node is in the replication config
<span class="nc bnc" id="L1283" title="All 2 branches missed."> if (FetchEdges.getOperator().get(i).getOperatorNodeID().equals(body.getTransferOperationalInfo().getNodeID())) {</span>
<span class="nc" id="L1284"> ok = true;</span>
<span class="nc" id="L1285"> break;</span>
}
}
}
<span class="nc bnc" id="L1289" title="All 2 branches missed."> if (!ok) {</span>
<span class="nc" id="L1290"> throw new TransferNotAllowedException(new ErrorMessage(&quot;E_transferNotAllowedUnknownNode&quot;));</span>
}
<span class="nc" id="L1293"> new ValidateReplication(null).validateTransfer(em, body);</span>
<span class="nc" id="L1295"> TransferEntities te = new TransferEntities();</span>
<span class="nc" id="L1296"> te.setKeyBag(body.getKeyBag());</span>
<span class="nc" id="L1297"> te.setTransferToken(body.getTransferToken());</span>
<span class="nc" id="L1298"> te.setAuthInfo(null);</span>
//make the change
//enqueue in replication notifier
//discard the token
<span class="nc" id="L1302"> logger.debug(&quot;request validated, processing transfer&quot;);</span>
<span class="nc" id="L1303"> List&lt;ChangeRecord&gt; executeTransfer = new UDDICustodyTransferImpl().executeTransfer(te, em, body.getTransferOperationalInfo().getAuthorizedName(), body.getTransferOperationalInfo().getNodeID());</span>
<span class="nc bnc" id="L1305" title="All 2 branches missed."> for (ChangeRecord c : executeTransfer) {</span>
try {
<span class="nc" id="L1307"> c.setChangeID(new ChangeRecordIDType());</span>
<span class="nc" id="L1308"> c.getChangeID().setNodeID(getNode());</span>
<span class="nc" id="L1309"> c.getChangeID().setOriginatingUSN(null);</span>
<span class="nc" id="L1310"> ReplicationNotifier.enqueue(MappingApiToModel.mapChangeRecord(c));</span>
<span class="nc" id="L1311"> } catch (UnsupportedEncodingException ex) {</span>
<span class="nc" id="L1312"> logger.error(&quot;&quot;, ex);</span>
<span class="nc" id="L1313"> }</span>
<span class="nc" id="L1314"> }</span>
/**
* The custodial node must verify that it has granted
* permission to transfer the entities identified and
* that this permission is still valid. This operation
* is comprised of two steps:
*
* 1. Verification that the transferToken was issued by
* it, that it has not expired, that it represents the
* authority to transfer no more and no less than those
* entities identified by the businessKey and tModelKey
* elements and that all these entities are still valid
* and not yet transferred. The transferToken is
* invalidated if any of these conditions are not met.
*
* 2. If the conditions above are met, the custodial
* node will prevent any further changes to the entities
* identified by the businessKey and tModelKey elements
* identified. The entity will remain in this state
* until the replication stream indicates it has been
* successfully processed via the replication stream.
* Upon successful verification of the custody transfer
* request by the custodial node, an empty message is
* returned by it indicating the success of the request
* and acknowledging the custody transfer. Following the
* issue of the empty message, the custodial node will
* submit into the replication stream a
* changeRecordNewData providing in the operationalInfo,
* the nodeID accepting custody of the datum and the
* authorizedName of the publisher accepting ownership.
* The acknowledgmentRequested attribute of this change
* record MUST be set to &quot;true&quot;.
*
*
*
* Finally, the custodial node invalidates the
* transferToken in order to prevent additional calls of
* the transfer_entities API.
*/
<span class="nc" id="L1353"> tx.commit();</span>
<span class="nc" id="L1354"> long procTime = System.currentTimeMillis() - startTime;</span>
<span class="nc" id="L1355"> serviceCounter.update(ReplicationQuery.TRANSFER_CUSTODY,</span>
QueryStatus.SUCCESS, procTime);
<span class="nc" id="L1357"> } catch (DispositionReportFaultMessage d) {</span>
<span class="nc" id="L1358"> logger.error(&quot;Unable to process node to node custody transfer &quot;, d);</span>
<span class="nc" id="L1359"> throw d;</span>
} finally {
<span class="nc bnc" id="L1361" title="All 8 branches missed."> if (em != null &amp;&amp; em.isOpen()) {</span>
<span class="nc" id="L1362"> em.close();</span>
}
<span class="nc bnc" id="L1364" title="All 4 branches missed."> if (tx.isActive()) {</span>
<span class="nc" id="L1365"> tx.rollback();</span>
}
}
<span class="nc" id="L1368"> }</span>
}
</pre><div class="footer"><span class="right">Created with <a href="http://www.jacoco.org/jacoco">JaCoCo</a> 0.7.9.201702052155</span></div></body></html>