blob: 6a713258ca28a4a3878174714c194b984d429cc2 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.streamscope;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* A registry for Stream "oscilloscope" {@link StreamScope} instances.
* <P>
* The registry contains a collection of StreamScope instances
* that are registered by one or more names.
* </P><P>
* The names are: by a TStream {@link org.apache.edgent.topology.TStream#alias(String) alias} or
* by a stream's (output port's) unique identifier.
* Static methods are provided for composing these names and extracting
* the alias/identifier from generated names.
* </P>
* See {@code org.apache.edgent.providers.development.DevelopmentProvider}
*/
public class StreamScopeRegistry {
private final Map<String, StreamScope<?>> byNameMap = new HashMap<>();
private final Map<StreamScope<?>, List<String>> byStreamScopeMap = new HashMap<>();
private static final String JOB_OPLET_FMT = "j[%s].op[%s]";
private static final String STREAMID_FMT = JOB_OPLET_FMT+".o[%d]";
private static final String ID_PREFIX = "id.";
private static final String ALIAS_PREFIX = "alias.";
public StreamScopeRegistry() {
}
/**
* Make a streamId for the specified stream.
* @param jobId the job id (e.g., "JOB_0")
* @param opletId the oplet id (e.g., "OP_2")
* @param oport the oplet output port index (0-based)
* @return the streamId
*/
public static String mkStreamId(String jobId, String opletId, int oport) {
Objects.requireNonNull(jobId, "jobId");
Objects.requireNonNull(opletId, "opletId");
if (oport < 0)
throw new IllegalArgumentException("oport");
return String.format(STREAMID_FMT, jobId, opletId, oport);
}
/** create a prefix of a streamId based name
* @param jobId the job id (e.g., "JOB_0")
* @param opletId the oplet id (e.g., "OP_2")
* @return the value
*/
static String mkStreamIdNamePrefix(String jobId, String opletId) {
return String.format(ID_PREFIX+JOB_OPLET_FMT, jobId, opletId);
}
/** create a registration name for a stream alias
* @param alias the alias
* @return the value
*/
public static String nameForStreamAlias(String alias) {
Objects.requireNonNull(alias, "alias");
return ALIAS_PREFIX+alias;
}
/** Create a registration name for a stream id.
* @param streamId the stream id
* @return the value
* @see #mkStreamId(String, String, int)
*/
public static String nameForStreamId(String streamId) {
Objects.requireNonNull(streamId, "id");
return ID_PREFIX+streamId;
}
/** Extract the stream alias from a name
* @param name the name
* @return null if {@code name} is not from nameByStreamAlias()
*/
public static String streamAliasFromName(String name) {
Objects.requireNonNull(name, "name");
if (!name.startsWith(ALIAS_PREFIX))
return null;
return name.substring(ALIAS_PREFIX.length());
}
/** Extract the streamId from the name.
* @param name the name
* @return null if {@code name} is not from nameByStreamId()
*/
public static String streamIdFromName(String name) {
Objects.requireNonNull(name, "name");
if (!name.startsWith(ID_PREFIX))
return null;
return name.substring(ID_PREFIX.length());
}
/** Register a StreamScope by {@code name}
* <P>
* A single StreamScope can be registered with multiple names.
* </P>
* @param name name to register with
* @param streamScope the StreamScope
* @throws IllegalStateException if a registration already exists for {@code name}
* @see #nameForStreamId(String)
* @see #nameForStreamAlias(String)
*/
public synchronized void register(String name, StreamScope<?> streamScope) {
if (byNameMap.containsKey(name))
throw new IllegalStateException("StreamScope already registered by name "+name);
byNameMap.put(name, streamScope);
List<String> names = byStreamScopeMap.get(streamScope);
if (names == null) {
names = new ArrayList<>(2);
byStreamScopeMap.put(streamScope, names);
}
names.add(name);
}
/**
* Lookup a StreamScope
* @param name a StreamScope is registration name
* @return the StreamScope. null if name is not registered.
* @see #nameForStreamId(String)
* @see #nameForStreamAlias(String)
*/
public synchronized StreamScope<?> lookup(String name) {
return byNameMap.get(name);
}
/**
* Get the registered names.
* @return unmodifiable collection of the name.
* The set is backed by the registry so the set may change.
*/
public synchronized Set<String> getNames() {
return Collections.unmodifiableSet(byNameMap.keySet());
}
/** Get registered StreamScopes and the name(s) each is registered with.
* The map is backed by the registry so its contents may change.
* @return the map
*/
public synchronized Map<StreamScope<?>, List<String>> getStreamScopes() {
return Collections.unmodifiableMap(byStreamScopeMap);
}
/** remove the specific name registration. Other registration of the same StreamScope may still exist.
* no-op if name is not registered.
* @param name the name to unregister
* @see #unregister(StreamScope)
*/
public synchronized void unregister(String name) {
StreamScope<?> streamScope = byNameMap.remove(name);
if (streamScope == null)
return;
List<String> names = byStreamScopeMap.get(streamScope);
names.remove(name);
if (names.isEmpty())
byStreamScopeMap.remove(streamScope);
}
/** remove all name registrations of the StreamScope.
* no-op if no registrations for the StreamScope
* @param streamScope the StreamScope to unregister
*/
public synchronized void unregister(StreamScope<?> streamScope) {
List<String> names = byStreamScopeMap.get(streamScope);
if (names == null)
return;
names = new ArrayList<>(names);
for (String name : names)
unregister(name);
}
/** remove all name registrations of the StreamScopes for the specified oplet.
* no-op if no registrations for the oplet
* @param jobId the job id (e.g., "JOB_0")
* @param opletId the oplet id (e.g., "OP_2")
*/
synchronized void unregister(String jobId, String opletId) {
String prefix = mkStreamIdNamePrefix(jobId, opletId);
List<StreamScope<?>> toUnregister = new ArrayList<>();
for (String name : getNames()) {
if (name.startsWith(prefix)) {
toUnregister.add(lookup(name));
}
}
for (StreamScope<?> streamScope : toUnregister) {
unregister(streamScope);
}
}
}