blob: fea54ce08156760b8c10d53a93cf646faa3068f9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.zk;
import com.google.common.base.Strings;
/**
* The following ZK hierarchy is maintained for Standalone jobs:
* <pre>
* - /
* |- groupId/
* |- JobModelGeneration/
* |- activeJobModelVersion (data contains the most recent active job model version)
* |- jobModelVersion (data contains the version)
* |- jobModelUpgradeBarrier/ (contains barrier related data)
* |- jobModels/
* |- 1 (contains job model version 1 as data)
* |- 2
* |- processors/
* |- 00000001
* |- 00000002
* |- ...
* </pre>
* Note: ZK Node levels without an ending forward slash ('/') represent a leaf node and non-leaf node, otherwise.
*
* This class provides helper methods to easily generate/parse the path in the ZK hierarchy.
*/
public class ZkKeyBuilder {
static final String PROCESSORS_PATH = "processors";
static final String JOBMODEL_GENERATION_PATH = "jobModelGeneration";
static final String JOB_MODEL_UPGRADE_BARRIER_PATH = "jobModelUpgradeBarrier";
private static final String TASK_LOCALITY_PATH = "taskLocality";
/**
* Prefix generated to uniquely identify a particular deployment of a job.
* TODO: For now, it looks like $jobName-$jobId. We need to add a unique deployment/attempt identifier as well.
*/
private final String pathPrefix;
public ZkKeyBuilder(String pathPrefix) {
if (pathPrefix != null && !pathPrefix.trim().isEmpty()) {
this.pathPrefix = pathPrefix.trim();
} else {
throw new IllegalArgumentException("Zk PathPrefix cannot be null or empty!");
}
}
public String getRootPath() {
return "/" + pathPrefix;
}
String getProcessorsPath() {
return String.format("/%s/%s", pathPrefix, PROCESSORS_PATH);
}
/**
* Static method that helps parse the processorId substring from the ZK path
*
* Processor ID is prefixed by "processor-" and is an leaf node in ZK tree. Hence, this pattern is used to extract
* the processorId.
*
* @param path Full ZK path of a registered processor
* @return String representing the processor ID
*/
static String parseIdFromPath(String path) {
if (!Strings.isNullOrEmpty(path))
return path.substring(path.lastIndexOf("/") + 1);
return null;
}
String getJobModelVersionPath() {
return String.format("%s/%s/jobModelVersion", getRootPath(), JOBMODEL_GENERATION_PATH);
}
/**
* Denotes the path where the most recent active job model version is stored. The version of the job model is the
* most recent agreed upon version by the quorum. It differs from the <i>jobModelVersion</i> path which may
* have a newer version of job model published by the leader in during rebalance before consensus is achieved.
* @return the path where most recent active job model is stored
*/
String getActiveJobModelVersionPath() {
return String.format("%s/%s/activeJobModelVersion", getRootPath(), JOBMODEL_GENERATION_PATH);
}
String getJobModelPathPrefix() {
return String.format("%s/%s/jobModels", getRootPath(), JOBMODEL_GENERATION_PATH);
}
String getJobModelPath(String jobModelVersion) {
return String.format("%s/%s", getJobModelPathPrefix(), jobModelVersion);
}
String getJobModelVersionBarrierPrefix() {
return String.format("%s/%s/%s/versionBarriers", getRootPath(), JOBMODEL_GENERATION_PATH, JOB_MODEL_UPGRADE_BARRIER_PATH);
}
String getTaskLocalityPath() {
return String.format("%s/%s", getRootPath(), TASK_LOCALITY_PATH);
}
}