commit | 111cbd44ee509f48a59c611a62e64c81f5a83220 | [log] [tgz] |
---|---|---|
author | Alex Glikson <GLIKSON@il.ibm.com> | Tue Aug 09 00:00:31 2016 +0300 |
committer | glikson <glikson@il.ibm.com> | Sun Sep 11 11:36:46 2016 -0400 |
tree | 00dd12125e36392244e1353dd8f8d7df90e01a57 | |
parent | ea2c9ead88471232c23bf108164d36b9208e5173 [diff] |
# This is a combination of 28 commits. # The first commit's message is: Update README.md # The 2nd commit message will be skipped: # Update README.md # The 3rd commit message will be skipped: # Added gradle & eclipse config # The 4th commit message will be skipped: # Formatting improvement in README.md # The 5th commit message will be skipped: # Fix path in README.md # The 6th commit message will be skipped: # Moved Whisk's main method to the main Fetcher class # The 7th commit message will be skipped: # Added 'topic' and 'appid' as arguments # # Optional for whisk, mandatory for regular java invocation # +minor editing # The 8th commit message will be skipped: # added .gradle to gitignore # The 9th commit message will be skipped: # removed .gradle files # The 10th commit message will be skipped: # removed run folder # The 11th commit message will be skipped: # added run folder to gitignore # The 12th commit message will be skipped: # Split mhub action into 'mhub-wrtier' action and 'mhub-fetcher' action # The 13th commit message will be skipped: # Updated gradle (and readme) to create 2 separate jars # # This makes the build, local invocation and whisk action instantiation more streightforward # The 14th commit message will be skipped: # Added start/stop offsets and processor to fetcher, eliminated 'sleep' delays # # - added startOffset and stopOffset parameters for fetcher # - added pluggable 'recordsProcessor' # - replaced large 'sleep' delays and timeouts with listener and fine-grained loop delays # The 15th commit message will be skipped: # Added simple offsets monitor program # # Returns last offset in a topic/partition, as well as last committed offset # Invocation of 'writer' increases the last offset, while invocation of 'fetcher' # (with proper offset range) updates 'loast committed offset' returned by the monitor # The 16th commit message will be skipped: # Updated README.md with details about the monitor # The 17th commit message will be skipped: # Fixed the 'monitor' program, and several related issues # # Divided the monitorr into 2 consumer threads: 1) 'latest', 2) 'committed'. # The former subscribes with own consumerid, the latter doesn't subscribe/assign at all. # The 18th commit message will be skipped: # Package structure refactoring # The 19th commit message will be skipped: # Updated README.md with invocation flow for the new monitor # The 20th commit message will be skipped: # Typo in README.md # The 21st commit message will be skipped: # Fixes in logging and Whisk setup documentation # The 22nd commit message will be skipped: # Few README.md typos # The 23rd commit message will be skipped: # fixed few readme formatting issues # The 24th commit message will be skipped: # Applied Eclipse formatting, replaced spaces with tabs # # For consistent viewing at github # The 25th commit message will be skipped: # A bit more formatting, default param values aligned with readme # The 26th commit message will be skipped: # Added 'numRecords' param to 'writer' action # # Default is 50 # The 27th commit message will be skipped: # added simple MHub2ObStor action # # Reads messages from message hub and writes them in a batch to object storage # The 28th commit message will be skipped: # Updated gradle build for MH2OS action
Matos demonstrates Bluemix-based serverless implementation of a simple pipeline (hosted on OpenWhisk) that reads messages from a Message Hub topic and archives them in batches into an Object Storage folder.
The serverless architecture introduces multiple advantages. First, by leveraging OpenWhisk and given the persistent nature of Message Hub, it is possible to apply the archiving in batches, and pay only for the short periods of execution time (typically seconds) of each batch. Moreover, the architecture can seamlessly accommodate spikes in load due to inherent elasticity of OpenWhisk. The combination of the two can dramatically reduce the overall cost, and increase the elasticity of the solution.
Matos is implemented in Java.
Disclamer: notice that this implementation is for education/demonstration purposes ONLY. It does not address many important requirements, such as high availability, consistency and delivery guarantees. Moreover, there are several known cases when the program does not function properly (e.g., illegal parameters, invocation ‘corner cases’, etc).
Matos was inspired by Secor.
The heart of matos is an OpenWhisk action called batch, that copies a batch of messages from a Message Hub topic into an Object Storage folder. The action can be invoked passing a range of Kafka message offsets to archive. Otherwise it would archive all the pending messages since the last invocation (enabling, for example, periodic time-based invocation). Every batch will be saved in a separate file within the specified folder, using a naming convention that contains current timestamp and the range of offsets. In addition, there are two helper functions -- load, to produce a given amount of test messages into Message Hub, and monitor, to retrieve the current offsets (latest and last committed) for a given topic and consumer ID.
The actions receive arguments either via a configuration file (in JSON format, packaged with the action's jar file) or via regular action parameters mechanisms (explicit or package-bound). The parameters are:
kafkaBroker
- the address of the Message Hub broker to connect to, as appears in VCAP_SERVICES (e.g., kafka01-prod01.messagehub.services.us-south.bluemix.net:9093
)kafkaApiKey
(*) - the API key of your Message Hub instance that you want Matos to work with, as appears in VCAP_SERVICES (typically 48 characters)kafkaTopic
- the name of the topic in the above instance (e.g., matos-topic
)kafkaPartition
- the partition number within the topic (typically 0, unless the topic has multiple partitions)kafkaConsumerId
- a string identifying this Matos instance (e.g., matos-id
)kafkaStartOffset
- only for batch action (“-1” means that last committed offset will be used)kafkaEndOffset
- only for batch action (“-1” means that latest available offset will be used)kafkaNumRecords
- only for load (e.g., “1000”)swiftAuthUrl
- the auth URL of the Object Storage service, as appears in VCAP_SERVICES, with the proper suffix (e.g., https://identity.open.softlayer.com/v3/auth/tokens
)swiftRegion
- the region of the Object Storage service, as appears in VCAP_SERVICES (e.g., dallas
)swiftTenantId
(*) - the tenant ID of the Object Storage service instance you want to work with, as appears in VCAP_SERVICES (typically 32 characters)swiftUserId
(*) - the user ID of the Object Storage service instance, as appears in VCAP_SERVICES (typically 32 characters)swiftPassword
(*) - the password for the Object Storage service instance, as appears in VCAP_SERVICES (typically 16 characters. Notice that it may contain special characters, so make sure to escape them with '')swiftContainer
- the name of the folder within the Object Storage service instance (e.g., matos-folder
)See resources/matos.json for an example. Notice that due to security reasons, parameters marked with (*) would typically not be specified in the configuration file, but rather as runtime parameters of the OpenWhisk package binding (as demonstrated below).
For convenience, the actions can be also invoked locally as regular Java programs (in which case the config file is passed as first argument, followed by relevant parameters marked above with (*) - just run them without parameters to see the exact usage). When invoked locally, the monitor
program runs continuously, retrieving and displaying offsets every 5 seconds (you can interrupt it with Ctrl-C).
matos-topic
, typically with a single partition), to decide on consumerId (arbitrary string - e.g., matos-id
) and to locate (in VCAP_SERVICES) the api key and the address of the broker.matos-folder
) and to locate (in VCAP_SERVICES) the auth_url, region, projectId, userId and password. The id's are 32 characters long, and the password is 16 characters. Notice that the password may include special characters, which you may need to escape with '' when specifying in config file and/or command line.:~$ git clone <URL_OF_THIS_REPOSITORY.git> :~$ cd matos :~/matos$ mkdir run; mkdir tmp :~/matos$ vi resources/matos.json
Make sure your matos.json
contains the proper values for kafkaBroker
, kafkaTopic
, kafkaPartition
, swiftAuthUrl
, swiftRegion
, swiftContainer
.
:~/matos$ ./rejar.sh
:~/matos$ wsk package create matos :~/matos$ wsk action create matos/load run/matos-load.jar :~/matos$ wsk action create matos/monitor run/matos-monitor.jar :~/matos$ wsk action create matos/batch run/matos-batch.jar :~/matos$ wsk package bind matos mymatos --param kafkaApiKey <API_KEY> --param swiftTenantId <TENANT_ID> --param swiftUserId <USER_ID> --param swiftPassword <PASSWORD>
:~/matos$ ./rejar.sh :~/matos$ ./rewhisk.sh
In order to work with the project in Eclipse, run gradle eclipse
to download dependencies and to configure the buildpath properly.
Notice that if you want to change some of the OpenWhisk parameters associated with mymatos, you currently need to specify them all in wsk package update mymatos ...
.
Run the following command in a separate window to continuously observe OpenWhisk logs:
:~/matos$ wsk activation poll
Load some messages into Message Hub (make sure you run the actions under the mymatos
namespace, where you defined all the credentials):
:~/matos$ wsk action invoke mymatos/load --blocking --result { "last": "1000" } :~/matos$ wsk action invoke mymatos/load --blocking --result { "last": "2000" }
Archive some messages to Object Storage (explicitly specifying offsets):
:~/matos$ wsk action invoke mymatos/batch --blocking --result --param kafkaStartOffset 0 --param kafkaEndOffset 1000 { "last": "[0..1000]" }
Load some more messages:
:~/matos$ wsk action invoke mymatos/load --blocking --result --param kafkaNumRecords 2000 { "last": "4000" }
Check offsets:
:~/matos$ wsk action invoke mymatos/monitor --blocking --result { "committed": "1000", "last": "4000" }
Archive all the pending messages to Object Storage:
:~/matos$ wsk action invoke mymatos/batch --blocking --result { "last": "[1000..4000]" }
Load more messages:
:~/matos$ wsk action invoke mymatos/load --blocking --result --param kafkaNumRecords 3000 { "last": "7000" } :~/matos$ wsk action invoke mymatos/load --blocking --result --param kafkaNumRecords 3000 { "last": "10000" }
Archive all the pending messages to Object Storage:
:~/matos$ wsk action invoke mymatos/batch --blocking --result { "last": "[4000..10000]" }
Notice that if your Kafka topic has multiple partitions, make sure that you specify the right partition when invoking batch
or monitor
actions (0 in the default configuration file).
At any point, you can access the Object Storage service instance to observe the files created for each batch.
Refer to DESIGN.md