| <?xml version="1.0" encoding="UTF-8"?><!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"><html xmlns="http://www.w3.org/1999/xhtml" lang="en"><head><meta http-equiv="Content-Type" content="text/html;charset=UTF-8"/><link rel="stylesheet" href="../jacoco-resources/report.css" type="text/css"/><link rel="shortcut icon" href="../jacoco-resources/report.gif" type="image/gif"/><title>UDDIReplicationImpl.java</title><link rel="stylesheet" href="../jacoco-resources/prettify.css" type="text/css"/><script type="text/javascript" src="../jacoco-resources/prettify.js"></script></head><body onload="window['PR_TAB_WIDTH']=4;prettyPrint()"><div class="breadcrumb" id="breadcrumb"><span class="info"><a href="../jacoco-sessions.html" class="el_session">Sessions</a></span><a href="../index.html" class="el_report">jUDDI Core - OpenJPA</a> > <a href="index.source.html" class="el_package">org.apache.juddi.api.impl</a> > <span class="el_source">UDDIReplicationImpl.java</span></div><h1>UDDIReplicationImpl.java</h1><pre class="source lang-java linenums">/* |
| * Copyright 2001-2008 The Apache Software Foundation. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| package org.apache.juddi.api.impl; |
| |
| import java.io.StringWriter; |
| import java.io.UnsupportedEncodingException; |
| import java.math.BigInteger; |
| import java.rmi.RemoteException; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Queue; |
| import java.util.Set; |
| import java.util.Timer; |
| import java.util.TimerTask; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import javax.jws.WebParam; |
| import javax.jws.WebResult; |
| import javax.jws.WebService; |
| import javax.jws.soap.SOAPBinding; |
| import javax.persistence.EntityManager; |
| import javax.persistence.EntityTransaction; |
| import javax.persistence.Query; |
| import javax.xml.bind.JAXB; |
| import javax.xml.bind.annotation.XmlSeeAlso; |
| import javax.xml.ws.BindingProvider; |
| import org.apache.juddi.api.util.QueryStatus; |
| import org.apache.juddi.api.util.ReplicationQuery; |
| import org.apache.juddi.config.AppConfig; |
| import org.apache.juddi.config.PersistenceManager; |
| import org.apache.juddi.config.Property; |
| import org.apache.juddi.mapping.MappingApiToModel; |
| import org.apache.juddi.mapping.MappingModelToApi; |
| import org.apache.juddi.model.BindingTemplate; |
| import org.apache.juddi.model.BusinessEntity; |
| import org.apache.juddi.model.BusinessService; |
| import org.apache.juddi.model.Operator; |
| import org.apache.juddi.model.PublisherAssertion; |
| import org.apache.juddi.model.PublisherAssertionId; |
| import org.apache.juddi.model.Tmodel; |
| import org.apache.juddi.model.UddiEntity; |
| import org.apache.juddi.replication.ReplicationNotifier; |
| import static org.apache.juddi.replication.ReplicationNotifier.FetchEdges; |
| import org.apache.juddi.v3.client.UDDIService; |
| import org.apache.juddi.v3.client.cryptor.TransportSecurityHelper; |
| import org.apache.juddi.v3.error.ErrorMessage; |
| import org.apache.juddi.v3.error.FatalErrorException; |
| import org.apache.juddi.v3.error.TransferNotAllowedException; |
| import org.apache.juddi.validation.ValidateReplication; |
| import org.uddi.custody_v3.TransferEntities; |
| import org.uddi.repl_v3.ChangeRecord; |
| import org.uddi.repl_v3.ChangeRecordAcknowledgement; |
| import org.uddi.repl_v3.ChangeRecordIDType; |
| import org.uddi.repl_v3.ChangeRecords; |
| import org.uddi.repl_v3.CommunicationGraph.Edge; |
| import org.uddi.repl_v3.DoPing; |
| import org.uddi.repl_v3.GetChangeRecords; |
| import org.uddi.repl_v3.HighWaterMarkVectorType; |
| import org.uddi.repl_v3.NotifyChangeRecordsAvailable; |
| import org.uddi.repl_v3.ReplicationConfiguration; |
| import org.uddi.repl_v3.TransferCustody; |
| import org.uddi.v3_service.DispositionReportFaultMessage; |
| import org.uddi.v3_service.UDDIReplicationPortType; |
| |
| /** |
| * UDDI Replication defines four APIs. The first two presented here are used to |
| * perform replication and issue notifications. The latter ancillary APIs |
| * provide support for other aspects of UDDI Replication. |
| * <ul> |
| * <li>get_changeRecords</li> |
| * <li>notify_changeRecordsAvailable</li> |
| * <li>do_ping</li> |
| * <li>get_highWaterMarks</li></ul> |
| * |
| * @author <a href="mailto:alexoree@apache.org">Alex O'Ree</a> |
| */ |
| @WebService(serviceName = "UDDI_Replication_PortType", targetNamespace = "urn:uddi-org:api_v3_portType", |
| endpointInterface = "org.uddi.v3_service.UDDIReplicationPortType") |
| @XmlSeeAlso({ |
| org.uddi.custody_v3.ObjectFactory.class, |
| org.uddi.repl_v3.ObjectFactory.class, |
| org.uddi.subr_v3.ObjectFactory.class, |
| org.uddi.api_v3.ObjectFactory.class, |
| org.uddi.vscache_v3.ObjectFactory.class, |
| org.uddi.vs_v3.ObjectFactory.class, |
| org.uddi.sub_v3.ObjectFactory.class, |
| org.w3._2000._09.xmldsig_.ObjectFactory.class, |
| org.uddi.policy_v3.ObjectFactory.class, |
| org.uddi.policy_v3_instanceparms.ObjectFactory.class |
| }) |
| public class UDDIReplicationImpl extends AuthenticatedService implements UDDIReplicationPortType { |
| |
| static void notifyConfigurationChange(ReplicationConfiguration oldConfig, ReplicationConfiguration newConfig, AuthenticatedService service) { |
| |
| //if the config is different |
| <span class="fc" id="L113"> Set<String> oldnodes = getNodes(oldConfig);</span> |
| <span class="fc" id="L114"> Set<String> newNodes = getNodes(newConfig);</span> |
| |
| <span class="fc" id="L116"> Set<String> addedNodes = diffNodeList(oldnodes, newNodes);</span> |
| <span class="pc bpc" id="L117" title="1 of 2 branches missed."> if (queue == null) {</span> |
| <span class="nc" id="L118"> queue = new ConcurrentLinkedQueue<NotifyChangeRecordsAvailable>();</span> |
| } |
| <span class="fc bfc" id="L120" title="All 2 branches covered."> for (String s : addedNodes) {</span> |
| <span class="pc bpc" id="L121" title="1 of 2 branches missed."> if (!s.equals(service.getNode())) {</span> |
| <span class="fc" id="L122"> logger.info("This node: " + service.getNode() + ". New replication node queue for synchronization: " + s);</span> |
| <span class="fc" id="L123"> HighWaterMarkVectorType highWaterMarkVectorType = new HighWaterMarkVectorType();</span> |
| <span class="fc" id="L124"> highWaterMarkVectorType.getHighWaterMark().add(new ChangeRecordIDType(s, 0L));</span> |
| <span class="fc" id="L125"> queue.add(new NotifyChangeRecordsAvailable(s, highWaterMarkVectorType));</span> |
| } |
| <span class="fc" id="L127"> }</span> |
| |
| <span class="fc" id="L129"> }</span> |
| |
| private static Set<String> getNodes(ReplicationConfiguration oldConfig) { |
| <span class="fc" id="L132"> Set<String> ret = new HashSet<String>();</span> |
| <span class="fc bfc" id="L133" title="All 2 branches covered."> if (oldConfig == null) {</span> |
| <span class="fc" id="L134"> return ret;</span> |
| } |
| <span class="fc bfc" id="L136" title="All 2 branches covered."> for (org.uddi.repl_v3.Operator o : oldConfig.getOperator()) {</span> |
| <span class="fc" id="L137"> ret.add(o.getOperatorNodeID());</span> |
| <span class="fc" id="L138"> }</span> |
| <span class="pc bpc" id="L139" title="1 of 2 branches missed."> if (oldConfig.getCommunicationGraph() != null) {</span> |
| <span class="fc" id="L140"> ret.addAll(oldConfig.getCommunicationGraph().getNode());</span> |
| } |
| <span class="fc" id="L142"> return ret;</span> |
| } |
| |
| /** |
| * returns items in "newNodes" that are not in "oldNodes" |
| * |
| * @param oldnodes |
| * @param newNodes |
| * @return |
| */ |
| private static Set<String> diffNodeList(Set<String> oldnodes, Set<String> newNodes) { |
| <span class="fc" id="L153"> Set<String> diff = new HashSet<String>();</span> |
| <span class="fc" id="L154"> Iterator<String> iterator = newNodes.iterator();</span> |
| <span class="fc bfc" id="L155" title="All 2 branches covered."> while (iterator.hasNext()) {</span> |
| <span class="fc" id="L156"> String lhs = iterator.next();</span> |
| <span class="fc" id="L157"> Iterator<String> iterator1 = oldnodes.iterator();</span> |
| <span class="fc" id="L158"> boolean found = false;</span> |
| <span class="fc bfc" id="L159" title="All 2 branches covered."> while (iterator1.hasNext()) {</span> |
| <span class="fc" id="L160"> String rhs = iterator1.next();</span> |
| <span class="pc bpc" id="L161" title="1 of 2 branches missed."> if (rhs.equalsIgnoreCase(lhs)) {</span> |
| <span class="nc" id="L162"> found = true;</span> |
| <span class="nc" id="L163"> break;</span> |
| } |
| <span class="fc" id="L165"> }</span> |
| <span class="pc bpc" id="L166" title="1 of 2 branches missed."> if (!found) {</span> |
| <span class="fc" id="L167"> diff.add(lhs);</span> |
| } |
| |
| <span class="fc" id="L170"> }</span> |
| <span class="fc" id="L171"> return diff;</span> |
| } |
| |
| private UDDIServiceCounter serviceCounter; |
| |
| <span class="fc" id="L176"> private static PullTimerTask timer = null;</span> |
| private long startBuffer; |
| private long interval; |
| |
| <span class="fc" id="L180"> private static UDDIPublicationImpl pub = null;</span> |
| |
| public UDDIReplicationImpl() { |
| <span class="fc" id="L183"> super();</span> |
| try { |
| <span class="fc" id="L185"> this.interval = AppConfig.getConfiguration().getLong(Property.JUDDI_REPLICATION_INTERVAL, 5000L);</span> |
| <span class="fc" id="L186"> this.startBuffer = AppConfig.getConfiguration().getLong(Property.JUDDI_REPLICATION_START_BUFFER, 5000L);</span> |
| <span class="nc" id="L187"> } catch (Exception ex) {</span> |
| <span class="nc" id="L188"> logger.warn("Config error!", ex);</span> |
| <span class="fc" id="L189"> }</span> |
| |
| <span class="fc" id="L191"> serviceCounter = ServiceCounterLifecycleResource.getServiceCounter(UDDIReplicationImpl.class);</span> |
| <span class="fc" id="L192"> init();</span> |
| |
| <span class="fc" id="L194"> }</span> |
| |
| private synchronized void init() { |
| <span class="fc bfc" id="L197" title="All 2 branches covered."> if (pub == null) {</span> |
| <span class="fc" id="L198"> pub = new UDDIPublicationImpl();</span> |
| } |
| <span class="fc bfc" id="L200" title="All 2 branches covered."> if (queue == null) {</span> |
| <span class="fc" id="L201"> queue = new ConcurrentLinkedQueue<NotifyChangeRecordsAvailable>();</span> |
| } |
| <span class="fc" id="L203"> timer = new PullTimerTask();</span> |
| |
| <span class="fc" id="L205"> }</span> |
| |
| |
| /** |
| * handles when a remote node tells me that there's an update(s) |
| * available |
| */ |
| private class PullTimerTask extends TimerTask { |
| |
| <span class="fc" id="L214"> private Timer timer = null;</span> |
| |
| <span class="fc" id="L216"> public PullTimerTask() {</span> |
| <span class="fc" id="L217"> super();</span> |
| <span class="fc" id="L218"> timer = new Timer(true);</span> |
| <span class="fc" id="L219"> timer.scheduleAtFixedRate(this, startBuffer, interval);</span> |
| <span class="fc" id="L220"> }</span> |
| <span class="fc" id="L221"> boolean firstrun = true;</span> |
| |
| @Override |
| public void run() { |
| <span class="pc bpc" id="L225" title="1 of 2 branches missed."> if (firstrun) {</span> |
| <span class="fc" id="L226"> enqueueAllReceivingNodes();</span> |
| <span class="fc" id="L227"> firstrun = false;</span> |
| } |
| |
| <span class="pc bpc" id="L230" title="1 of 2 branches missed."> if (!queue.isEmpty()) {</span> |
| <span class="fc" id="L231"> logger.info("Replication change puller thread started. Queue size: " + queue.size());</span> |
| } |
| //ok someone told me there's a change available |
| <span class="pc bpc" id="L234" title="1 of 2 branches missed."> while (!queue.isEmpty()) {</span> |
| <span class="fc" id="L235"> NotifyChangeRecordsAvailable poll = queue.poll();</span> |
| <span class="pc bpc" id="L236" title="2 of 4 branches missed."> if (poll != null && !poll.getNotifyingNode().equalsIgnoreCase(getNode())) {</span> |
| <span class="fc" id="L237"> UDDIReplicationPortType replicationClient = getReplicationClient(poll.getNotifyingNode());</span> |
| <span class="fc bfc" id="L238" title="All 2 branches covered."> if (replicationClient == null) {</span> |
| <span class="fc" id="L239"> logger.fatal("unable to obtain a replication client to node " + poll);</span> |
| } else { |
| try { |
| //get the high water marks for this node |
| //ok now get all the changes |
| |
| //done replace with last known record from the given node |
| //for (int xx = 0; xx < poll.getChangesAvailable().getHighWaterMark().size(); xx++) { |
| // logger.info("Node " + poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID() |
| // + " USN " + poll.getChangesAvailable().getHighWaterMark().get(xx).getOriginatingUSN()); |
| //} |
| <span class="fc" id="L250"> Set<String> nodesHitThisCycle = new HashSet<String>();</span> |
| <span class="pc bpc" id="L251" title="1 of 2 branches missed."> for (int xx = 0; xx < poll.getChangesAvailable().getHighWaterMark().size(); xx++) {</span> |
| <span class="fc" id="L252"> int recordsreturned = 21;</span> |
| <span class="pc bpc" id="L253" title="1 of 2 branches missed."> while (recordsreturned >= 20) {</span> |
| <span class="pc bpc" id="L254" title="1 of 2 branches missed."> if (nodesHitThisCycle.contains(poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID())) {</span> |
| <span class="nc" id="L255"> logger.info("i've already hit the node " + poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID() + " this cycle, skipping");</span> |
| <span class="nc" id="L256"> break;</span> |
| } |
| <span class="pc bpc" id="L258" title="1 of 2 branches missed."> if (poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID().equalsIgnoreCase(getNode())) {</span> |
| <span class="nc" id="L259"> logger.info("ignoring updates that were generated here " + poll.getChangesAvailable().getHighWaterMark().get(xx).getOriginatingUSN() + " sent by " + poll.getNotifyingNode() + " this node is " + getNode());</span> |
| <span class="nc" id="L260"> break;</span> |
| } |
| <span class="fc" id="L262"> nodesHitThisCycle.add(poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID());</span> |
| <span class="fc" id="L263"> GetChangeRecords body = new GetChangeRecords();</span> |
| <span class="fc" id="L264"> body.setRequestingNode(getNode());</span> |
| <span class="fc" id="L265"> body.setResponseLimitCount(BigInteger.valueOf(100L));</span> |
| |
| <span class="fc" id="L267"> body.setChangesAlreadySeen(getLastChangeRecordFrom(poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID()));</span> |
| <span class="fc" id="L268"> logger.info("fetching updates from " + poll.getNotifyingNode() + " since " + body.getChangesAlreadySeen().getHighWaterMark().get(0).getNodeID() + ":" + body.getChangesAlreadySeen().getHighWaterMark().get(0).getOriginatingUSN() + ", items still in the queue: " + queue.size());</span> |
| //JAXB.marshal(body, System.out); |
| <span class="fc" id="L270"> List<ChangeRecord> records</span> |
| <span class="nc" id="L271"> = replicationClient.getChangeRecords(body).getChangeRecord();</span> |
| //ok now we need to persist the change records |
| <span class="nc" id="L273"> logger.info("Change records retrieved from " + poll.getNotifyingNode() + ", " + records.size());</span> |
| <span class="nc bnc" id="L274" title="All 2 branches missed."> for (int i = 0; i < records.size(); i++) {</span> |
| <span class="nc" id="L275"> logger.info("Change records retrieved " + records.get(i).getChangeID().getNodeID() + " USN " + records.get(i).getChangeID().getOriginatingUSN());</span> |
| <span class="nc" id="L276"> persistChangeRecord(records.get(i));</span> |
| } |
| <span class="nc" id="L278"> recordsreturned = records.size();</span> |
| <span class="nc" id="L279"> }</span> |
| } |
| <span class="fc" id="L281"> } catch (Exception ex) {</span> |
| <span class="fc" id="L282"> logger.error("Error caught fetching replication changes from " + poll + " @" + ((BindingProvider) replicationClient).getRequestContext().get(BindingProvider.ENDPOINT_ADDRESS_PROPERTY), ex);</span> |
| <span class="nc" id="L283"> }</span> |
| } |
| <span class="fc" id="L285"> } else {</span> |
| <span class="nc bnc" id="L286" title="All 2 branches missed."> if (poll == null) {</span> |
| <span class="nc" id="L287"> logger.warn("strange, popped a null object");</span> |
| <span class="nc bnc" id="L288" title="All 2 branches missed."> } else if (poll.getNotifyingNode().equalsIgnoreCase(getNode())) {</span> |
| <span class="nc" id="L289"> logger.warn("strange, popped an object from the queue but it was from myself. This probably indicates a configuration error! ignoring...first record: " + poll.getChangesAvailable().getHighWaterMark().get(0).getNodeID()+":" + poll.getChangesAvailable().getHighWaterMark().get(0).getOriginatingUSN());</span> |
| } |
| } |
| <span class="fc" id="L292"> }</span> |
| <span class="nc" id="L293"> }</span> |
| |
| @Override |
| public boolean cancel() { |
| <span class="nc" id="L297"> timer.cancel();</span> |
| <span class="nc" id="L298"> return super.cancel();</span> |
| } |
| |
| /** |
| * someone told me there's a change available, we retrieved it |
| * and are processing the changes locally. |
| * |
| * @param rec |
| */ |
| private void persistChangeRecord(ChangeRecord rec) { |
| <span class="nc bnc" id="L308" title="All 2 branches missed."> if (rec == null) {</span> |
| <span class="nc" id="L309"> return;</span> |
| } |
| <span class="nc" id="L311"> logger.debug("_______________________Remote change request " + rec.getChangeID().getNodeID() + ":" + rec.getChangeID().getOriginatingUSN());</span> |
| |
| <span class="nc bnc" id="L313" title="All 2 branches missed."> if (rec.getChangeID().getNodeID().equalsIgnoreCase(getNode())) {</span> |
| <span class="nc" id="L314"> logger.info("Just received a change record that i created, ignoring....");</span> |
| <span class="nc" id="L315"> return;</span> |
| } |
| <span class="nc" id="L317"> EntityManager em = PersistenceManager.getEntityManager();</span> |
| <span class="nc" id="L318"> EntityTransaction tx = em.getTransaction();</span> |
| <span class="nc" id="L319"> org.apache.juddi.model.ChangeRecord mapChangeRecord = null;</span> |
| /** |
| * In nodes that support pre-bundled replication |
| * responses, the recipient of the get_changeRecords |
| * message MAY return more change records than requested |
| * by the caller. In this scenario, the caller MUST also |
| * be prepared to deal with such redundant changes where |
| * a USN is less than the USN specified in the |
| * changesAlreadySeen highWaterMarkVector. |
| */ |
| |
| try { |
| <span class="nc" id="L331"> tx.begin();</span> |
| //check to see if we have this update already |
| <span class="nc" id="L333"> Query createQuery = em.createQuery("select c from ChangeRecord c where c.nodeID=:node and c.originatingUSN=:oid");</span> |
| <span class="nc" id="L334"> createQuery.setParameter("node", rec.getChangeID().getNodeID());</span> |
| <span class="nc" id="L335"> createQuery.setParameter("oid", rec.getChangeID().getOriginatingUSN());</span> |
| <span class="nc" id="L336"> Object existingrecord = null;</span> |
| try { |
| <span class="nc" id="L338"> existingrecord = createQuery.getSingleResult();</span> |
| <span class="nc" id="L339"> } catch (Exception ex) {</span> |
| <span class="nc" id="L340"> logger.debug("error checking to see if change record exists already (expected failure)", ex);</span> |
| <span class="nc" id="L341"> }</span> |
| <span class="nc bnc" id="L342" title="All 2 branches missed."> if (existingrecord != null) {</span> |
| <span class="nc" id="L343"> logger.info("I've already processed change record " + rec.getChangeID().getNodeID() + " " + rec.getChangeID().getOriginatingUSN());</span> |
| <span class="nc" id="L344"> return;</span> |
| } |
| //if it didn't come from here and i haven't seen it yet |
| <span class="nc" id="L347"> ReplicationNotifier.EnqueueRetransmit(rec);</span> |
| //the remotechange record rec must also be persisted!! |
| <span class="nc" id="L349"> mapChangeRecord = MappingApiToModel.mapChangeRecord(rec);</span> |
| <span class="nc" id="L350"> mapChangeRecord.setId(null);</span> |
| <span class="nc" id="L351"> mapChangeRecord.setIsAppliedLocally(true);</span> |
| <span class="nc" id="L352"> em.persist(mapChangeRecord);</span> |
| <span class="nc" id="L353"> tx.commit();</span> |
| <span class="nc" id="L354"> logger.info("Remote CR saved, it was from " + mapChangeRecord.getNodeID() //this is the origin of the change</span> |
| <span class="nc" id="L355"> + " USN:" + mapChangeRecord.getOriginatingUSN()</span> |
| <span class="nc" id="L356"> + " Type:" + mapChangeRecord.getRecordType().name()</span> |
| <span class="nc" id="L357"> + " Key:" + mapChangeRecord.getEntityKey()</span> |
| <span class="nc" id="L358"> + " Local id from sender:" + mapChangeRecord.getId());</span> |
| <span class="nc" id="L359"> tx = em.getTransaction();</span> |
| <span class="nc" id="L360"> tx.begin();</span> |
| //<editor-fold defaultstate="collapsed" desc="delete a record"> |
| |
| <span class="nc bnc" id="L363" title="All 2 branches missed."> if (rec.getChangeRecordDelete() != null) {</span> |
| <span class="nc bnc" id="L364" title="All 6 branches missed."> if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBindingKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBindingKey())) {</span> |
| //delete a binding template |
| <span class="nc" id="L366"> UddiEntity ue = em.find(BindingTemplate.class, rec.getChangeRecordDelete().getBindingKey());</span> |
| <span class="nc" id="L367"> validateNodeIdMisMatches(ue, getNode());</span> |
| <span class="nc" id="L368"> pub.deleteBinding(rec.getChangeRecordDelete().getBindingKey(), em);</span> |
| } |
| <span class="nc bnc" id="L370" title="All 6 branches missed."> if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBusinessKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBusinessKey())) {</span> |
| //delete a business |
| <span class="nc" id="L372"> UddiEntity ue = em.find(BusinessEntity.class, rec.getChangeRecordDelete().getBusinessKey());</span> |
| <span class="nc" id="L373"> validateNodeIdMisMatches(ue, getNode());</span> |
| <span class="nc" id="L374"> pub.deleteBusiness(rec.getChangeRecordDelete().getBusinessKey(), em);</span> |
| } |
| <span class="nc bnc" id="L376" title="All 6 branches missed."> if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getServiceKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getServiceKey())) {</span> |
| <span class="nc" id="L377"> UddiEntity ue = em.find(BusinessService.class, rec.getChangeRecordDelete().getServiceKey());</span> |
| <span class="nc" id="L378"> validateNodeIdMisMatches(ue, getNode());</span> |
| //delete a service |
| <span class="nc" id="L380"> pub.deleteService(rec.getChangeRecordDelete().getServiceKey(), em);</span> |
| } |
| <span class="nc bnc" id="L382" title="All 6 branches missed."> if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getTModelKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getTModelKey())) {</span> |
| //delete a tmodel |
| /** |
| * The changeRecordDelete for a |
| * tModel does not correspond to |
| * any API described in this |
| * specification and should only |
| * appear in the replication |
| * stream as the result of an |
| * administrative function to |
| * permanently remove a tModel. |
| */ |
| <span class="nc" id="L394"> UddiEntity tm = em.find(Tmodel.class, rec.getChangeRecordDelete().getTModelKey());</span> |
| <span class="nc bnc" id="L395" title="All 2 branches missed."> if (tm != null) {</span> |
| <span class="nc" id="L396"> validateNodeIdMisMatches(tm, getNode());</span> |
| <span class="nc" id="L397"> em.remove(tm);</span> |
| } else { |
| <span class="nc" id="L399"> logger.error("failed to adminstratively delete tmodel because it doesn't exist. " + rec.getChangeRecordDelete().getTModelKey());</span> |
| } |
| //pub.deleteTModel(rec.getChangeRecordDelete().getTModelKey(), em); |
| } |
| } |
| <span class="nc bnc" id="L404" title="All 4 branches missed."> if (rec.getChangeRecordDeleteAssertion() != null && rec.getChangeRecordDeleteAssertion().getPublisherAssertion() != null) {</span> |
| //delete a pa template |
| <span class="nc" id="L406"> pub.deletePublisherAssertion(rec.getChangeRecordDeleteAssertion(), em);</span> |
| } |
| |
| //</editor-fold> |
| //<editor-fold defaultstate="collapsed" desc="New Data"> |
| <span class="nc bnc" id="L411" title="All 2 branches missed."> if (rec.getChangeRecordNewData() != null) {</span> |
| |
| //The operationalInfo element MUST contain the operational information associated with the indicated new data. |
| <span class="nc bnc" id="L414" title="All 2 branches missed."> if (rec.getChangeRecordNewData().getOperationalInfo() == null) {</span> |
| <span class="nc" id="L415"> logger.warn("Inbound replication data does not have the required OperationalInfo element and is NOT spec compliant. Data will be ignored");</span> |
| } else { |
| <span class="nc bnc" id="L417" title="All 2 branches missed."> if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID() == null) {</span> |
| <span class="nc" id="L418"> throw new Exception("Inbound replication data is missiong node id! Change will not be applied");</span> |
| } |
| <span class="nc bnc" id="L420" title="All 2 branches missed."> if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equalsIgnoreCase(getNode())) {</span> |
| <span class="nc" id="L421"> logger.warn("Inbound replication data is modifying locally owned data. This is not allowed, except for custody transfer");</span> |
| } |
| <span class="nc bnc" id="L423" title="All 2 branches missed."> if (rec.getChangeRecordNewData().getBindingTemplate() != null) {</span> |
| //fetch the binding template if it exists already |
| //if it exists, |
| // confirm the owning node, it shouldn't be the local node id, if it is, throw |
| // the owning node should be the same as it was before |
| |
| <span class="nc" id="L429"> BusinessService model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewData().getBindingTemplate().getServiceKey());</span> |
| <span class="nc bnc" id="L430" title="All 2 branches missed."> if (model == null) {</span> |
| <span class="nc" id="L431"> logger.error("Replication error, attempting to insert a binding where the service doesn't exist yet");</span> |
| } else { |
| <span class="nc" id="L433"> validateNodeIdMatches(rec.getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());</span> |
| |
| <span class="nc" id="L435"> org.apache.juddi.model.BindingTemplate bt = em.find(org.apache.juddi.model.BindingTemplate.class, rec.getChangeRecordNewData().getBindingTemplate().getBindingKey());</span> |
| <span class="nc bnc" id="L436" title="All 2 branches missed."> if (bt != null) {</span> |
| //ValidateNodeIdMatches(node, bt.getNodeId()); |
| <span class="nc" id="L438"> em.remove(bt);</span> |
| } |
| <span class="nc" id="L440"> bt = new BindingTemplate();</span> |
| <span class="nc" id="L441"> MappingApiToModel.mapBindingTemplate(rec.getChangeRecordNewData().getBindingTemplate(), bt, model);</span> |
| <span class="nc" id="L442"> MappingApiToModel.mapOperationalInfo(bt, rec.getChangeRecordNewData().getOperationalInfo());</span> |
| // MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo()); |
| <span class="nc" id="L444"> em.persist(bt);</span> |
| } |
| |
| <span class="nc bnc" id="L447" title="All 2 branches missed."> } else if (rec.getChangeRecordNewData().getBusinessEntity() != null) {</span> |
| |
| <span class="nc" id="L449"> BusinessEntity model = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessEntity().getBusinessKey());</span> |
| <span class="nc bnc" id="L450" title="All 2 branches missed."> if (model != null) {</span> |
| //if the owner of the new data is me, and the update didn't originate from me |
| <span class="nc bnc" id="L452" title="All 2 branches missed."> if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())</span> |
| <span class="nc bnc" id="L453" title="All 2 branches missed."> && !model.getNodeId().equals(getNode())) {</span> |
| <span class="nc bnc" id="L454" title="All 2 branches missed."> if (model.getIsTransferInProgress()) {</span> |
| //allow the transfer |
| <span class="nc" id="L456"> MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model);</span> |
| <span class="nc" id="L457"> MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());</span> |
| <span class="nc" id="L458"> MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());</span> |
| <span class="nc" id="L459"> model.setIsTransferInProgress(false);</span> |
| <span class="nc" id="L460"> em.merge(model);</span> |
| } else { |
| //block it, unexpected transfer |
| <span class="nc" id="L463"> throw new Exception("Unexpected entity transfer to to node " + getNode() + " from " + rec.getChangeID().getNodeID());</span> |
| } |
| |
| <span class="nc bnc" id="L466" title="All 2 branches missed."> } else if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())</span> |
| <span class="nc bnc" id="L467" title="All 2 branches missed."> && model.getNodeId().equals(getNode())) {</span> |
| //if destination is here and it's staying here, then this is strange also |
| //someone else updated one of my records |
| <span class="nc" id="L470"> throw new Exception("unexpected modification of records that this server owns, " + model.getEntityKey());</span> |
| <span class="nc bnc" id="L471" title="All 2 branches missed."> } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())</span> |
| <span class="nc bnc" id="L472" title="All 2 branches missed."> && model.getNodeId().equals(getNode())) {</span> |
| //this is also strange, destination is elsewhere however it's owned by me. |
| <span class="nc" id="L474"> throw new Exception("unexpected transfer from this node to elsewhere, possible that the key in question exists at two places prior to replication sync, " + model.getEntityKey());</span> |
| |
| <span class="nc bnc" id="L476" title="All 2 branches missed."> } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())</span> |
| <span class="nc bnc" id="L477" title="All 2 branches missed."> && !model.getNodeId().equals(getNode())) {</span> |
| //changes on a remote node, for an existing item |
| <span class="nc" id="L479"> MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model);</span> |
| <span class="nc" id="L480"> MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());</span> |
| <span class="nc" id="L481"> em.merge(model);</span> |
| |
| } |
| |
| } else { |
| <span class="nc" id="L486"> model = new BusinessEntity();</span> |
| <span class="nc" id="L487"> MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model);</span> |
| <span class="nc" id="L488"> MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());</span> |
| <span class="nc" id="L489"> em.persist(model);</span> |
| } |
| } |
| <span class="nc bnc" id="L492" title="All 2 branches missed."> if (rec.getChangeRecordNewData().getBusinessService() != null) {</span> |
| <span class="nc" id="L493"> BusinessEntity find = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessService().getBusinessKey());</span> |
| <span class="nc bnc" id="L494" title="All 2 branches missed."> if (find == null) {</span> |
| <span class="nc" id="L495"> logger.error("Replication error, attempting to insert a service where the business doesn't exist yet");</span> |
| } else { |
| |
| <span class="nc" id="L498"> org.apache.juddi.model.BusinessService model = null;</span> |
| <span class="nc" id="L499"> model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewData().getBusinessService().getServiceKey());</span> |
| <span class="nc bnc" id="L500" title="All 2 branches missed."> if (model != null) {</span> |
| <span class="nc" id="L501"> validateNodeIdMatches(rec.getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());</span> |
| <span class="nc" id="L502"> em.remove(model);</span> |
| } |
| |
| <span class="nc" id="L505"> model = new org.apache.juddi.model.BusinessService();</span> |
| <span class="nc" id="L506"> MappingApiToModel.mapBusinessService(rec.getChangeRecordNewData().getBusinessService(), model, find);</span> |
| <span class="nc" id="L507"> MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());</span> |
| <span class="nc" id="L508"> MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());</span> |
| |
| <span class="nc" id="L510"> em.persist(model);</span> |
| } |
| |
| <span class="nc bnc" id="L513" title="All 2 branches missed."> } else if (rec.getChangeRecordNewData().getTModel() != null) {</span> |
| |
| <span class="nc" id="L515"> Tmodel model = em.find(org.apache.juddi.model.Tmodel.class, rec.getChangeRecordNewData().getTModel().getTModelKey());</span> |
| <span class="nc bnc" id="L516" title="All 2 branches missed."> if (model != null) {</span> |
| //in the case of a transfer |
| //if the new entity is being transfer to ME, accept and i didn't previously own it, but only if the local record is flagged as transferable |
| //meaning, only accept if i'm expecting a transfer |
| <span class="nc bnc" id="L520" title="All 2 branches missed."> if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())</span> |
| <span class="nc bnc" id="L521" title="All 2 branches missed."> && !model.getNodeId().equals(getNode())) {</span> |
| <span class="nc bnc" id="L522" title="All 2 branches missed."> if (model.getIsTransferInProgress()) {</span> |
| //allow the transfer |
| <span class="nc" id="L524"> em.remove(model);</span> |
| <span class="nc" id="L525"> model = new Tmodel();</span> |
| <span class="nc" id="L526"> MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);</span> |
| <span class="nc" id="L527"> MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());</span> |
| <span class="nc" id="L528"> model.setIsTransferInProgress(false);</span> |
| <span class="nc" id="L529"> em.persist(model);</span> |
| } else { |
| //block it, unexpected transfer |
| <span class="nc" id="L532"> throw new Exception("Unexpected entity transfer to this node from " + rec.getChangeID().getNodeID());</span> |
| } |
| |
| <span class="nc bnc" id="L535" title="All 2 branches missed."> } else if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())</span> |
| <span class="nc bnc" id="L536" title="All 2 branches missed."> && model.getNodeId().equals(getNode())) {</span> |
| //if destination is here and it's staying here, then this is strange also |
| //someone else updated one of my records |
| <span class="nc" id="L539"> throw new Exception("unexpected modification of records that this server owns, " + model.getEntityKey());</span> |
| <span class="nc bnc" id="L540" title="All 2 branches missed."> } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())</span> |
| <span class="nc bnc" id="L541" title="All 2 branches missed."> && model.getNodeId().equals(getNode())) {</span> |
| //this is also strange, destination is elsewhere however it's owned by me. |
| <span class="nc" id="L543"> throw new Exception("unexpected transfer from this node to elsewhere, possible that the key in question exists at two places prior to replication sync, " + model.getEntityKey());</span> |
| |
| <span class="nc bnc" id="L545" title="All 2 branches missed."> } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(getNode())</span> |
| <span class="nc bnc" id="L546" title="All 2 branches missed."> && !model.getNodeId().equals(getNode())) {</span> |
| //changes on a remote node, for an existing item |
| <span class="nc" id="L548"> em.remove(model);</span> |
| <span class="nc" id="L549"> model = new Tmodel();</span> |
| <span class="nc" id="L550"> MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);</span> |
| |
| <span class="nc" id="L552"> MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());</span> |
| |
| <span class="nc" id="L554"> em.persist(model);</span> |
| |
| } |
| } else { |
| <span class="nc" id="L558"> model = new Tmodel();</span> |
| <span class="nc" id="L559"> MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);</span> |
| |
| <span class="nc" id="L561"> MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());</span> |
| |
| <span class="nc" id="L563"> em.persist(model);</span> |
| } |
| } |
| |
| } |
| |
| } |
| //</editor-fold> |
| |
| // changeRecordNull no action needed |
| // changeRecordHide tmodel only |
| //<editor-fold defaultstate="collapsed" desc="hide tmodel"> |
| <span class="nc bnc" id="L575" title="All 2 branches missed."> if (rec.getChangeRecordHide() != null) {</span> |
| /* |
| A changeRecordHide element corresponds to the behavior of hiding a tModel described in the delete_tModel in the Publish API section of this Specification. A tModel listed in a changeRecordHide should be marked as hidden, so that it is not returned in response to a find_tModel API call. |
| |
| The changeRecordHide MUST contain a modified timestamp to allow multi-node registries to calculate consistent modifiedIncludingChildren timestamps as described in Section 3.8 operationalInfo Structure. |
| */ |
| <span class="nc" id="L581"> String key = rec.getChangeRecordHide().getTModelKey();</span> |
| <span class="nc" id="L582"> org.apache.juddi.model.Tmodel existing = em.find(org.apache.juddi.model.Tmodel.class, key);</span> |
| <span class="nc bnc" id="L583" title="All 2 branches missed."> if (existing == null) {</span> |
| <span class="nc" id="L584"> logger.error("Unexpected delete/hide tmodel message received for non existing key " + key);</span> |
| } else { |
| //no one else can delete/hide my tmodel |
| <span class="nc" id="L587"> validateNodeIdMisMatches(existing, getNode());</span> |
| <span class="nc" id="L588"> existing.setDeleted(true);</span> |
| <span class="nc" id="L589"> existing.setModified(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime());</span> |
| <span class="nc" id="L590"> existing.setModifiedIncludingChildren(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime());</span> |
| <span class="nc" id="L591"> em.persist(existing);</span> |
| } |
| } |
| //</editor-fold> |
| |
| //<editor-fold defaultstate="collapsed" desc="changeRecordPublisherAssertion"> |
| <span class="nc bnc" id="L597" title="All 2 branches missed."> if (rec.getChangeRecordPublisherAssertion() != null) {</span> |
| |
| <span class="nc" id="L599"> logger.info("Repl CR Publisher Assertion");</span> |
| //TODO are publisher assertions owned by a given node? |
| <span class="nc" id="L601"> PublisherAssertionId paid = new PublisherAssertionId(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getFromKey(), rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getToKey());</span> |
| <span class="nc" id="L602"> org.apache.juddi.model.PublisherAssertion model = em.find(org.apache.juddi.model.PublisherAssertion.class, paid);</span> |
| <span class="nc bnc" id="L603" title="All 2 branches missed."> if (model != null) {</span> |
| <span class="nc" id="L604"> logger.info("Repl CR Publisher Assertion - Existing");</span> |
| |
| <span class="nc bnc" id="L606" title="All 2 branches missed."> if (rec.getChangeRecordPublisherAssertion().isFromBusinessCheck()) {</span> |
| <span class="nc" id="L607"> model.setFromCheck("true");</span> |
| } else { |
| <span class="nc" id="L609"> model.setFromCheck("false");</span> |
| } |
| |
| <span class="nc bnc" id="L612" title="All 2 branches missed."> if (rec.getChangeRecordPublisherAssertion().isToBusinessCheck()) {</span> |
| <span class="nc" id="L613"> model.setToCheck("true");</span> |
| } else { |
| <span class="nc" id="L615"> model.setToCheck("false");</span> |
| } |
| |
| <span class="nc" id="L618"> model.setKeyName(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getKeyedReference().getKeyName());</span> |
| <span class="nc" id="L619"> model.setKeyValue(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getKeyedReference().getKeyValue());</span> |
| <span class="nc" id="L620"> model.setTmodelKey(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getKeyedReference().getTModelKey());</span> |
| <span class="nc" id="L621"> model.setModified(rec.getChangeRecordPublisherAssertion().getModified().toGregorianCalendar().getTime());</span> |
| //model.setSignatures(MappingApiToModel.mapApiSignaturesToModelSignatures(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getSignature())); |
| <span class="nc bnc" id="L623" title="All 2 branches missed."> if ("false".equalsIgnoreCase(model.getFromCheck())</span> |
| <span class="nc bnc" id="L624" title="All 2 branches missed."> && "false".equalsIgnoreCase(model.getToCheck())) {</span> |
| <span class="nc" id="L625"> logger.warn("!!!New publisher assertion is both false and false, strange. no need to save it then!");</span> |
| <span class="nc" id="L626"> em.remove(model);</span> |
| } |
| <span class="nc" id="L628"> em.merge(model);</span> |
| } else { |
| <span class="nc" id="L630"> logger.info("Repl CR Publisher Assertion - new PA");</span> |
| |
| <span class="nc" id="L632"> model = new PublisherAssertion();</span> |
| <span class="nc" id="L633"> MappingApiToModel.mapPublisherAssertion(rec.getChangeRecordPublisherAssertion().getPublisherAssertion(), model);</span> |
| <span class="nc" id="L634"> model.setBusinessEntityByFromKey(null);</span> |
| <span class="nc" id="L635"> model.setBusinessEntityByToKey(null);</span> |
| <span class="nc" id="L636"> model.setBusinessEntityByFromKey(em.find(BusinessEntity.class, rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getFromKey()));</span> |
| <span class="nc" id="L637"> model.setBusinessEntityByToKey(em.find(BusinessEntity.class, rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getToKey()));</span> |
| |
| <span class="nc bnc" id="L639" title="All 2 branches missed."> if (rec.getChangeRecordPublisherAssertion().isFromBusinessCheck()) {</span> |
| <span class="nc" id="L640"> model.setFromCheck("true");</span> |
| } else { |
| <span class="nc" id="L642"> model.setFromCheck("false");</span> |
| } |
| |
| <span class="nc bnc" id="L645" title="All 2 branches missed."> if (rec.getChangeRecordPublisherAssertion().isToBusinessCheck()) {</span> |
| <span class="nc" id="L646"> model.setToCheck("true");</span> |
| } else { |
| <span class="nc" id="L648"> model.setToCheck("false");</span> |
| } |
| <span class="nc" id="L650"> model.setModified(rec.getChangeRecordPublisherAssertion().getModified().toGregorianCalendar().getTime());</span> |
| <span class="nc" id="L651"> em.persist(model);</span> |
| } |
| } |
| //</editor-fold> |
| |
| <span class="nc bnc" id="L656" title="All 2 branches missed."> if (rec.isAcknowledgementRequested()) {</span> |
| <span class="nc" id="L657"> ChangeRecord posack = new ChangeRecord();</span> |
| <span class="nc" id="L658"> posack.setChangeRecordAcknowledgement(new ChangeRecordAcknowledgement());</span> |
| <span class="nc" id="L659"> posack.getChangeRecordAcknowledgement().setAcknowledgedChange(rec.getChangeID());</span> |
| <span class="nc" id="L660"> posack.setAcknowledgementRequested(false);</span> |
| <span class="nc" id="L661"> ReplicationNotifier.enqueue(MappingApiToModel.mapChangeRecord(posack));</span> |
| } |
| <span class="nc bnc" id="L663" title="All 2 branches missed."> if (rec.getChangeRecordNewDataConditional() != null) {</span> |
| |
| <span class="nc bnc" id="L665" title="All 2 branches missed."> if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID() == null) {</span> |
| <span class="nc" id="L666"> throw new Exception("Inbound replication data is missiong node id!");</span> |
| } |
| |
| //The operationalInfo element MUST contain the operational information associated with the indicated new data. |
| <span class="nc bnc" id="L670" title="All 2 branches missed."> if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo() == null) {</span> |
| <span class="nc" id="L671"> logger.warn("Inbound replication data does not have the required OperationalInfo element and is NOT spec compliant. Data will be ignored");</span> |
| } else { |
| <span class="nc bnc" id="L673" title="All 2 branches missed."> if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBindingTemplate() != null) {</span> |
| //fetch the binding template if it exists already |
| //if it exists, |
| // confirm the owning node, it shouldn't be the local node id, if it is, throw |
| // the owning node should be the same as it was before |
| |
| <span class="nc" id="L679"> BusinessService model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBindingTemplate().getServiceKey());</span> |
| <span class="nc bnc" id="L680" title="All 2 branches missed."> if (model == null) {</span> |
| <span class="nc" id="L681"> logger.error("Replication error, attempting to insert a binding where the service doesn't exist yet");</span> |
| } else { |
| |
| <span class="nc" id="L684"> org.apache.juddi.model.BindingTemplate bt = em.find(org.apache.juddi.model.BindingTemplate.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBindingTemplate().getBindingKey());</span> |
| <span class="nc bnc" id="L685" title="All 2 branches missed."> if (bt != null) {</span> |
| <span class="nc" id="L686"> validateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), bt.getNodeId());</span> |
| |
| <span class="nc" id="L688"> em.remove(bt);</span> |
| } |
| <span class="nc" id="L690"> bt = new BindingTemplate();</span> |
| <span class="nc" id="L691"> MappingApiToModel.mapBindingTemplate(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBindingTemplate(), bt, model);</span> |
| <span class="nc" id="L692"> MappingApiToModel.mapOperationalInfo(bt, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());</span> |
| // MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo()); |
| <span class="nc" id="L694"> em.persist(bt);</span> |
| } |
| |
| <span class="nc bnc" id="L697" title="All 2 branches missed."> } else if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessEntity() != null) {</span> |
| |
| <span class="nc" id="L699"> BusinessEntity model = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessEntity().getBusinessKey());</span> |
| <span class="nc bnc" id="L700" title="All 2 branches missed."> if (model != null) {</span> |
| <span class="nc" id="L701"> validateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());</span> |
| //TODO revisit access control rules |
| <span class="nc" id="L703"> em.remove(model);</span> |
| } |
| <span class="nc" id="L705"> model = new BusinessEntity();</span> |
| <span class="nc" id="L706"> MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessEntity(), model);</span> |
| // MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo()); |
| |
| <span class="nc" id="L709"> MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());</span> |
| <span class="nc" id="L710"> logger.warn("Name size on save is " + model.getBusinessNames().size());</span> |
| <span class="nc" id="L711"> em.persist(model);</span> |
| |
| } |
| <span class="nc bnc" id="L714" title="All 2 branches missed."> if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService() != null) {</span> |
| <span class="nc" id="L715"> BusinessEntity find = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService().getBusinessKey());</span> |
| <span class="nc bnc" id="L716" title="All 2 branches missed."> if (find == null) {</span> |
| <span class="nc" id="L717"> logger.error("Replication error, attempting to insert a service where the business doesn't exist yet");</span> |
| } else { |
| |
| <span class="nc" id="L720"> org.apache.juddi.model.BusinessService model = null;</span> |
| <span class="nc" id="L721"> model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService().getServiceKey());</span> |
| <span class="nc bnc" id="L722" title="All 2 branches missed."> if (model != null) {</span> |
| <span class="nc" id="L723"> validateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());</span> |
| <span class="nc" id="L724"> em.remove(model);</span> |
| } |
| |
| <span class="nc" id="L727"> model = new org.apache.juddi.model.BusinessService();</span> |
| <span class="nc" id="L728"> MappingApiToModel.mapBusinessService(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService(), model, find);</span> |
| <span class="nc" id="L729"> MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());</span> |
| <span class="nc" id="L730"> MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());</span> |
| |
| <span class="nc" id="L732"> em.persist(model);</span> |
| } |
| |
| <span class="nc bnc" id="L735" title="All 2 branches missed."> } else if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getTModel() != null) {</span> |
| |
| <span class="nc" id="L737"> Tmodel model = em.find(org.apache.juddi.model.Tmodel.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getTModel().getTModelKey());</span> |
| <span class="nc bnc" id="L738" title="All 2 branches missed."> if (model != null) {</span> |
| <span class="nc" id="L739"> validateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());</span> |
| <span class="nc" id="L740"> em.remove(model);</span> |
| } |
| <span class="nc" id="L742"> model = new Tmodel();</span> |
| <span class="nc" id="L743"> MappingApiToModel.mapTModel(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getTModel(), model);</span> |
| |
| <span class="nc" id="L745"> MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());</span> |
| |
| <span class="nc" id="L747"> em.persist(model);</span> |
| } |
| |
| } |
| |
| } |
| <span class="nc bnc" id="L753" title="All 2 branches missed."> if (rec.getChangeRecordNull() != null) {</span> |
| //No action required |
| |
| } |
| <span class="nc bnc" id="L757" title="All 2 branches missed."> if (rec.getChangeRecordCorrection() != null) {</span> |
| //TODO implement |
| |
| } |
| <span class="nc bnc" id="L761" title="All 2 branches missed."> if (rec.getChangeRecordConditionFailed() != null) {</span> |
| //TODO implement |
| |
| } |
| <span class="nc" id="L765"> tx.commit();</span> |
| |
| <span class="nc" id="L767"> } catch (Exception drfm) {</span> |
| |
| <span class="nc" id="L769"> logger.warn("Error applying the change record! ", drfm);</span> |
| <span class="nc" id="L770"> StringWriter sw = new StringWriter();</span> |
| <span class="nc" id="L771"> JAXB.marshal(rec, sw);</span> |
| <span class="nc" id="L772"> logger.warn("This is the record that failed to persist: " + sw.toString());</span> |
| <span class="nc bnc" id="L773" title="All 2 branches missed."> if (tx.isActive()) {</span> |
| <span class="nc" id="L774"> tx.rollback();</span> |
| } |
| <span class="nc bnc" id="L776" title="All 2 branches missed."> if (mapChangeRecord != null) {</span> |
| //set the change record's isApplied to false |
| try { |
| <span class="nc" id="L779"> tx = em.getTransaction();</span> |
| <span class="nc" id="L780"> tx.begin();</span> |
| <span class="nc" id="L781"> mapChangeRecord.setIsAppliedLocally(false);</span> |
| <span class="nc" id="L782"> em.merge(mapChangeRecord);</span> |
| <span class="nc" id="L783"> tx.commit();</span> |
| <span class="nc" id="L784"> } catch (Exception e) {</span> |
| <span class="nc" id="L785"> logger.error("error updating change record!!", e);</span> |
| <span class="nc bnc" id="L786" title="All 2 branches missed."> if (tx.isActive()) {</span> |
| <span class="nc" id="L787"> tx.rollback();</span> |
| } |
| <span class="nc" id="L789"> }</span> |
| } else { |
| <span class="nc" id="L791"> logger.fatal("whoa! change record is null when saving a remote change record, this is unexpected and should be reported");</span> |
| } |
| } finally { |
| <span class="nc bnc" id="L794" title="All 8 branches missed."> if (tx.isActive()) {</span> |
| <span class="nc" id="L795"> tx.rollback();</span> |
| } |
| <span class="nc" id="L797"> em.close();</span> |
| <span class="nc" id="L798"> }</span> |
| <span class="nc" id="L799"> }</span> |
| |
| private HighWaterMarkVectorType getLastChangeRecordFrom(String sourcenode) { |
| <span class="fc" id="L802"> HighWaterMarkVectorType ret = new HighWaterMarkVectorType();</span> |
| <span class="fc" id="L803"> ChangeRecordIDType cid = new ChangeRecordIDType();</span> |
| <span class="fc" id="L804"> cid.setNodeID(sourcenode);</span> |
| <span class="fc" id="L805"> cid.setOriginatingUSN(0L);</span> |
| <span class="fc" id="L806"> EntityManager em = PersistenceManager.getEntityManager();</span> |
| <span class="fc" id="L807"> EntityTransaction tx = em.getTransaction();</span> |
| try { |
| <span class="fc" id="L809"> tx.begin();</span> |
| //Long id = 0L; |
| try { |
| <span class="fc" id="L812"> cid.setOriginatingUSN((Long) em.createQuery("select MAX(e.originatingUSN) from ChangeRecord e where e.nodeID = :node")</span> |
| <span class="fc" id="L813"> .setParameter("node", sourcenode)</span> |
| <span class="fc" id="L814"> .getSingleResult());</span> |
| <span class="nc" id="L815"> } catch (Exception ex) {</span> |
| <span class="nc" id="L816"> logger.info("unexpected error searching for last record from " + sourcenode, ex);</span> |
| <span class="fc" id="L817"> }</span> |
| |
| <span class="fc" id="L819"> tx.rollback();</span> |
| |
| <span class="nc" id="L821"> } catch (Exception drfm) {</span> |
| <span class="nc" id="L822"> logger.warn("error caught fetching newest record from node " + sourcenode, drfm);</span> |
| } finally { |
| <span class="pc bpc" id="L824" title="5 of 6 branches missed."> if (tx.isActive()) {</span> |
| <span class="nc" id="L825"> tx.rollback();</span> |
| } |
| <span class="pc" id="L827"> em.close();</span> |
| <span class="pc" id="L828"> }</span> |
| <span class="fc" id="L829"> logger.info("Highest known record for " + sourcenode + " is " + cid.getOriginatingUSN());</span> |
| <span class="fc" id="L830"> ret.getHighWaterMark().add(cid);</span> |
| |
| <span class="fc" id="L832"> return ret;</span> |
| } |
| |
| private void enqueueAllReceivingNodes() { |
| <span class="pc bpc" id="L836" title="1 of 2 branches missed."> if (queue == null) {</span> |
| <span class="nc" id="L837"> queue = new ConcurrentLinkedQueue<NotifyChangeRecordsAvailable>();</span> |
| } |
| //get the replication config |
| //get everyone we are expecting to receive data from, then enqueue them for pulling |
| <span class="fc" id="L841"> ReplicationConfiguration repcfg = ReplicationNotifier.FetchEdges();</span> |
| <span class="pc bpc" id="L842" title="1 of 2 branches missed."> if (repcfg == null) {</span> |
| <span class="nc" id="L843"> return;</span> |
| } |
| <span class="fc" id="L845"> Set<String> allnodes = new HashSet<String>();</span> |
| <span class="fc bfc" id="L846" title="All 2 branches covered."> for (int i = 0; i < repcfg.getOperator().size(); i++) {</span> |
| <span class="fc" id="L847"> allnodes.add(repcfg.getOperator().get(i).getOperatorNodeID());</span> |
| } |
| <span class="fc" id="L849"> Set<String> receivers = new HashSet<String>();</span> |
| <span class="pc bpc" id="L850" title="1 of 2 branches missed."> if (repcfg.getCommunicationGraph() == null</span> |
| <span class="pc bpc" id="L851" title="1 of 2 branches missed."> || repcfg.getCommunicationGraph().getEdge().isEmpty()) {</span> |
| //no edges or graph defined, default to the operator list |
| <span class="fc bfc" id="L853" title="All 2 branches covered."> for (org.uddi.repl_v3.Operator o : repcfg.getOperator()) {</span> |
| //no need to tell myself about a change at myself |
| <span class="pc bpc" id="L855" title="1 of 2 branches missed."> if (!o.getOperatorNodeID().equalsIgnoreCase(getNode())) {</span> |
| <span class="fc" id="L856"> receivers.add(o.getOperatorNodeID());</span> |
| } |
| <span class="fc" id="L858"> }</span> |
| } else { |
| //repcfg.getCommunicationGraph() |
| <span class="nc" id="L861"> Iterator<Edge> iterator = repcfg.getCommunicationGraph().getEdge().iterator();</span> |
| <span class="nc bnc" id="L862" title="All 2 branches missed."> while (iterator.hasNext()) {</span> |
| <span class="nc" id="L863"> Edge next = iterator.next();</span> |
| |
| <span class="nc bnc" id="L865" title="All 2 branches missed."> if (next.getMessageReceiver().equalsIgnoreCase(getNode())) {</span> |
| <span class="nc" id="L866"> receivers.add(next.getMessageSender());</span> |
| } |
| |
| <span class="nc" id="L869"> }</span> |
| |
| } |
| <span class="fc bfc" id="L872" title="All 2 branches covered."> for (String s : receivers) {</span> |
| //this is a list of nodes that this node is expecting updates from |
| //here are we ticking the notification engine to ping the remove service for updates |
| <span class="fc bfc" id="L875" title="All 2 branches covered."> for (String nodeping : allnodes) {</span> |
| <span class="fc" id="L876"> queue.add(new NotifyChangeRecordsAvailable(s, getLastChangeRecordFrom(nodeping)));</span> |
| //for each node we are expecting data from, go fetch it, along the way, we'll request all data for all nodes |
| //that we know about |
| <span class="fc" id="L879"> }</span> |
| |
| <span class="fc" id="L881"> }</span> |
| <span class="fc" id="L882"> }</span> |
| |
| } |
| |
| /** |
| * used to check for alterations on *this node's data from another node, |
| * which isn't allowed |
| * |
| * @param ue |
| * @param node |
| * @throws Exception |
| */ |
| private static void validateNodeIdMisMatches(UddiEntity ue, String node) throws Exception { |
| <span class="nc bnc" id="L895" title="All 2 branches missed."> if (ue == null) {</span> |
| <span class="nc" id="L896"> return;//object doesn't exist</span> |
| } |
| <span class="nc bnc" id="L898" title="All 2 branches missed."> if (ue.getNodeId().equals(node)) {</span> |
| <span class="nc" id="L899"> throw new Exception("Alert! attempt to alter locally owned entity " + ue.getEntityKey() + " owned by " + ue.getAuthorizedName() + "@" + ue.getNodeId());</span> |
| } |
| <span class="nc" id="L901"> }</span> |
| |
| /** |
| * use to validate that changed data maintained ownership, except for |
| * business entities and tmodels since they allow transfer |
| * |
| * @param newNodeId |
| * @param currentOwningNode |
| * @throws Exception |
| */ |
| private void validateNodeIdMatches(String newNodeId, String currentOwningNode) throws Exception { |
| <span class="nc bnc" id="L912" title="All 4 branches missed."> if (newNodeId == null || currentOwningNode == null) {</span> |
| <span class="nc" id="L913"> throw new Exception("either the local node ID is null or the inbound replication data's node id is null");</span> |
| } |
| //only time this is allowed is custody transfer |
| <span class="nc bnc" id="L916" title="All 2 branches missed."> if (!newNodeId.equals(currentOwningNode)) {</span> |
| <span class="nc" id="L917"> logger.info("AUDIT, custody transfer from node, " + currentOwningNode + " to " + newNodeId + " current node is " + getNode());</span> |
| //throw new Exception("node id mismatch!"); |
| } |
| |
| //if i already have a record and "own it" and the remote node has a record with the same key, reject the update |
| //1.5.8 |
| /** |
| * Each node has custody of a portion of the aggregate data |
| * managed by the registry of which it is a part. Each datum is |
| * by definition in the custody of exactly one such node. A |
| * datum in this context can be a businessEntity, a |
| * businessService, a bindingTemplate, a tModel, or a |
| * publisherAssertion. Changes to a datum in the registry MUST |
| * originate at the node which is the custodian of the datum. |
| * The registry defines the policy for data custody and, if |
| * allowed, the custodian node for a given datum can be changed; |
| * such custody transfer processes are discussed in Section 5.4 |
| * Custody and Ownership Transfer API. |
| */ |
| //so someone else attempted to update one of my records, reject it |
| <span class="nc bnc" id="L937" title="All 2 branches missed."> if (newNodeId.equals(getNode())) {</span> |
| //throw new Exception("node id mismatch! this node already has a record for key " + newDataOperationalInfo.getEntityKey() + " and I'm the authority for it."); |
| } |
| <span class="nc" id="L940"> }</span> |
| |
| private synchronized UDDIReplicationPortType getReplicationClient(String node) { |
| <span class="fc bfc" id="L943" title="All 2 branches covered."> if (cache.containsKey(node)) {</span> |
| <span class="fc" id="L944"> return cache.get(node);</span> |
| } |
| <span class="fc" id="L946"> UDDIService svc = new UDDIService();</span> |
| <span class="fc" id="L947"> UDDIReplicationPortType replicationClient = svc.getUDDIReplicationPort();</span> |
| <span class="fc" id="L948"> TransportSecurityHelper.applyTransportSecurity((BindingProvider) replicationClient);</span> |
| |
| <span class="fc" id="L950"> EntityManager em = PersistenceManager.getEntityManager();</span> |
| <span class="fc" id="L951"> EntityTransaction tx = em.getTransaction();</span> |
| try { |
| <span class="fc" id="L953"> tx.begin();</span> |
| <span class="fc" id="L954"> StringBuilder sql = new StringBuilder();</span> |
| <span class="fc" id="L955"> sql.append("select c from ReplicationConfiguration c order by c.serialNumber desc");</span> |
| //sql.toString(); |
| <span class="fc" id="L957"> Query qry = em.createQuery(sql.toString());</span> |
| <span class="fc" id="L958"> qry.setMaxResults(1);</span> |
| |
| <span class="fc" id="L960"> org.apache.juddi.model.ReplicationConfiguration resultList = (org.apache.juddi.model.ReplicationConfiguration) qry.getSingleResult();</span> |
| <span class="fc bfc" id="L961" title="All 2 branches covered."> for (Operator o : resultList.getOperator()) {</span> |
| <span class="fc bfc" id="L962" title="All 2 branches covered."> if (o.getOperatorNodeID().equalsIgnoreCase(node)) {</span> |
| <span class="fc" id="L963"> ((BindingProvider) replicationClient).getRequestContext().put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, o.getSoapReplicationURL());</span> |
| <span class="fc" id="L964"> cache.put(node, replicationClient);</span> |
| <span class="fc" id="L965"> return replicationClient;</span> |
| } |
| <span class="fc" id="L967"> }</span> |
| <span class="fc" id="L968"> tx.rollback();</span> |
| |
| <span class="nc" id="L970"> } catch (Exception ex) {</span> |
| <span class="nc" id="L971"> logger.fatal("Node not found!" + node, ex);</span> |
| } finally { |
| <span class="pc bpc" id="L973" title="6 of 8 branches missed."> if (tx.isActive()) {</span> |
| <span class="pc" id="L974"> tx.rollback();</span> |
| } |
| <span class="pc" id="L976"> em.close();</span> |
| <span class="pc" id="L977"> }</span> |
| //em.close(); |
| <span class="fc" id="L979"> return null;</span> |
| |
| } |
| <span class="fc" id="L982"> private Map<String, UDDIReplicationPortType> cache = new HashMap<String, UDDIReplicationPortType>();</span> |
| |
| /** |
| * @since 3.3 |
| * @param body |
| * @return |
| * @throws DispositionReportFaultMessage |
| */ |
| public String doPing(DoPing body) throws DispositionReportFaultMessage { |
| <span class="fc" id="L991"> long startTime = System.currentTimeMillis();</span> |
| <span class="fc" id="L992"> long procTime = System.currentTimeMillis() - startTime;</span> |
| <span class="fc" id="L993"> serviceCounter.update(ReplicationQuery.DO_PING, QueryStatus.SUCCESS, procTime);</span> |
| |
| <span class="fc" id="L995"> return getNode();</span> |
| |
| } |
| |
| @SOAPBinding(parameterStyle = SOAPBinding.ParameterStyle.BARE) |
| @WebResult(name = "changeRecords", targetNamespace = "urn:uddi-org:repl_v3", partName = "body") |
| // @WebMethod(operationName = "get_changeRecords", action = "get_changeRecords") |
| @Override |
| public org.uddi.repl_v3.ChangeRecords getChangeRecords( |
| @WebParam(partName = "body", name = "get_changeRecords", targetNamespace = "urn:uddi-org:repl_v3") org.uddi.repl_v3.GetChangeRecords body |
| ) throws DispositionReportFaultMessage, RemoteException { |
| <span class="nc" id="L1006"> long startTime = System.currentTimeMillis();</span> |
| <span class="nc" id="L1007"> String requestingNode = body.getRequestingNode();</span> |
| <span class="nc" id="L1008"> HighWaterMarkVectorType changesAlreadySeen = body.getChangesAlreadySeen();</span> |
| <span class="nc" id="L1009"> BigInteger responseLimitCount = body.getResponseLimitCount();</span> |
| <span class="nc" id="L1010"> HighWaterMarkVectorType responseLimitVector = body.getResponseLimitVector();</span> |
| |
| <span class="nc" id="L1012"> new ValidateReplication(null).validateGetChangeRecords(requestingNode, changesAlreadySeen, responseLimitCount, responseLimitVector, FetchEdges(), ctx);</span> |
| |
| //TODO should we validate that "requestingNode" is in the replication config? |
| <span class="nc" id="L1015"> List<ChangeRecord> ret = new ArrayList<ChangeRecord>();</span> |
| <span class="nc" id="L1016"> EntityManager em = PersistenceManager.getEntityManager();</span> |
| <span class="nc" id="L1017"> EntityTransaction tx = em.getTransaction();</span> |
| |
| /** |
| * More specifically, the recipient determines the particular |
| * change records that are returned by comparing the originating |
| * USNs in the caller’s high water mark vector with the |
| * originating USNs of each of the changes the recipient has |
| * seen from others or generated by itself. The recipient SHOULD |
| * only return change records that have originating USNs that |
| * are greater than those listed in the changesAlreadySeen |
| * highWaterMarkVector and less than the limit required by |
| * either the responseLimitCount or the responseLimitVector. |
| * |
| * |
| * Part of the message is a high water mark vector that contains |
| * for each node of the registry the originating USN of the most |
| * recent change record that has been successfully processed by |
| * the invocating node |
| */ |
| try { |
| <span class="nc" id="L1037"> int maxrecords = AppConfig.getConfiguration().getInt(Property.JUDDI_REPLICATION_GET_CHANGE_RECORDS_MAX, 100);</span> |
| <span class="nc bnc" id="L1038" title="All 2 branches missed."> if (responseLimitCount != null) {</span> |
| <span class="nc" id="L1039"> maxrecords = responseLimitCount.intValue();</span> |
| } |
| <span class="nc" id="L1041"> tx.begin();</span> |
| <span class="nc" id="L1042"> Long firstrecord = 0L;</span> |
| <span class="nc" id="L1043"> Long lastrecord = null;</span> |
| <span class="nc" id="L1044"> Query createQuery = null;</span> |
| //SELECT t0.id, t0.change_contents, t0.entity_key, t0.appliedlocal, t0.node_id, t0.orginating_usn, t0.record_type FROM j3_chg_record t0 WHERE (t0.id > NULL AND t0.node_id = ?) ORDER BY t0.id ASC |
| <span class="nc bnc" id="L1046" title="All 2 branches missed."> if (changesAlreadySeen != null) {</span> |
| //this is basically a lower limit (i.e. the newest record that was processed by the requestor |
| //therefore we want the oldest record stored locally to return to the requestor for processing |
| <span class="nc bnc" id="L1049" title="All 2 branches missed."> for (int i = 0; i < changesAlreadySeen.getHighWaterMark().size(); i++) {</span> |
| <span class="nc" id="L1050"> firstrecord = changesAlreadySeen.getHighWaterMark().get(i).getOriginatingUSN();</span> |
| <span class="nc bnc" id="L1051" title="All 2 branches missed."> if (firstrecord == null) {</span> |
| <span class="nc" id="L1052"> firstrecord = 0L;</span> |
| } |
| <span class="nc bnc" id="L1054" title="All 2 branches missed."> if (changesAlreadySeen.getHighWaterMark().get(i).getNodeID().equals(getNode())) {</span> |
| //special case, search by database id |
| <span class="nc" id="L1056"> createQuery = em.createQuery("select e from ChangeRecord e where "</span> |
| + "(e.id > :inbound AND e.nodeID = :node) " |
| + "order by e.id ASC"); |
| |
| } else { |
| <span class="nc" id="L1061"> createQuery = em.createQuery("select e from ChangeRecord e where "</span> |
| + "e.originatingUSN > :inbound AND e.nodeID = :node " |
| + "order by e.originatingUSN ASC"); |
| } |
| <span class="nc" id="L1065"> logger.info("Query db for replication changes, lower index is " + (firstrecord) + " last index " + lastrecord + " record limit " + maxrecords);</span> |
| <span class="nc" id="L1066"> logger.info("This node is " + getNode() + ", request is for data originated from " + changesAlreadySeen.getHighWaterMark().get(i).getNodeID() + " and it's being sent back to " + requestingNode);</span> |
| |
| <span class="nc" id="L1068"> createQuery.setMaxResults(maxrecords);</span> |
| <span class="nc" id="L1069"> createQuery.setParameter("inbound", firstrecord);</span> |
| <span class="nc" id="L1070"> createQuery.setParameter("node", changesAlreadySeen.getHighWaterMark().get(i).getNodeID());</span> |
| <span class="nc" id="L1071"> List<org.apache.juddi.model.ChangeRecord> records = (List<org.apache.juddi.model.ChangeRecord>) createQuery.getResultList();</span> |
| <span class="nc" id="L1072"> logger.info(records.size() + " CR records returned from query");</span> |
| <span class="nc bnc" id="L1073" title="All 2 branches missed."> for (int x = 0; x < records.size(); x++) {</span> |
| <span class="nc" id="L1074"> ChangeRecord r = MappingModelToApi.mapChangeRecord(records.get(x));</span> |
| //if (!Excluded(changesAlreadySeen, r)) { |
| <span class="nc" id="L1076"> ret.add(r);</span> |
| //} |
| |
| } |
| } |
| } /*if (responseLimitVector != null) { |
| //using responseLimitVector, indicating for each node in the graph the first change originating there that he does not wish to be returned. |
| //upper limit basically |
| for (int i = 0; i < responseLimitVector.getHighWaterMark().size(); i++) { |
| //if (responseLimitVector.getHighWaterMark().get(i).getNodeID().equals(node)) { |
| lastrecord = responseLimitVector.getHighWaterMark().get(i).getOriginatingUSN(); |
| //} |
| } |
| }*/ else { |
| <span class="nc bnc" id="L1090" title="All 2 branches missed."> if (firstrecord == null) {</span> |
| <span class="nc" id="L1091"> firstrecord = 0L;</span> |
| } |
| //assume that they just want records that originated from here? |
| <span class="nc" id="L1094"> logger.info("Query db for replication changes, lower index is " + (firstrecord) + " last index " + lastrecord + " record limit " + maxrecords);</span> |
| <span class="nc" id="L1095"> logger.info("This node is " + getNode() + " requesting node " + requestingNode);</span> |
| |
| <span class="nc bnc" id="L1097" title="All 2 branches missed."> if (lastrecord != null) {</span> |
| <span class="nc" id="L1098"> createQuery = em.createQuery("select e from ChangeRecord e where "</span> |
| + "(e.id > :inbound AND e.nodeID = :node AND e.id < :lastrecord) " |
| + "order by e.id ASC"); |
| <span class="nc" id="L1101"> createQuery.setParameter("lastrecord", lastrecord);</span> |
| } else { |
| <span class="nc" id="L1103"> createQuery = em.createQuery("select e from ChangeRecord e where "</span> |
| + "(e.id > :inbound AND e.nodeID = :node) " |
| + "order by e.id ASC"); |
| } |
| <span class="nc" id="L1107"> createQuery.setMaxResults(maxrecords);</span> |
| <span class="nc" id="L1108"> createQuery.setParameter("inbound", firstrecord);</span> |
| <span class="nc" id="L1109"> createQuery.setParameter("node", getNode());</span> |
| |
| <span class="nc" id="L1111"> List<org.apache.juddi.model.ChangeRecord> records = (List<org.apache.juddi.model.ChangeRecord>) createQuery.getResultList();</span> |
| <span class="nc" id="L1112"> logger.info(records.size() + " CR records returned from query");</span> |
| <span class="nc bnc" id="L1113" title="All 2 branches missed."> for (int i = 0; i < records.size(); i++) {</span> |
| <span class="nc" id="L1114"> ChangeRecord r = MappingModelToApi.mapChangeRecord(records.get(i));</span> |
| //if (!Excluded(changesAlreadySeen, r)) { |
| <span class="nc" id="L1116"> ret.add(r);</span> |
| //} |
| |
| } |
| } |
| <span class="nc" id="L1121"> tx.rollback();</span> |
| <span class="nc" id="L1122"> long procTime = System.currentTimeMillis() - startTime;</span> |
| <span class="nc" id="L1123"> serviceCounter.update(ReplicationQuery.GET_CHANGERECORDS,</span> |
| QueryStatus.SUCCESS, procTime); |
| |
| <span class="nc" id="L1126"> } catch (Exception ex) {</span> |
| <span class="nc" id="L1127"> logger.fatal("Error, this node is: " + getNode(), ex);</span> |
| <span class="nc" id="L1128"> throw new FatalErrorException(new ErrorMessage("E_fatalError", ex.getMessage()));</span> |
| |
| } finally { |
| <span class="nc bnc" id="L1131" title="All 4 branches missed."> if (tx.isActive()) {</span> |
| <span class="nc" id="L1132"> tx.rollback();</span> |
| } |
| <span class="nc" id="L1134"> em.close();</span> |
| <span class="nc" id="L1135"> }</span> |
| <span class="nc" id="L1136"> logger.info("Change records returned for " + requestingNode + ": " + ret.size());</span> |
| //JAXB.marshal(ret, System.out); |
| <span class="nc" id="L1138"> ChangeRecords x = new ChangeRecords();</span> |
| <span class="nc" id="L1139"> x.getChangeRecord().addAll(ret);</span> |
| //JAXB.marshal(x, System.out); |
| <span class="nc" id="L1141"> return x;</span> |
| } |
| |
| /** |
| * This UDDI API message provides a means to obtain a list of |
| * highWaterMark element containing the highest known USN for all nodes |
| * in the replication graph. If there is no graph, we just return the |
| * local bits |
| * |
| * @return |
| * @throws DispositionReportFaultMessage |
| */ |
| @Override |
| public List<ChangeRecordIDType> getHighWaterMarks() |
| throws DispositionReportFaultMessage { |
| <span class="fc" id="L1156"> long startTime = System.currentTimeMillis();</span> |
| |
| <span class="fc" id="L1158"> List<ChangeRecordIDType> ret = new ArrayList<ChangeRecordIDType>();</span> |
| |
| //fetch from database the highest known watermark |
| <span class="fc" id="L1161"> ReplicationConfiguration FetchEdges = FetchEdges();</span> |
| |
| <span class="fc" id="L1163"> EntityManager em = PersistenceManager.getEntityManager();</span> |
| <span class="fc" id="L1164"> EntityTransaction tx = em.getTransaction();</span> |
| <span class="fc" id="L1165"> HashMap<String, Long> map = new HashMap<String, Long>();</span> |
| try { |
| <span class="fc" id="L1167"> tx.begin();</span> |
| <span class="pc bpc" id="L1168" title="1 of 2 branches missed."> if (FetchEdges != null) {</span> |
| <span class="fc" id="L1169"> Iterator<String> it = FetchEdges.getCommunicationGraph().getNode().iterator();</span> |
| <span class="fc bfc" id="L1170" title="All 2 branches covered."> while (it.hasNext()) {</span> |
| <span class="fc" id="L1171"> String nextNode = it.next();</span> |
| <span class="pc bpc" id="L1172" title="1 of 2 branches missed."> if (!nextNode.equals(getNode())) {</span> |
| <span class="pc bpc" id="L1173" title="1 of 2 branches missed."> if (!map.containsKey(nextNode)) {</span> |
| <span class="fc" id="L1174"> Long id = 0L;</span> |
| try { |
| <span class="nc" id="L1176"> id = (Long) em.createQuery("select e.originatingUSN from ChangeRecord e where e.nodeID = :node order by e.originatingUSN desc").setParameter("node", nextNode).setMaxResults(1).getSingleResult();</span> |
| <span class="fc" id="L1177"> } catch (Exception ex) {</span> |
| <span class="fc" id="L1178"> logger.debug(ex);</span> |
| <span class="nc" id="L1179"> }</span> |
| <span class="pc bpc" id="L1180" title="1 of 2 branches missed."> if (id == null) {</span> |
| <span class="nc" id="L1181"> id = 0L;</span> |
| //per the spec |
| } |
| <span class="fc" id="L1184"> map.put(nextNode, id);</span> |
| |
| } |
| } |
| <span class="fc" id="L1188"> }</span> |
| } |
| //dont forget this node |
| <span class="fc" id="L1191"> Query setMaxResults = em.createQuery("select (e.id) from ChangeRecord e where e.nodeID = :node order by e.id desc")</span> |
| <span class="fc" id="L1192"> .setParameter("node", getNode()).setMaxResults(1);</span> |
| <span class="fc" id="L1193"> Long id =null;</span> |
| <span class="pc bpc" id="L1194" title="1 of 2 branches missed."> if (setMaxResults.getResultList().isEmpty()) {</span> |
| //this can happen at or near startup |
| <span class="nc" id="L1196"> id = 0L;</span> |
| } else { |
| <span class="fc" id="L1198"> id = (Long) em.createQuery("select (e.id) from ChangeRecord e where e.nodeID = :node order by e.id desc")</span> |
| <span class="fc" id="L1199"> .setParameter("node", getNode()).setMaxResults(1).getSingleResult();</span> |
| } |
| <span class="fc" id="L1201"> ChangeRecordIDType x = new ChangeRecordIDType();</span> |
| <span class="fc" id="L1202"> x.setNodeID(getNode());</span> |
| <span class="fc" id="L1203"> x.setOriginatingUSN(id);</span> |
| <span class="fc" id="L1204"> ret.add(x);</span> |
| |
| <span class="fc" id="L1206"> tx.rollback();</span> |
| <span class="fc" id="L1207"> long procTime = System.currentTimeMillis() - startTime;</span> |
| <span class="fc" id="L1208"> serviceCounter.update(ReplicationQuery.GET_HIGHWATERMARKS, QueryStatus.SUCCESS, procTime);</span> |
| |
| <span class="nc" id="L1210"> } catch (Exception drfm) {</span> |
| <span class="nc" id="L1211"> logger.fatal("Error, this node is: " + getNode(), drfm);</span> |
| <span class="nc" id="L1212"> throw new FatalErrorException(new ErrorMessage("E_fatalError", drfm.getMessage()));</span> |
| |
| } finally { |
| <span class="pc bpc" id="L1215" title="3 of 4 branches missed."> if (tx.isActive()) {</span> |
| <span class="nc" id="L1216"> tx.rollback();</span> |
| } |
| <span class="pc" id="L1218"> em.close();</span> |
| <span class="fc" id="L1219"> }</span> |
| |
| <span class="fc" id="L1221"> Iterator<Map.Entry<String, Long>> iterator = map.entrySet().iterator();</span> |
| <span class="fc bfc" id="L1222" title="All 2 branches covered."> while (iterator.hasNext()) {</span> |
| <span class="fc" id="L1223"> Map.Entry<String, Long> next = iterator.next();</span> |
| <span class="fc" id="L1224"> ret.add(new ChangeRecordIDType(next.getKey(), next.getValue()));</span> |
| <span class="fc" id="L1225"> }</span> |
| <span class="fc" id="L1226"> return ret;</span> |
| } |
| |
| /** |
| * this means that another node has a change and we need to pick up the |
| * change and apply it to our local database. |
| * |
| * @param body |
| * @throws DispositionReportFaultMessage |
| */ |
| @Override |
| public void notifyChangeRecordsAvailable(NotifyChangeRecordsAvailable body) |
| throws DispositionReportFaultMessage { |
| <span class="nc" id="L1239"> long startTime = System.currentTimeMillis();</span> |
| |
| //some other node just told us there's new records available, call |
| //getChangeRecords from the remote node asynch |
| <span class="nc" id="L1243"> new ValidateReplication(null).validateNotifyChangeRecordsAvailable(body, ctx);</span> |
| |
| <span class="nc" id="L1245"> logger.info(body.getNotifyingNode() + " just told me that there are change records available, enqueuing...size is " + queue.size() + " this node is " + getNode());</span> |
| //if (!queue.contains(body.getNotifyingNode())) { |
| <span class="nc" id="L1247"> queue.add(body);</span> |
| //} |
| <span class="nc" id="L1249"> long procTime = System.currentTimeMillis() - startTime;</span> |
| <span class="nc" id="L1250"> serviceCounter.update(ReplicationQuery.NOTIFY_CHANGERECORDSAVAILABLE,</span> |
| QueryStatus.SUCCESS, procTime); |
| <span class="nc" id="L1252"> }</span> |
| <span class="fc" id="L1253"> private static Queue<NotifyChangeRecordsAvailable> queue = null;</span> |
| |
| /** |
| * transfers custody of an entity from node1/user1 to node2/user2 |
| * |
| * assume this node is node 2. |
| * |
| * user1 on node1 requests a transfer token. node 1 issues the token. |
| * |
| * user1 now has a transfer token for their stuff user now takes the |
| * token to node 2 and calls transferEntities |
| * <img src="http://www.uddi.org/pubs/uddi-v3.0.2-20041019_files/image086.gif"> |
| * |
| * @param body |
| * @throws DispositionReportFaultMessage |
| */ |
| @Override |
| public void transferCustody(TransferCustody body) |
| throws DispositionReportFaultMessage { |
| <span class="nc" id="L1272"> long startTime = System.currentTimeMillis();</span> |
| <span class="nc" id="L1273"> EntityManager em = PersistenceManager.getEntityManager();</span> |
| <span class="nc" id="L1274"> EntityTransaction tx = em.getTransaction();</span> |
| <span class="nc" id="L1275"> logger.info("Inbound transfer request (via replication api, node to node");</span> |
| try { |
| <span class="nc" id="L1277"> tx.begin();</span> |
| //*this node is transfering data to another node |
| //ValidateReplication.unsupportedAPICall(); |
| //a remote node just told me to give up control of some of my entities |
| |
| //EntityTransaction tx = em.getTransaction(); |
| //confirm i have a replication config |
| <span class="nc" id="L1284"> boolean ok = false;</span> |
| <span class="nc" id="L1285"> ReplicationConfiguration FetchEdges = ReplicationNotifier.FetchEdges();</span> |
| <span class="nc bnc" id="L1286" title="All 2 branches missed."> if (FetchEdges != null) {</span> |
| <span class="nc bnc" id="L1287" title="All 2 branches missed."> for (int i = 0; i < FetchEdges.getOperator().size(); i++) {</span> |
| //confirm that the destination node is in the replication config |
| <span class="nc bnc" id="L1289" title="All 2 branches missed."> if (FetchEdges.getOperator().get(i).getOperatorNodeID().equals(body.getTransferOperationalInfo().getNodeID())) {</span> |
| <span class="nc" id="L1290"> ok = true;</span> |
| <span class="nc" id="L1291"> break;</span> |
| } |
| } |
| } |
| <span class="nc bnc" id="L1295" title="All 2 branches missed."> if (!ok) {</span> |
| <span class="nc" id="L1296"> throw new TransferNotAllowedException(new ErrorMessage("E_transferNotAllowedUnknownNode"));</span> |
| } |
| |
| <span class="nc" id="L1299"> new ValidateReplication(null).validateTransfer(em, body);</span> |
| |
| <span class="nc" id="L1301"> TransferEntities te = new TransferEntities();</span> |
| <span class="nc" id="L1302"> te.setKeyBag(body.getKeyBag());</span> |
| <span class="nc" id="L1303"> te.setTransferToken(body.getTransferToken());</span> |
| <span class="nc" id="L1304"> te.setAuthInfo(null);</span> |
| //make the change |
| //enqueue in replication notifier |
| //discard the token |
| <span class="nc" id="L1308"> logger.debug("request validated, processing transfer");</span> |
| <span class="nc" id="L1309"> List<ChangeRecord> executeTransfer = new UDDICustodyTransferImpl().executeTransfer(te, em, body.getTransferOperationalInfo().getAuthorizedName(), body.getTransferOperationalInfo().getNodeID());</span> |
| |
| <span class="nc bnc" id="L1311" title="All 2 branches missed."> for (ChangeRecord c : executeTransfer) {</span> |
| try { |
| <span class="nc" id="L1313"> c.setChangeID(new ChangeRecordIDType());</span> |
| <span class="nc" id="L1314"> c.getChangeID().setNodeID(getNode());</span> |
| <span class="nc" id="L1315"> c.getChangeID().setOriginatingUSN(null);</span> |
| <span class="nc" id="L1316"> ReplicationNotifier.enqueue(MappingApiToModel.mapChangeRecord(c));</span> |
| <span class="nc" id="L1317"> } catch (UnsupportedEncodingException ex) {</span> |
| <span class="nc" id="L1318"> logger.error("", ex);</span> |
| <span class="nc" id="L1319"> }</span> |
| <span class="nc" id="L1320"> }</span> |
| /** |
| * The custodial node must verify that it has granted |
| * permission to transfer the entities identified and |
| * that this permission is still valid. This operation |
| * is comprised of two steps: |
| * |
| * 1. Verification that the transferToken was issued by |
| * it, that it has not expired, that it represents the |
| * authority to transfer no more and no less than those |
| * entities identified by the businessKey and tModelKey |
| * elements and that all these entities are still valid |
| * and not yet transferred. The transferToken is |
| * invalidated if any of these conditions are not met. |
| * |
| * 2. If the conditions above are met, the custodial |
| * node will prevent any further changes to the entities |
| * identified by the businessKey and tModelKey elements |
| * identified. The entity will remain in this state |
| * until the replication stream indicates it has been |
| * successfully processed via the replication stream. |
| * Upon successful verification of the custody transfer |
| * request by the custodial node, an empty message is |
| * returned by it indicating the success of the request |
| * and acknowledging the custody transfer. Following the |
| * issue of the empty message, the custodial node will |
| * submit into the replication stream a |
| * changeRecordNewData providing in the operationalInfo, |
| * the nodeID accepting custody of the datum and the |
| * authorizedName of the publisher accepting ownership. |
| * The acknowledgmentRequested attribute of this change |
| * record MUST be set to "true". |
| * |
| * |
| * |
| * Finally, the custodial node invalidates the |
| * transferToken in order to prevent additional calls of |
| * the transfer_entities API. |
| */ |
| <span class="nc" id="L1359"> tx.commit();</span> |
| <span class="nc" id="L1360"> long procTime = System.currentTimeMillis() - startTime;</span> |
| <span class="nc" id="L1361"> serviceCounter.update(ReplicationQuery.TRANSFER_CUSTODY,</span> |
| QueryStatus.SUCCESS, procTime); |
| <span class="nc" id="L1363"> } catch (DispositionReportFaultMessage d) {</span> |
| <span class="nc" id="L1364"> logger.error("Unable to process node to node custody transfer ", d);</span> |
| <span class="nc" id="L1365"> throw d;</span> |
| } finally { |
| <span class="nc bnc" id="L1367" title="All 8 branches missed."> if (em != null && em.isOpen()) {</span> |
| <span class="nc" id="L1368"> em.close();</span> |
| } |
| <span class="nc bnc" id="L1370" title="All 4 branches missed."> if (tx.isActive()) {</span> |
| <span class="nc" id="L1371"> tx.rollback();</span> |
| } |
| } |
| <span class="nc" id="L1374"> }</span> |
| |
| } |
| </pre><div class="footer"><span class="right">Created with <a href="http://www.jacoco.org/jacoco">JaCoCo</a> 0.7.9.201702052155</span></div></body></html> |