Hadoop requires a client that can be used to interact remotely with the services provided by Hadoop cluster. This will also be true when using the Apache Knox Gateway to provide perimeter security and centralized access for these services. The two primary existing clients for Hadoop are the CLI (i.e. Command Line Interface, hadoop) and HUE (i.e. Hadoop User Environment). For several reasons however, neither of these clients can currently be used to access Hadoop services via the Apache Knox Gateway.
This led to thinking about a very simple client that could help people use and evaluate the gateway. The list below outlines the general requirements for such a client.
The result is a very simple DSL (Domain Specific Language) of sorts that is used via Groovy scripts. Here is an example of a command that copies a file from the local file system to HDFS.
Note: The variables session, localFile and remoteFile are assumed to be defined.
Hdfs.put( session ).file( localFile ).to( remoteFile ).now()
This work is very early in development but is also very useful in its current state. We are very interested in receiving feedback about how to improve this feature and the DSL in particular.
A note of thanks to REST-assured which provides a Fluent interface style DSL for testing REST services. It served as the initial inspiration for the creation of this DSL.
This document assumes a few things about your environment in order to simplify the examples.
The DSL requires a shell to interpret the Groovy script. The shell can either be used interactively or to execute a script file. To simplify use, the distribution contains an embedded version of the Groovy shell.
The shell can be run interactively. Use the command exit
to exit.
java -jar bin/shell.jar
When running interactively it may be helpful to reduce some of the output generated by the shell console. Use the following command in the interactive shell to reduce that output. This only needs to be done once as these preferences are persisted.
set verbosity QUIET set show-last-result false
Also when running interactively use the exit
command to terminate the shell. Using ^C
to exit can sometimes leaves the parent shell in a problematic state.
The shell can also be used to execute a script by passing a single filename argument.
java -jar bin/shell.jar samples/ExampleWebHdfsPutGetFile.groovy
Once the shell can be launched the DSL can be used to interact with the gateway and Hadoop. Below is a very simple example of an interactive shell session to upload a file to HDFS.
java -jar bin/shell.jar knox:000> session = Hadoop.login( "https://localhost:8443/gateway/sandbox", "guest", "guest-password" ) knox:000> Hdfs.put( session ).file( "README" ).to( "/tmp/example/README" ).now()
The knox:000>
in the example above is the prompt from the embedded Groovy console. If you output doesn't look like this you may need to set the verbosity and show-last-result preferences as described above in the Usage section.
If you relieve an error HTTP/1.1 403 Forbidden
it may be because that file already exists. Try deleting it with the following command and then try again.
knox:000> Hdfs.rm(session).file("/tmp/example/README").now()
Without using some other tool to browse HDFS it is hard to tell that that this command did anything. Execute this to get a bit more feedback.
knox:000> println "Status=" + Hdfs.put( session ).file( "README" ).to( "/tmp/example/README2" ).now().statusCode Status=201
Notice that a different filename is used for the destination. Without this an error would have resulted. Of course the DSL also provides a command to list the contents of a directory.
knox:000> println Hdfs.ls( session ).dir( "/tmp/example" ).now().string {"FileStatuses":{"FileStatus":[{"accessTime":1363711366977,"blockSize":134217728,"group":"hdfs","length":19395,"modificationTime":1363711366977,"owner":"guest","pathSuffix":"README","permission":"644","replication":1,"type":"FILE"},{"accessTime":1363711375617,"blockSize":134217728,"group":"hdfs","length":19395,"modificationTime":1363711375617,"owner":"guest","pathSuffix":"README2","permission":"644","replication":1,"type":"FILE"}]}}
It is a design decision of the DSL to not provide type safe classes for various request and response payloads. Doing so would provide an undesirable coupling between the DSL and the service implementation. It also would make adding new commands much more difficult. See the Groovy section below for a variety capabilities and tools for working with JSON and XML to make this easy. The example below shows the use of JsonSlurper and GPath to extract content from a JSON response.
knox:000> import groovy.json.JsonSlurper knox:000> text = Hdfs.ls( session ).dir( "/tmp/example" ).now().string knox:000> json = (new JsonSlurper()).parseText( text ) knox:000> println json.FileStatuses.FileStatus.pathSuffix [README, README2]
In the future, “built-in” methods to slurp JSON and XML may be added to make this a bit easier. This would allow for this type if single line interaction.
println Hdfs.ls(session).dir("/tmp").now().json().FileStatuses.FileStatus.pathSuffix
Shell session should always be ended with shutting down the session. The examples above do not touch on it but the DSL supports the simple execution of commands asynchronously. The shutdown command attempts to ensures that all asynchronous commands have completed before existing the shell.
knox:000> session.shutdown() knox:000> exit
All of the commands above could have been combined into a script file and executed as a single line.
java -jar bin/shell.jar samples/ExampleWebHdfsPutGet.groovy
This would be the content of that script.
import org.apache.hadoop.gateway.shell.Hadoop import org.apache.hadoop.gateway.shell.hdfs.Hdfs import groovy.json.JsonSlurper gateway = "https://localhost:8443/gateway/sandbox" username = "guest" password = "guest-password" dataFile = "README" session = Hadoop.login( gateway, username, password ) Hdfs.rm( session ).file( "/tmp/example" ).recursive().now() Hdfs.put( session ).file( dataFile ).to( "/tmp/example/README" ).now() text = Hdfs.ls( session ).dir( "/tmp/example" ).now().string json = (new JsonSlurper()).parseText( text ) println json.FileStatuses.FileStatus.pathSuffix session.shutdown() exit
Notice the Hdfs.rm
command. This is included simply to ensure that the script can be rerun. Without this an error would result the second time it is run.
The DSL supports the ability to invoke commands asynchronously via the later() invocation method. The object returned from the later() method is a java.util.concurrent.Future parametrized with the response type of the command. This is an example of how to asynchronously put a file to HDFS.
future = Hdfs.put(session).file("README").to("tmp/example/README").later() println future.get().statusCode
The future.get() method will block until the asynchronous command is complete. To illustrate the usefulness of this however multiple concurrent commands are required.
readmeFuture = Hdfs.put(session).file("README").to("tmp/example/README").later() licenseFuture = Hdfs.put(session).file("LICENSE").to("tmp/example/LICENSE").later() session.waitFor( readmeFuture, licenseFuture ) println readmeFuture.get().statusCode println licenseFuture.get().statusCode
The session.waitFor() method will wait for one or more asynchronous commands to complete.
Futures alone only provide asynchronous invocation of the command. What if some processing should also occur asynchronously once the command is complete. Support for this is provided by closures. Closures are blocks of code that are passed into the later() invocation method. In Groovy these are contained within {} immediately after a method. These blocks of code are executed once the asynchronous command is complete.
Hdfs.put(session).file("README").to("tmp/example/README").later(){ println it.statusCode }
In this example the put() command is executed on a separate thread and once complete the println it.statusCode
block is executed on that thread. The it variable is automatically populated by Groovy and is a reference to the result that is returned from the future or now() method. The future example above can be rewritten to illustrate the use of closures.
readmeFuture = Hdfs.put(session).file("README").to("tmp/example/README").later() { println it.statusCode } licenseFuture = Hdfs.put(session).file("LICENSE").to("tmp/example/LICENSE").later() { println it.statusCode } session.waitFor( readmeFuture, licenseFuture )
Again, the session.waitFor() method will wait for one or more asynchronous commands to complete.
In order to understand the DSL there are three primary constructs that need to be understood.
This construct encapsulates the client side session state that will be shared between all command invocations. In particular it will simplify the management of any tokens that need to be presented with each command invocation. It also manages a thread pool that is used by all asynchronous commands which is why it is important to call one of the shutdown methods.
The syntax associated with this is expected to change we expect that credentials will not need to be provided to the gateway. Rather it is expected that some form of access token will be used to initialize the session.
Services are the primary extension point for adding new suites of commands. The current built in examples are: Hdfs, Job and Workflow. The desire for extensibility is the reason for the slightly awkward Hdfs.ls(session) syntax. Certainly something more like session.hdfs().ls()
would have been preferred but this would prevent adding new commands easily. At a minimum it would result in extension commands with a different syntax from the “built-in” commands.
The service objects essentially function as a factory for a suite of commands.
Commands provide the behavior of the DSL. They typically follow a Fluent interface style in order to allow for single line commands. There are really three parts to each command: Request, Invocation, Response
The request is populated by all of the methods following the “verb” method and the “invoke” method. For example in Hdfs.rm(session).ls(dir).now()
the request is populated between the “verb” method rm()
and the “invoke” method now()
.
The invocation method controls how the request is invoked. Currently supported synchronous and asynchronous invocation. The now() method executes the request and returns the result immediately. The later() method submits the request to be executed later and returns a future from which the result can be retrieved. In addition later() invocation method can optionally be provided a closure to execute when the request is complete. See the Futures and Closures sections below for additional detail and examples.
The response contains the results of the invocation of the request. In most cases the response is a thin wrapper over the HTTP response. In fact many commands will share a single BasicResponse type that only provides a few simple methods.
public int getStatusCode() public long getContentLength() public String getContentType() public String getContentEncoding() public InputStream getStream() public String getString() public byte[] getBytes() public void close();
Thanks to Groovy these methods can be accessed as attributes. In the some of the examples the staticCode was retrieved for example.
println Hdfs.put(session).rm(dir).now().statusCode
Groovy will invoke the getStatusCode method to retrieve the statusCode attribute.
The three methods getStream(), getBytes() and getString deserve special attention. Care must be taken that the HTTP body is fully read once and only once. Therefore one of these methods (and only one) must be called once and only once. Calling one of these more than once will cause an error. Failing to call one of these methods once will result in lingering open HTTP connections. The close() method may be used if the caller is not interested in reading the result body. Most commands that do not expect a response body will call close implicitly. If the body is retrieved via getBytes() or getString(), the close() method need not be called. When using getStream(), care must be taken to consume the entire body otherwise lingering open HTTP connections will result. The close() method may be called after reading the body partially to discard the remainder of the body.
The built-in supported client DLS for each Hadoop service can be found in the #[Service Details] section.
Extensibility is a key design goal of the KnoxShell and client DSL. There are two ways to provide extended functionality for use with the shell. The first is to simply create Groovy scripts that use the DSL to perform a useful task. The second is to add new services and commands. In order to add new service and commands new classes must be written in either Groovy or Java and added to the classpath of the shell. Fortunately there is a very simple way to add classes and JARs to the shell classpath. The first time the shell is executed it will create a configuration file in the same directory as the JAR with the same base name and a .cfg
extension.
bin/shell.jar bin/shell.cfg
That file contains both the main class for the shell as well as a definition of the classpath. Currently that file will by default contain the following.
main.class=org.apache.hadoop.gateway.shell.Shell class.path=../lib; ../lib/*.jar; ../ext; ../ext/*.jar
Therefore to extend the shell you should copy any new service and command class either to the ext
directory or if they are packaged within a JAR copy the JAR to the ext
directory. The lib
directory is reserved for JARs that may be delivered with the product.
Below are samples for the service and command classes that would need to be written to add new commands to the shell. These happen to be Groovy source files but could with very minor changes be Java files. The easiest way to add these to the shell is to compile them directory into the ext
directory. Note: This command depends upon having the Groovy compiler installed and available on the execution path.
groovy -d ext -cp bin/shell.jar samples/SampleService.groovy \ samples/SampleSimpleCommand.groovy samples/SampleComplexCommand.groovy
These source files are available in the samples directory of the distribution but these are included here for convenience.
import org.apache.hadoop.gateway.shell.Hadoop class SampleService { static String PATH = "/webhdfs/v1" static SimpleCommand simple( Hadoop session ) { return new SimpleCommand( session ) } static ComplexCommand.Request complex( Hadoop session ) { return new ComplexCommand.Request( session ) } }
import org.apache.hadoop.gateway.shell.AbstractRequest import org.apache.hadoop.gateway.shell.BasicResponse import org.apache.hadoop.gateway.shell.Hadoop import org.apache.http.client.methods.HttpGet import org.apache.http.client.utils.URIBuilder import java.util.concurrent.Callable class SimpleCommand extends AbstractRequest<BasicResponse> { SimpleCommand( Hadoop session ) { super( session ) } private String param SimpleCommand param( String param ) { this.param = param return this } @Override protected Callable<BasicResponse> callable() { return new Callable<BasicResponse>() { @Override BasicResponse call() { URIBuilder uri = uri( SampleService.PATH, param ) addQueryParam( uri, "op", "LISTSTATUS" ) HttpGet get = new HttpGet( uri.build() ) return new BasicResponse( execute( get ) ) } } } }
import com.jayway.jsonpath.JsonPath import org.apache.hadoop.gateway.shell.AbstractRequest import org.apache.hadoop.gateway.shell.BasicResponse import org.apache.hadoop.gateway.shell.Hadoop import org.apache.http.HttpResponse import org.apache.http.client.methods.HttpGet import org.apache.http.client.utils.URIBuilder import java.util.concurrent.Callable class ComplexCommand { static class Request extends AbstractRequest<Response> { Request( Hadoop session ) { super( session ) } private String param; Request param( String param ) { this.param = param; return this; } @Override protected Callable<Response> callable() { return new Callable<Response>() { @Override Response call() { URIBuilder uri = uri( SampleService.PATH, param ) addQueryParam( uri, "op", "LISTSTATUS" ) HttpGet get = new HttpGet( uri.build() ) return new Response( execute( get ) ) } } } } static class Response extends BasicResponse { Response(HttpResponse response) { super(response) } public List<String> getNames() { return JsonPath.read( string, "\$.FileStatuses.FileStatus[*].pathSuffix" ) } } }
The shell included in the distribution is basically an unmodified packaging of the Groovy shell. The distribution does however provide a wrapper that makes it very easy to setup the class path for the shell. In fact the JARs required to execute the DSL are included on the class path by default. Therefore these command are functionally equivalent if you have Groovy [installed][15]. See below for a description of what is required for JARs required by the DSL from lib
and dep
directories.
java -jar bin/shell.jar samples/ExampleWebHdfsPutGet.groovy groovy -classpath {JARs required by the DSL from lib and dep} samples/ExampleWebHdfsPutGet.groovy
The interactive shell isn't exactly equivalent. However the only difference is that the shell.jar automatically executes some additional imports that are useful for the KnoxShell client DSL. So these two sets of commands should be functionality equivalent. However there is currently a class loading issue that prevents the groovysh command from working properly.
java -jar bin/shell.jar groovysh -classpath {JARs required by the DSL from lib and dep} import org.apache.hadoop.gateway.shell.Hadoop import org.apache.hadoop.gateway.shell.hdfs.Hdfs import org.apache.hadoop.gateway.shell.job.Job import org.apache.hadoop.gateway.shell.workflow.Workflow import java.util.concurrent.TimeUnit
Alternatively, you can use the Groovy Console which does not appear to have the same class loading issue.
groovyConsole -classpath {JARs required by the DSL from lib and dep} import org.apache.hadoop.gateway.shell.Hadoop import org.apache.hadoop.gateway.shell.hdfs.Hdfs import org.apache.hadoop.gateway.shell.job.Job import org.apache.hadoop.gateway.shell.workflow.Workflow import java.util.concurrent.TimeUnit
The JARs currently required by the client DSL are
lib/gateway-shell-${gateway-version}.jar dep/httpclient-4.2.3.jar dep/httpcore-4.2.2.jar dep/commons-lang3-3.1.jar dep/commons-codec-1.7.jar
So on Linux/MacOS you would need this command
groovy -cp lib/gateway-shell-0.2.0-SNAPSHOT.jar:dep/httpclient-4.2.3.jar:dep/httpcore-4.2.2.jar:dep/commons-lang3-3.1.jar:dep/commons-codec-1.7.jar samples/ExampleWebHdfsPutGet.groovy
and on Windows you would need this command
groovy -cp lib/gateway-shell-0.2.0-SNAPSHOT.jar;dep/httpclient-4.2.3.jar;dep/httpcore-4.2.2.jar;dep/commons-lang3-3.1.jar;dep/commons-codec-1.7.jar samples/ExampleWebHdfsPutGet.groovy
The exact list of required JARs is likely to change from release to release so it is recommended that you utilize the wrapper bin/shell.jar
.
In addition because the DSL can be used via standard Groovy, the Groovy integrations in many popular IDEs (e.g. IntelliJ , Eclipse) can also be used. This makes it particularly nice to develop and execute scripts to interact with Hadoop. The code-completion features in modern IDEs in particular provides immense value. All that is required is to add the shell-0.2.0.jar to the projects class path.
There are a variety of Groovy tools that make it very easy to work with the standard interchange formats (i.e. JSON and XML). In Groovy the creation of XML or JSON is typically done via a “builder” and parsing done via a “slurper”. In addition once JSON or XML is “slurped” the GPath, an XPath like feature build into Groovy can be used to access data.