blob: 929ff09531d7185b8527da1d2ac13960552f8ead [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.streams.fullcontact
import java.io.InputStream
import java.io.OutputStream
import java.io.{BufferedInputStream, File, FileInputStream, FileOutputStream, PrintStream}
import java.util.Scanner
import java.util.concurrent.Callable
import com.google.common.base.Preconditions
import com.typesafe.config.Config
import org.apache.juneau.json.JsonParser
import org.apache.streams.config.StreamsConfigurator
import org.apache.streams.fullcontact.FullContactSocialGraph.FullContactSocialGraphStats
import org.apache.streams.fullcontact.FullContactSocialGraph.typesafe
import org.apache.streams.fullcontact.pojo.PersonSummary
import org.apache.streams.fullcontact.util.FullContactUtils
import scala.io.Source
import scala.util.Failure
import scala.util.Success
import scala.util.Try
/**
* Produce an activity streams 2.0 social graph from full contact response data file.
*/
object FullContactSocialGraph {
lazy val typesafe: Config = StreamsConfigurator.getConfig.getConfig("org.apache.streams.fullcontact.FullContactSocialGraph")
/**
* To use from command line:
*
* <p/>
* java -cp streams-dist-jar-with-dependencies.jar -Dconfig.file=application.conf org.apache.streams.fullcontact.SocialGraphCli
*
* <p/>
* Input stream should contain a series of json-serialized `PersonSummary` objects.
*
* <p/>
* Output stream will contain a TTL-serialized social graph.
*
* <p/>
* Input to the process is:
* A file if application.conf contains an 'input' key
* A file if -Dinput= is specified
* stdin otherwise
*
* Output from the process is:
* A file if application.conf contains an 'input' key
* A file if -Doutput= is specified
* stdout otherwise
*
* @link org.apache.streams.fullcontact.FullContactSocialGraph
* @throws Exception Exception
*/
@throws[Exception]
final def main(args: Array[String]): Unit = {
val inputStream: InputStream = if (typesafe.hasPath("input")) {
new FileInputStream(new File(typesafe.getString("input")))
} else System.in
val outputStream: OutputStream = if (typesafe.hasPath("output")) {
new FileOutputStream(new File(typesafe.getString("output")))
} else System.out
val job = Try(new FullContactSocialGraph(inputStream, outputStream))
job match {
case Success(_) => return job.get
case Failure(t : Throwable) => throw new Exception(t)
}
}
case class FullContactSocialGraphStats(
inputLines : Int,
personSummaries : Int,
allOrganizations : Int,
allInterestItems : Int,
uniqueInterests : Int,
topicHierarchy : Int,
allProfiles : Int,
allProfileRelationships : Int,
allEmploymentItems : Int,
uniqueEmployers : Int,
allUrlRelationships : Int,
allImageRelationships : Int
)
}
class FullContactSocialGraph(in: InputStream, out: OutputStream ) extends Callable[FullContactSocialGraphStats] {
val inputStream: BufferedInputStream = new BufferedInputStream(in)
val outputStream: PrintStream = new PrintStream(out)
override def call() : FullContactSocialGraphStats = {
val input = Source.fromInputStream (inputStream)
val inputLines = input.getLines()
// sequence of all PersonSummary
val personSummaries = inputLines.map (JsonParser.DEFAULT.parse (_, classOf[PersonSummary] ) ).toSeq
// PersonSummary derived sequences
val allOrganizations = FullContactUtils.allOrganizationItems (personSummaries.toIterator).toSeq
val allInterestItems = FullContactUtils.allInterestItems (personSummaries.toIterator).toSeq
val uniqueInterests = FullContactUtils.uniqueInterests (allInterestItems.toIterator).toSeq
val topicHierarchy = FullContactUtils.topicHierarchy (allInterestItems.toIterator).toSeq
val allProfiles = FullContactUtils.allProfiles (personSummaries.toIterator).seq
val allProfileRelationships = FullContactUtils.allProfileRelationships (personSummaries.toIterator).seq
val allEmploymentItems = FullContactUtils.allEmploymentItems (personSummaries.toIterator).seq
val uniqueEmployers = FullContactUtils.uniqueEmployers (allEmploymentItems).seq
val allUrlRelationships = FullContactUtils.allUrlRelationships (personSummaries.toIterator).seq
val allImageRelationships = FullContactUtils.allImageRelationships (personSummaries.toIterator).seq
personSummaries.flatMap (FullContactUtils.safe_personSummaryAsTurtle).foreach (outputStream.println (_) )
allOrganizations.flatMap (FullContactUtils.safe_organizationAsTurtle).foreach (outputStream.println (_) )
uniqueInterests.flatMap (FullContactUtils.safe_interestTopicAsTurtle).foreach (outputStream.println (_) )
topicHierarchy.map (FullContactUtils.topicRelationshipAsTurtle).foreach (outputStream.println (_) )
allUrlRelationships.flatMap (FullContactUtils.safe_urlRelationshipAsTurtle).foreach (outputStream.println (_) )
allImageRelationships.flatMap (FullContactUtils.safe_imageRelationshipAsTurtle).foreach (outputStream.println (_) )
allProfiles.flatMap (FullContactUtils.safe_profileAsTurtle).foreach (outputStream.println (_) )
allProfileRelationships.flatMap (FullContactUtils.safe_personProfileRelationshipAsTurtle).foreach (outputStream.println (_) )
FullContactSocialGraphStats(
inputLines = inputLines.size,
personSummaries = personSummaries.size,
allOrganizations = allOrganizations.size,
allInterestItems = allInterestItems.size,
uniqueInterests = uniqueInterests.size,
topicHierarchy = topicHierarchy.size,
allProfiles = allProfiles.size,
allProfileRelationships = allProfileRelationships.size,
allEmploymentItems = allEmploymentItems.size,
uniqueEmployers = uniqueEmployers.size,
allUrlRelationships = allUrlRelationships.size,
allImageRelationships = allImageRelationships.size
)
}
}