/* | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, | |
* software distributed under the License is distributed on an | |
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
* KIND, either express or implied. See the License for the | |
* specific language governing permissions and limitations | |
* under the License. | |
*/ | |
package org.apache.tuscany.sca.binding.notification; | |
import java.io.InputStream; | |
import java.io.OutputStream; | |
import java.net.URI; | |
import java.net.URL; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import javax.servlet.ServletInputStream; | |
import javax.servlet.ServletOutputStream; | |
import org.apache.tuscany.sca.binding.notification.encoding.Broker; | |
import org.apache.tuscany.sca.binding.notification.encoding.BrokerConsumerReference; | |
import org.apache.tuscany.sca.binding.notification.encoding.BrokerID; | |
import org.apache.tuscany.sca.binding.notification.encoding.BrokerProducerReference; | |
import org.apache.tuscany.sca.binding.notification.encoding.Brokers; | |
import org.apache.tuscany.sca.binding.notification.encoding.Constants; | |
import org.apache.tuscany.sca.binding.notification.encoding.EncodingException; | |
import org.apache.tuscany.sca.binding.notification.encoding.EncodingObject; | |
import org.apache.tuscany.sca.binding.notification.encoding.EncodingRegistry; | |
import org.apache.tuscany.sca.binding.notification.encoding.EncodingUtils; | |
import org.apache.tuscany.sca.binding.notification.encoding.EndConsumers; | |
import org.apache.tuscany.sca.binding.notification.encoding.EndProducers; | |
import org.apache.tuscany.sca.binding.notification.encoding.EndpointAddress; | |
import org.apache.tuscany.sca.binding.notification.encoding.EndpointReference; | |
import org.apache.tuscany.sca.binding.notification.encoding.EndpointReferenceWrapper; | |
import org.apache.tuscany.sca.binding.notification.encoding.NeighborBrokerConsumers; | |
import org.apache.tuscany.sca.binding.notification.encoding.Neighbors; | |
import org.apache.tuscany.sca.binding.notification.encoding.NewBroker; | |
import org.apache.tuscany.sca.binding.notification.encoding.NewBrokerAck; | |
import org.apache.tuscany.sca.binding.notification.encoding.NewBrokerResponse; | |
import org.apache.tuscany.sca.binding.notification.encoding.NewConsumer; | |
import org.apache.tuscany.sca.binding.notification.encoding.NewConsumerResponse; | |
import org.apache.tuscany.sca.binding.notification.encoding.NewProducer; | |
import org.apache.tuscany.sca.binding.notification.encoding.NewProducerResponse; | |
import org.apache.tuscany.sca.binding.notification.encoding.RemoveBroker; | |
import org.apache.tuscany.sca.binding.notification.encoding.RemovedBroker; | |
import org.apache.tuscany.sca.binding.notification.encoding.ReplaceBrokerConnection; | |
import org.apache.tuscany.sca.binding.notification.util.IOUtils; | |
import org.apache.tuscany.sca.binding.notification.util.NotificationServlet; | |
import org.apache.tuscany.sca.binding.notification.util.URIUtil; | |
import org.apache.tuscany.sca.binding.notification.util.IOUtils.IOUtilsException; | |
import org.apache.tuscany.sca.binding.notification.util.IOUtils.ReadableContinuation; | |
import org.apache.tuscany.sca.binding.notification.util.IOUtils.Writeable; | |
import org.apache.tuscany.sca.binding.notification.util.NotificationServlet.NotificationServletStreamHandler; | |
import org.apache.tuscany.sca.host.http.ServletHost; | |
/** | |
* A notification type manager serves as a registry of producers and consumers, or brokers, for | |
* any notification type. This class implements an interface that allows a reference provider | |
* (a producer), a service provider (a consumer), or both (a broker, via the provider factory), | |
* to access locally the ntm for its notification type, regardless of whether the ntm resides | |
* locally or remotely. | |
* At a given host there is only one reference provider and/or one service provider for any given | |
* notification type. So, if the ntm for a notification type resides locally, then it is invoked | |
* exclusively by either a reference provider (newProducer), a service provider (newConsumer), or | |
* a provider factory (newBroker). And since these invocations occur when the providers are being | |
* created then all three of consumerLists, producerLists and brokerLists must be null when these | |
* invocations occur. | |
* | |
* @version $Rev$ $Date$ | |
*/ | |
public class NotificationTypeManagerImpl implements NotificationTypeManager { | |
private static final String ntmPathBase = "/ntm"; | |
private ServletHost servletHost; | |
private EncodingRegistry encodingRegistry; | |
private Map<URI, NotificationTypeManagerHandler> ntmHandlers; | |
public NotificationTypeManagerImpl() { | |
} | |
public void setServletHost(ServletHost servletHost) { | |
this.servletHost = servletHost; | |
} | |
public void setEncodingRegistry(EncodingRegistry encodingRegistry) { | |
this.encodingRegistry = encodingRegistry; | |
} | |
public void init() { | |
ntmHandlers = new HashMap<URI, NotificationTypeManagerHandler>(); | |
} | |
public String newConsumer(URI notificationType, URL consumerUrl, URL remoteNtmUrl, List<URL> producerListResult) { | |
if (ntmUrlIsRemote(consumerUrl, remoteNtmUrl)) { | |
try { | |
WriteableEPW wEPW = new WriteableEPW(new NewConsumer(), consumerUrl); | |
InputStreamDecoder isd = new InputStreamDecoder(); | |
NewConsumerResponse ncr = | |
(NewConsumerResponse)IOUtils.sendHttpRequest(remoteNtmUrl, Constants.NEW_CONSUMER_OP, wEPW, isd); | |
String sequenceType = ncr.getSequenceType(); | |
if (Constants.EndProducers.equals(sequenceType) || Constants.BrokerProducers.equals(sequenceType)) { | |
for (EndpointReference epr : ncr.getReferenceSequence()) { | |
producerListResult.add(epr.getEndpointAddress().getAddress()); | |
} | |
} | |
return sequenceType; | |
} catch(Exception e) { | |
e.printStackTrace(); | |
throw new RuntimeException(e); | |
} | |
} else { | |
NotificationTypeManagerHandler ntmHandler = ntmHandlers.get(notificationType); | |
if (ntmHandler != null) { | |
throw new RuntimeException("Trying to deploy local consumer with existing local producer, consumer or broker"); | |
} | |
createNtmHandler(consumerUrl.getAuthority(), notificationType, consumerUrl, null, null); | |
return Constants.NoProducers; | |
} | |
} | |
private void createNtmHandler(String ntmUriAuthority, URI notificationType, URL consumerUrl, URL producerUrl, BrokerStruct broker) { | |
String ntmUri = "http://" + ntmUriAuthority + ntmPathBase + URIUtil.getPath(notificationType); | |
NotificationTypeManagerHandler ntmh = new NotificationTypeManagerHandler(notificationType, consumerUrl, producerUrl, broker); | |
ntmHandlers.put(notificationType, ntmh); | |
servletHost.addServletMapping(ntmUri, new NotificationServlet(ntmh)); | |
} | |
public String newProducer(URI notificationType, URL producerUrl, URL remoteNtmUrl, List<URL> consumerListResult) { | |
if (ntmUrlIsRemote(producerUrl, remoteNtmUrl)) { | |
try { | |
WriteableEPW wEPW = new WriteableEPW(new NewProducer(), producerUrl); | |
InputStreamDecoder isd = new InputStreamDecoder(); | |
NewProducerResponse npr = | |
(NewProducerResponse)IOUtils.sendHttpRequest(remoteNtmUrl, Constants.NEW_PRODUCER_OP, wEPW, isd); | |
String sequenceType = npr.getSequenceType(); | |
if (Constants.EndConsumers.equals(sequenceType) || Constants.BrokerConsumers.equals(sequenceType)) { | |
for (EndpointReference epr : npr.getReferenceSequence()) { | |
consumerListResult.add(epr.getEndpointAddress().getAddress()); | |
} | |
} | |
return sequenceType; | |
} catch(Exception e) { | |
e.printStackTrace(); | |
throw new RuntimeException(e); | |
} | |
} else { | |
NotificationTypeManagerHandler ntmHandler = ntmHandlers.get(notificationType); | |
if (ntmHandler != null) { | |
throw new RuntimeException("Trying to deploy local producer with existing local producer, consumer or broker"); | |
} | |
createNtmHandler(producerUrl.getAuthority(), notificationType, null, producerUrl, null); | |
return Constants.NoConsumers; | |
} | |
} | |
public boolean newBroker(URI notificationType, | |
URL consumerUrl, | |
URL producerUrl, | |
String brokerID, | |
URL remoteNtmUrl, | |
List<EndpointReference> consumerListResult, | |
List<EndpointReference> producerListResult) { | |
String ntmUriAuthority = producerUrl.getAuthority(); | |
if (!ntmUriAuthority.equals(consumerUrl.getAuthority())) { | |
throw new RuntimeException("Producer url and consumer url do not match"); | |
} | |
if (ntmUrlIsRemote(producerUrl, remoteNtmUrl)) { | |
try { | |
WriteableNewBroker wnb = new WriteableNewBroker(consumerUrl, producerUrl, brokerID); | |
InputStreamDecoder isd = new InputStreamDecoder(); | |
NewBrokerResponse nbr = | |
(NewBrokerResponse)IOUtils.sendHttpRequest(remoteNtmUrl, Constants.NEW_BROKER_OP, wnb, isd); | |
if (nbr.isFirstBroker()) { | |
if (nbr.getEndConsumers().getSequenceType().equals(Constants.EndConsumers)) { | |
for (EndpointReference epr : nbr.getEndConsumers().getReferenceSequence()) { | |
consumerListResult.add(epr); | |
} | |
} | |
if (nbr.getEndProducers().getSequenceType().equals(Constants.EndProducers)) { | |
for (EndpointReference epr : nbr.getEndProducers().getReferenceSequence()) { | |
producerListResult.add(epr); | |
} | |
} | |
} else { | |
for (Broker broker : nbr.getBrokers().getBrokerSequence()) { | |
consumerListResult.add(broker.getBrokerConsumerReference().getReference()); | |
producerListResult.add(broker.getBrokerProducerReference().getReference()); | |
} | |
} | |
return nbr.isFirstBroker(); | |
} catch(Exception e) { | |
e.printStackTrace(); | |
throw new RuntimeException(e); | |
} | |
} else { | |
NotificationTypeManagerHandler ntmHandler = ntmHandlers.get(notificationType); | |
if (ntmHandler != null) { | |
throw new RuntimeException("Trying to deploy local broker with existing local producer, consumer or broker"); | |
} | |
BrokerStruct broker = new BrokerStruct(consumerUrl, producerUrl, brokerID); | |
createNtmHandler(ntmUriAuthority, notificationType, null, null, broker); | |
return true; | |
} | |
} | |
private boolean ntmUrlIsRemote(URL localUrl, URL ntmUrl) { | |
if (ntmUrl == null) { | |
return false; | |
} | |
if (localUrl.getPort() != ntmUrl.getPort()) { | |
return true; | |
} | |
String remoteNtmUrlAuthority = ntmUrl.getAuthority(); | |
if (remoteNtmUrlAuthority.indexOf("localhost") >= 0) { | |
return false; | |
} | |
return !localUrl.getAuthority().equals(remoteNtmUrlAuthority); | |
} | |
public void newBrokerAck(URL remoteNtmUrl) { | |
try { | |
IOUtils.sendHttpRequest(remoteNtmUrl, Constants.NEW_BROKER_ACK_OP, new WriteableNewBrokerAck(), null); | |
} catch(Exception e) { | |
e.printStackTrace(); | |
throw new RuntimeException(e); | |
} | |
} | |
public void removeBroker(EndpointReference brokerConsumerEpr, List<EndpointReference> neighborBrokerConsumerEprs, URL remoteNtmUrl) { | |
WriteableRemoveBroker wrb = new WriteableRemoveBroker(brokerConsumerEpr, neighborBrokerConsumerEprs); | |
try { | |
IOUtils.sendHttpRequest(remoteNtmUrl, Constants.REMOVE_BROKER_OP, wrb, null); | |
} catch(Exception e) { | |
e.printStackTrace(); | |
throw new RuntimeException(e); | |
} | |
} | |
private class NotificationTypeManagerHandler implements NotificationServletStreamHandler { | |
private URI notificationType; | |
List<URL> consumerList; | |
List<URL> producerList; | |
List<BrokerStruct> brokerList; | |
private NotificationTypeLock notificationTypeLock; | |
private BrokerStruct pendingBroker; | |
public NotificationTypeManagerHandler(URI notificationType) { | |
this.notificationType = notificationType; | |
this.notificationTypeLock = new NotificationTypeLock(); | |
this.pendingBroker = null; | |
} | |
public NotificationTypeManagerHandler(URI notificationType, URL consumerUrl, URL producerUrl, BrokerStruct broker) { | |
this(notificationType); | |
if (consumerUrl != null) { | |
addConsumer(consumerUrl); | |
} else if (producerUrl != null) { | |
addProducer(producerUrl); | |
} else if (broker != null) { | |
addBroker(broker); | |
} | |
} | |
private void addConsumer(URL consumerUrl) { | |
if (consumerList == null) { | |
consumerList = new ArrayList<URL>(); | |
} | |
consumerList.add(consumerUrl); | |
} | |
private void addProducer(URL producerUrl) { | |
if (producerList == null) { | |
producerList = new ArrayList<URL>(); | |
} | |
producerList.add(producerUrl); | |
} | |
private void addBroker(BrokerStruct broker) { | |
if (brokerList == null) { | |
brokerList = new ArrayList<BrokerStruct>(); | |
} | |
brokerList.add(broker); | |
} | |
public void handle(Map<String, String> headers, ServletInputStream istream, int contentLength, ServletOutputStream ostream) { | |
String opHeader = headers.get(IOUtils.Notification_Operation); | |
EncodingObject eo = null; | |
try { | |
eo = EncodingUtils.decodeFromStream(encodingRegistry, istream); | |
} catch(EncodingException e) { | |
throw new RuntimeException(e); | |
} | |
if (Constants.NEW_CONSUMER_OP.equals(opHeader)) { | |
handleNewConsumer((NewConsumer)eo, ostream); | |
} else if(Constants.NEW_PRODUCER_OP.equals(opHeader)) { | |
handleNewProducer((NewProducer)eo, ostream); | |
} else if(Constants.NEW_BROKER_OP.equals(opHeader)) { | |
handleNewBroker((NewBroker)eo, ostream); | |
} else if (Constants.NEW_BROKER_ACK_OP.equals(opHeader)) { | |
handleNewBrokerAck(); | |
} else if (Constants.REMOVE_BROKER_OP.equals(opHeader)) { | |
handleRemoveBroker((RemoveBroker)eo); | |
} | |
} | |
private void handleNewConsumer(NewConsumer nc, ServletOutputStream ostream) { | |
synchronized(notificationTypeLock) { | |
if (notificationTypeLock.isLocked) { | |
try { notificationTypeLock.wait(); } catch(InterruptedException e) {} | |
} | |
URL consumerUrl = nc.getReference().getEndpointAddress().getAddress(); | |
if (brokerList == null) { | |
addConsumer(consumerUrl); | |
} | |
NewConsumerResponse ncr = new NewConsumerResponse(); | |
if (producerList != null) { | |
ncr.setSequenceType(Constants.EndProducers); | |
for (URL producerUrl : producerList) { | |
ncr.addReferenceToSequence(EncodingUtils.createEndpointReference(producerUrl, null)); | |
} | |
} else if(brokerList != null) { | |
ncr.setSequenceType(Constants.BrokerProducers); | |
for (BrokerStruct broker : brokerList) { | |
ncr.addReferenceToSequence(EncodingUtils.createEndpointReference(broker.producerUrl, null)); | |
} | |
} else { | |
ncr.setSequenceType(Constants.NoProducers); | |
} | |
try { | |
EncodingUtils.encodeToStream(encodingRegistry, ncr, ostream); | |
} catch(IOUtilsException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
} | |
private void handleNewProducer(NewProducer np, ServletOutputStream ostream) { | |
synchronized(notificationTypeLock) { | |
if (notificationTypeLock.isLocked) { | |
try { notificationTypeLock.wait(); } catch(InterruptedException e) {} | |
} | |
URL producerUrl = np.getReference().getEndpointAddress().getAddress(); | |
if (brokerList == null) { | |
addProducer(producerUrl); | |
} | |
NewProducerResponse npr = new NewProducerResponse(); | |
if (consumerList != null) { | |
npr.setSequenceType(Constants.EndConsumers); | |
for (URL consumerUrl : consumerList) { | |
npr.addReferenceToSequence(EncodingUtils.createEndpointReference(consumerUrl, null)); | |
} | |
} else if(brokerList != null) { | |
npr.setSequenceType(Constants.BrokerConsumers); | |
for (BrokerStruct broker : brokerList) { | |
npr.addReferenceToSequence(EncodingUtils.createEndpointReference(broker.consumerUrl, null)); | |
} | |
} else { | |
npr.setSequenceType(Constants.NoConsumers); | |
} | |
try { | |
EncodingUtils.encodeToStream(encodingRegistry, npr, ostream); | |
} catch(IOUtilsException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
} | |
private void handleNewBroker(NewBroker nb, ServletOutputStream ostream) { | |
synchronized(notificationTypeLock) { | |
if (notificationTypeLock.isLocked) { | |
try { notificationTypeLock.wait(); } catch(InterruptedException e) {} | |
} | |
NewBrokerResponse nbr = new NewBrokerResponse(); | |
if (consumerList != null || producerList != null || brokerList == null) { | |
nbr.setFirstBroker(true); | |
EndConsumers endConsumers = new EndConsumers(); | |
if (consumerList != null) { | |
endConsumers.setSequenceType(Constants.EndConsumers); | |
for (URL consumerUrl : consumerList) { | |
endConsumers.addReferenceToSequence(EncodingUtils.createEndpointReference(consumerUrl, null)); | |
} | |
} else { | |
endConsumers.setSequenceType(Constants.NoConsumers); | |
} | |
nbr.setEndConsumers(endConsumers); | |
EndProducers endProducers = new EndProducers(); | |
if (producerList != null) { | |
endProducers.setSequenceType(Constants.EndProducers); | |
for (URL producerUrl : producerList) { | |
endProducers.addReferenceToSequence(EncodingUtils.createEndpointReference(producerUrl, null)); | |
} | |
} else { | |
endProducers.setSequenceType(Constants.NoProducers); | |
} | |
nbr.setEndProducers(endProducers); | |
} else { | |
nbr.setFirstBroker(false); | |
Brokers brokers = new Brokers(); | |
for (BrokerStruct brokerStruct : brokerList) { | |
Broker brokerElt = new Broker(); | |
BrokerConsumerReference bcr = new BrokerConsumerReference(); | |
bcr.setReference(EncodingUtils.createEndpointReference(brokerStruct.consumerUrl, brokerStruct.brokerID)); | |
brokerElt.setBrokerConsumerReference(bcr); | |
BrokerProducerReference bpr = new BrokerProducerReference(); | |
bpr.setReference(EncodingUtils.createEndpointReference(brokerStruct.producerUrl, brokerStruct.brokerID)); | |
brokerElt.setBrokerProducerReference(bpr); | |
brokers.addBrokerToSequence(brokerElt); | |
} | |
nbr.setBrokers(brokers); | |
} | |
EndpointReference consumerEPR = nb.getBrokerConsumerReference().getReference(); | |
URL consumerUrl = consumerEPR.getEndpointAddress().getAddress(); | |
BrokerID consumerBrokerID = consumerEPR.getReferenceProperties().getProperty(BrokerID.class); | |
EndpointReference producerEPR = nb.getBrokerProducerReference().getReference(); | |
URL producerUrl = producerEPR.getEndpointAddress().getAddress(); | |
BrokerID producerBrokerID = producerEPR.getReferenceProperties().getProperty(BrokerID.class); | |
if (consumerBrokerID == null || | |
producerBrokerID == null || | |
!consumerBrokerID.getID().equals(producerBrokerID.getID())) { | |
throw new RuntimeException("Producer and consumer broker ids do not match"); | |
} | |
// only add broker if consumerList == null && producerList == null | |
// otherwise, make it a pending broker and wait for ack | |
// TODO block for a configurable amount of time | |
BrokerStruct broker = new BrokerStruct(consumerUrl, producerUrl, consumerBrokerID.getID()); | |
if (consumerList == null && producerList == null) { | |
addBroker(broker); | |
} else { | |
pendingBroker = broker; | |
notificationTypeLock.isLocked = true; | |
} | |
try { | |
EncodingUtils.encodeToStream(encodingRegistry, nbr, ostream); | |
} catch(IOUtilsException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
} | |
private void handleNewBrokerAck() { | |
synchronized(notificationTypeLock) { | |
if (!notificationTypeLock.isLocked) { | |
notificationTypeLock.notifyAll(); | |
throw new RuntimeException("Notification type should be locked"); | |
} | |
if (brokerList != null) { | |
notificationTypeLock.isLocked = false; | |
notificationTypeLock.notifyAll(); | |
throw new RuntimeException("Can't add pending broker to non-empty broker list"); | |
} | |
if (pendingBroker == null) { | |
notificationTypeLock.isLocked = false; | |
notificationTypeLock.notifyAll(); | |
throw new RuntimeException("Missing pending broker"); | |
} | |
addBroker(pendingBroker); | |
consumerList = null; | |
producerList = null; | |
pendingBroker = null; | |
notificationTypeLock.isLocked = false; | |
notificationTypeLock.notifyAll(); | |
} | |
} | |
private void handleRemoveBroker(RemoveBroker rb) { | |
synchronized(notificationTypeLock) { | |
if (notificationTypeLock.isLocked) { | |
try { notificationTypeLock.wait(); } catch(InterruptedException e) {} | |
} | |
if (brokerList == null) { | |
throw new RuntimeException("No broker to remove for [" + notificationType + "]"); | |
} | |
NeighborBrokerConsumers nbcs = rb.getNeighborBrokerConsumers(); | |
EndpointReference rbEpr = rb.getBrokerConsumerReference().getReference(); | |
if (nbcs != null && nbcs.getReferenceSequence() != null) { | |
List<Broker> neighborBrokers = new ArrayList<Broker>(); | |
for (EndpointReference neighborBrokerConsumerEpr : nbcs.getReferenceSequence()) { | |
BrokerStruct neighborBrokerStruct = null; | |
URL neighborBrokerConsumerEprUrl = neighborBrokerConsumerEpr.getEndpointAddress().getAddress(); | |
for (BrokerStruct brokerStruct : brokerList) { | |
if (brokerStruct.consumerUrl.equals(neighborBrokerConsumerEprUrl)) { | |
neighborBrokerStruct = brokerStruct; | |
break; | |
} | |
} | |
if (neighborBrokerStruct == null) { | |
throw new RuntimeException("Can't find neighbor broker for consumer EPR [" + | |
neighborBrokerConsumerEprUrl + "]"); | |
} | |
BrokerConsumerReference bcr = new BrokerConsumerReference(); | |
bcr.setReference(EncodingUtils.createEndpointReference(neighborBrokerStruct.consumerUrl, neighborBrokerStruct.brokerID)); | |
BrokerProducerReference bpr = new BrokerProducerReference(); | |
bpr.setReference(EncodingUtils.createEndpointReference(neighborBrokerStruct.producerUrl, neighborBrokerStruct.brokerID)); | |
Broker neighborBroker = new Broker(); | |
neighborBroker.setBrokerConsumerReference(bcr); | |
neighborBroker.setBrokerProducerReference(bpr); | |
neighborBrokers.add(neighborBroker); | |
} | |
int lastIndex = neighborBrokers.size() - 1; | |
for (int index = lastIndex; index >= 0; index--) { | |
List<Broker> writeableNeighborBrokers = ((index > 0) ? neighborBrokers.subList(0, index) : null); | |
WriteableReplaceBrokerConnection wrbc = new WriteableReplaceBrokerConnection(rbEpr, writeableNeighborBrokers); | |
URL targetUrl = | |
neighborBrokers.get(index).getBrokerProducerReference().getReference().getEndpointAddress().getAddress(); | |
try { | |
IOUtils.sendHttpRequest(targetUrl, Constants.REPLACE_BROKER_CONNECTION_OP, wrbc, null); | |
} catch(Exception e) { | |
e.printStackTrace(); | |
throw new RuntimeException(e); | |
} | |
} | |
} | |
BrokerStruct removedBrokerStruct = null; | |
URL rbEprUrl = rbEpr.getEndpointAddress().getAddress(); | |
for (BrokerStruct brokerSruct : brokerList) { | |
if (brokerSruct.consumerUrl.equals(rbEprUrl)) { | |
removedBrokerStruct = brokerSruct; | |
break; | |
} | |
} | |
if (removedBrokerStruct == null) { | |
throw new RuntimeException("Can't find broker to remove for EPR [" + rbEprUrl + "]"); | |
} | |
if(!brokerList.remove(removedBrokerStruct)) { | |
throw new RuntimeException("Broker was not removed"); | |
} | |
} | |
} | |
} | |
class NotificationTypeLock { | |
public boolean isLocked; | |
} | |
class WriteableEPW implements Writeable { | |
private EndpointReferenceWrapper epw; | |
public WriteableEPW(EndpointReferenceWrapper epw, URL url) { | |
EndpointAddress epa = new EndpointAddress(); | |
epa.setAddress(url); | |
EndpointReference epr = new EndpointReference(); | |
epr.setEndpointAddress(epa); | |
epw.setReference(epr); | |
this.epw = epw; | |
} | |
public void write(OutputStream os) throws IOUtilsException { | |
EncodingUtils.encodeToStream(encodingRegistry, epw, os); | |
} | |
} | |
class InputStreamDecoder implements ReadableContinuation { | |
public Object read(InputStream istream) throws IOUtilsException { | |
try { | |
return EncodingUtils.decodeFromStream(encodingRegistry, istream); | |
} catch(EncodingException e) { | |
throw new IOUtilsException(e); | |
} | |
} | |
} | |
class BrokerStruct { | |
public URL consumerUrl; | |
public URL producerUrl; | |
public String brokerID; | |
public BrokerStruct(URL consumerUrl, URL producerUrl, String brokerID) { | |
this.consumerUrl = consumerUrl; | |
this.producerUrl = producerUrl; | |
this.brokerID = brokerID; | |
} | |
} | |
class WriteableNewBroker implements Writeable { | |
private NewBroker newBroker; | |
public WriteableNewBroker(URL consumerUrl, URL producerUrl, String brokerID) { | |
newBroker = new NewBroker(); | |
BrokerConsumerReference bcr = new BrokerConsumerReference(); | |
bcr.setReference(EncodingUtils.createEndpointReference(consumerUrl, brokerID)); | |
newBroker.setBrokerConsumerReference(bcr); | |
BrokerProducerReference bpr = new BrokerProducerReference(); | |
bpr.setReference(EncodingUtils.createEndpointReference(producerUrl, brokerID)); | |
newBroker.setBrokerProducerReference(bpr); | |
} | |
public void write(OutputStream os) throws IOUtilsException { | |
EncodingUtils.encodeToStream(encodingRegistry, newBroker, os); | |
} | |
} | |
class WriteableNewBrokerAck implements Writeable { | |
public void write(OutputStream os) throws IOUtilsException { | |
EncodingUtils.encodeToStream(encodingRegistry, new NewBrokerAck(), os); | |
} | |
} | |
class WriteableRemoveBroker implements Writeable { | |
private RemoveBroker removeBroker; | |
public WriteableRemoveBroker(EndpointReference brokerConsumerEpr, List<EndpointReference> neighborBrokerConsumerEprs) { | |
removeBroker = new RemoveBroker(); | |
BrokerConsumerReference brokerConsumerReference = new BrokerConsumerReference(); | |
brokerConsumerReference.setReference(brokerConsumerEpr); | |
removeBroker.setBrokerConsumerReference(brokerConsumerReference); | |
if (neighborBrokerConsumerEprs != null) { | |
NeighborBrokerConsumers neighborBrokerConsumers = new NeighborBrokerConsumers(); | |
neighborBrokerConsumers.setReferenceSequence(neighborBrokerConsumerEprs); | |
neighborBrokerConsumers.setSequenceType(Constants.BrokerConsumers); | |
removeBroker.setNeighborBrokerConsumers(neighborBrokerConsumers); | |
} | |
} | |
public void write(OutputStream os) throws IOUtilsException { | |
EncodingUtils.encodeToStream(encodingRegistry, removeBroker, os); | |
} | |
} | |
class WriteableReplaceBrokerConnection implements Writeable { | |
private ReplaceBrokerConnection replaceBrokerConnection; | |
public WriteableReplaceBrokerConnection(EndpointReference removedBrokerEpr, List<Broker> brokerSequence) { | |
replaceBrokerConnection = new ReplaceBrokerConnection(); | |
RemovedBroker removedBroker = new RemovedBroker(); | |
removedBroker.setReference(removedBrokerEpr); | |
replaceBrokerConnection.setRemovedBroker(removedBroker); | |
if (brokerSequence != null) { | |
Neighbors neighbors = new Neighbors(); | |
neighbors.setBrokerSequence(brokerSequence); | |
replaceBrokerConnection.setNeighbors(neighbors); | |
} | |
} | |
public void write(OutputStream os) throws IOUtilsException { | |
EncodingUtils.encodeToStream(encodingRegistry, replaceBrokerConnection, os); | |
} | |
} | |
} |