blob: 4696375342a92fbe8d98c31947562dc48e739421 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.inlong.manager.client;
import com.google.common.collect.Lists;
import org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.InlongClient;
import org.apache.inlong.manager.client.api.InlongGroup;
import org.apache.inlong.manager.client.api.InlongGroupContext;
import org.apache.inlong.manager.client.api.InlongStreamBuilder;
import org.apache.inlong.manager.common.enums.FieldType;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.source.autopush.AutoPushSource;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.shiro.util.Assert;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* Test class for auto push to hive.
*/
@Disabled
class AutoPush2HiveExample extends BaseExample {
@Test
void testCreateGroupForHive() {
ClientConfiguration configuration = new ClientConfiguration();
configuration.setWriteTimeout(10);
configuration.setReadTimeout(10);
configuration.setConnectTimeout(10);
configuration.setTimeUnit(TimeUnit.SECONDS);
configuration.setAuthentication(super.getInlongAuth());
InlongClient inlongClient = InlongClient.create(super.getServiceUrl(), configuration);
InlongGroupInfo groupInfo = super.createGroupInfo();
try {
InlongGroup group = inlongClient.forGroup(groupInfo);
InlongStreamInfo streamInfo = createStreamInfo();
InlongStreamBuilder streamBuilder = group.createStream(streamInfo);
streamBuilder.fields(createStreamFields());
streamBuilder.source(createAutoPushSource());
streamBuilder.sink(createHiveSink());
streamBuilder.initOrUpdate();
// start group
InlongGroupContext inlongGroupContext = group.init();
Assert.notNull(inlongGroupContext);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
void testStopGroup() {
ClientConfiguration configuration = new ClientConfiguration();
configuration.setWriteTimeout(10);
configuration.setReadTimeout(10);
configuration.setConnectTimeout(10);
configuration.setTimeUnit(TimeUnit.SECONDS);
configuration.setAuthentication(super.getInlongAuth());
InlongClient inlongClient = InlongClient.create(super.getServiceUrl(), configuration);
InlongGroupInfo groupInfo = createGroupInfo();
try {
InlongGroup group = inlongClient.forGroup(groupInfo);
InlongGroupContext groupContext = group.delete();
Assert.notNull(groupContext);
} catch (Exception e) {
e.printStackTrace();
}
}
private AutoPushSource createAutoPushSource() {
AutoPushSource autoPushSource = new AutoPushSource();
autoPushSource.setDataProxyGroup("{Dataproxy.group}");
return autoPushSource;
}
private List<StreamField> createStreamFields() {
List<StreamField> streamFieldList = Lists.newArrayList();
streamFieldList.add(new StreamField(0, FieldType.STRING.toString(), "name", null, null));
streamFieldList.add(new StreamField(1, FieldType.INT.toString(), "age", null, null));
return streamFieldList;
}
}