This tutorial provides examples and guide to use Samza asynchronous API and multithreading.
If your job process involves synchronous IO, or blocking IO, you can simply configure the Samza build-in thread pool to run your tasks in parallel. In the following example, SyncRestTask uses Jersey client to makes rest calls in each process().
{% highlight java %} public class SyncRestTask implements StreamTask, InitableTask, ClosableTask { private Client client; private WebTarget target;
@Override public void init(Config config, TaskContext taskContext) throws Exception { client = ClientBuilder.newClient(); target = client.target(“http://example.com/resource/”).path(“hello”); }
@Override public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { Response response = target.request().get(); System.out.println(“Response status code " + response.getStatus() + " received.”); }
@Override public void close() throws Exception { client.close(); } } {% endhighlight %}
By default Samza will run this task sequentially in a single thread. In below we configure the thread pool of size 16 to run the tasks in parallel:
{% highlight jproperties %}
job.container.thread.pool.size=16 {% endhighlight %}
NOTE: The thread pool will be used to run all the synchronous operations of a task, including StreamTask.process(), WindowableTask.window(), and internally Task.commit(). This is for maximizing the parallelism between tasks as well as reducing the blocking time. When running tasks in multithreading, Samza still guarantees the in-order processing of the messages within a task by default.
If your job process is asynchronous, e.g. making non-blocking remote IO calls, AsyncStreamTask interface provides the support for it. In the following example AsyncRestTask makes asynchronous rest call and triggers callback once it's complete.
{% highlight java %} public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTask { private Client client; private WebTarget target;
@Override public void init(Config config, TaskContext taskContext) throws Exception { client = ClientBuilder.newClient(); target = client.target(“http://example.com/resource/”).path(“hello”); }
@Override public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator, final TaskCallback callback) { target.request().async().get(new InvocationCallback() { @Override public void completed(Response response) { System.out.println(“Response status code " + response.getStatus() + " received.”); callback.complete(); }
@Override public void failed(Throwable throwable) { System.out.println("Invocation failed."); callback.failure(throwable); } });
}
@Override public void close() throws Exception { client.close(); } } {% endhighlight %}
In the above example, the process is not complete when processAsync() returns. In the callback thread from Jersey client, we trigger TaskCallback to indicate the process is done. In order to make sure the callback will be triggered within certain time interval, e.g. 5 seconds, you can config the following property:
{% highlight jproperties %}
task.callback.timeout.ms=5000 {% endhighlight %}
NOTE: Samza also guarantees the in-order process of the messages within an AsyncStreamTask by default, meaning the next processAsync() of a task won't be called until the previous processAsync() callback has been triggered.
In both cases above, Samza supports in-order process by default. Further parallelism is also supported by allowing a task to process multiple outstanding messages in parallel. The following config allows one task to process at most 4 outstanding messages in parallel at a time:
{% highlight jproperties %}
task.max.concurrency=4 {% endhighlight %}
NOTE: In case of AsyncStreamTask, processAsync() is still invoked in the order of the message arrivals, but the completion can go out of order. In case of StreamTask with multithreading, process() can run out-of-order since they are dispatched to a thread pool. This option should NOT be used when strict ordering of the output is required.
In any of the scenarios, Samza guarantees the following semantics: