blob: f7404e35cb042c316bbfb27e462c381293dd5bb8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.remote;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.zeppelin.display.*;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterAngular;
import org.apache.zeppelin.resource.LocalResourcePool;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
private static final String INTERPRETER_SCRIPT =
System.getProperty("os.name").startsWith("Windows") ?
"../bin/interpreter.cmd" :
"../bin/interpreter.sh";
private InterpreterGroup intpGroup;
private HashMap<String, String> env;
private RemoteInterpreter intp;
private InterpreterContext context;
private RemoteAngularObjectRegistry localRegistry;
private AtomicInteger onAdd;
private AtomicInteger onUpdate;
private AtomicInteger onRemove;
@Before
public void setUp() throws Exception {
onAdd = new AtomicInteger(0);
onUpdate = new AtomicInteger(0);
onRemove = new AtomicInteger(0);
intpGroup = new InterpreterGroup("intpId");
localRegistry = new RemoteAngularObjectRegistry("intpId", this, intpGroup);
intpGroup.setAngularObjectRegistry(localRegistry);
env = new HashMap<>();
env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
Properties p = new Properties();
intp = new RemoteInterpreter(
p,
"note",
MockInterpreterAngular.class.getName(),
new File(INTERPRETER_SCRIPT).getAbsolutePath(),
"fake",
"fakeRepo",
env,
10 * 1000,
null,
null,
"anonymous",
false
);
intpGroup.put("note", new LinkedList<Interpreter>());
intpGroup.get("note").add(intp);
intp.setInterpreterGroup(intpGroup);
context = new InterpreterContext(
"note",
"id",
null,
"title",
"text",
new AuthenticationInfo(),
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LocalResourcePool("pool1"),
new LinkedList<InterpreterContextRunner>(), null);
intp.open();
}
@After
public void tearDown() throws Exception {
intp.close();
intpGroup.close();
}
@Test
public void testAngularObjectInterpreterSideCRUD() throws InterruptedException {
InterpreterResult ret = intp.interpret("get", context);
Thread.sleep(500); // waitFor eventpoller pool event
String[] result = ret.message().get(0).getData().split(" ");
assertEquals("0", result[0]); // size of registry
assertEquals("0", result[1]); // num watcher called
// create object
ret = intp.interpret("add n1 v1", context);
Thread.sleep(500);
result = ret.message().get(0).getData().split(" ");
assertEquals("1", result[0]); // size of registry
assertEquals("0", result[1]); // num watcher called
assertEquals("v1", localRegistry.get("n1", "note", null).get());
// update object
ret = intp.interpret("update n1 v11", context);
result = ret.message().get(0).getData().split(" ");
Thread.sleep(500);
assertEquals("1", result[0]); // size of registry
assertEquals("1", result[1]); // num watcher called
assertEquals("v11", localRegistry.get("n1", "note", null).get());
// remove object
ret = intp.interpret("remove n1", context);
result = ret.message().get(0).getData().split(" ");
Thread.sleep(500);
assertEquals("0", result[0]); // size of registry
assertEquals("1", result[1]); // num watcher called
assertEquals(null, localRegistry.get("n1", "note", null));
}
@Test
public void testAngularObjectRemovalOnZeppelinServerSide() throws InterruptedException {
// test if angularobject removal from server side propagate to interpreter process's registry.
// will happen when notebook is removed.
InterpreterResult ret = intp.interpret("get", context);
Thread.sleep(500); // waitFor eventpoller pool event
String[] result = ret.message().get(0).getData().split(" ");
assertEquals("0", result[0]); // size of registry
// create object
ret = intp.interpret("add n1 v1", context);
Thread.sleep(500);
result = ret.message().get(0).getData().split(" ");
assertEquals("1", result[0]); // size of registry
assertEquals("v1", localRegistry.get("n1", "note", null).get());
// remove object in local registry.
localRegistry.removeAndNotifyRemoteProcess("n1", "note", null);
ret = intp.interpret("get", context);
Thread.sleep(500); // waitFor eventpoller pool event
result = ret.message().get(0).getData().split(" ");
assertEquals("0", result[0]); // size of registry
}
@Test
public void testAngularObjectAddOnZeppelinServerSide() throws InterruptedException {
// test if angularobject add from server side propagate to interpreter process's registry.
// will happen when zeppelin server loads notebook and restore the object into registry
InterpreterResult ret = intp.interpret("get", context);
Thread.sleep(500); // waitFor eventpoller pool event
String[] result = ret.message().get(0).getData().split(" ");
assertEquals("0", result[0]); // size of registry
// create object
localRegistry.addAndNotifyRemoteProcess("n1", "v1", "note", null);
// get from remote registry
ret = intp.interpret("get", context);
Thread.sleep(500); // waitFor eventpoller pool event
result = ret.message().get(0).getData().split(" ");
assertEquals("1", result[0]); // size of registry
}
@Override
public void onAdd(String interpreterGroupId, AngularObject object) {
onAdd.incrementAndGet();
}
@Override
public void onUpdate(String interpreterGroupId, AngularObject object) {
onUpdate.incrementAndGet();
}
@Override
public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) {
onRemove.incrementAndGet();
}
}