blob: 6893322bbde2d62fb233e9ecc96dfa2ad536fab4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.elasticjob.lite.internal.storage;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.api.transaction.TransactionOp;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.shardingsphere.elasticjob.infra.exception.JobSystemException;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.exception.RegExceptionHandler;
import java.util.LinkedList;
import java.util.List;
/**
* Job node storage.
*/
public final class JobNodeStorage {
private final CoordinatorRegistryCenter regCenter;
private final String jobName;
private final JobNodePath jobNodePath;
public JobNodeStorage(final CoordinatorRegistryCenter regCenter, final String jobName) {
this.regCenter = regCenter;
this.jobName = jobName;
jobNodePath = new JobNodePath(jobName);
}
/**
* Judge is job node existed or not.
*
* @param node node
* @return is job node existed or not
*/
public boolean isJobNodeExisted(final String node) {
return regCenter.isExisted(jobNodePath.getFullPath(node));
}
/**
* Judge is job root node existed or not.
*
* @return is job root node existed or not
*/
public boolean isJobRootNodeExisted() {
return regCenter.isExisted("/" + jobName);
}
/**
* Get job node data.
*
* @param node node
* @return data of job node
*/
public String getJobNodeData(final String node) {
return regCenter.get(jobNodePath.getFullPath(node));
}
/**
* Get job node data from registry center directly.
*
* @param node node
* @return data of job node
*/
public String getJobNodeDataDirectly(final String node) {
return regCenter.getDirectly(jobNodePath.getFullPath(node));
}
/**
* Get job node children keys.
*
* @param node node
* @return children keys
*/
public List<String> getJobNodeChildrenKeys(final String node) {
return regCenter.getChildrenKeys(jobNodePath.getFullPath(node));
}
/**
* Get job root node data.
*
* @return data of job node
*/
public String getJobRootNodeData() {
return regCenter.get("/" + jobName);
}
/**
* Create job node if needed.
*
* <p>Do not create node if root node not existed, which means job is shutdown.</p>
*
* @param node node
*/
public void createJobNodeIfNeeded(final String node) {
if (isJobRootNodeExisted() && !isJobNodeExisted(node)) {
regCenter.persist(jobNodePath.getFullPath(node), "");
}
}
/**
* Remove job node if existed.
*
* @param node node
*/
public void removeJobNodeIfExisted(final String node) {
if (isJobNodeExisted(node)) {
regCenter.remove(jobNodePath.getFullPath(node));
}
}
/**
* Fill job node.
*
* @param node node
* @param value data of job node
*/
public void fillJobNode(final String node, final Object value) {
regCenter.persist(jobNodePath.getFullPath(node), value.toString());
}
/**
* Fill ephemeral job node.
*
* @param node node
* @param value data of job node
*/
public void fillEphemeralJobNode(final String node, final Object value) {
regCenter.persistEphemeral(jobNodePath.getFullPath(node), value.toString());
}
/**
* Update job node.
*
* @param node node
* @param value data of job node
*/
public void updateJobNode(final String node, final Object value) {
regCenter.update(jobNodePath.getFullPath(node), value.toString());
}
/**
* Replace data.
*
* @param node node
* @param value to be replaced data
*/
public void replaceJobNode(final String node, final Object value) {
regCenter.persist(jobNodePath.getFullPath(node), value.toString());
}
/**
* Replace data to root node.
*
* @param value to be replaced data
*/
public void replaceJobRootNode(final Object value) {
regCenter.persist("/" + jobName, value.toString());
}
/**
* Execute operator in transaction.
*
* @param callback transaction execution callback
*/
public void executeInTransaction(final TransactionExecutionCallback callback) {
try {
List<CuratorOp> operations = new LinkedList<>();
CuratorFramework client = getClient();
TransactionOp transactionOp = client.transactionOp();
operations.add(transactionOp.check().forPath("/"));
operations.addAll(callback.createCuratorOperators(transactionOp));
client.transaction().forOperations(operations);
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
}
/**
* Execute in leader server.
*
* @param latchNode node for leader latch
* @param callback execute callback
*/
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
latch.start();
latch.await();
callback.execute();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
handleException(ex);
}
}
private void handleException(final Exception ex) {
if (ex instanceof InterruptedException) {
Thread.currentThread().interrupt();
} else {
throw new JobSystemException(ex);
}
}
/**
* Add connection state listener.
*
* @param listener connection state listener
*/
public void addConnectionStateListener(final ConnectionStateListener listener) {
getClient().getConnectionStateListenable().addListener(listener);
}
private CuratorFramework getClient() {
return (CuratorFramework) regCenter.getRawClient();
}
/**
* Add data listener.
*
* @param listener data listener
*/
public void addDataListener(final CuratorCacheListener listener) {
CuratorCache cache = (CuratorCache) regCenter.getRawCache("/" + jobName);
cache.listenable().addListener(listener);
}
/**
* Get registry center time.
*
* @return registry center time
*/
public long getRegistryCenterTime() {
return regCenter.getRegistryCenterTime(jobNodePath.getFullPath("systemTime/current"));
}
}