blob: 3cf33cf25c6c66fc43aea4a700a0aeccacd54ebf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gora.store;
import static org.apache.gora.examples.WebPageDataCreator.ANCHORS;
import static org.apache.gora.examples.WebPageDataCreator.CONTENTS;
import static org.apache.gora.examples.WebPageDataCreator.LINKS;
import static org.apache.gora.examples.WebPageDataCreator.SORTED_URLS;
import static org.apache.gora.examples.WebPageDataCreator.URLS;
import static org.apache.gora.examples.WebPageDataCreator.URL_INDEXES;
import static org.apache.gora.examples.WebPageDataCreator.createWebPageData;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull;
import org.apache.avro.Schema.Field;
import org.apache.avro.util.Utf8;
import org.apache.gora.examples.WebPageDataCreator;
import org.apache.gora.examples.generated.Employee;
import org.apache.gora.examples.generated.Metadata;
import org.apache.gora.examples.generated.WebPage;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.persistency.impl.BeanFactoryImpl;
import org.apache.gora.query.PartitionQuery;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
import org.apache.gora.util.AvroUtils;
import org.apache.gora.util.ByteUtils;
import org.apache.gora.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test utilities for DataStores. This utility class provides everything
* necessary for convenience tests in {@link DataStoreTestBase} to execute cleanly.
* The tests begin in a fairly trivial fashion getting progressively
* more complex as we begin testing some more advanced features within the
* Gora API. In addition to this class, the first place to look API
* functionality is at the examples directories under various Gora modules.
* All the modules have a <gora-module>/src/examples/ directory under
* which some example classes can be found. Especially, there are some
* classes that are used for tests under <gora-core>/src/examples/
*/
public class DataStoreTestUtil {
private static final Logger LOG = LoggerFactory.getLogger(DataStoreTestUtil.class);
public static final long YEAR_IN_MS = 365L * 24L * 60L * 60L * 1000L;
private static final int NUM_KEYS = 4;
public static <K, T extends Persistent> void testNewPersistent(
DataStore<K,T> dataStore) throws Exception {
T obj1 = dataStore.newPersistent();
T obj2 = dataStore.newPersistent();
assertEquals(dataStore.getPersistentClass(),
obj1.getClass());
assertNotNull(obj1);
assertNotNull(obj2);
assertFalse( obj1 == obj2 );
}
public static <K> Employee createEmployee() throws Exception {
Employee employee = Employee.newBuilder().build();
employee.setName(new Utf8("Random Joe"));
employee.setDateOfBirth( System.currentTimeMillis() - 20L * YEAR_IN_MS );
employee.setSalary(100000);
employee.setSsn(new Utf8("101010101010"));
return employee;
}
private static <K> WebPage createWebPage() {
WebPage webpage = WebPage.newBuilder().build();
webpage.setUrl(new Utf8("url.."));
webpage.setContent(ByteBuffer.wrap("test content".getBytes(Charset.defaultCharset())));
webpage.setParsedContent(new ArrayList<CharSequence>());
Metadata metadata = Metadata.newBuilder().build();
webpage.setMetadata(metadata);
return webpage;
}
public static <K> Employee createBoss()
throws Exception {
Employee employee = Employee.newBuilder().build();
employee.setName(new Utf8("Random boss"));
employee.setDateOfBirth(System.currentTimeMillis() - 22L * YEAR_IN_MS);
employee.setSalary(1000000);
employee.setSsn(new Utf8("202020202020"));
return employee;
}
public static void testAutoCreateSchema(DataStore<String,Employee> dataStore)
throws Exception {
//should not throw exception
dataStore.put("foo", createEmployee());
}
public static void testCreateEmployeeSchema(DataStore<String, Employee> dataStore)
throws Exception {
dataStore.createSchema();
//should not throw exception
dataStore.createSchema();
}
public static void testTruncateSchema(DataStore<String, WebPage> dataStore)
throws Exception {
dataStore.createSchema();
WebPageDataCreator.createWebPageData(dataStore);
dataStore.truncateSchema();
assertEmptyResults(dataStore.newQuery());
}
public static void testDeleteSchema(DataStore<String, WebPage> dataStore)
throws Exception {
dataStore.createSchema();
WebPageDataCreator.createWebPageData(dataStore);
dataStore.deleteSchema();
dataStore.createSchema();
assertEmptyResults(dataStore.newQuery());
}
public static<K, T extends Persistent> void testSchemaExists(
DataStore<K, T> dataStore) throws Exception {
dataStore.createSchema();
assertTrue(dataStore.schemaExists());
dataStore.deleteSchema();
assertFalse(dataStore.schemaExists());
}
public static void testGetEmployee(DataStore<String, Employee> dataStore)
throws Exception {
dataStore.createSchema();
Employee employee = DataStoreTestUtil.createEmployee();
String ssn = employee.getSsn().toString();
dataStore.put(ssn, employee);
dataStore.flush();
Employee after = dataStore.get(ssn, AvroUtils.getSchemaFieldNames(Employee.SCHEMA$));
assertEqualEmployeeObjects(employee, after);
}
public static void testGetEmployeeRecursive(DataStore<String, Employee> dataStore)
throws Exception {
Employee employee = DataStoreTestUtil.createEmployee();
Employee boss = DataStoreTestUtil.createBoss();
employee.setBoss(boss);
String ssn = employee.getSsn().toString();
dataStore.put(ssn, employee);
dataStore.flush();
Employee after = dataStore.get(ssn, AvroUtils.getSchemaFieldNames(Employee.SCHEMA$));
assertEqualEmployeeObjects(employee, after);
}
public static void testGetEmployeeDoubleRecursive(DataStore<String, Employee> dataStore)
throws Exception {
Employee employee = DataStoreTestUtil.createEmployee();
Employee boss = DataStoreTestUtil.createBoss();
Employee uberBoss = DataStoreTestUtil.createBoss();
uberBoss.setName(new Utf8("Ãœberboss")) ;
boss.setBoss(uberBoss) ;
employee.setBoss(boss) ;
String ssn = employee.getSsn().toString();
dataStore.put(ssn, employee);
dataStore.flush();
Employee after = dataStore.get(ssn, AvroUtils.getSchemaFieldNames(Employee.SCHEMA$));
assertEqualEmployeeObjects(employee, after);
}
public static void testGetEmployeeNested(DataStore<String, Employee> dataStore)
throws Exception {
Employee employee = DataStoreTestUtil.createEmployee();
WebPage webpage = new BeanFactoryImpl<>(String.class,WebPage.class).newPersistent() ;
webpage.setUrl(new Utf8("url..")) ;
webpage.setContent(ByteBuffer.wrap("test content".getBytes(Charset.defaultCharset()))) ;
webpage.setParsedContent(new ArrayList<CharSequence>());
Metadata metadata = new BeanFactoryImpl<>(String.class,Metadata.class).newPersistent();
webpage.setMetadata(metadata) ;
employee.setWebpage(webpage) ;
String ssn = employee.getSsn().toString();
dataStore.put(ssn, employee);
dataStore.flush();
Employee after = dataStore.get(ssn, AvroUtils.getSchemaFieldNames(Employee.SCHEMA$));
assertEqualEmployeeObjects(employee, after);
assertEqualWebPageObjects(webpage, after.getWebpage());
}
public static void testGetEmployee3UnionField(DataStore<String, Employee> dataStore)
throws Exception {
Employee employee = DataStoreTestUtil.createEmployee();
employee.setBoss(new Utf8("Real boss")) ;
String ssn = employee.getSsn().toString();
dataStore.put(ssn, employee);
dataStore.flush();
Employee after = dataStore.get(ssn, AvroUtils.getSchemaFieldNames(Employee.SCHEMA$));
assertEqualEmployeeObjects(employee, after);
assertEquals("Real boss", after.getBoss().toString()) ;
}
public static void testGetEmployeeNonExisting(DataStore<String, Employee> dataStore)
throws Exception {
Employee employee = dataStore.get("_NON_EXISTING_SSN_FOR_EMPLOYEE_");
assertNull(employee);
}
public static void testGetEmployeeWithFields(DataStore<String, Employee> dataStore)
throws Exception {
Employee employee = DataStoreTestUtil.createEmployee();
WebPage webpage = createWebPage();
employee.setWebpage(webpage);
Employee boss = createBoss();
employee.setBoss(boss);
String ssn = employee.getSsn().toString();
dataStore.put(ssn, employee);
dataStore.flush();
String[] fields = AvroUtils.getPersistentFieldNames(employee);
for(Set<String> subset : StringUtils.powerset(fields)) {
if(subset.isEmpty())
continue;
Employee after = dataStore.get(ssn, subset.toArray(new String[subset.size()]));
Employee expected = Employee.newBuilder().build();
for(String field:subset) {
int index = expected.getSchema().getField(field).pos();
expected.put(index, employee.get(index));
}
assertEqualEmployeeObjects(expected, after);
}
}
/**
* Simple function which iterates through a before (put) and after (get) object
* in an attempt to verify if the same field's and values have been obtained.
* Within the original employee object we iterate from 1 instead of 0 due to the
* removal of the '__g__' field at position 0 when we put objects into the datastore.
* This field is used to identify whether fields within the object, and
* consequently the object itself, are/is dirty however this field is not
* required when persisting the object.
* We explicitly get values from each field as this makes it easier to debug
* if tests go wrong.
* @param employee
* @param after
*/
private static void assertEqualEmployeeObjects(Employee employee, Employee after) {
//for (int i = 1; i < employee.SCHEMA$.getFields().size(); i++) {
// for (int j = 1; j < after.SCHEMA$.getFields().size(); j++) {
// assertEquals(employee.SCHEMA$.getFields().get(i), after.SCHEMA$.getFields().get(j));
// }
//}
//check name field
CharSequence beforeName = employee.getName();
CharSequence afterName = after.getName();
assertEquals(beforeName, afterName);
//check dateOfBirth field
Long beforeDOB = employee.getDateOfBirth();
Long afterDOB = after.getDateOfBirth();
assertEquals(beforeDOB, afterDOB);
//check ssn field
CharSequence beforeSsn = employee.getSsn();
CharSequence afterSsn = after.getSsn();
assertEquals(beforeSsn, afterSsn);
//check salary field
Integer beforeSalary = employee.getSalary();
Integer afterSalary = after.getSalary();
assertEquals(beforeSalary, afterSalary);
//check boss field
if (employee.getBoss() != null) {
if (employee.getBoss() instanceof Utf8) {
String beforeBoss = employee.getBoss().toString();
String afterBoss = after.getBoss().toString();
assertEquals("Boss String field values in UNION should be the same",
beforeBoss, afterBoss);
} else {
Employee beforeBoss = (Employee) employee.getBoss();
Employee afterBoss = (Employee) after.getBoss();
assertEqualEmployeeObjects(beforeBoss, afterBoss);
}
}
//check webpage field
if (employee.getWebpage() != null) {
WebPage beforeWebPage = employee.getWebpage();
WebPage afterWebPage = after.getWebpage();
assertEqualWebPageObjects(beforeWebPage, afterWebPage);
}
}
/**
* Mimics {@link org.apache.gora.store.DataStoreTestUtil#assertEqualEmployeeObjects(Employee, Employee)}
* in that we pick our way through fields within before and after
* {@link org.apache.gora.examples.generated.WebPage} objects comparing field values.
* @param beforeWebPage
* @param afterWebPage
*/
private static void assertEqualWebPageObjects(WebPage beforeWebPage, WebPage afterWebPage) {
//check url field
CharSequence beforeUrl = beforeWebPage.getUrl();
CharSequence afterUrl = afterWebPage.getUrl();
assertEquals(beforeUrl, afterUrl);
//check content field
ByteBuffer beforeContent = beforeWebPage.getContent();
ByteBuffer afterContent = afterWebPage.getContent();
assertEquals(beforeContent, afterContent);
//check parsedContent field
List<CharSequence> beforeParsedContent =
beforeWebPage.getParsedContent();
List<CharSequence> afterParsedContent =
afterWebPage.getParsedContent();
assertEquals(beforeParsedContent, afterParsedContent);
//check outlinks field
Map<CharSequence, CharSequence> beforeOutlinks =
beforeWebPage.getOutlinks();
Map<CharSequence, CharSequence> afterOutlinks =
afterWebPage.getOutlinks();
assertEquals(beforeOutlinks, afterOutlinks);
//check metadata field
if (beforeWebPage.get(5) != null) {
Metadata beforeMetadata = beforeWebPage.getMetadata();
Metadata afterMetadata = afterWebPage.getMetadata();
assertEqualMetadataObjects(beforeMetadata, afterMetadata);
}
}
/**
* Mimics {@link org.apache.gora.store.DataStoreTestUtil#assertEqualEmployeeObjects(Employee, Employee)}
* in that we pick our way through fields within before and after
* {@link org.apache.gora.examples.generated.Metadata} objects comparing field values.
* @param beforeMetadata
* @param afterMetadata
*/
private static void assertEqualMetadataObjects(Metadata beforeMetadata, Metadata afterMetadata) {
//check version field
int beforeVersion = beforeMetadata.getVersion();
int afterVersion = afterMetadata.getVersion();
assertEquals(beforeVersion, afterVersion);
//check data field
Map<CharSequence, CharSequence> beforeData = beforeMetadata.getData();
Map<CharSequence, CharSequence> afterData = afterMetadata.getData();
assertEquals(beforeData, afterData);
}
public static Employee testPutEmployee(DataStore<String, Employee> dataStore)
throws Exception {
dataStore.createSchema();
Employee employee = DataStoreTestUtil.createEmployee();
return employee;
}
public static void testEmptyUpdateEmployee(DataStore<String, Employee> dataStore)
throws Exception {
dataStore.createSchema();
long ssn = 1234567890L;
String ssnStr = Long.toString(ssn);
long now = System.currentTimeMillis();
Employee employee = dataStore.newPersistent();
employee.setName(new Utf8("John Doe"));
employee.setDateOfBirth(now - 20L * YEAR_IN_MS);
employee.setSalary(100000);
employee.setSsn(new Utf8(ssnStr));
dataStore.put(employee.getSsn().toString(), employee);
dataStore.flush();
employee = dataStore.get(ssnStr);
dataStore.put(ssnStr, employee);
dataStore.flush();
employee = dataStore.newPersistent();
dataStore.put(Long.toString(ssn + 1), employee);
dataStore.flush();
employee = dataStore.get(Long.toString(ssn + 1));
assertNull(employee);
}
/**
* Here we create 5 {@link org.apache.gora.examples.generated.Employee} objects
* before populating fields with data and flushing them to the datastore.
* We then update the 1st of the {@link org.apache.gora.examples.generated.Employee}'s
* with more data and flush this data. Assertions are then made over the updated
* {@link org.apache.gora.examples.generated.Employee} object.
* @param dataStore
* @throws IOException
* @throws Exception
*/
public static void testUpdateEmployee(DataStore<String, Employee> dataStore)
throws Exception {
dataStore.createSchema();
long ssn = 1234567890L;
long now = System.currentTimeMillis();
for (int i = 0; i < 5; i++) {
Employee employee = Employee.newBuilder().build();
employee.setName(new Utf8("John Doe " + i));
employee.setDateOfBirth(now - 20L * YEAR_IN_MS);
employee.setSalary(100000);
employee.setSsn(new Utf8(Long.toString(ssn + i)));
dataStore.put(employee.getSsn().toString(), employee);
}
dataStore.flush();
for (int i = 0; i < 1; i++) {
Employee employee = Employee.newBuilder().build();
employee.setName(new Utf8("John Doe " + (i + 5)));
employee.setDateOfBirth(now - 18L * YEAR_IN_MS);
employee.setSalary(120000);
employee.setSsn(new Utf8(Long.toString(ssn + i)));
dataStore.put(employee.getSsn().toString(), employee);
}
dataStore.flush();
for (int i = 0; i < 1; i++) {
String key = Long.toString(ssn + i);
Employee employee = dataStore.get(key);
assertEquals(now - 18L * YEAR_IN_MS, employee.getDateOfBirth().longValue());
assertEquals("John Doe " + (i + 5), employee.getName().toString());
assertEquals(120000, employee.getSalary().intValue());
}
}
/**
* Here we create 7 {@link org.apache.gora.examples.generated.WebPage}
* objects and populate field data before flushing the objects to the
* datastore. We then get the objects, adding data to the 'content' and
* 'parsedContent' fields before clearing the 'outlinks' field and
* re-populating it. This data is then flushed to the datastore.
* Finally we get the {@link org.apache.gora.examples.generated.WebPage}
* objects and make various assertions over verious fields. This tests
* that we can update fields and that data can be written and read correctly.
* @param dataStore
* @throws IOException
* @throws Exception
*/
public static void testUpdateWebPagePutToArray(DataStore<String, WebPage> dataStore)
throws Exception {
dataStore.createSchema();
String[] urls = {"http://a.com/a", "http://b.com/b", "http://c.com/c",
"http://d.com/d", "http://e.com/e", "http://f.com/f", "http://g.com/g" };
String content = "content";
String parsedContent = "parsedContent";
int parsedContentCount = 0;
for (int i = 0; i < urls.length; i++) {
WebPage webPage = WebPage.newBuilder().build();
webPage.setUrl(new Utf8(urls[i]));
for (parsedContentCount = 0; parsedContentCount < 5; parsedContentCount++) {
webPage.getParsedContent().add(new Utf8(parsedContent + i + "," + parsedContentCount));
}
dataStore.put(webPage.getUrl().toString(), webPage);
}
dataStore.flush();
for (int i = 0; i < urls.length; i++) {
WebPage webPage = dataStore.get(urls[i]);
webPage.setContent(ByteBuffer.wrap(ByteUtils.toBytes(content + i)));
for (parsedContentCount = 5; parsedContentCount < 10; parsedContentCount++) {
webPage.getParsedContent().add(new Utf8(parsedContent + i + "," + parsedContentCount));
}
dataStore.put(webPage.getUrl().toString(), webPage);
}
dataStore.flush();
for (int i = 0; i < urls.length; i++) {
WebPage webPage = dataStore.get(urls[i]);
assertEquals(content + i, ByteUtils.toString( toByteArray(webPage.getContent()) ));
assertEquals(10, webPage.getParsedContent().size());
int j = 0;
for (CharSequence pc : webPage.getParsedContent()) {
assertEquals(parsedContent + i + "," + j, pc.toString());
j++;
}
}
}
public static void testUpdateWebPagePutToNotNullableMap(DataStore<String, WebPage> dataStore)
throws Exception {
dataStore.createSchema();
String[] urls = {"http://a.com/a", "http://b.com/b", "http://c.com/c",
"http://d.com/d", "http://e.com/e", "http://f.com/f", "http://g.com/g" };
String anchor = "anchor";
// putting evens
for (String url : urls) {
WebPage webPage = WebPage.newBuilder().build();
webPage.setUrl(new Utf8(url));
for (int j = 0; j < urls.length; j += 2) {
webPage.getOutlinks().put(new Utf8(anchor + j), new Utf8(urls[j]));
}
dataStore.put(webPage.getUrl().toString(), webPage);
}
dataStore.flush();
// putting odds
// for (String url : urls) {
// WebPage webPage = dataStore.get(url);
// webPage.getOutlinks().clear();
// for (int j = 1; j < urls.length; j += 2) {
// webPage.getOutlinks().put(new Utf8(anchor + j), new Utf8(urls[j]));
// }
// test for double put of same entries
// for (int j = 1; j < urls.length; j += 2) {
// webPage.getOutlinks().put(new Utf8(anchor + j), new Utf8(urls[j]));
// }
// dataStore.put(webPage.getUrl().toString(), webPage);
// }
dataStore.flush();
for (String url : urls) {
WebPage webPage = dataStore.get(url);
int count = 0;
for (int j = 0; j < urls.length; j += 2) {
CharSequence link = webPage.getOutlinks().get(new Utf8(anchor + j));
System.out.println("=====>" + webPage.toString());
// assertNotNull(link);
// assertEquals(urls[j], link.toString());
count++;
}
assertEquals(count, webPage.getOutlinks().size());
}
}
public static void testUpdateWebPagePutToNullableMap(DataStore<String, WebPage> dataStore)
throws Exception {
dataStore.createSchema();
String[] urls = {"http://a.com/a", "http://b.com/b", "http://c.com/c",
"http://d.com/d", "http://e.com/e", "http://f.com/f", "http://g.com/g" };
String header = "header";
String[] headers = { "firstHeader", "secondHeader", "thirdHeader",
"fourthHeader", "fifthHeader", "sixthHeader" };
for (String url : urls) {
WebPage webPage = WebPage.newBuilder().build();
webPage.setUrl(new Utf8(url));
//test put for nullable map field
// we put data to the 'headers' field which is a Map with default value of 'null'
webPage.setHeaders(new HashMap<CharSequence, CharSequence>());
for (int j = 0; j < headers.length; j += 2) {
webPage.getHeaders().put(new Utf8(header + j), new Utf8(headers[j]));
}
dataStore.put(webPage.getUrl().toString(), webPage);
}
dataStore.flush();
for (String url : urls) {
WebPage webPage = dataStore.get(url);
//webPage.getHeaders().clear(); //TODO clear method does not work
webPage.setHeaders(new HashMap<CharSequence, CharSequence>());
for (int j = 1; j < headers.length; j += 2) {
webPage.getHeaders().put(new Utf8(header + j), new Utf8(headers[j]));
}
dataStore.put(webPage.getUrl().toString(), webPage);
}
dataStore.flush();
for (String url : urls) {
WebPage webPage = dataStore.get(url);
int count = 0;
for (int j = 1; j < headers.length; j += 2) {
CharSequence headerSample = webPage.getHeaders().get(new Utf8(header + j));
assertNotNull(headerSample);
assertEquals(headers[j], headerSample.toString());
count++;
}
assertEquals(count, webPage.getHeaders().size());
}
}
public static void testUpdateWebPageRemoveMapEntry(DataStore<String, WebPage> dataStore)
throws Exception {
dataStore.createSchema();
String[] urls = {"http://a.com/a", "http://b.com/b", "http://c.com/c",
"http://d.com/d", "http://e.com/e", "http://f.com/f", "http://g.com/g" };
String anchor = "anchor";
for (String url : urls) {
WebPage webPage = WebPage.newBuilder().build();
webPage.setUrl(new Utf8(url));
for (int j = 0; j < urls.length; j++) {
webPage.getOutlinks().put(new Utf8(anchor + j), new Utf8(urls[j]));
}
dataStore.put(webPage.getUrl().toString(), webPage);
}
dataStore.flush();
// map entry removal test
for (String url : urls) {
WebPage webPage = dataStore.get(url);
for (int j = 1; j < urls.length; j += 2) {
webPage.getOutlinks().remove(new Utf8(anchor + j));
}
dataStore.put(webPage.getUrl().toString(), webPage);
}
dataStore.flush();
for (String url : urls) {
int count = 0;
WebPage webPage = dataStore.get(url);
for (int j = 1; j < urls.length; j += 2) {
CharSequence link = webPage.getOutlinks().get(new Utf8(anchor + j));
assertNull(link);
//assertEquals(urls[j], link.toString());
count++;
}
assertEquals(urls.length - count, webPage.getOutlinks().size());
}
}
public static void testUpdateWebPageRemoveField(DataStore<String, WebPage> dataStore)
throws Exception {
dataStore.createSchema();
String[] urls = {"http://a.com/a", "http://b.com/b", "http://c.com/c",
"http://d.com/d", "http://e.com/e", "http://f.com/f", "http://g.com/g" };
String header = "header";
String[] headers = { "firstHeader", "secondHeader", "thirdHeader",
"fourthHeader", "fifthHeader", "sixthHeader" };
for (String url : urls) {
WebPage webPage = WebPage.newBuilder().build();
webPage.setUrl(new Utf8(url));
webPage.setHeaders(new HashMap<CharSequence, CharSequence>());
for (int j = 0; j < headers.length; j++) {
webPage.getHeaders().put(new Utf8(header + j), new Utf8(headers[j]));
}
dataStore.put(webPage.getUrl().toString(), webPage);
}
dataStore.flush();
// nullable map field removal test
for (String url : urls) {
WebPage webPage = dataStore.get(url);
webPage.setHeaders(null);
dataStore.put(webPage.getUrl().toString(), webPage);
}
dataStore.flush();
for (String url : urls) {
WebPage webPage = dataStore.get(url);
assertNull(webPage.getHeaders());
}
}
public static void assertWebPage(WebPage page, int i) throws Exception{
assertNotNull(page);
assertEquals(URLS[i], page.getUrl().toString());
// 'content' is optional
if (page.getContent() != null) {
assertTrue("content error:" + new String( toByteArray(page.getContent()), Charset.defaultCharset() ) +
" actual=" + CONTENTS[i] + " i=" + i
, Arrays.equals( toByteArray(page.getContent() )
, CONTENTS[i].getBytes(Charset.defaultCharset())));
List<CharSequence> parsedContent = page.getParsedContent();
assertNotNull(parsedContent);
assertTrue(parsedContent.size() > 0);
int j=0;
String[] tokens = CONTENTS[i].split(" ");
for(CharSequence token : parsedContent) {
assertEquals(tokens[j++], token.toString());
}
} else {
// when page.getContent() is null
assertTrue(CONTENTS[i] == null) ;
List<CharSequence> parsedContent = page.getParsedContent();
assertNotNull(parsedContent);
assertTrue(parsedContent.size() == 0);
}
if(LINKS[i].length > 0) {
assertNotNull(page.getOutlinks());
assertTrue(page.getOutlinks().size() > 0);
for(int k=0; k<LINKS[i].length; k++) {
assertEquals(ANCHORS[i][k],
page.getOutlinks().get(new Utf8(URLS[LINKS[i][k]])).toString());
}
} else {
assertTrue(page.getOutlinks() == null || page.getOutlinks().isEmpty());
}
}
private static void testGetWebPage(DataStore<String, WebPage> store, String[] fields)
throws Exception {
createWebPageData(store);
for(int i=0; i<URLS.length; i++) {
WebPage page = store.get(URLS[i], fields);
assertWebPage(page, i);
}
}
public static void testGetWebPage(DataStore<String, WebPage> store) throws Exception {
testGetWebPage(store, getFields(WebPage.SCHEMA$.getFields()));
}
public static void testGetWebPageDefaultFields(DataStore<String, WebPage> store)
throws Exception {
testGetWebPage(store, null);
}
private static void testQueryWebPageSingleKey(DataStore<String, WebPage> store
, String[] fields) throws Exception {
createWebPageData(store);
for(int i=0; i<URLS.length; i++) {
Query<String, WebPage> query = store.newQuery();
query.setFields(fields);
query.setKey(URLS[i]);
Result<String, WebPage> result = query.execute();
assertTrue(result.next());
WebPage page = result.get();
assertWebPage(page, i);
assertFalse(result.next());
}
}
public static void testQueryWebPageSingleKey(DataStore<String, WebPage> store)
throws Exception {
testQueryWebPageSingleKey(store, getFields(WebPage.SCHEMA$.getFields()));
}
public static void testQueryWebPageSingleKeyDefaultFields(
DataStore<String, WebPage> store) throws Exception {
testQueryWebPageSingleKey(store, null);
}
public static void testQueryWebPageKeyRange(DataStore<String, WebPage> store,
boolean setStartKeys, boolean setEndKeys)
throws Exception {
createWebPageData(store);
//create sorted set of urls
List<String> sortedUrls = new ArrayList<>();
Collections.addAll(sortedUrls, URLS);
Collections.sort(sortedUrls);
//try all ranges
for(int i=0; i<sortedUrls.size(); i++) {
for(int j=i; j<sortedUrls.size(); j++) {
Query<String, WebPage> query = store.newQuery();
if(setStartKeys)
query.setStartKey(sortedUrls.get(i));
if(setEndKeys)
query.setEndKey(sortedUrls.get(j));
Result<String, WebPage> result = query.execute();
int r=0;
while(result.next()) {
WebPage page = result.get();
assertWebPage(page, URL_INDEXES.get(page.getUrl().toString()));
r++;
}
int expectedLength = (setEndKeys ? j+1: sortedUrls.size()) -
(setStartKeys ? i: 0);
assertEquals(expectedLength, r);
if(!setEndKeys)
break;
}
if(!setStartKeys)
break;
}
}
public static void testQueryWebPages(DataStore<String, WebPage> store)
throws Exception {
testQueryWebPageKeyRange(store, false, false);
}
public static void testQueryWebPageStartKey(DataStore<String, WebPage> store)
throws Exception {
testQueryWebPageKeyRange(store, true, false);
}
public static void testQueryWebPageEndKey(DataStore<String, WebPage> store)
throws Exception {
testQueryWebPageKeyRange(store, false, true);
}
public static void testQueryWebPageKeyRange(DataStore<String, WebPage> store)
throws Exception {
testQueryWebPageKeyRange(store, true, true);
}
public static void testQueryWebPageEmptyResults(DataStore<String, WebPage> store)
throws Exception {
createWebPageData(store);
//query empty results
Query<String, WebPage> query = store.newQuery();
query.setStartKey("aa");
query.setEndKey("ab");
assertEmptyResults(query);
//query empty results for one key
query = store.newQuery();
query.setKey("aa");
assertEmptyResults(query);
}
public static<K,T extends Persistent> void assertEmptyResults(Query<K, T> query)
throws Exception {
assertNumResults(query, 0);
}
public static<K,T extends Persistent> void assertNumResults(Query<K, T>query
, long numResults) throws Exception {
Result<K, T> result = query.execute();
int actualNumResults = 0;
while(result.next()) {
actualNumResults++;
}
result.close();
assertEquals(numResults, actualNumResults);
}
public static void testGetPartitions(DataStore<String, WebPage> store)
throws Exception {
createWebPageData(store);
testGetPartitions(store, store.newQuery());
}
public static void testGetPartitions(DataStore<String, WebPage> store
, Query<String, WebPage> query) throws Exception {
List<PartitionQuery<String, WebPage>> partitions = store.getPartitions(query);
assertNotNull(partitions);
assertTrue(partitions.size() > 0);
for(PartitionQuery<String, WebPage> partition:partitions) {
assertNotNull(partition);
}
assertPartitions(store, query, partitions);
}
public static void assertPartitions(DataStore<String, WebPage> store,
Query<String, WebPage> query, List<PartitionQuery<String,WebPage>> partitions)
throws Exception {
int count = 0, partitionsCount = 0;
Map<String, Integer> results = new HashMap<>();
Map<String, Integer> partitionResults = new HashMap<>();
//execute query and count results
Result<String, WebPage> result = store.execute(query);
assertNotNull(result);
while(result.next()) {
assertNotNull(result.getKey());
assertNotNull(result.get());
results.put(result.getKey(), result.get().hashCode()); //keys are not reused, so this is safe
count++;
}
result.close();
assertTrue(count > 0); //assert that results is not empty
assertEquals(count, results.size()); //assert that keys are unique
for(PartitionQuery<String, WebPage> partition:partitions) {
assertNotNull(partition);
result = store.execute(partition);
assertNotNull(result);
while(result.next()) {
assertNotNull(result.getKey());
assertNotNull(result.get());
partitionResults.put(result.getKey(), result.get().hashCode());
partitionsCount++;
}
result.close();
assertEquals(partitionsCount, partitionResults.size()); //assert that keys are unique
}
assertTrue(partitionsCount > 0);
assertEquals(count, partitionsCount);
for(Map.Entry<String, Integer> r : results.entrySet()) {
Integer p = partitionResults.get(r.getKey());
assertNotNull(p);
assertEquals(r.getValue(), p);
}
}
public static void testDelete(DataStore<String, WebPage> store) throws Exception {
WebPageDataCreator.createWebPageData(store);
//delete one by one
int deletedSoFar = 0;
for(String url : URLS) {
assertTrue(store.delete(url));
store.flush();
//assert that it is actually deleted
assertNull(store.get(url));
//assert that other records are not deleted
assertNumResults(store.newQuery(), URLS.length - ++deletedSoFar);
}
}
public static void testDeleteByQuery(DataStore<String, WebPage> store)
throws Exception {
Query<String, WebPage> query;
//test 1 - delete all
WebPageDataCreator.createWebPageData(store);
query = store.newQuery();
assertNumResults(store.newQuery(), URLS.length);
store.deleteByQuery(query);
store.flush();
assertEmptyResults(store.newQuery());
//test 2 - delete all
WebPageDataCreator.createWebPageData(store);
query = store.newQuery();
query.setFields(AvroUtils.getSchemaFieldNames(WebPage.SCHEMA$));
assertNumResults(store.newQuery(), URLS.length);
store.deleteByQuery(query);
store.flush();
assertEmptyResults(store.newQuery());
//test 3 - delete all
WebPageDataCreator.createWebPageData(store);
query = store.newQuery();
query.setKeyRange("a", "z"); //all start with "http://"
assertNumResults(store.newQuery(), URLS.length);
store.deleteByQuery(query);
store.flush();
assertEmptyResults(store.newQuery());
//test 4 - delete some
WebPageDataCreator.createWebPageData(store);
query = store.newQuery();
query.setEndKey(SORTED_URLS[NUM_KEYS]);
assertNumResults(store.newQuery(), URLS.length);
store.deleteByQuery(query);
store.flush();
assertNumResults(store.newQuery(), URLS.length - (NUM_KEYS+1));
store.truncateSchema();
}
public static void testDeleteByQueryFields(DataStore<String, WebPage> store)
throws Exception {
Query<String, WebPage> query;
//test 5 - delete all with some fields
WebPageDataCreator.createWebPageData(store);
query = store.newQuery();
query.setFields("outlinks"
, "parsedContent", "content");
assertNumResults(store.newQuery(), URLS.length);
store.deleteByQuery(query);
store.deleteByQuery(query);
store.deleteByQuery(query);//don't you love that HBase sometimes does not delete arbitrarily
store.flush();
assertNumResults(store.newQuery(), URLS.length);
//assert that data is deleted
for (String SORTED_URL : SORTED_URLS) {
WebPage page = store.get(SORTED_URL);
assertNotNull(page);
assertNotNull(page.getUrl());
assertEquals(page.getUrl().toString(), SORTED_URL);
assertEquals("Map of Outlinks should have a size of '0' as the deleteByQuery "
+ "not only removes the data but also the data structure.", 0, page.getOutlinks().size());
assertEquals(0, page.getParsedContent().size());
if (page.getContent() != null) {
LOG.info("url:" + page.getUrl().toString());
LOG.info("limit:" + page.getContent().limit());
} else {
assertNull(page.getContent());
}
}
//test 6 - delete some with some fields
WebPageDataCreator.createWebPageData(store);
query = store.newQuery();
query.setFields("url");
String startKey = SORTED_URLS[NUM_KEYS];
String endKey = SORTED_URLS[SORTED_URLS.length - NUM_KEYS];
query.setStartKey(startKey);
query.setEndKey(endKey);
assertNumResults(store.newQuery(), URLS.length);
store.deleteByQuery(query);
store.deleteByQuery(query);
store.deleteByQuery(query);//don't you love that HBase sometimes does not delete arbitrarily
assertNumResults(store.newQuery(), URLS.length);
//assert that data is deleted
for (int i = 0; i < URLS.length; i++) {
store.flush();
WebPage page = store.get(URLS[i]);
assertNotNull(page);
if( URLS[i].compareTo(startKey) < 0 || URLS[i].compareTo(endKey) >= 0) {
//not deleted
assertWebPage(page, i);
} else {
//deleted
assertNull(page.getUrl());
assertNotNull(page.getOutlinks());
assertNotNull(page.getParsedContent());
assertNotNull(page.getContent());
assertTrue(page.getOutlinks().size() > 0);
assertTrue(page.getParsedContent().size() > 0);
}
}
}
public static void testPutNested(DataStore<String, WebPage> store)
throws Exception {
String revUrl = "foo.com:http/";
String url = "http://foo.com/";
store.createSchema();
WebPage page = WebPage.newBuilder().build();
Metadata metadata = Metadata.newBuilder().build();
metadata.setVersion(1);
metadata.getData().put(new Utf8("foo"), new Utf8("baz"));
page.setMetadata(metadata);
page.setUrl(new Utf8(url));
store.put(revUrl, page);
store.flush();
page = store.get(revUrl);
metadata = page.getMetadata();
assertNotNull(metadata);
assertEquals(1, metadata.getVersion().intValue());
assertEquals(new Utf8("baz"), metadata.getData().get(new Utf8("foo")));
}
public static void testPutArray(DataStore<String, WebPage> store)
throws Exception {
store.createSchema();
WebPage page = WebPage.newBuilder().build();
String[] tokens = {"example", "content", "in", "example.com"};
page.setParsedContent(new ArrayList<CharSequence>());
for(String token: tokens) {
page.getParsedContent().add(new Utf8(token));
}
store.put("com.example/http", page);
store.close();
}
public static byte[] testPutBytes(DataStore<String, WebPage> store)
throws Exception {
store.createSchema();
WebPage page = WebPage.newBuilder().build();
page.setUrl(new Utf8("http://example.com"));
byte[] contentBytes = "example content in example.com".getBytes(Charset.defaultCharset());
ByteBuffer buff = ByteBuffer.wrap(contentBytes);
page.setContent(buff);
store.put("com.example/http", page);
store.close();
return contentBytes;
}
public static void testPutMap(DataStore<String, WebPage> store)
throws Exception {
store.createSchema();
WebPage page = WebPage.newBuilder().build();
page.setUrl(new Utf8("http://example.com"));
page.getOutlinks().put(new Utf8("http://example2.com"), new Utf8("anchor2"));
page.getOutlinks().put(new Utf8("http://example3.com"), new Utf8("anchor3"));
page.getOutlinks().put(new Utf8("http://example3.com"), new Utf8("anchor4"));
store.put("com.example/http", page);
store.close();
}
public static void testPutMixedMapTypes(DataStore<String, WebPage> store) {
WebPage webpage = createWebPage();
webpage.getByteData().put(new Utf8("byteData"), ByteBuffer.wrap(ByteUtils.toBytes("hello map")));
webpage.getStringData().put(new Utf8("stringData"), "hello map");
store.createSchema();
store.put(webpage.getUrl().toString(), webpage);
store.flush();
assertNotNull(store.get(webpage.getUrl().toString()));
}
private static byte[] toByteArray(ByteBuffer buffer) {
int p = buffer.position();
int n = buffer.limit() - p;
byte[] bytes = new byte[n];
for (int i = 0; i < n; i++) {
bytes[i] = buffer.get(p++);
}
return bytes;
}
public static String[] getFields(List<Field> schemaFields) {
List<Field> list = new ArrayList<>();
for (Field field : schemaFields) {
if (!Persistent.DIRTY_BYTES_FIELD_NAME.equalsIgnoreCase(field.name())) {
list.add(field);
}
}
schemaFields = list;
String[] fieldNames = new String[schemaFields.size()];
for(int i = 0; i<fieldNames.length; i++ ){
fieldNames[i] = schemaFields.get(i).name();
}
return fieldNames;
}
}