blob: 04805669e92d9d06bbdb67af17d4ca44abfac553 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This Python file uses the following encoding: utf-8
import storm
from random import choice
from time import sleep
from uuid import uuid4
words = [u"nathan", u"mike", u"jackson", u"golda", u"bertels人"]
class TesterSpout(storm.Spout):
def initialize(self, conf, context):
storm.log('spout initializing')
self.pending = {}
self.count = 0
def nextTuple(self):
if self.count < 2:
word = choice(words)
id = str(uuid4())
self.pending[id] = word
storm.rpcMetrics("my-custom-shellspout-metric", 1)
self.count = self.count + 1
storm.log("TesterSpout update my-custom-shellspout-metric "+str(self.count))
storm.emit([word], id=id)
def ack(self, id):
del self.pending[id]
def fail(self, id):
storm.log("emitting " + self.pending[id] + " on fail")
storm.emit([self.pending[id]], id=id)
TesterSpout().run()