This tutorial provides examples and guide to use Samza asynchronous API and multithreading.
If your job process involves synchronous IO, or blocking IO, you can simply configure the Samza build-in thread pool to run your tasks in parallel. In the following example, SyncRestTask uses Jersey client to makes rest calls in each process().
{% highlight java %} public class SyncRestTask implements StreamTask, InitableTask, ClosableTask { private Client client; private WebTarget target;
@Override public void init(Config config, TaskContext taskContext) throws Exception { client = ClientBuilder.newClient(); target = client.target(“http://example.com/resource/”).path(“hello”); }
@Override public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { Response response = target.request().get(); System.out.println(“Response status code " + response.getStatus() + " received.”); }
@Override public void close() throws Exception { client.close(); } } {% endhighlight %}
By default Samza will run this task sequentially in a single thread. In below we configure the thread pool of size 16 to run the tasks in parallel:
{% highlight jproperties %}
job.container.thread.pool.size=16 {% endhighlight %}
NOTE: The thread pool will be used to run all the synchronous operations of a task, including StreamTask.process(), WindowableTask.window(), and internally Task.commit(). This is for maximizing the parallelism between tasks as well as reducing the blocking time. When running tasks in multithreading, Samza still guarantees the in-order processing of the messages within a task by default.
If your job process is asynchronous, e.g. making non-blocking remote IO calls, AsyncStreamTask interface provides the support for it. In the following example AsyncRestTask makes asynchronous rest call and triggers callback once it's complete.
{% highlight java %} public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTask { private Client client; private WebTarget target;
@Override public void init(Config config, TaskContext taskContext) throws Exception { client = ClientBuilder.newClient(); target = client.target(“http://example.com/resource/”).path(“hello”); }
@Override public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator, final TaskCallback callback) { target.request().async().get(new InvocationCallback() { @Override public void completed(Response response) { System.out.println(“Response status code " + response.getStatus() + " received.”); callback.complete(); }
@Override public void failed(Throwable throwable) { System.out.println("Invocation failed."); callback.failure(throwable); } });
}
@Override public void close() throws Exception { client.close(); } } {% endhighlight %}
In the above example, the process is not complete when processAsync() returns. In the callback thread from Jersey client, we trigger TaskCallback to indicate the process is done. In order to make sure the callback will be triggered within certain time interval, e.g. 5 seconds, you can config the following property:
{% highlight jproperties %}
task.callback.timeout.ms=5000 {% endhighlight %}
NOTE: Samza also guarantees the in-order process of the messages within an AsyncStreamTask by default, meaning the next processAsync() of a task won't be called until the previous processAsync() callback has been triggered.
If your processing logic is asynchronous, e.g. it makes non-blocking remote calls, you can implement it using the AsyncFlatMapFunction. The following example illustrates an application that processes Wikipedia feed updates and invokes a remote service to standardize the updates and sends the standardized events to Wikipedia.
{% highlight java %}
public class WikipediaAsyncStandardizer implements StreamApplication {
@Override public void describe(StreamApplicationDescriptor appDescriptor) { // Define a SystemDescriptor for Wikipedia data WikipediaSystemDescriptor wikipediaSystemDescriptor = new WikipediaSystemDescriptor(“irc.wikimedia.org”, 6667); // Define InputDescriptors for consuming wikipedia data WikipediaInputDescriptor wikipediaInputDescriptor = wikipediaSystemDescriptor .getInputDescriptor(“en-wikipedia”) .withChannel(“#en.wikipedia”); // Define OutputDescriptor for producing wikipedia data WikipediaOutputDescriptor wikipediaOutputDescriptor = wikipediaSystemDescriptor .getOutputDescriptor(“en-wikipedia-standardized”) .withChannel(“#en.wikipedia.standardized”);
appDescriptor.getInputStream(wikipediaInputDescriptor) .filter(WikipediaFeedEvent::isUpdate) .flatMapAsync(new AsyncStandardizerFunction()) .sendTo(wikipediaOutputDescriptor);
}
static class AsyncStandardizerFunction implements AsyncFlatMapFunction<WikipediaFeedEvent, StandardizedWikipediaFeedEvent> { private transient Client client;
@Override public void init(Context context) { client = ClientBuilder.newClient(context.getJobContext().getConfig().get("standardizer.uri")); } @Override public CompletionStage<Collection<StandardizedWikipediaFeedEvent>> apply(WikipediaFeedEvent wikipediaFeedEvent) { Request<StandardizerRequest> standardizerRequest = buildStandardizedRequest(wikipediaFeedEvent); CompletableFuture<StandardizerResponse> standardizerResponse = client.sendRequest(standardizerRequest); return standardizerResponse .thenApply(response -> extractStandardizedWikipediaFeedEvent(response)); } @Override public void close() { client.close(); }
} } {% endhighlight %}
In the above example, the results from the AsyncStandardizerFunction
are propagated to downstream operator once the future is complete. There is an overall timeout for each to message to be processed and you can tune it using:
{% highlight jproperties %}
task.callback.timeout.ms {% endhighlight %}
If IO library accepts callbacks instead of returning a Future, the callback can be adapted to a Future in the following way:
{% highlight java %}
public CompletionStage<Collection> apply(WikipediaFeedEvent wikipediaFeedEvent) { Request standardizationRequest = buildStandardizedRequest(wikipediaFeedEvent); CompletableFuture<Collection> standardizedFuture = new CompletableFuture<>(); client.async().get(standardizationRequest, new InvocationCallback() { @Override public void completed(StandardizerResponse response) { standardizedFuture.complete(extractStandardizedWikipediaFeedEvent(response)); }
@Override public void failed(Throwable throwable) { standardizedFuture.completeExceptionally(throwable); } });
} {% endhighlight %}
In all cases above, Samza supports in-order process by default. Further parallelism is also supported by allowing a task to process multiple outstanding messages in parallel. The following config allows one task to process at most 4 outstanding messages in parallel at a time:
{% highlight jproperties %}
task.max.concurrency=4 {% endhighlight %}
NOTE: In case of AsyncStreamTask, processAsync() is still invoked in the order of the message arrivals, but the completion can happen out of order. In case of StreamTask and High level API applications with task.max.concurrency > 1, delivery can be out-of-order. This option should NOT be used when strict ordering of the output is required.
In any of the scenarios, Samza guarantees the following semantics: