blob: 052a818f69b1c42e86920ad5e10adca69bba13cf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.inlong.manager.service.heartbeat;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.enums.ComponentTypeEnum;
import org.apache.inlong.common.heartbeat.GroupHeartbeat;
import org.apache.inlong.common.heartbeat.StreamHeartbeat;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntity;
import org.apache.inlong.manager.dao.entity.GroupHeartbeatEntity;
import org.apache.inlong.manager.dao.entity.StreamHeartbeatEntity;
import org.apache.inlong.manager.dao.mapper.ComponentHeartbeatEntityMapper;
import org.apache.inlong.manager.dao.mapper.GroupHeartbeatEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamHeartbeatEntityMapper;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.heartbeat.ComponentHeartbeatResponse;
import org.apache.inlong.manager.pojo.heartbeat.GroupHeartbeatResponse;
import org.apache.inlong.manager.pojo.heartbeat.HeartbeatPageRequest;
import org.apache.inlong.manager.pojo.heartbeat.HeartbeatQueryRequest;
import org.apache.inlong.manager.pojo.heartbeat.HeartbeatReportRequest;
import org.apache.inlong.manager.pojo.heartbeat.StreamHeartbeatResponse;
import org.apache.inlong.manager.service.core.HeartbeatService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* Heartbeat service layer implementation
*/
@Service
@Slf4j
public class HeartbeatServiceImpl implements HeartbeatService {
private static final Gson GSON = new Gson();
@Autowired
@Lazy
private HeartbeatManager heartbeatManager;
@Autowired
private ComponentHeartbeatEntityMapper componentHeartbeatMapper;
@Autowired
private GroupHeartbeatEntityMapper groupHeartbeatMapper;
@Autowired
private StreamHeartbeatEntityMapper streamHeartbeatMapper;
@Override
public Boolean reportHeartbeat(HeartbeatReportRequest request) {
if (request == null || StringUtils.isBlank(request.getComponentType())) {
log.warn("request is null or component null, just return");
return false;
}
if (log.isDebugEnabled()) {
log.debug("received heartbeat: " + request);
}
heartbeatManager.reportHeartbeat(request);
ComponentTypeEnum componentType = ComponentTypeEnum.forType(request.getComponentType());
switch (componentType) {
case Sort:
case DataProxy:
case Agent:
case Cache:
case SDK:
return updateHeartbeatOpt(request);
default:
log.error("Unsupported componentType={} for Inlong", componentType);
return false;
}
}
@Override
public ComponentHeartbeatResponse getComponentHeartbeat(HeartbeatQueryRequest request) {
Preconditions.expectNotNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
String component = request.getComponent();
Preconditions.expectNotBlank(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY);
Preconditions.expectNotBlank(request.getInstance(), ErrorCodeEnum.REQUEST_INSTANCE_EMPTY);
ComponentTypeEnum componentType = ComponentTypeEnum.forType(component);
switch (componentType) {
case Sort:
case DataProxy:
case Agent:
case Cache:
case SDK:
ComponentHeartbeatEntity res = componentHeartbeatMapper.selectByKey(component, request.getInstance());
return CommonBeanUtils.copyProperties(res, ComponentHeartbeatResponse::new);
default:
throw new BusinessException("Unsupported component type for " + component);
}
}
@Override
public GroupHeartbeatResponse getGroupHeartbeat(HeartbeatQueryRequest request) {
Preconditions.expectNotNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
String component = request.getComponent();
Preconditions.expectNotBlank(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY);
Preconditions.expectNotBlank(request.getInstance(), ErrorCodeEnum.REQUEST_INSTANCE_EMPTY);
Preconditions.expectNotBlank(request.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY);
ComponentTypeEnum componentType = ComponentTypeEnum.forType(component);
switch (componentType) {
case Sort:
case DataProxy:
case Agent:
case Cache:
case SDK:
GroupHeartbeatEntity result = groupHeartbeatMapper.selectByKey(component, request.getInstance(),
request.getInlongGroupId());
return CommonBeanUtils.copyProperties(result, GroupHeartbeatResponse::new);
default:
throw new BusinessException("Unsupported component type for " + component);
}
}
@Override
public StreamHeartbeatResponse getStreamHeartbeat(HeartbeatQueryRequest request) {
Preconditions.expectNotNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
String component = request.getComponent();
Preconditions.expectNotBlank(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY);
Preconditions.expectNotBlank(request.getInstance(), ErrorCodeEnum.REQUEST_INSTANCE_EMPTY);
Preconditions.expectNotBlank(request.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY);
Preconditions.expectNotBlank(request.getInlongStreamId(), ErrorCodeEnum.STREAM_ID_IS_EMPTY);
ComponentTypeEnum componentType = ComponentTypeEnum.forType(component);
switch (componentType) {
case Sort:
case DataProxy:
case Agent:
case Cache:
case SDK:
StreamHeartbeatEntity result = streamHeartbeatMapper.selectByKey(component, request.getInstance(),
request.getInlongGroupId(), request.getInlongStreamId());
return CommonBeanUtils.copyProperties(result, StreamHeartbeatResponse::new);
default:
throw new BusinessException("Unsupported component type for " + component);
}
}
@Override
public PageResult<ComponentHeartbeatResponse> listComponentHeartbeat(HeartbeatPageRequest request) {
Preconditions.expectNotNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
String component = request.getComponent();
Preconditions.expectNotBlank(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY);
ComponentTypeEnum componentType = ComponentTypeEnum.forType(component);
switch (componentType) {
case Sort:
case DataProxy:
case Agent:
case Cache:
case SDK:
return listComponentHeartbeatOpt(request);
default:
throw new BusinessException("Unsupported component type for " + component);
}
}
@Override
public PageResult<GroupHeartbeatResponse> listGroupHeartbeat(HeartbeatPageRequest request) {
Preconditions.expectNotNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
String component = request.getComponent();
Preconditions.expectNotBlank(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY);
ComponentTypeEnum componentType = ComponentTypeEnum.forType(component);
switch (componentType) {
case Sort:
case DataProxy:
case Agent:
case Cache:
case SDK:
return listGroupHeartbeatOpt(request);
default:
throw new BusinessException("Unsupported component type for " + component);
}
}
@Override
public PageResult<StreamHeartbeatResponse> listStreamHeartbeat(HeartbeatPageRequest request) {
Preconditions.expectNotNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
String component = request.getComponent();
Preconditions.expectNotBlank(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY);
Preconditions.expectNotBlank(request.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY);
ComponentTypeEnum componentType = ComponentTypeEnum.forType(component);
switch (componentType) {
case Sort:
case DataProxy:
case Agent:
case Cache:
case SDK:
return listStreamHeartbeatOpt(request);
default:
throw new BusinessException("Unsupported component type for " + component);
}
}
/**
* Default implementation for updating heartbeat
*/
private Boolean updateHeartbeatOpt(HeartbeatReportRequest request) {
if (log.isDebugEnabled()) {
log.debug("heartbeat request json = {}", GSON.toJson(request));
}
String component = request.getComponentType();
String instanceIp = request.getIp();
Long reportTime = request.getReportTime();
// Add component heartbeats
ComponentHeartbeatEntity entity = new ComponentHeartbeatEntity();
entity.setComponent(component);
entity.setInstance(instanceIp);
entity.setReportTime(reportTime);
componentHeartbeatMapper.insertOrUpdateByKey(entity);
// Add group heartbeats
List<GroupHeartbeat> groupHeartbeats = request.getGroupHeartbeats();
if (CollectionUtils.isNotEmpty(groupHeartbeats)) {
groupHeartbeatMapper.insertOrUpdateAll(component, instanceIp, reportTime, groupHeartbeats);
}
// Add stream heartbeats
List<StreamHeartbeat> streamHeartbeats = request.getStreamHeartbeats();
if (CollectionUtils.isNotEmpty(streamHeartbeats)) {
streamHeartbeatMapper.insertOrUpdateAll(component, instanceIp, reportTime, streamHeartbeats);
}
return true;
}
private PageResult<ComponentHeartbeatResponse> listComponentHeartbeatOpt(HeartbeatPageRequest request) {
PageHelper.startPage(request.getPageNum(), request.getPageSize());
Page<ComponentHeartbeatEntity> entityPage =
(Page<ComponentHeartbeatEntity>) componentHeartbeatMapper.selectByCondition(request);
List<ComponentHeartbeatResponse> responseList = CommonBeanUtils.copyListProperties(entityPage,
ComponentHeartbeatResponse::new);
return new PageResult<>(responseList, entityPage.getTotal());
}
private PageResult<GroupHeartbeatResponse> listGroupHeartbeatOpt(HeartbeatPageRequest request) {
PageHelper.startPage(request.getPageNum(), request.getPageSize());
Page<GroupHeartbeatEntity> entityPage = (Page<GroupHeartbeatEntity>) groupHeartbeatMapper.selectByCondition(
request);
List<GroupHeartbeatResponse> responseList = CommonBeanUtils.copyListProperties(entityPage,
GroupHeartbeatResponse::new);
return new PageResult<>(responseList,
entityPage.getTotal(), entityPage.getPageNum(), entityPage.getPageSize());
}
private PageResult<StreamHeartbeatResponse> listStreamHeartbeatOpt(HeartbeatPageRequest request) {
PageHelper.startPage(request.getPageNum(), request.getPageSize());
Page<StreamHeartbeatEntity> entityPage =
(Page<StreamHeartbeatEntity>) streamHeartbeatMapper.selectByCondition(request);
List<StreamHeartbeatResponse> responseList = CommonBeanUtils.copyListProperties(entityPage,
StreamHeartbeatResponse::new);
return new PageResult<>(responseList, entityPage.getTotal(), entityPage.getPageNum(), entityPage.getPageSize());
}
}