| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samza.system; |
| |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| import org.apache.samza.startpoint.Startpoint; |
| |
| /** |
| * Helper interface attached to an underlying system to fetch information about |
| * streams, partitions, offsets, etc. This interface is useful for providing |
| * utility methods that Samza needs in order to interact with a system. |
| */ |
| public interface SystemAdmin { |
| |
| /* |
| * Start this SystemAdmin |
| */ |
| default void start() {}; |
| |
| /* |
| * Stop this SystemAdmin |
| */ |
| default void stop() {}; |
| |
| /** |
| * To give the status of current systemAdmin |
| * @return boolean stopped or not; |
| */ |
| default boolean isStopped() { |
| return false; |
| } |
| |
| /** |
| * Fetches the offsets for the messages immediately after the supplied offsets |
| * for a group of SystemStreamPartitions. |
| * |
| * @param offsets |
| * Map from SystemStreamPartition to current offsets. |
| * @return Map from SystemStreamPartition to offsets immediately after the |
| * current offsets. |
| */ |
| Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets); |
| |
| /** |
| * Fetch metadata from a system for a set of streams. |
| * |
| * @param streamNames |
| * The streams to to fetch metadata for. |
| * @return A map from stream name to SystemStreamMetadata for each stream |
| * requested in the parameter set. |
| */ |
| Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames); |
| |
| /** |
| * Fetch metadata from a system for a set of SSPs. |
| * Implementors should override this if there is a more efficient implementation than delegating to |
| * {@link #getSystemStreamMetadata}. |
| * |
| * @param ssps SSPs for which to get metadata |
| * @return A map from SystemStreamPartition to the SystemStreamPartitionMetadata, with an entry for each SSP in |
| * {@code ssps} for which metadata could be found |
| * @throws RuntimeException if there was an error fetching metadata |
| */ |
| default Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> getSSPMetadata( |
| Set<SystemStreamPartition> ssps) { |
| Set<String> streams = ssps.stream().map(SystemStream::getStream).collect(Collectors.toSet()); |
| Map<String, SystemStreamMetadata> streamToSystemStreamMetadata = getSystemStreamMetadata(streams); |
| Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> sspToSSPMetadata = new HashMap<>(); |
| for (SystemStreamPartition ssp : ssps) { |
| SystemStreamMetadata systemStreamMetadata = streamToSystemStreamMetadata.get(ssp.getStream()); |
| if (systemStreamMetadata != null) { |
| SystemStreamMetadata.SystemStreamPartitionMetadata sspMetadata = |
| systemStreamMetadata.getSystemStreamPartitionMetadata().get(ssp.getPartition()); |
| if (sspMetadata != null) { |
| sspToSSPMetadata.put(ssp, sspMetadata); |
| } |
| } |
| } |
| return sspToSSPMetadata; |
| } |
| |
| /** |
| * Compare the two offsets. -1, 0, +1 means offset1 < offset2, |
| * offset1 == offset2 and offset1 > offset2 respectively. Return |
| * null if those two offsets are not comparable |
| * |
| * @param offset1 First offset for comparison. |
| * @param offset2 Second offset for comparison. |
| * @return -1 if offset1 < offset2; 0 if offset1 == offset2; 1 if offset1 > offset2. Null if not comparable |
| */ |
| Integer offsetComparator(String offset1, String offset2); |
| |
| /** |
| * Create a stream described by the spec. |
| * |
| * @param streamSpec The spec, or blueprint from which the physical stream will be created on the system. |
| * @return {@code true} if the stream was actually created and not pre-existing. |
| * {@code false} if the stream was pre-existing. |
| * A RuntimeException will be thrown if creation fails. |
| */ |
| default boolean createStream(StreamSpec streamSpec) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** |
| * Validates the stream described by the streamSpec on the system. |
| * A {@link StreamValidationException} should be thrown for any validation error. |
| * |
| * @param streamSpec The spec, or blueprint for the physical stream on the system. |
| * @throws StreamValidationException if validation fails. |
| */ |
| default void validateStream(StreamSpec streamSpec) throws StreamValidationException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** |
| * Clear the entire stream described by the spec. |
| * @param streamSpec The spec for the physical stream on the system. |
| * @return {@code true} if the stream was successfully cleared. |
| * {@code false} if clearing stream failed. |
| */ |
| default boolean clearStream(StreamSpec streamSpec) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** |
| * Delete records up to (and including) the provided ssp offsets for all system stream partitions specified in the map |
| * |
| * @param offsets a map from system stream partition to offset |
| */ |
| default void deleteMessages(Map<SystemStreamPartition, String> offsets) { |
| |
| } |
| |
| /** |
| * Get partitions counts only. Should be more efficient then getSystemStreamMetadata, but if not implemented |
| * revert to getSystemStreamMetadata. |
| * @param streamNames set of streams to query. |
| * @param cacheTTL cacheTTL to use if caching the values. |
| * @return A map from stream name to SystemStreamMetadata for each stream |
| * requested in the parameter set. |
| */ |
| default Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(Set<String> streamNames, long cacheTTL) { |
| return getSystemStreamMetadata(streamNames); |
| } |
| |
| /** |
| * Fetch the set of all available streams |
| * @return The set of all available SystemStreams. |
| */ |
| default Set<SystemStream> getAllSystemStreams() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** |
| * Resolves the startpoint to a system specific offset. |
| * @param startpoint represents the startpoint. |
| * @param systemStreamPartition represents the system stream partition. |
| * @return the resolved offset. |
| */ |
| default String resolveStartpointToOffset(SystemStreamPartition systemStreamPartition, Startpoint startpoint) { |
| throw new UnsupportedOperationException(); |
| } |
| } |