blob: 58a0cb8a5e225be64f85443206ee45a043225ea5 [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns org.apache.storm.trident.state-test
(:use [clojure test])
(:require [org.apache.storm [testing :as t]])
(:import [org.apache.storm.trident.operation.builtin Count])
(:import [org.apache.storm.trident.state OpaqueValue])
(:import [org.apache.storm.trident.state CombinerValueUpdater])
(:import [org.apache.storm.trident.topology.state TransactionalState TestTransactionalState])
(:import [org.apache.storm.trident.state.map TransactionalMap OpaqueMap])
(:import [org.apache.storm.trident.testing MemoryBackingMap MemoryMapState])
(:import [org.apache.storm.utils ZookeeperAuthInfo])
(:import [org.apache.curator.framework CuratorFramework])
(:import [org.apache.curator.framework.api CreateBuilder ProtectACLCreateModePathAndBytesable])
(:import [org.apache.zookeeper CreateMode ZooDefs ZooDefs$Ids])
(:import [org.mockito Matchers Mockito])
(:import [org.mockito.exceptions.base MockitoAssertionError])
(:use [org.apache.storm.trident testing])
(:use [org.apache.storm config util]))
(defn single-remove [map key]
(-> map (.multiRemove [[key]])))
(defn single-put [map key val]
(-> map (.multiPut [[key]] [val])))
(defn single-get [map key]
(-> map (.multiGet [[key]]) first))
(defn single-update [map key amt]
(-> map (.multiUpdate [[key]] [(CombinerValueUpdater. (Count.) amt)]) first))
(deftest test-opaque-value
(let [opqval (OpaqueValue. 8 "v1" "v0")
upval0 (.update opqval 8 "v2")
upval1 (.update opqval 9 "v2")
]
(is (= "v1" (.get opqval nil)))
(is (= "v1" (.get opqval 100)))
(is (= "v1" (.get opqval 9)))
(is (= "v0" (.get opqval 8)))
(let [has-exception (try
(.get opqval 7) false
(catch Exception e true))]
(is (= true has-exception)))
(is (= "v0" (.getPrev opqval)))
(is (= "v1" (.getCurr opqval)))
;; update with current
(is (= "v0" (.getPrev upval0)))
(is (= "v2" (.getCurr upval0)))
(not (identical? opqval upval0))
;; update
(is (= "v1" (.getPrev upval1)))
(is (= "v2" (.getCurr upval1)))
(not (identical? opqval upval1))
))
(deftest test-opaque-map
(let [map (OpaqueMap/build (MemoryBackingMap.))]
(.beginCommit map 1)
(is (= nil (single-get map "a")))
;; tests that intra-batch caching works
(is (= 1 (single-update map "a" 1)))
(is (= 1 (single-get map "a")))
(is (= 3 (single-update map "a" 2)))
(is (= 3 (single-get map "a")))
(.commit map 1)
(.beginCommit map 1)
(is (= nil (single-get map "a")))
(is (= 2 (single-update map "a" 2)))
(.commit map 1)
(.beginCommit map 2)
(is (= 2 (single-get map "a")))
(is (= 5 (single-update map "a" 3)))
(is (= 6 (single-update map "a" 1)))
(.commit map 2)
))
(deftest test-transactional-map
(let [map (TransactionalMap/build (MemoryBackingMap.))]
(.beginCommit map 1)
(is (= nil (single-get map "a")))
;; tests that intra-batch caching works
(is (= 1 (single-update map "a" 1)))
(is (= 3 (single-update map "a" 2)))
(.commit map 1)
(.beginCommit map 1)
(is (= 3 (single-get map "a")))
;; tests that intra-batch caching has no effect if it's the same commit as previous commit
(is (= 3 (single-update map "a" 1)))
(is (= 3 (single-update map "a" 2)))
(.commit map 1)
(.beginCommit map 2)
(is (= 3 (single-get map "a")))
(is (= 6 (single-update map "a" 3)))
(is (= 7 (single-update map "a" 1)))
(.commit map 2)
))
(deftest test-create-node-acl
(testing "Creates ZooKeeper nodes with the correct ACLs"
(let [curator (Mockito/mock CuratorFramework)
builder0 (Mockito/mock CreateBuilder)
builder1 (Mockito/mock ProtectACLCreateModePathAndBytesable)
expectedAcls ZooDefs$Ids/CREATOR_ALL_ACL]
(. (Mockito/when (.create curator)) (thenReturn builder0))
(. (Mockito/when (.creatingParentsIfNeeded builder0)) (thenReturn builder1))
(. (Mockito/when (.withMode builder1 (Matchers/isA CreateMode))) (thenReturn builder1))
(. (Mockito/when (.withACL builder1 (Mockito/anyList))) (thenReturn builder1))
(TestTransactionalState/createNode curator "" (byte-array 0) expectedAcls nil)
(is (nil?
(try
(. (Mockito/verify builder1) (withACL expectedAcls))
(catch MockitoAssertionError e
e)))))))
(deftest test-memory-map-state-remove
(let [map (MemoryMapState. (uuid))]
(.beginCommit map 1)
(single-put map "a" 1)
(single-put map "b" 2)
(.commit map 1)
(.beginCommit map 2)
(single-remove map "a")
(is (nil? (single-get map "a")))
(is (= 2 (single-get map "b")))
(.commit map 2)
(.beginCommit map 2)
(is (= 1 (single-get map "a")))
(is (= 2 (single-get map "b")))
(single-remove map "a")
(.commit map 2)
(.beginCommit map 3)
(is (nil? (single-get map "a")))
(is (= 2 (single-get map "b")))
(.commit map 3)
))