blob: a1d417a65268fe88b8fc1183db19136cb8b2e1f3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.execution;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.CheckpointManager;
import org.apache.samza.config.*;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.util.StreamUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StreamManager {
private static final Logger LOGGER = LoggerFactory.getLogger(StreamManager.class);
private final SystemAdmins systemAdmins;
public StreamManager(Config config) {
this(new SystemAdmins(config, StreamManager.class.getSimpleName()));
}
@VisibleForTesting
StreamManager(SystemAdmins systemAdmins) {
this.systemAdmins = systemAdmins;
}
public void createStreams(List<StreamSpec> streams) {
Multimap<String, StreamSpec> streamsGroupedBySystem = HashMultimap.create();
streams.forEach(streamSpec ->
streamsGroupedBySystem.put(streamSpec.getSystemName(), streamSpec));
for (Map.Entry<String, Collection<StreamSpec>> entry : streamsGroupedBySystem.asMap().entrySet()) {
String systemName = entry.getKey();
SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(systemName);
for (StreamSpec stream : entry.getValue()) {
LOGGER.info("Creating stream {} with partitions {} on system {}",
new Object[]{stream.getPhysicalName(), stream.getPartitionCount(), systemName});
systemAdmin.createStream(stream);
}
}
}
public void start() {
this.systemAdmins.start();
}
public void stop() {
this.systemAdmins.stop();
}
Map<String, Integer> getStreamPartitionCounts(String systemName, Set<String> streamNames) {
Map<String, Integer> streamToPartitionCount = new HashMap<>();
SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(systemName);
if (systemAdmin == null) {
throw new SamzaException(String.format("System %s does not exist.", systemName));
}
// retrieve the metadata for the streams in this system
Map<String, SystemStreamMetadata> streamToMetadata = systemAdmin.getSystemStreamMetadata(streamNames);
// set the partitions of a stream to its StreamEdge
streamToMetadata.forEach((stream, data) ->
streamToPartitionCount.put(stream, data.getSystemStreamPartitionMetadata().size()));
return streamToPartitionCount;
}
/**
* This is a best-effort approach to clear the internal streams from previous run, including intermediate streams,
* checkpoint stream and changelog streams.
* For batch processing, we always clean up the previous internal streams and create a new set for each run.
* @param prevConfig config of the previous run
*/
public void clearStreamsFromPreviousRun(Config prevConfig) {
try {
ApplicationConfig appConfig = new ApplicationConfig(prevConfig);
LOGGER.info("run.id from previous run is {}", appConfig.getRunId());
StreamConfig streamConfig = new StreamConfig(prevConfig);
//Find all intermediate streams and clean up
Set<StreamSpec> intStreams = streamConfig.getStreamIds().stream()
.filter(streamConfig::getIsIntermediateStream)
.map(id -> new StreamSpec(id, streamConfig.getPhysicalName(id), streamConfig.getSystem(id)))
.collect(Collectors.toSet());
intStreams.forEach(stream -> {
LOGGER.info("Clear intermediate stream {} in system {}", stream.getPhysicalName(), stream.getSystemName());
systemAdmins.getSystemAdmin(stream.getSystemName()).clearStream(stream);
});
//Find checkpoint stream and clean up
TaskConfig taskConfig = new TaskConfig(prevConfig);
taskConfig.getCheckpointManager(new MetricsRegistryMap()).ifPresent(CheckpointManager::clearCheckpoints);
//Find changelog streams and remove them
StorageConfig storageConfig = new StorageConfig(prevConfig);
for (String store : storageConfig.getStoreNames()) {
String changelog = storageConfig.getChangelogStream(store).orElse(null);
if (changelog != null) {
LOGGER.info("Clear store {} changelog {}", store, changelog);
SystemStream systemStream = StreamUtil.getSystemStreamFromNames(changelog);
StreamSpec spec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream(), systemStream.getSystem(), 1);
systemAdmins.getSystemAdmin(spec.getSystemName()).clearStream(spec);
}
}
} catch (Exception e) {
// For batch, we always create a new set of internal streams (checkpoint, changelog and intermediate) with unique
// id. So if clearStream doesn't work, it won't affect the correctness of the results.
// We log a warning here and rely on retention to clean up the streams later.
LOGGER.warn("Fail to clear internal streams from previous run. Please clean up manually.", e);
}
}
/**
* Create a unique stream name if it's batch mode and has a valid run.id.
* @param stream physical name of the stream
* @param config {@link Config} object
* @return stream name created
*/
public static String createUniqueNameForBatch(String stream, Config config) {
ApplicationConfig appConfig = new ApplicationConfig(config);
if (appConfig.getAppMode() == ApplicationConfig.ApplicationMode.BATCH && appConfig.getRunId() != null) {
return stream + "-" + appConfig.getRunId();
} else {
return stream;
}
}
}