blob: 1cd345e0c4d201da73fc5774a94c735321697ef3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.system;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.samza.startpoint.Startpoint;
/**
* Helper interface attached to an underlying system to fetch information about
* streams, partitions, offsets, etc. This interface is useful for providing
* utility methods that Samza needs in order to interact with a system.
*/
public interface SystemAdmin {
/*
* Start this SystemAdmin
*/
default void start() {};
/*
* Stop this SystemAdmin
*/
default void stop() {};
/**
* Fetches the offsets for the messages immediately after the supplied offsets
* for a group of SystemStreamPartitions.
*
* @param offsets
* Map from SystemStreamPartition to current offsets.
* @return Map from SystemStreamPartition to offsets immediately after the
* current offsets.
*/
Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets);
/**
* Fetch metadata from a system for a set of streams.
*
* @param streamNames
* The streams to to fetch metadata for.
* @return A map from stream name to SystemStreamMetadata for each stream
* requested in the parameter set.
*/
Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames);
/**
* Fetch metadata from a system for a set of SSPs.
* Implementors should override this if there is a more efficient implementation than delegating to
* {@link #getSystemStreamMetadata}.
*
* @param ssps SSPs for which to get metadata
* @return A map from SystemStreamPartition to the SystemStreamPartitionMetadata, with an entry for each SSP in
* {@code ssps} for which metadata could be found
* @throws RuntimeException if there was an error fetching metadata
*/
default Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> getSSPMetadata(
Set<SystemStreamPartition> ssps) {
Set<String> streams = ssps.stream().map(SystemStream::getStream).collect(Collectors.toSet());
Map<String, SystemStreamMetadata> streamToSystemStreamMetadata = getSystemStreamMetadata(streams);
Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> sspToSSPMetadata = new HashMap<>();
for (SystemStreamPartition ssp : ssps) {
SystemStreamMetadata systemStreamMetadata = streamToSystemStreamMetadata.get(ssp.getStream());
if (systemStreamMetadata != null) {
SystemStreamMetadata.SystemStreamPartitionMetadata sspMetadata =
systemStreamMetadata.getSystemStreamPartitionMetadata().get(ssp.getPartition());
if (sspMetadata != null) {
sspToSSPMetadata.put(ssp, sspMetadata);
}
}
}
return sspToSSPMetadata;
}
/**
* Compare the two offsets. -1, 0, +1 means offset1 &lt; offset2,
* offset1 == offset2 and offset1 &gt; offset2 respectively. Return
* null if those two offsets are not comparable
*
* @param offset1 First offset for comparison.
* @param offset2 Second offset for comparison.
* @return -1 if offset1 &lt; offset2; 0 if offset1 == offset2; 1 if offset1 &gt; offset2. Null if not comparable
*/
Integer offsetComparator(String offset1, String offset2);
/**
* Create a stream described by the spec.
*
* @param streamSpec The spec, or blueprint from which the physical stream will be created on the system.
* @return {@code true} if the stream was actually created and not pre-existing.
* {@code false} if the stream was pre-existing.
* A RuntimeException will be thrown if creation fails.
*/
default boolean createStream(StreamSpec streamSpec) {
throw new UnsupportedOperationException();
}
/**
* Validates the stream described by the streamSpec on the system.
* A {@link StreamValidationException} should be thrown for any validation error.
*
* @param streamSpec The spec, or blueprint for the physical stream on the system.
* @throws StreamValidationException if validation fails.
*/
default void validateStream(StreamSpec streamSpec) throws StreamValidationException {
throw new UnsupportedOperationException();
}
/**
* Clear the entire stream described by the spec.
* @param streamSpec The spec for the physical stream on the system.
* @return {@code true} if the stream was successfully cleared.
* {@code false} if clearing stream failed.
*/
default boolean clearStream(StreamSpec streamSpec) {
throw new UnsupportedOperationException();
}
/**
* Delete records up to (and including) the provided ssp offsets for all system stream partitions specified in the map
*
* @param offsets a map from system stream partition to offset
*/
default void deleteMessages(Map<SystemStreamPartition, String> offsets) {
}
/**
* Get partitions counts only. Should be more efficient then getSystemStreamMetadata, but if not implemented
* revert to getSystemStreamMetadata.
* @param streamNames set of streams to query.
* @param cacheTTL cacheTTL to use if caching the values.
* @return A map from stream name to SystemStreamMetadata for each stream
* requested in the parameter set.
*/
default Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(Set<String> streamNames, long cacheTTL) {
return getSystemStreamMetadata(streamNames);
}
/**
* Fetch the set of all available streams
* @return The set of all available SystemStreams.
*/
default Set<SystemStream> getAllSystemStreams() {
throw new UnsupportedOperationException();
}
/**
* Resolves the startpoint to a system specific offset.
* @param startpoint represents the startpoint.
* @param systemStreamPartition represents the system stream partition.
* @return the resolved offset.
*/
default String resolveStartpointToOffset(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
throw new UnsupportedOperationException();
}
}