blob: 1133f6920ff8b7c4ff9a14313fb26733688c930a [file] [log] [blame]
<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"><html xmlns="http://www.w3.org/1999/xhtml" lang="en"><head><meta http-equiv="Content-Type" content="text/html;charset=UTF-8"/><link rel="stylesheet" href="../jacoco-resources/report.css" type="text/css"/><link rel="shortcut icon" href="../jacoco-resources/report.gif" type="image/gif"/><title>ReplicationNotifier.java</title><link rel="stylesheet" href="../jacoco-resources/prettify.css" type="text/css"/><script type="text/javascript" src="../jacoco-resources/prettify.js"></script></head><body onload="window['PR_TAB_WIDTH']=4;prettyPrint()"><div class="breadcrumb" id="breadcrumb"><span class="info"><a href="../jacoco-sessions.html" class="el_session">Sessions</a></span><a href="../index.html" class="el_report">jUDDI Core - OpenJPA</a> &gt; <a href="index.source.html" class="el_package">org.apache.juddi.replication</a> &gt; <span class="el_source">ReplicationNotifier.java</span></div><h1>ReplicationNotifier.java</h1><pre class="source lang-java linenums">/*
* Copyright 2001-2008 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the &quot;License&quot;);
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an &quot;AS IS&quot; BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.juddi.replication;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction;
import javax.persistence.Query;
import javax.xml.bind.JAXB;
import javax.xml.ws.BindingProvider;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.juddi.api_v3.Node;
import org.apache.juddi.config.AppConfig;
import org.apache.juddi.config.PersistenceManager;
import org.apache.juddi.config.Property;
import org.apache.juddi.mapping.MappingApiToModel;
import org.apache.juddi.mapping.MappingModelToApi;
import org.apache.juddi.model.ChangeRecord;
import org.apache.juddi.model.ReplicationConfiguration;
import org.apache.juddi.v3.client.UDDIService;
import org.apache.juddi.v3.client.cryptor.TransportSecurityHelper;
import org.uddi.repl_v3.ChangeRecordIDType;
import org.uddi.repl_v3.CommunicationGraph.Edge;
import org.uddi.repl_v3.HighWaterMarkVectorType;
import org.uddi.repl_v3.NotifyChangeRecordsAvailable;
import org.uddi.repl_v3.Operator;
import org.uddi.v3_service.UDDIReplicationPortType;
/**
* Handles when local records have been changed, change journal storage and
* notifications to all remote replication nodes that something has been
* altered.
*
* @author &lt;a href=&quot;mailto:alexoree@apache.org&quot;&gt;Alex O'Ree&lt;/a&gt;
*
*/
public class ReplicationNotifier extends TimerTask {
<span class="fc" id="L66"> private static Log log = LogFactory.getLog(ReplicationNotifier.class);</span>
<span class="fc" id="L67"> private Timer timer = null;</span>
<span class="fc" id="L68"> private long startBuffer = 5000;//AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER, 20000l); // 20s startup delay default </span>
<span class="fc" id="L69"> private long interval = 5000;//AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l); //5 min default</span>
<span class="fc" id="L70"> private static String node = null;</span>
<span class="fc" id="L71"> private static UDDIService uddiService = new UDDIService();</span>
/**
* default constructor
*
* @throws ConfigurationException
*/
public ReplicationNotifier() throws ConfigurationException {
<span class="fc" id="L79"> super();</span>
<span class="fc" id="L80"> init();</span>
<span class="fc" id="L81"> }</span>
private synchronized void init() throws ConfigurationException {
<span class="fc" id="L83"> timer = new Timer(true);</span>
<span class="fc" id="L84"> startBuffer=AppConfig.getConfiguration().getLong(Property.JUDDI_REPLICATION_START_BUFFER, 5000l);</span>
<span class="fc" id="L85"> interval = AppConfig.getConfiguration().getLong(Property.JUDDI_REPLICATION_INTERVAL, 5000l);</span>
<span class="fc" id="L86"> timer.scheduleAtFixedRate(this, startBuffer, interval);</span>
<span class="pc bpc" id="L87" title="1 of 2 branches missed."> if (queue == null) {</span>
<span class="nc" id="L88"> queue = new ConcurrentLinkedQueue();</span>
}
<span class="fc" id="L90"> node = AppConfig.getConfiguration().getString(Property.JUDDI_NODE_ID, &quot;UNDEFINED_NODE_NAME&quot;);</span>
<span class="fc" id="L91"> }</span>
@Override
public boolean cancel() {
<span class="fc" id="L95"> timer.cancel();</span>
//TODO notify other nodes that i'm going down
<span class="fc" id="L97"> return super.cancel();</span>
}
//ReplicationNotifier.Enqueue(this);
public synchronized static void enqueue(org.apache.juddi.model.ChangeRecord change) {
<span class="fc bfc" id="L102" title="All 2 branches covered."> if (queue == null) {</span>
<span class="fc" id="L103"> queue = new ConcurrentLinkedQueue&lt;org.apache.juddi.model.ChangeRecord&gt;();</span>
}
<span class="fc" id="L105"> queue.add(change);</span>
<span class="fc" id="L106"> }</span>
public synchronized static void EnqueueRetransmit(org.uddi.repl_v3.ChangeRecord change) {
<span class="nc bnc" id="L109" title="All 2 branches missed."> if (queue2 == null) {</span>
<span class="nc" id="L110"> queue2 = new ConcurrentLinkedQueue&lt;org.uddi.repl_v3.ChangeRecord&gt;();</span>
}
<span class="nc" id="L112"> queue2.add(change);</span>
<span class="nc" id="L113"> }</span>
static Queue&lt;org.apache.juddi.model.ChangeRecord&gt; queue;
static Queue&lt;org.uddi.repl_v3.ChangeRecord&gt; queue2;
/**
* Note: this is for locally originated changes only, see null null null {@link org.apache.juddi.api.impl.UDDIReplicationImpl.PullTimerTask#PersistChangeRecord PersistChangeRecord
* } for how remote changes are processed
*
* @param j must be one of the UDDI save APIs
*
*/
protected void ProcessChangeRecord(org.apache.juddi.model.ChangeRecord j) {
//store and convert the changes to database model
//TODO need a switch to send the notification without persisting the record
//this is to support multihop notifications
<span class="fc" id="L129"> EntityManager em = PersistenceManager.getEntityManager();</span>
<span class="fc" id="L130"> EntityTransaction tx = null;</span>
try {
<span class="fc" id="L132"> tx = em.getTransaction();</span>
<span class="fc" id="L133"> tx.begin();</span>
<span class="fc" id="L134"> j.setIsAppliedLocally(true);</span>
<span class="fc" id="L135"> em.persist(j);</span>
<span class="fc" id="L136"> j.setOriginatingUSN(j.getId());</span>
<span class="fc" id="L137"> em.merge(j);</span>
<span class="fc" id="L138"> log.info(&quot;CR saved locally, it was from &quot; + j.getNodeID()</span>
<span class="fc" id="L139"> + &quot; USN:&quot; + j.getOriginatingUSN()</span>
<span class="fc" id="L140"> + &quot; Type:&quot; + j.getRecordType().name()</span>
<span class="fc" id="L141"> + &quot; Key:&quot; + j.getEntityKey()</span>
<span class="fc" id="L142"> + &quot; Local id:&quot; + j.getId());</span>
<span class="fc" id="L143"> tx.commit();</span>
<span class="nc" id="L144"> } catch (Exception ex) {</span>
<span class="nc" id="L145"> log.fatal(&quot;unable to store local change record locally!!&quot;, ex);</span>
<span class="nc bnc" id="L146" title="All 4 branches missed."> if (tx != null &amp;&amp; tx.isActive()) {</span>
<span class="nc" id="L147"> tx.rollback();</span>
}
<span class="nc" id="L149"> JAXB.marshal(MappingModelToApi.mapChangeRecord(j), System.out);</span>
} finally {
<span class="pc" id="L151"> em.close();</span>
<span class="pc" id="L152"> }</span>
<span class="fc" id="L154"> log.debug(&quot;ChangeRecord: &quot; + j.getId() + &quot;,&quot; + j.getEntityKey() + &quot;,&quot; + j.getNodeID() + &quot;,&quot; + j.getOriginatingUSN() + &quot;,&quot; + j.getRecordType().toString());</span>
<span class="fc" id="L155"> SendNotifications(j.getId(), j.getNodeID(), false);</span>
<span class="fc" id="L157"> }</span>
private void SendNotifications(Long id, String origin_node, boolean isRetrans) {
<span class="fc" id="L161"> org.uddi.repl_v3.ReplicationConfiguration repcfg = FetchEdges();</span>
<span class="pc bpc" id="L163" title="1 of 2 branches missed."> if (repcfg == null) {</span>
<span class="nc" id="L164"> log.debug(&quot;No replication configuration is defined!&quot;);</span>
<span class="nc" id="L165"> return;</span>
}
<span class="pc bpc" id="L168" title="2 of 4 branches missed."> if (id == null || origin_node == null) {</span>
<span class="nc" id="L169"> log.fatal(&quot;Either the id is null or the origin_node is null. I can't send out this alert!!&quot;);</span>
//throw new Exception(node);
<span class="nc" id="L171"> return;</span>
}
<span class="fc" id="L174"> Set&lt;Object&gt; destinationUrls = new HashSet&lt;Object&gt;();</span>
/**
* In the absence of a communicationGraph element from the
* Replication Configuration Structure (although it's mandatory
* in the xsd), all nodes listed in the node element MAY send
* any and all messages to any other node of the registry.
*/
<span class="pc bpc" id="L182" title="1 of 2 branches missed."> if (repcfg.getCommunicationGraph() == null</span>
<span class="pc bpc" id="L183" title="2 of 4 branches missed."> || repcfg.getCommunicationGraph().getEdge().isEmpty() &amp;&amp; !isRetrans) {</span>
//no edges or graph defined, default to the operator list
//retransmission only applies to non-directed-edge replication, thus the extra check
<span class="fc bfc" id="L186" title="All 2 branches covered."> for (Operator o : repcfg.getOperator()) {</span>
//no need to tell myself about a change at myself or the origin
<span class="pc bpc" id="L188" title="3 of 4 branches missed."> if (!o.getOperatorNodeID().equalsIgnoreCase(node) &amp;&amp; !o.getOperatorNodeID().equalsIgnoreCase(origin_node)) {</span>
<span class="nc" id="L189"> destinationUrls.add(o.getSoapReplicationURL());</span>
}
<span class="fc" id="L191"> }</span>
} else {
//this is for directed graph replication
//find all nodes that i need to notify
<span class="nc" id="L195"> Iterator&lt;Edge&gt; iterator = repcfg.getCommunicationGraph().getEdge().iterator();</span>
<span class="nc bnc" id="L196" title="All 2 branches missed."> while (iterator.hasNext()) {</span>
<span class="nc" id="L197"> Edge next = iterator.next();</span>
<span class="nc bnc" id="L199" title="All 2 branches missed."> if (next.getMessageSender().equalsIgnoreCase(node)) {</span>
//this is my server and i need to transmit the notification to
<span class="nc" id="L202"> String messageReceiver = next.getMessageReceiver();</span>
<span class="nc" id="L203"> PrimaryAlternate container = new PrimaryAlternate();</span>
//pointless to send a notification to myself or the origin
<span class="nc bnc" id="L205" title="All 4 branches missed."> if (!messageReceiver.equalsIgnoreCase(node) &amp;&amp; !messageReceiver.equalsIgnoreCase(origin_node)) {</span>
//look up the endpoint urls
<span class="nc bnc" id="L207" title="All 2 branches missed."> for (int x = 0; x &lt; repcfg.getOperator().size(); x++) {</span>
<span class="nc bnc" id="L208" title="All 2 branches missed."> if (repcfg.getOperator().get(x).getOperatorNodeID().equalsIgnoreCase(messageReceiver)) {</span>
<span class="nc" id="L209"> container.primaryUrl = repcfg.getOperator().get(x).getSoapReplicationURL();</span>
}
}
<span class="nc bnc" id="L212" title="All 2 branches missed."> for (int y = 0; y &lt; next.getMessageReceiverAlternate().size(); y++) {</span>
<span class="nc bnc" id="L213" title="All 2 branches missed."> for (int x = 0; x &lt; repcfg.getOperator().size(); x++) {</span>
<span class="nc bnc" id="L214" title="All 2 branches missed."> if (repcfg.getOperator().get(x).getOperatorNodeID().equalsIgnoreCase(next.getMessageReceiverAlternate().get(y))) {</span>
<span class="nc" id="L215"> container.alternateUrls.add(repcfg.getOperator().get(x).getSoapReplicationURL());</span>
}
}
}
}
<span class="nc bnc" id="L220" title="All 2 branches missed."> if (container.primaryUrl != null) {</span>
<span class="nc" id="L221"> destinationUrls.add(container);</span>
} else {
<span class="nc" id="L223"> log.warn(&quot;Unable to find primary url for directed edge graph replication from this node &quot; + node + &quot; to &quot;</span>
<span class="nc" id="L224"> + &quot;destination node &quot; + next.getMessageReceiver() + &quot; it will be ignored!&quot;);</span>
}
}
<span class="nc" id="L229"> }</span>
}
<span class="pc bpc" id="L233" title="1 of 2 branches missed."> if (destinationUrls.isEmpty()) {</span>
<span class="fc" id="L234"> log.debug(&quot;Something is bizarre with the replication config. I should have had at least one node to notify, but I have none!&quot;);</span>
<span class="fc" id="L235"> return;</span>
}
<span class="nc" id="L237"> UDDIReplicationPortType x = uddiService.getUDDIReplicationPort();</span>
<span class="nc" id="L238"> TransportSecurityHelper.applyTransportSecurity((BindingProvider) x);</span>
<span class="nc bnc" id="L240" title="All 2 branches missed."> for (Object s : destinationUrls) {</span>
<span class="nc" id="L242"> NotifyChangeRecordsAvailable req = new NotifyChangeRecordsAvailable();</span>
<span class="nc" id="L244"> req.setNotifyingNode(node);</span>
<span class="nc" id="L245"> HighWaterMarkVectorType highWaterMarkVectorType = new HighWaterMarkVectorType();</span>
<span class="nc" id="L247"> highWaterMarkVectorType.getHighWaterMark().add(new ChangeRecordIDType(origin_node, id));</span>
<span class="nc" id="L248"> req.setChangesAvailable(highWaterMarkVectorType);</span>
<span class="nc bnc" id="L250" title="All 2 branches missed."> if (s instanceof String) {</span>
<span class="nc" id="L251"> SendNotification(x, (String) s, req);</span>
<span class="nc bnc" id="L252" title="All 2 branches missed."> } else if (s instanceof PrimaryAlternate) {</span>
//more complex directed graph stuff
<span class="nc" id="L254"> PrimaryAlternate pa = (PrimaryAlternate) s;</span>
<span class="nc bnc" id="L255" title="All 2 branches missed."> if (!SendNotification(x, pa.primaryUrl, req)) {</span>
<span class="nc bnc" id="L256" title="All 2 branches missed."> for (String url : pa.alternateUrls) {</span>
<span class="nc bnc" id="L257" title="All 2 branches missed."> if (SendNotification(x, url, req)) {</span>
<span class="nc" id="L258"> break;</span>
}
//no need to continue to additional alternates
<span class="nc" id="L261"> }</span>
} else {
//primary url succeeded, no further action required
}
}
//TODO the spec talks about control messages, should we even support it? seems pointless
<span class="nc" id="L269"> }</span>
<span class="nc" id="L271"> }</span>
/**
* return true if successful
*
* @param x
* @param s
* @param req
* @return
*/
private boolean SendNotification(UDDIReplicationPortType x, String s, NotifyChangeRecordsAvailable req) {
<span class="nc" id="L282"> ((BindingProvider) x).getRequestContext().put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, s);</span>
try {
<span class="nc" id="L284"> x.notifyChangeRecordsAvailable(req);</span>
<span class="nc" id="L285"> log.info(&quot;Successfully sent change record available message to &quot; + s + &quot; this node: &quot; + node);</span>
<span class="nc" id="L286"> return true;</span>
<span class="nc" id="L287"> } catch (Exception ex) {</span>
<span class="nc" id="L288"> log.warn(&quot;Unable to send change notification to &quot; + s + &quot; this node: &quot; + node + &quot; reason: &quot; + ex.getMessage());</span>
<span class="nc" id="L289"> log.debug(&quot;Unable to send change notification to &quot; + s, ex);</span>
}
<span class="nc" id="L291"> return false;</span>
}
<span class="nc" id="L294"> private static class PrimaryAlternate {</span>
<span class="nc" id="L296"> String primaryUrl = null;</span>
<span class="nc" id="L297"> List&lt;String&gt; alternateUrls = new ArrayList&lt;String&gt;();</span>
}
public synchronized void run() {
<span class="fc" id="L301"> log.debug(&quot;Replication thread triggered&quot;);</span>
<span class="pc bpc" id="L302" title="1 of 2 branches missed."> if (queue == null) {</span>
<span class="nc" id="L303"> queue = new ConcurrentLinkedQueue();</span>
}
<span class="fc bfc" id="L305" title="All 2 branches covered."> if (queue2 == null) {</span>
<span class="fc" id="L306"> queue2 = new ConcurrentLinkedQueue();</span>
}
//TODO revisie this
<span class="fc bfc" id="L309" title="All 2 branches covered."> if (!queue.isEmpty()) {</span>
<span class="fc" id="L310"> log.info(&quot;Replication, Notifying nodes of new change records. &quot; + queue.size() + &quot; queued&quot;);</span>
}
//TODO check for replication config changes
<span class="fc bfc" id="L314" title="All 2 branches covered."> while (!queue.isEmpty()) {</span>
//for each change at this node
<span class="fc" id="L317"> ChangeRecord j = queue.poll();</span>
<span class="fc" id="L318"> ProcessChangeRecord(j);</span>
<span class="fc" id="L320"> }</span>
<span class="pc bpc" id="L322" title="1 of 2 branches missed."> while (!queue2.isEmpty()) {</span>
//for each change at this node
<span class="nc" id="L325"> org.uddi.repl_v3.ChangeRecord j = queue2.poll();</span>
<span class="nc" id="L327"> ChangeRecord model = new ChangeRecord();</span>
try {
<span class="nc" id="L329"> model = MappingApiToModel.mapChangeRecord(j);</span>
<span class="nc" id="L330"> } catch (UnsupportedEncodingException ex) {</span>
<span class="nc" id="L331"> Logger.getLogger(ReplicationNotifier.class.getName()).log(Level.SEVERE, null, ex);</span>
<span class="nc" id="L332"> }</span>
<span class="nc" id="L333"> log.info(&quot;retransmitting CR notificationm entity owner: &quot; + j.getChangeID().getNodeID() + &quot; CR: &quot; + j.getChangeID().getOriginatingUSN() + &quot; key:&quot; + model.getEntityKey() + &quot; &quot; + model.getRecordType().name() + &quot; accepted locally:&quot; + model.getIsAppliedLocally());</span>
<span class="nc" id="L334"> SendNotifications(j.getChangeID().getOriginatingUSN(), j.getChangeID().getNodeID(), true);</span>
<span class="nc" id="L336"> }</span>
<span class="fc" id="L337"> }</span>
/**
* returns the latest version of the replication config or null if there
* is no config
*
* @return
*/
public static org.uddi.repl_v3.ReplicationConfiguration FetchEdges() {
<span class="fc" id="L347"> EntityManager em = PersistenceManager.getEntityManager();</span>
<span class="fc" id="L348"> EntityTransaction tx = null;</span>
<span class="fc" id="L349"> org.uddi.repl_v3.ReplicationConfiguration item = new org.uddi.repl_v3.ReplicationConfiguration();</span>
try {
<span class="fc" id="L351"> tx = em.getTransaction();</span>
<span class="fc" id="L352"> tx.begin();</span>
<span class="fc" id="L353"> Query q = em.createQuery(&quot;SELECT item FROM ReplicationConfiguration item order by item.serialNumber DESC&quot;);</span>
<span class="fc" id="L354"> q.setMaxResults(1);</span>
<span class="fc" id="L355"> ReplicationConfiguration results = (ReplicationConfiguration) q.getSingleResult();</span>
// ReplicationConfiguration find = em.find(ReplicationConfiguration.class, null);
<span class="pc bpc" id="L357" title="1 of 2 branches missed."> if (results != null) {</span>
<span class="fc" id="L358"> MappingModelToApi.mapReplicationConfiguration(results, item);</span>
}
<span class="fc" id="L361"> tx.commit();</span>
<span class="fc" id="L362"> return item;</span>
<span class="nc" id="L363"> } catch (Exception ex) {</span>
//log.error(&quot;error&quot;, ex);
//no config available
<span class="nc bnc" id="L366" title="All 4 branches missed."> if (tx != null &amp;&amp; tx.isActive()) {</span>
<span class="nc" id="L367"> tx.rollback();</span>
}
} finally {
<span class="pc" id="L370"> em.close();</span>
<span class="nc" id="L371"> }</span>
<span class="nc" id="L372"> return null;</span>
}
@Deprecated
private Node getNode(String messageSender) {
<span class="nc" id="L377"> EntityManager em = PersistenceManager.getEntityManager();</span>
<span class="nc" id="L378"> EntityTransaction tx = null;</span>
try {
<span class="nc" id="L380"> tx = em.getTransaction();</span>
<span class="nc" id="L381"> tx.begin();</span>
<span class="nc" id="L382"> Node api = new Node();</span>
<span class="nc" id="L383"> org.apache.juddi.model.Node find = em.find(org.apache.juddi.model.Node.class, messageSender);</span>
<span class="nc bnc" id="L384" title="All 2 branches missed."> if (find != null) {</span>
<span class="nc" id="L385"> MappingModelToApi.mapNode(find, api);</span>
}
<span class="nc" id="L387"> tx.commit();</span>
<span class="nc" id="L388"> return api;</span>
<span class="nc" id="L389"> } catch (Exception ex) {</span>
<span class="nc" id="L390"> log.error(&quot;error&quot;, ex);</span>
<span class="nc bnc" id="L391" title="All 4 branches missed."> if (tx != null &amp;&amp; tx.isActive()) {</span>
<span class="nc" id="L392"> tx.rollback();</span>
}
} finally {
<span class="nc" id="L395"> em.close();</span>
<span class="nc" id="L396"> }</span>
<span class="nc" id="L397"> return null;</span>
}
}
</pre><div class="footer"><span class="right">Created with <a href="http://www.jacoco.org/jacoco">JaCoCo</a> 0.7.9.201702052155</span></div></body></html>