blob: fe031954567ed98da59f1c380641d51b75f4b100 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with this
work for additional information regarding copyright ownership. The ASF
licenses this file to You under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
*/
package main
import (
"math"
"github.com/sirupsen/logrus"
"vermeer/apps/compute"
"vermeer/apps/options"
"vermeer/apps/serialize"
)
func init() {
_ = AlgoMaker
logrus.Infof("pagerank plugin init")
}
var AlgoMaker = PageRankMaker{}
type PageRankMaker struct {
}
func (prm *PageRankMaker) CreateWorkerComputer() compute.WorkerComputer {
return &PageRankWorker{}
}
func (prm *PageRankMaker) CreateMasterComputer() compute.MasterComputer {
return &PageRankMaster{}
}
func (prm *PageRankMaker) Name() string {
return "pagerank"
}
func (prm *PageRankMaker) DataNeeded() []string {
return []string{"use_out_degree"}
}
type PageRankWorker struct {
compute.WorkerComputerBase
damping serialize.SFloat32
initialRank serialize.SFloat32
danglingSumWorker serialize.SFloat32
danglingSum serialize.SFloat32
preDangling serialize.SFloat32
diffSum []serialize.SFloat32
newValues []serialize.SFloat32
oldValues []serialize.SFloat32
}
func (pgw *PageRankWorker) VertexValue(i uint32) serialize.MarshalAble {
return &pgw.newValues[i]
}
func (pgw *PageRankWorker) Init() error {
pgw.newValues = make([]serialize.SFloat32, pgw.WContext.GraphData.Vertex.TotalVertexCount())
pgw.oldValues = make([]serialize.SFloat32, pgw.WContext.GraphData.Vertex.TotalVertexCount())
pgw.diffSum = make([]serialize.SFloat32, pgw.WContext.Parallel)
initValue := 1.0 / serialize.SFloat32(pgw.WContext.GraphData.Vertex.TotalVertexCount())
for i := range pgw.oldValues {
pgw.oldValues[i] = initValue
}
pgw.damping = serialize.SFloat32(
options.GetFloat(pgw.WContext.Params, "pagerank.damping"))
pgw.initialRank = (1.0 - pgw.damping) / serialize.SFloat32(pgw.WContext.GraphData.Vertex.TotalVertexCount())
pgw.preDangling = pgw.damping / serialize.SFloat32(pgw.WContext.GraphData.Vertex.TotalVertexCount())
pgw.WContext.CreateValue("dangling_sum", compute.ValueTypeFloat32, compute.CValueActionAggregate)
pgw.WContext.SetValue("dangling_sum", serialize.SFloat32(0))
pgw.WContext.CreateValue("diff_sum", compute.ValueTypeFloat32, compute.CValueActionAggregate)
pgw.WContext.SetValue("diff_sum", serialize.SFloat32(0))
return nil
}
func (pgw *PageRankWorker) BeforeStep() {
for i := range pgw.diffSum {
pgw.diffSum[i] = 0
}
pgw.danglingSum = 0
for i := uint32(0); i < pgw.WContext.GraphData.Vertex.TotalVertexCount(); i++ {
if pgw.WContext.GraphData.Edges.GetOutDegree(i) == 0 {
pgw.danglingSum += pgw.oldValues[i]
}
}
}
func (pgw *PageRankWorker) Compute(vertexID uint32, pID int) {
newRank := serialize.SFloat32(0.0)
vertIdx := vertexID - pgw.WContext.GraphData.VertIDStart
inEdges := pgw.WContext.GraphData.Edges.GetInEdges(vertIdx)
for _, nID := range inEdges {
out := pgw.WContext.GraphData.Edges.GetOutDegree(uint32(nID))
newRank += pgw.oldValues[nID] / serialize.SFloat32(out)
}
newRank = pgw.initialRank + pgw.damping*newRank + pgw.preDangling*pgw.danglingSum
pgw.diffSum[pID] += serialize.SFloat32(math.Abs(float64(newRank - pgw.newValues[vertexID])))
pgw.newValues[vertexID] = newRank
}
func (pgw *PageRankWorker) AfterStep() {
pgw.WContext.SetValue("dangling_sum", pgw.danglingSumWorker)
//endIdx := pgw.WContext.GraphData.VertIDStart + pgw.WContext.GraphData.VertexCount
diffSum := serialize.SFloat32(0.0)
for _, v := range pgw.diffSum {
diffSum += v
}
//for i := pgw.WContext.GraphData.VertIDStart; i < endIdx; i++ {
// diffSum += serialize.SFloat32(math.Abs(float64(pgw.oldValues[i] - pgw.newValues[i])))
//}
pgw.WContext.SetValue("diff_sum", diffSum)
for i := range pgw.newValues {
pgw.oldValues[i] = pgw.newValues[i]
}
}
func (pgw *PageRankWorker) OutputValueType() string {
return "FLOAT"
}
type PageRankMaster struct {
compute.MasterComputerBase
diffThreshold serialize.SFloat32
}
func (pgm *PageRankMaster) Init() error {
pgm.diffThreshold = serialize.SFloat32(options.GetFloat(pgm.MContext.Params, "pagerank.diff_threshold"))
return nil
}
func (pgm *PageRankMaster) Compute() bool {
diffSum := pgm.MContext.GetValue("diff_sum").(serialize.SFloat32)
logrus.Infof("different sum: %f, threshold: %f", diffSum, pgm.diffThreshold)
if pgm.MContext.GetValue("diff_sum").(serialize.SFloat32) < pgm.diffThreshold {
return false
}
return true
}