blob: fe0f3f5314ef226c3b48a5aa84ff9136a5e47567 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Net;
using System.Threading;
using System.IO;
using System.Collections.Generic;
using Apache.NMS.Util;
using Apache.NMS.ActiveMQ.Commands;
namespace Apache.NMS.ActiveMQ.Transport.Discovery.Http
{
public class HttpDiscoveryAgent : AbstractDiscoveryAgent, ISuspendable
{
private enum UpdateState
{
SUSPENDED,
RESUMING,
RESUMED
}
public const string DEFAULT_DISCOVERY_URI_STRING = "http://localhost:8080/discovery-registry";
public const string DEFAULT_GROUP = "default";
private readonly object updateMutex = new object();
private UpdateState state = UpdateState.RESUMED;
private const int DEFAULT_UPDATE_INTERVAL = 10 * 1000;
private long keepAliveInterval = DEFAULT_UPDATE_INTERVAL;
public HttpDiscoveryAgent()
{
this.Group = DEFAULT_GROUP;
}
#region Property setters and getters
public override long KeepAliveInterval
{
get { return this.keepAliveInterval; }
set { this.keepAliveInterval = value; }
}
#endregion
public override String ToString()
{
return "HttpDiscoveryAgent-" + (SelfService != null ? "advertise:" + SelfService : "");
}
protected override void DoStartAgent()
{
if (DiscoveryURI == null || DiscoveryURI.Host.Equals("default"))
{
DiscoveryURI = new Uri(DEFAULT_DISCOVERY_URI_STRING + "/" + Group);
}
if (Tracer.IsDebugEnabled)
{
Tracer.DebugFormat("http agent started with discoveryURI = {0}", DiscoveryURI);
}
}
protected override void DoStopAgent()
{
// Ensure the worker thread exits its wait so it can detect the stopped event.
Resume();
}
public void Suspend()
{
Monitor.Enter(updateMutex);
try
{
this.state = UpdateState.SUSPENDED;
}
finally
{
Monitor.Exit(updateMutex);
}
}
public void Resume()
{
Monitor.Enter(updateMutex);
try
{
this.state = UpdateState.RESUMING;
Monitor.Pulse(updateMutex);
}
finally
{
Monitor.Exit(updateMutex);
}
}
protected override void DoDiscovery()
{
DoUpdate();
Monitor.Enter(updateMutex);
try
{
do
{
if (state == UpdateState.RESUMING)
{
state = UpdateState.RESUMED;
}
else
{
Monitor.Wait(updateMutex, TimeSpan.FromMilliseconds(KeepAliveInterval));
}
}
while (state == UpdateState.SUSPENDED && started.Value);
}
finally
{
Monitor.Exit(updateMutex);
}
}
private void DoUpdate()
{
List<string> activeServices = DoLookup(KeepAliveInterval * 3);
// If there is error talking the the central server, then activeServices == null
if (activeServices != null)
{
lock (discoveredServicesLock)
{
foreach(String service in activeServices)
{
Tracer.DebugFormat("Http discovery found live service: {0}", service);
ProcessLiveService("", service);
}
}
}
}
private List<string> DoLookup(long freshness)
{
String url = DiscoveryURI + "?freshness=" + freshness;
try
{
WebClient client = new WebClient();
string response = client.DownloadString(url);
if (response != null)
{
Tracer.DebugFormat("GET to {0} got a {1}", url, response);
List<string> rc = new List<string>();
StringReader reader = new StringReader(response);
while (true)
{
string line = reader.ReadLine();
if (line == null)
{
break;
}
line = line.Trim();
if (line.Length != 0 && !rc.Contains(line))
{
rc.Add(line);
}
}
return rc;
}
Tracer.DebugFormat("GET to {0} failed to retrieve any services.", url);
return null;
}
catch (Exception e)
{
Tracer.WarnFormat("GET to {0} failed with: {1}", url, e.Message);
return null;
}
}
protected override void DoAdvertizeSelf()
{
// TODO - Don't need this yet unless we want to do some testing.
}
}
}