blob: 3e6180974c60f9ecd6e6eebb4de8eae7b2e9751c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.stream.twitter;
import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import com.github.tomakehurst.wiremock.junit.WireMockClassRule;
import com.twitter.hbc.core.HttpHosts;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import twitter4j.Status;
import twitter4j.TwitterException;
import twitter4j.TwitterObjectFactory;
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
/**
* Test for {@link TwitterStreamer}. Tests Public Status streaming API https://dev.twitter.com/streaming/public.
*/
public class IgniteTwitterStreamerTest extends GridCommonAbstractTest {
/** Cache entries count. */
private static final int CACHE_ENTRY_COUNT = 100;
/** Mocked api in embedded server. */
private static final String MOCK_TWEET_PATH = "/tweet/mock";
/** Sample tweet. */
private static final String tweet = "{\"id\":647375831971590144,\"text\":\"sample tweet to test streamer\"}\n";
/** Constructor. */
public IgniteTwitterStreamerTest() {
super(true);
}
/**
* See <a href="http://wiremock.org/docs/junit-rule/">The JUnit 4.x Rule</a>.
*/
@ClassRule
public static WireMockClassRule wireMockClsRule = new WireMockClassRule(WireMockConfiguration.DYNAMIC_PORT);
/** Embedded mock HTTP server's for Twitter API rule. */
@Rule
public WireMockClassRule wireMockRule = wireMockClsRule;
/** Embedded mock HTTP server for Twitter API. */
public final WireMockServer mockSrv = new WireMockServer(); //Starts server on 8080 port.
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
return 10_000;
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName).setIncludeEventTypes(EventType.EVTS_ALL);
}
/** */
private void init() {
grid().getOrCreateCache(defaultCacheConfiguration());
mockSrv.start();
stubFor(get(urlMatching("/1.1" + MOCK_TWEET_PATH + ".*")).willReturn(aResponse().
withHeader("Content-Type", "text/plain").withBody(tweet.length() + "\n" + tweet)));
}
/** */
private void cleanup() {
stopAllGrids();
mockSrv.stop();
}
/**
* @throws Exception Test exception.
*/
@Test
public void testStatusesFilterEndpointOAuth1() throws Exception {
init();
try (IgniteDataStreamer<Long, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) {
TwitterStreamerImpl streamer = newStreamerInstance(dataStreamer);
Map<String, String> params = new HashMap<>();
params.put("track", "apache, twitter");
params.put("follow", "3004445758");// @ApacheIgnite id.
streamer.setApiParams(params);
streamer.setEndpointUrl(MOCK_TWEET_PATH);
streamer.setHosts(new HttpHosts("http://localhost:8080"));
streamer.setThreadsCount(8);
executeStreamer(streamer);
} finally {
cleanup();
}
}
/**
* @param streamer Twitter streamer.
* @throws InterruptedException Test exception.
* @throws TwitterException Test exception.
*/
private void executeStreamer(TwitterStreamer streamer) throws InterruptedException, TwitterException {
// Checking streaming.
CacheListener lsnr = subscribeToPutEvents();
streamer.start();
try {
streamer.start();
fail("Successful start of already started Twitter Streamer");
}
catch (IgniteException ignored) {
// No-op.
}
CountDownLatch latch = lsnr.getLatch();
// Enough tweets was handled in 10 seconds. Limited by test's timeout.
latch.await();
unsubscribeToPutEvents(lsnr);
streamer.stop();
try {
streamer.stop();
fail("Successful stop of already stopped Twitter Streamer");
}
catch (IgniteException ignored) {
// No-op.
}
// Checking cache content after streaming finished.
Status status = TwitterObjectFactory.createStatus(tweet);
IgniteCache<Long, String> cache = grid().cache(DEFAULT_CACHE_NAME);
String cachedVal = cache.get(status.getId());
// Tweet successfully put to cache.
assertTrue(cachedVal != null && cachedVal.equals(status.getText()));
// Same tweets does not produce duplicate entries.
assertEquals(1, cache.size());
}
/**
* @return Cache listener.
*/
private CacheListener subscribeToPutEvents() {
Ignite ignite = grid();
// Listen to cache PUT events and expect as many as messages as test data items.
CacheListener lsnr = new CacheListener();
ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME)).localListen(lsnr, EVT_CACHE_OBJECT_PUT);
return lsnr;
}
/**
* @param lsnr Cache listener.
*/
private void unsubscribeToPutEvents(CacheListener lsnr) {
Ignite ignite = grid();
ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME)).stopLocalListen(lsnr, EVT_CACHE_OBJECT_PUT);
}
/**
* @param dataStreamer Ignite Data Streamer.
* @return Twitter Streamer.
*/
private TwitterStreamerImpl newStreamerInstance(IgniteDataStreamer<Long, String> dataStreamer) {
OAuthSettings oAuthSettings = new OAuthSettings("<dummy>", "<dummy>", "<dummy>", "<dummy>");
TwitterStreamerImpl streamer = new TwitterStreamerImpl(oAuthSettings);
streamer.setIgnite(grid());
streamer.setStreamer(dataStreamer);
dataStreamer.allowOverwrite(true);
dataStreamer.autoFlushFrequency(10);
return streamer;
}
/**
* Listener.
*/
private static class CacheListener implements IgnitePredicate<CacheEvent> {
/** */
private final CountDownLatch latch = new CountDownLatch(CACHE_ENTRY_COUNT);
/**
* @return Latch.
*/
public CountDownLatch getLatch() {
return latch;
}
/**
* @param evt Cache Event.
* @return {@code true}.
*/
@Override public boolean apply(CacheEvent evt) {
latch.countDown();
return true;
}
}
}