blob: 15d06c9ba27aa78870e6c4a167826b98364f8b95 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.streams.fullcontact
import java.io.BufferedInputStream
import java.io.BufferedOutputStream
import java.io.File
import java.io.FileInputStream
import java.io.FileOutputStream
import java.io.PrintStream
import java.util.Scanner
import com.typesafe.config.Config
import org.apache.streams.config.ComponentConfigurator
import org.apache.streams.config.StreamsConfigurator
import org.apache.streams.core.StreamsDatum
import org.apache.streams.core.StreamsProcessor
import org.apache.streams.fullcontact.api.EnrichPersonRequest
import org.apache.streams.fullcontact.config.FullContactConfiguration
import org.apache.streams.fullcontact.pojo.PersonSummary
import scala.collection.JavaConversions._
import scala.io.Source
import scala.util.Failure
import scala.util.Success
import scala.util.Try
/**
* Call enrich persons on a series of requests
*/
object PersonEnrichmentProcessor {
lazy val typesafe: Config = StreamsConfigurator.getConfig.getConfig("org.apache.streams.fullcontact.PersonEnrichmentProcessor")
/**
* To use from command line:
*
* <p/>
* Supply (at least) the following required configuration in application.conf:
*
* <p/>
* org.apache.streams.fullcontact.config.FullContactConfiguration.token = ""
*
* <p/>
* Launch syntax:
*
* <p/>
* java -cp streams-dist-jar-with-dependencies.jar -Dconfig.file=./application.conf org.apache.streams.fullcontact.provider.PersonEnrichmentProcessor
*
* <p/>
* Input to the process is:
* A file if application.conf contains an 'input' key
* A file if -Dinput= is specified
* stdin otherwise
*
* Output from the process is:
* A file if application.conf contains an 'input' key
* A file if -Doutput= is specified
* stdout otherwise
*
* @link org.apache.streams.fullcontact.api.EnrichPersonRequest
* @param args application.conf input.jsonl output.jsonl
* @throws Exception Exception
*/
@throws[Exception]
final def main(args: Array[String]): Unit = {
val inputStream = if (typesafe.hasPath("input")) {
new BufferedInputStream(new FileInputStream(new File(typesafe.getString("input"))))
} else System.in
val outputStream = if (typesafe.hasPath("output")) {
new PrintStream(new FileOutputStream(new File(typesafe.getString("output"))))
} else System.out
val input = Source.fromInputStream(inputStream)
val inputLines = input.getLines()
val inputDatums = inputLines.map(entry => new StreamsDatum(entry))
val outStream = new PrintStream(new BufferedOutputStream(outputStream))
val outputDatums = streamDatums(inputDatums)
val outputLines = outputDatums.map(_.getDocument().asInstanceOf[String])
for( line <- outputLines ) {
outStream.println(line)
}
outStream.flush()
outStream.close()
}
def stream( iter : Iterator[EnrichPersonRequest] )
( implicit processor : PersonEnrichment = FullContact.getInstance() ) : Iterator[PersonSummary] = {
iter.map( item => processor.enrichPerson(item) )
}
def streamDatums( iter : Iterator[StreamsDatum] )
( implicit processor : PersonEnrichmentProcessor = new PersonEnrichmentProcessor() ) : Iterator[StreamsDatum] = {
iter.flatMap( item => processor.process(item) )
}
def processor : Iterator[EnrichPersonRequest] => Iterator[PersonSummary] = {
stream(_)
}
}
class PersonEnrichmentProcessor(config : FullContactConfiguration = new ComponentConfigurator[FullContactConfiguration](classOf[FullContactConfiguration]).detectConfiguration())
extends StreamsProcessor with Serializable {
var personEnrichment = FullContact.getInstance(config)
/**
* Process/Analyze the {@link org.apache.streams.core.StreamsDatum} and return the the StreamsDatums that will
* passed to every down stream operation that reads from this processor.
*
* @param entry StreamsDatum to be processed
* @return resulting StreamDatums from processing. Should never be null or contain null object. Empty list OK.
*/
override def process(entry: StreamsDatum): java.util.List[StreamsDatum] = {
val request : EnrichPersonRequest = {
entry.getDocument match {
case _ : EnrichPersonRequest => entry.getDocument.asInstanceOf[EnrichPersonRequest]
case _ : String => personEnrichment.parser.parse(entry.getDocument, classOf[EnrichPersonRequest])
case _ => throw new Exception("invalid input type")
}
}
val attempt = Try(personEnrichment.enrichPerson(request))
attempt match {
case Success(_ : PersonSummary) => List(new StreamsDatum(personEnrichment.serializer.serialize(attempt.get)))
case Failure(_) => List()
}
}
/**
* Each operation must publish an identifier.
*/
override def getId: String = "PersonEnrichmentProcessor"
/**
* This method will be called after initialization/serialization. Initialize any non-serializable objects here.
*
* @param configurationObject Any object to help intialize the operation. ie. Map, JobContext, Properties, etc. The type
* will be based on where the operation is being run (ie. hadoop, storm, locally, etc.)
*/
override def prepare(configurationObject: Any): Unit = {
personEnrichment = FullContact.getInstance(configurationObject.asInstanceOf[FullContactConfiguration])
}
/**
* No guarantee that this method will ever be called. But upon shutdown of the stream, an attempt to call this method
* will be made.
* Use this method to terminate connections, etc.
*/
override def cleanUp(): Unit = {
personEnrichment.restClient.close()
personEnrichment = null
}
}