| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samza.coordinator.stream; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import org.apache.samza.Partition; |
| import org.apache.samza.SamzaException; |
| import org.apache.samza.config.Config; |
| import org.apache.samza.config.MapConfig; |
| import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage; |
| import org.apache.samza.coordinator.stream.messages.SetConfig; |
| import org.apache.samza.metrics.MetricsRegistry; |
| import org.apache.samza.serializers.JsonSerde; |
| import org.apache.samza.serializers.Serde; |
| import org.apache.samza.system.IncomingMessageEnvelope; |
| import org.apache.samza.system.SystemAdmin; |
| import org.apache.samza.system.SystemConsumer; |
| import org.apache.samza.system.SystemFactory; |
| import org.apache.samza.system.SystemStream; |
| import org.apache.samza.system.SystemStreamMetadata; |
| import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata; |
| import org.apache.samza.system.SystemStreamPartition; |
| import org.apache.samza.system.SystemStreamPartitionIterator; |
| import org.apache.samza.util.CoordinatorStreamUtil; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A wrapper around a SystemConsumer that provides helpful methods for dealing |
| * with the coordinator stream. |
| */ |
| public class CoordinatorStreamSystemConsumer { |
| private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamSystemConsumer.class); |
| |
| private final Serde<List<?>> keySerde; |
| private final Serde<Map<String, Object>> messageSerde; |
| private final SystemStreamPartition coordinatorSystemStreamPartition; |
| private final SystemConsumer systemConsumer; |
| private final SystemAdmin systemAdmin; |
| private final Map<String, String> configMap; |
| private volatile boolean isStarted; |
| private volatile boolean isBootstrapped; |
| private final Object bootstrapLock = new Object(); |
| private volatile Set<CoordinatorStreamMessage> bootstrappedStreamSet = Collections.emptySet(); |
| |
| public CoordinatorStreamSystemConsumer(Config config, MetricsRegistry registry) { |
| SystemStream coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config); |
| SystemFactory systemFactory = CoordinatorStreamUtil.getCoordinatorSystemFactory(config); |
| SystemAdmin systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem(), config, this.getClass().getSimpleName()); |
| SystemConsumer systemConsumer = systemFactory.getConsumer(coordinatorSystemStream.getSystem(), config, registry, this.getClass().getSimpleName()); |
| |
| this.coordinatorSystemStreamPartition = new SystemStreamPartition(coordinatorSystemStream, new Partition(0)); |
| this.systemConsumer = systemConsumer; |
| this.systemAdmin = systemAdmin; |
| this.configMap = new HashMap<>(); |
| this.isBootstrapped = false; |
| this.keySerde = new JsonSerde<>(); |
| this.messageSerde = new JsonSerde<>(); |
| } |
| |
| // Used only for test |
| public CoordinatorStreamSystemConsumer(SystemStream coordinatorSystemStream, SystemConsumer systemConsumer, SystemAdmin systemAdmin) { |
| this.coordinatorSystemStreamPartition = new SystemStreamPartition(coordinatorSystemStream, new Partition(0)); |
| this.systemConsumer = systemConsumer; |
| this.systemAdmin = systemAdmin; |
| this.configMap = new HashMap<>(); |
| this.isBootstrapped = false; |
| this.keySerde = new JsonSerde<>(); |
| this.messageSerde = new JsonSerde<>(); |
| } |
| |
| /** |
| * Retrieves the oldest offset in the coordinator stream, and registers the |
| * coordinator stream with the SystemConsumer using the earliest offset. |
| */ |
| public void register() { |
| if (isStarted) { |
| log.info("Coordinator stream partition {} has already been registered. Skipping.", coordinatorSystemStreamPartition); |
| return; |
| } |
| log.debug("Attempting to register: {}", coordinatorSystemStreamPartition); |
| Set<String> streamNames = new HashSet<String>(); |
| String streamName = coordinatorSystemStreamPartition.getStream(); |
| streamNames.add(streamName); |
| Map<String, SystemStreamMetadata> systemStreamMetadataMap = systemAdmin.getSystemStreamMetadata(streamNames); |
| log.info(String.format("Got metadata %s", systemStreamMetadataMap.toString())); |
| |
| if (systemStreamMetadataMap == null) { |
| throw new SamzaException("Received a null systemStreamMetadataMap from the systemAdmin. This is illegal."); |
| } |
| |
| SystemStreamMetadata systemStreamMetadata = systemStreamMetadataMap.get(streamName); |
| |
| if (systemStreamMetadata == null) { |
| throw new SamzaException("Expected " + streamName + " to be in system stream metadata."); |
| } |
| |
| SystemStreamPartitionMetadata systemStreamPartitionMetadata = systemStreamMetadata.getSystemStreamPartitionMetadata().get(coordinatorSystemStreamPartition.getPartition()); |
| |
| if (systemStreamPartitionMetadata == null) { |
| throw new SamzaException("Expected metadata for " + coordinatorSystemStreamPartition + " to exist."); |
| } |
| |
| String startingOffset = systemStreamPartitionMetadata.getOldestOffset(); |
| log.debug("Registering {} with offset {}", coordinatorSystemStreamPartition, startingOffset); |
| systemConsumer.register(coordinatorSystemStreamPartition, startingOffset); |
| } |
| |
| /** |
| * Starts the underlying SystemConsumer. |
| */ |
| public void start() { |
| if (isStarted) { |
| log.info("Coordinator stream consumer already started"); |
| return; |
| } |
| log.info("Starting coordinator stream system consumer."); |
| systemConsumer.start(); |
| systemAdmin.start(); |
| isStarted = true; |
| } |
| |
| /** |
| * Stops the underlying SystemConsumer. |
| */ |
| public void stop() { |
| log.info("Stopping coordinator stream system consumer."); |
| systemConsumer.stop(); |
| systemAdmin.stop(); |
| isStarted = false; |
| } |
| |
| /** |
| * Read all messages from the earliest offset, all the way to the latest. |
| * Currently, this method only pays attention to config messages. |
| */ |
| public void bootstrap() { |
| synchronized (bootstrapLock) { |
| // Make a copy so readers aren't affected while we modify the set. |
| final LinkedHashSet<CoordinatorStreamMessage> bootstrappedMessages = new LinkedHashSet<>(bootstrappedStreamSet); |
| |
| log.info("Bootstrapping configuration from coordinator stream."); |
| SystemStreamPartitionIterator iterator = |
| new SystemStreamPartitionIterator(systemConsumer, coordinatorSystemStreamPartition); |
| |
| try { |
| while (iterator.hasNext()) { |
| IncomingMessageEnvelope envelope = iterator.next(); |
| Object[] keyArray = keySerde.fromBytes((byte[]) envelope.getKey()).toArray(); |
| Map<String, Object> valueMap = null; |
| if (envelope.getMessage() != null) { |
| valueMap = messageSerde.fromBytes((byte[]) envelope.getMessage()); |
| } |
| CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(keyArray, valueMap); |
| log.debug("Received coordinator stream message: {}", coordinatorStreamMessage); |
| // Remove any existing entry. Set.add() does not add if the element already exists. |
| if (bootstrappedMessages.remove(coordinatorStreamMessage)) { |
| log.debug("Removed duplicate message: {}", coordinatorStreamMessage); |
| } |
| bootstrappedMessages.add(coordinatorStreamMessage); |
| if (SetConfig.TYPE.equals(coordinatorStreamMessage.getType())) { |
| String configKey = coordinatorStreamMessage.getKey(); |
| if (coordinatorStreamMessage.isDelete()) { |
| configMap.remove(configKey); |
| } else { |
| String configValue = new SetConfig(coordinatorStreamMessage).getConfigValue(); |
| configMap.put(configKey, configValue); |
| } |
| } |
| } |
| |
| bootstrappedStreamSet = Collections.unmodifiableSet(bootstrappedMessages); |
| log.debug("Bootstrapped configuration: {}", configMap); |
| isBootstrapped = true; |
| } catch (Exception e) { |
| throw new SamzaException(e); |
| } |
| } |
| } |
| |
| /** |
| * Returns the set of bootstrapped {@link CoordinatorStreamMessage}s |
| * @param type The type of {@link CoordinatorStreamMessage}s to return. |
| * @return The bootstrapped {@link CoordinatorStreamMessage}s |
| */ |
| public Set<CoordinatorStreamMessage> getBootstrappedStream(String type) { |
| log.debug("Bootstrapping coordinator stream for messages of type {}", type); |
| bootstrap(); |
| LinkedHashSet<CoordinatorStreamMessage> bootstrappedStream = new LinkedHashSet<CoordinatorStreamMessage>(); |
| for (CoordinatorStreamMessage coordinatorStreamMessage : bootstrappedStreamSet) { |
| log.trace("Considering message: {}", coordinatorStreamMessage); |
| if (type.equalsIgnoreCase(coordinatorStreamMessage.getType())) { |
| log.trace("Adding message: {}", coordinatorStreamMessage); |
| bootstrappedStream.add(coordinatorStreamMessage); |
| } |
| } |
| return bootstrappedStream; |
| } |
| |
| /** |
| * @return The bootstrapped configuration that's been read after bootstrap has |
| * been invoked. |
| */ |
| public Config getConfig() { |
| if (isBootstrapped) { |
| return new MapConfig(configMap); |
| } else { |
| throw new SamzaException("Must call bootstrap before retrieving config."); |
| } |
| } |
| |
| /** |
| * Gets an iterator on the coordinator stream, starting from the starting offset the consumer was registered with. |
| * |
| * @return an iterator on the coordinator stream pointing to the starting offset the consumer was registered with. |
| */ |
| public SystemStreamPartitionIterator getStartIterator() { |
| return new SystemStreamPartitionIterator(systemConsumer, coordinatorSystemStreamPartition); |
| } |
| |
| /** |
| * returns all unread messages after an iterator on the stream |
| * |
| * @param iterator the iterator pointing to an offset in the coordinator stream. All unread messages after this iterator are returned |
| * @return a set of unread messages after a given iterator |
| */ |
| public Set<CoordinatorStreamMessage> getUnreadMessages(SystemStreamPartitionIterator iterator) { |
| return getUnreadMessages(iterator, null); |
| } |
| |
| /** |
| * returns all unread messages of a specific type, after an iterator on the stream |
| * |
| * @param iterator the iterator pointing to an offset in the coordinator stream. All unread messages after this iterator are returned |
| * @param type the type of the messages to be returned |
| * @return a set of unread messages of a given type, after a given iterator |
| */ |
| public Set<CoordinatorStreamMessage> getUnreadMessages(SystemStreamPartitionIterator iterator, String type) { |
| LinkedHashSet<CoordinatorStreamMessage> messages = new LinkedHashSet<CoordinatorStreamMessage>(); |
| while (iterator.hasNext()) { |
| IncomingMessageEnvelope envelope = iterator.next(); |
| Object[] keyArray = keySerde.fromBytes((byte[]) envelope.getKey()).toArray(); |
| Map<String, Object> valueMap = null; |
| if (envelope.getMessage() != null) { |
| valueMap = messageSerde.fromBytes((byte[]) envelope.getMessage()); |
| } |
| CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(keyArray, valueMap); |
| if (type == null || type.equals(coordinatorStreamMessage.getType())) { |
| messages.add(coordinatorStreamMessage); |
| } |
| } |
| return messages; |
| } |
| |
| /** |
| * Checks whether or not there are any messages after a given iterator on the coordinator stream |
| * |
| * @param iterator The iterator to check if there are any new messages after this point |
| * @return True if there are new messages after the iterator, false otherwise |
| */ |
| public boolean hasNewMessages(SystemStreamPartitionIterator iterator) { |
| if (iterator == null) { |
| return false; |
| } |
| return iterator.hasNext(); |
| } |
| |
| @VisibleForTesting |
| boolean isStarted() { |
| return isStarted; |
| } |
| } |