blob: 7c79b398cd601c02f4a2f00122a197aa353c31d2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.service.client.impl;
import com.sun.jersey.api.client.WebResource;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import org.apache.commons.lang3.StringUtils;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
import org.apache.eagle.service.client.EagleServiceClientException;
import org.apache.eagle.service.client.EagleServiceConnector;
import org.apache.eagle.service.client.EagleServiceSingleEntityQueryRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
public class EagleServiceClientImpl extends EagleServiceBaseClient {
private static final Logger LOG = LoggerFactory.getLogger(EagleServiceClientImpl.class);
private static final String SERVICE_HOST_KEY = "service.host";
private static final String SERVICE_PORT_KEY = "service.port";
private static final String SERVICE_USERNAME_KEY = "service.username";
private static final String SERVICE_PASSWORD_KEY = "service.password";
public EagleServiceClientImpl(String host, int port) {
super(host, port);
}
@Deprecated
public EagleServiceClientImpl(EagleServiceConnector connector) {
this(connector.getEagleServiceHost(), connector.getEagleServicePort(), connector.getUsername(), connector.getPassword());
}
public EagleServiceClientImpl(Config config) {
super(
config.hasPath(SERVICE_HOST_KEY) ? config.getString(SERVICE_HOST_KEY) : "localhost",
tryGetPortFromConfig(config),
config.hasPath(SERVICE_USERNAME_KEY) ? config.getString(SERVICE_USERNAME_KEY) : null,
config.hasPath(SERVICE_PASSWORD_KEY) ? config.getString(SERVICE_PASSWORD_KEY) : null
);
}
/**
* Try to get eagle service port from config by key: service.port no matter STRING or INT.
*/
private static int tryGetPortFromConfig(Config config) {
if (config.hasPath(SERVICE_PORT_KEY)) {
try {
return config.getInt(SERVICE_PORT_KEY);
} catch (ConfigException.WrongType wrongType) {
String portStr = config.getString(SERVICE_PORT_KEY);
if (StringUtils.isNotBlank(portStr)) {
return Integer.valueOf(portStr);
}
}
}
return 9090;
}
public EagleServiceClientImpl(String host, int port, String username, String password) {
super(host, port, username, password);
}
public EagleServiceClientImpl(String host, int port, String basePath, String username, String password) {
super(host, port, basePath, username, password);
}
private String getWholePath(String urlString) {
return getBaseEndpoint() + urlString;
}
@Override
public <E extends TaggedLogAPIEntity> GenericServiceAPIResponseEntity<String> create(List<E> entities, String serviceName) throws IOException, EagleServiceClientException {
checkNotNull(serviceName, "serviceName");
checkNotNull(entities, "entities");
final GenericServiceAPIResponseEntity<String> response;
response = postEntitiesWithService(GENERIC_ENTITY_PATH, entities, serviceName);
if (!response.isSuccess()) {
LOG.error("Failed to create entities for service: " + serviceName);
}
return response;
}
@Override
public <E extends TaggedLogAPIEntity> GenericServiceAPIResponseEntity<String> create(List<E> entities) throws IOException, EagleServiceClientException {
checkNotNull(entities, "entities");
Map<String, List<E>> serviceEntityMap = groupEntitiesByService(entities);
if (LOG.isDebugEnabled()) {
LOG.debug("Creating entities for " + serviceEntityMap.keySet().size() + " services");
}
List<String> createdKeys = new LinkedList<String>();
for (Map.Entry<String, List<E>> entry : serviceEntityMap.entrySet()) {
GenericServiceAPIResponseEntity<String> response = create(entry.getValue(), entry.getKey());
if (!response.isSuccess()) {
throw new IOException("Service side exception: " + response.getException());
} else if (response.getObj() != null) {
createdKeys.addAll(response.getObj());
}
}
GenericServiceAPIResponseEntity<String> entity = new GenericServiceAPIResponseEntity<String>();
entity.setObj(createdKeys);
entity.setSuccess(true);
return entity;
}
@Override
public <E extends TaggedLogAPIEntity> GenericServiceAPIResponseEntity<String> delete(List<E> entities) throws IOException, EagleServiceClientException {
checkNotNull(entities, "entities");
Map<String, List<E>> serviceEntityMap = groupEntitiesByService(entities);
if (LOG.isDebugEnabled()) {
LOG.debug("Creating entities for " + serviceEntityMap.keySet().size() + " services");
}
List<String> deletedKeys = new LinkedList<String>();
for (Map.Entry<String, List<E>> entry : serviceEntityMap.entrySet()) {
GenericServiceAPIResponseEntity<String> response = delete(entry.getValue(), entry.getKey());
if (!response.isSuccess()) {
LOG.error("Got service exception: " + response.getException());
throw new IOException(response.getException());
} else if (response.getObj() != null) {
deletedKeys.addAll(response.getObj());
}
}
GenericServiceAPIResponseEntity<String> entity = new GenericServiceAPIResponseEntity<String>();
entity.setObj(deletedKeys);
entity.setSuccess(true);
return entity;
}
@SuppressWarnings("unchecked")
@Override
public <E extends TaggedLogAPIEntity> GenericServiceAPIResponseEntity<String> delete(List<E> entities, String serviceName) throws IOException, EagleServiceClientException {
checkNotNull(entities, "entities");
checkNotNull(serviceName, "serviceName");
return postEntitiesWithService(GENERIC_ENTITY_DELETE_PATH, entities, serviceName);
}
@SuppressWarnings("unchecked")
@Override
public GenericServiceAPIResponseEntity<String> delete(EagleServiceSingleEntityQueryRequest request) throws IOException, EagleServiceClientException {
String queryString = request.getQueryParameterString();
StringBuilder sb = new StringBuilder();
sb.append(GENERIC_ENTITY_PATH);
sb.append("?");
sb.append(queryString);
final String urlString = sb.toString();
if (!this.silence) {
LOG.info("Going to delete by querying service: " + getWholePath(urlString));
}
WebResource r = getWebResource(urlString);
return putAuthHeaderIfNeeded(r.accept(DEFAULT_MEDIA_TYPE)
.header(CONTENT_TYPE, DEFAULT_HTTP_HEADER_CONTENT_TYPE))
.delete(GenericServiceAPIResponseEntity.class);
}
@SuppressWarnings("unchecked")
@Override
public GenericServiceAPIResponseEntity<String> deleteById(List<String> ids, String serviceName) throws EagleServiceClientException, IOException {
checkNotNull(serviceName, "serviceName");
checkNotNull(ids, "ids");
final String json = marshall(ids);
final WebResource r = getWebResource(GENERIC_ENTITY_DELETE_PATH);
return putAuthHeaderIfNeeded(r.queryParam(SERVICE_NAME, serviceName)
.queryParam(DELETE_BY_ID, "true")
.accept(DEFAULT_MEDIA_TYPE))
.header(CONTENT_TYPE, DEFAULT_HTTP_HEADER_CONTENT_TYPE)
.post(GenericServiceAPIResponseEntity.class, json);
}
@Override
public <E extends TaggedLogAPIEntity> GenericServiceAPIResponseEntity<String> update(List<E> entities) throws IOException, EagleServiceClientException {
checkNotNull(entities, "entities");
Map<String, List<E>> serviceEntityMap = groupEntitiesByService(entities);
if (LOG.isDebugEnabled()) {
LOG.debug("Updating entities for " + serviceEntityMap.keySet().size() + " services");
}
List<String> createdKeys = new LinkedList<String>();
for (Map.Entry<String, List<E>> entry : serviceEntityMap.entrySet()) {
GenericServiceAPIResponseEntity<String> response = update(entry.getValue(), entry.getKey());
if (!response.isSuccess()) {
throw new IOException("Got service exception when updating service " + entry.getKey() + " : " + response.getException());
} else {
if (response.getObj() != null) {
createdKeys.addAll(response.getObj());
}
}
}
GenericServiceAPIResponseEntity<String> entity = new GenericServiceAPIResponseEntity<String>();
entity.setObj(createdKeys);
entity.setSuccess(true);
return entity;
}
@Override
public <E extends TaggedLogAPIEntity> GenericServiceAPIResponseEntity<String> update(List<E> entities, String serviceName) throws IOException, EagleServiceClientException {
checkNotNull(entities, "entities");
checkNotNull(serviceName, "serviceName");
return putEntitiesWithService(GENERIC_ENTITY_PATH, entities, serviceName);
}
@Override
@SuppressWarnings("unchecked")
public <T extends Object> GenericServiceAPIResponseEntity<T> search(EagleServiceSingleEntityQueryRequest request) throws EagleServiceClientException {
String queryString = request.getQueryParameterString();
StringBuilder sb = new StringBuilder();
sb.append(GENERIC_ENTITY_PATH);
sb.append("?");
sb.append(queryString);
final String urlString = sb.toString();
if (!this.silence) {
LOG.info("Going to query service: " + getWholePath(urlString));
}
WebResource r = getWebResource(urlString);
return putAuthHeaderIfNeeded(r.accept(DEFAULT_MEDIA_TYPE))
.header(CONTENT_TYPE, DEFAULT_HTTP_HEADER_CONTENT_TYPE)
.get(GenericServiceAPIResponseEntity.class);
}
}