blob: 7fbca66c9a9f95f915a3076a2c6213ed98775880 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pirk.responder.wideskies.storm;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.pirk.query.wideskies.Query;
import org.apache.pirk.schema.data.DataSchemaLoader;
import org.apache.pirk.schema.query.QuerySchemaLoader;
import org.apache.pirk.serialization.HadoopFileSystemStore;
import org.apache.pirk.serialization.LocalFileSystemStore;
import org.apache.pirk.utils.SystemConfiguration;
import org.apache.storm.Constants;
import org.apache.storm.tuple.Tuple;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.Map;
/**
* Utils class for the Storm implementation of Wideskies
*/
public class StormUtils
{
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(StormUtils.class);
/**
* Method to read in serialized Query object from the given queryFile
*
* @param useHdfs
* @param hdfsUri
* @param queryFile
* @return
*/
public static Query getQuery(boolean useHdfs, String hdfsUri, String queryFile)
{
Query query = null;
try
{
if (useHdfs)
{
FileSystem fs = FileSystem.get(URI.create(hdfsUri), new Configuration());
logger.info("reading query file from hdfs: " + queryFile);
query = (new HadoopFileSystemStore(fs)).recall(queryFile, Query.class);
}
else
{
logger.info("reading local query file from " + queryFile);
query = (new LocalFileSystemStore()).recall(queryFile, Query.class);
}
} catch (Exception e)
{
logger.error("Unable to initalize query info.", e);
throw new RuntimeException(e);
}
return query;
}
/**
* Method to read in and return a serialized Query object from the given file and initialize/load the query.schemas and data.schemas
*
* @param map
* @return
*/
public static Query prepareQuery(Map map)
{
Query query = null;
boolean useHdfs = (boolean) map.get(StormConstants.USE_HDFS);
String hdfsUri = (String) map.get(StormConstants.HDFS_URI_KEY);
String queryFile = (String) map.get(StormConstants.QUERY_FILE_KEY);
try
{
query = StormUtils.getQuery(useHdfs, hdfsUri, queryFile);
} catch (Exception e)
{
logger.warn("Unable to initialize query info.", e);
}
return query;
}
/***
* Initialize data and query schema. Conf requires values for USE_HDFS, HDFS_URI_KEY, DSCHEMA_KEY, and QSCHEMA_KEY
*
* @param conf
*/
public static void initializeSchemas(Map conf, String id)
{
SystemConfiguration.setProperty("data.schemas", (String) conf.get(StormConstants.DSCHEMA_KEY));
SystemConfiguration.setProperty("query.schemas", (String) conf.get(StormConstants.QSCHEMA_KEY));
try
{
boolean hdfs = (boolean) conf.get(StormConstants.USE_HDFS);
if (hdfs)
{
String hdfsUri = (String) conf.get(StormConstants.HDFS_URI_KEY);
FileSystem fs = FileSystem.get(URI.create(hdfsUri), new Configuration());
DataSchemaLoader.initialize(true, fs);
QuerySchemaLoader.initialize(true, fs);
}
else
{
DataSchemaLoader.initialize();
QuerySchemaLoader.initialize();
}
} catch (Exception e)
{
logger.error("Failed to initialize schema files.", e);
throw new RuntimeException(e);
}
}
protected static boolean isTickTuple(Tuple tuple)
{
return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
}
}