[ISSUE-3057][Improve] Improve streampark-common module base on [3.10 Pre-Conditions Checking] (#3420)
* [Improve] rename notNull method in Utils.scala
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
index f17465e..faf1fc9 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
@@ -134,7 +134,7 @@
val shadedPackage = "org.apache.streampark.shaded"
override def configureByResource(url: URL): Unit = {
- Utils.notNull(url, "URL argument cannot be null")
+ Utils.requireNotNull(url, "URL argument cannot be null")
val path = url.getPath
if (path.endsWith("xml")) {
val configurator = new JoranConfigurator()
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index 30d001a..f8e3a21 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -33,14 +33,14 @@
private[this] lazy val OS = System.getProperty("os.name").toLowerCase
- def notNull(obj: Any, message: String): Unit = {
+ def requireNotNull(obj: Any, message: String): Unit = {
if (obj == null) {
throw new NullPointerException(message)
}
}
- def notNull(obj: Any): Unit = {
- notNull(obj, "this argument must not be null")
+ def requireNotNull(obj: Any): Unit = {
+ requireNotNull(obj, "this argument must not be null")
}
def requireNotEmpty(elem: Any): Boolean = {
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java
index a1c2260..5af4fc0 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java
@@ -82,7 +82,7 @@
@PostMapping("/update")
@RequiresPermissions("externalLink:update")
public RestResponse update(@Valid ExternalLink externalLink) {
- Utils.notNull(externalLink.getId(), "The link id cannot be null");
+ Utils.requireNotNull(externalLink.getId(), "The link id cannot be null");
externalLinkService.update(externalLink);
return RestResponse.success();
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index 294e795..5026ee3 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -113,7 +113,7 @@
.forEach(
key -> {
InternalOption config = InternalConfigHolder.getConfig(key);
- Utils.notNull(config);
+ Utils.requireNotNull(config);
InternalConfigHolder.set(config, springEnv.getProperty(key, config.classType()));
});
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 401eb67..8f22ca5 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -385,7 +385,7 @@
public void start(Application appParam, boolean auto) throws Exception {
// 1) check application
final Application application = getById(appParam.getId());
- Utils.notNull(application);
+ Utils.requireNotNull(application);
ApiAlertException.throwIfTrue(
!application.isCanBeStart(), "[StreamPark] The application cannot be started repeatedly.");
@@ -397,7 +397,7 @@
}
AppBuildPipeline buildPipeline = appBuildPipeService.getById(application.getId());
- Utils.notNull(buildPipeline);
+ Utils.requireNotNull(buildPipeline);
FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(application.getVersionId());
@@ -625,7 +625,7 @@
switch (application.getDevelopmentMode()) {
case FLINK_SQL:
FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), false);
- Utils.notNull(flinkSql);
+ Utils.requireNotNull(flinkSql);
// 1) dist_userJar
String sqlDistJar = commonService.getSqlClientJar(flinkEnv);
// 2) appConfig
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index db827a6..3611123 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -206,7 +206,7 @@
FlinkSql effectiveFlinkSql = flinkSqlService.getEffective(app.getId(), false);
if (app.isFlinkSqlJobOrPyFlinkJob()) {
FlinkSql flinkSql = newFlinkSql == null ? effectiveFlinkSql : newFlinkSql;
- Utils.notNull(flinkSql);
+ Utils.requireNotNull(flinkSql);
app.setDependency(flinkSql.getDependency());
app.setTeamResource(flinkSql.getTeamResource());
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
index 227dab2..1cc365c 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
@@ -76,7 +76,7 @@
@Override
public List<ExternalLink> render(Long appId) {
Application app = applicationManageService.getById(appId);
- Utils.notNull(app, "Application doesn't exist");
+ Utils.requireNotNull(app, "Application doesn't exist");
List<ExternalLink> externalLink = this.list();
if (externalLink != null && externalLink.size() > 0) {
// Render the placeholder
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
index 54c03d5..1ba1781 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
@@ -173,11 +173,11 @@
@Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
public void rollback(Application application) {
FlinkSql sql = getCandidate(application.getId(), CandidateTypeEnum.HISTORY);
- Utils.notNull(sql);
+ Utils.requireNotNull(sql);
try {
// check and backup current job
FlinkSql effectiveSql = getEffective(application.getId(), false);
- Utils.notNull(effectiveSql);
+ Utils.requireNotNull(effectiveSql);
// rollback history sql
backUpService.rollbackFlinkSql(application, sql);
} catch (Exception e) {
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index b07f659..55ca1dc 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -116,7 +116,7 @@
@Override
public boolean update(Project projectParam) {
Project project = getById(projectParam.getId());
- Utils.notNull(project);
+ Utils.requireNotNull(project);
ApiAlertException.throwIfFalse(
project.getTeamId().equals(projectParam.getTeamId()),
"TeamId can't be changed, update project failed.");
@@ -159,7 +159,7 @@
@Override
public boolean removeById(Long id) {
Project project = getById(id);
- Utils.notNull(project);
+ Utils.requireNotNull(project);
LambdaQueryWrapper<Application> queryWrapper =
new LambdaQueryWrapper<Application>().eq(Application::getProjectId, id);
long count = applicationManageService.count(queryWrapper);
@@ -231,7 +231,7 @@
@Override
public List<String> listModules(Long id) {
Project project = getById(id);
- Utils.notNull(project);
+ Utils.requireNotNull(project);
if (BuildStateEnum.SUCCESSFUL != BuildStateEnum.of(project.getBuildState())
|| !project.getDistHome().exists()) {
@@ -297,7 +297,7 @@
}
List<Map<String, Object>> confList = new ArrayList<>();
File[] files = unzipFile.listFiles(x -> "conf".equals(x.getName()));
- Utils.notNull(files);
+ Utils.requireNotNull(files);
for (File item : files) {
eachFile(item, confList, true);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 55e9a6f..59f8948 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -292,7 +292,7 @@
Map<String, Object> properties = new HashMap<>();
if (FlinkExecutionMode.isRemoteMode(application.getFlinkExecutionMode())) {
- Utils.notNull(
+ Utils.requireNotNull(
cluster,
String.format(
"The clusterId=%s cannot be find, maybe the clusterId is wrong or the cluster has been deleted. Please contact the Admin.",
@@ -310,7 +310,7 @@
}
if (FlinkExecutionMode.isYarnMode(application.getExecutionMode())) {
if (FlinkExecutionMode.YARN_SESSION == application.getFlinkExecutionMode()) {
- Utils.notNull(
+ Utils.requireNotNull(
cluster,
String.format(
"The yarn session clusterId=%s cannot be find, maybe the clusterId is wrong or the cluster has been deleted. Please contact the Admin.",
@@ -378,7 +378,7 @@
// At the remote mode, request the flink webui interface to get the savepoint path
FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
- Utils.notNull(
+ Utils.requireNotNull(
cluster,
String.format(
"The clusterId=%s cannot be find, maybe the clusterId is wrong or "
@@ -443,8 +443,8 @@
private void expire(SavePoint entity) {
FlinkEnv flinkEnv = flinkEnvService.getByAppId(entity.getAppId());
Application application = applicationManageService.getById(entity.getAppId());
- Utils.notNull(flinkEnv);
- Utils.notNull(application);
+ Utils.requireNotNull(flinkEnv);
+ Utils.requireNotNull(application);
int cpThreshold =
tryGetChkNumRetainedFromDynamicProps(application.getDynamicProperties())
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
index b98bdd7..0b70f9d 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
@@ -184,7 +184,7 @@
nowStep = nowStep.get(nowChar).getNext();
loc += 1;
}
- Utils.notNull(preNode);
+ Utils.requireNotNull(preNode);
preNode.setStop();
preNode.setCount(count);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
index 9cecfb4..65de104 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
@@ -72,8 +72,9 @@
@Override
public IPage<YarnQueue> getPage(YarnQueue yarnQueue, RestRequest request) {
- Utils.notNull(yarnQueue, "Yarn queue query params mustn't be null.");
- Utils.notNull(yarnQueue.getTeamId(), "Team id of yarn queue query params mustn't be null.");
+ Utils.requireNotNull(yarnQueue, "Yarn queue query params mustn't be null.");
+ Utils.requireNotNull(
+ yarnQueue.getTeamId(), "Team id of yarn queue query params mustn't be null.");
Page<YarnQueue> page = new Page<>();
page.setCurrent(request.getPageNum());
page.setSize(request.getPageSize());
@@ -88,8 +89,8 @@
@Override
public ResponseResult<String> checkYarnQueue(YarnQueue yarnQueue) {
- Utils.notNull(yarnQueue, "Yarn queue mustn't be empty.");
- Utils.notNull(yarnQueue.getTeamId(), "Team id mustn't be null.");
+ Utils.requireNotNull(yarnQueue, "Yarn queue mustn't be empty.");
+ Utils.requireNotNull(yarnQueue.getTeamId(), "Team id mustn't be null.");
ResponseResult<String> responseResult = new ResponseResult<>();
@@ -206,8 +207,8 @@
@VisibleForTesting
public YarnQueue getYarnQueueByIdWithPreconditions(YarnQueue yarnQueue) {
- Utils.notNull(yarnQueue, "Yarn queue mustn't be null.");
- Utils.notNull(yarnQueue.getId(), "Yarn queue id mustn't be null.");
+ Utils.requireNotNull(yarnQueue, "Yarn queue mustn't be null.");
+ Utils.requireNotNull(yarnQueue.getId(), "Yarn queue id mustn't be null.");
YarnQueue queueFromDB = getById(yarnQueue.getId());
ApiAlertException.throwIfNull(queueFromDB, "The queue doesn't exist.");
return queueFromDB;
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
index 78e70d2..18380e7 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
@@ -95,7 +95,7 @@
page.setSize(request.getPageSize());
IPage<User> resPage = this.baseMapper.selectPage(page, user);
- Utils.notNull(resPage);
+ Utils.requireNotNull(resPage);
if (resPage.getTotal() == 0) {
resPage.setRecords(Collections.emptyList());
}
@@ -197,7 +197,7 @@
@Override
public void setLastTeam(Long teamId, Long userId) {
User user = getById(userId);
- Utils.notNull(user);
+ Utils.requireNotNull(user);
user.setLastTeamId(teamId);
this.baseMapper.updateById(user);
}
@@ -205,7 +205,7 @@
@Override
public void clearLastTeam(Long userId, Long teamId) {
User user = getById(userId);
- Utils.notNull(user);
+ Utils.requireNotNull(user);
if (!teamId.equals(user.getLastTeamId())) {
return;
}
diff --git a/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java b/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
index 5b25372..f1a8b47 100644
--- a/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
+++ b/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
@@ -98,9 +98,9 @@
@Override
public void start() {
- Utils.notNull(jobManagerContainer);
+ Utils.requireNotNull(jobManagerContainer);
jobManagerContainer.start();
- Utils.notNull(taskManagerContainers);
+ Utils.requireNotNull(taskManagerContainers);
for (FlinkContainer taskManagerContainer : taskManagerContainers) {
taskManagerContainer.start();
}
@@ -108,11 +108,11 @@
@Override
public void stop() {
- Utils.notNull(taskManagerContainers);
+ Utils.requireNotNull(taskManagerContainers);
for (FlinkContainer taskManagerContainer : taskManagerContainers) {
taskManagerContainer.stop();
}
- Utils.notNull(jobManagerContainer);
+ Utils.requireNotNull(jobManagerContainer);
jobManagerContainer.stop();
}