blob: 54d4f23b6f07ab5dd971b551c52108100ea7772f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.inlong.manager.plugin.listener;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.TaskEvent;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
/**
* Listener of delete stream sort
*/
@Slf4j
public class DeleteStreamListener implements SortOperateListener {
@Override
public TaskEvent event() {
return TaskEvent.COMPLETE;
}
@Override
public boolean accept(WorkflowContext context) {
ProcessForm processForm = context.getProcessForm();
String groupId = processForm.getInlongGroupId();
if (!(processForm instanceof StreamResourceProcessForm)) {
log.info("not add delete stream listener, not StreamResourceProcessForm for groupId={}", groupId);
return false;
}
StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm;
String streamId = streamProcessForm.getStreamInfo().getInlongStreamId();
if (streamProcessForm.getGroupOperateType() != GroupOperateType.DELETE) {
log.info("not add delete stream listener, as the operate was not DELETE for groupId={} streamId={}",
groupId, streamId);
return false;
}
log.info("add delete stream listener for groupId={} streamId={}", groupId, streamId);
return true;
}
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
ProcessForm processForm = context.getProcessForm();
StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm;
InlongGroupInfo groupInfo = streamResourceProcessForm.getGroupInfo();
List<InlongGroupExtInfo> groupExtList = groupInfo.getExtList();
log.info("inlong group: {} ext info: {}", groupInfo.getInlongGroupId(), groupExtList);
InlongStreamInfo streamInfo = streamResourceProcessForm.getStreamInfo();
List<InlongStreamExtInfo> streamExtList = streamInfo.getExtList();
log.info("inlong stream: {} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
Map<String, String> kvConf = new HashMap<>();
groupExtList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
streamExtList.forEach(extInfo -> kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue()));
final String groupId = streamInfo.getInlongGroupId();
final String streamId = streamInfo.getInlongStreamId();
String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
if (StringUtils.isNotEmpty(sortExt)) {
Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
});
kvConf.putAll(result);
}
String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
if (StringUtils.isBlank(jobId)) {
String message = String.format("sort job id is empty for groupId=%s streamId=%s", groupId, streamId);
return ListenerResult.fail(message);
}
FlinkInfo flinkInfo = new FlinkInfo();
flinkInfo.setJobId(jobId);
String sortUrl = kvConf.get(InlongConstants.SORT_URL);
flinkInfo.setEndpoint(sortUrl);
FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
FlinkOperation flinkOperation = new FlinkOperation(flinkService);
try {
flinkOperation.delete(flinkInfo);
log.info("job delete success for jobId={}", jobId);
return ListenerResult.success();
} catch (Exception e) {
flinkInfo.setException(true);
flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
flinkOperation.pollJobStatus(flinkInfo);
String message = String.format("delete sort failed for groupId=%s streamId=%s", groupId, streamId);
log.error(message, e);
return ListenerResult.fail(message + e.getMessage());
}
}
}