blob: 219847c1acb882a6c387888e44b3cd5069632a25 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"name": "import_movielens",
"source": [
"name": "movies",
"inputs": [],
"type": "file",
"options": {
"paths": "file://[=WORKING_DIR]/movielens/input/movies.csv",
"format": "csv",
"columns": "movieId,title,genres"
"name": "ratings",
"inputs": [],
"type": "file",
"options": {
"paths": "file://[=WORKING_DIR]/movielens/input/ratings.csv",
"format": "csv",
"columns": "userId,movieId,rating,timestamp"
"name": "tags",
"inputs": [],
"type": "file",
"options": {
"paths": "file://[=WORKING_DIR]/movielens/input/tags.csv",
"format": "csv",
"columns": "userId,movieId,tag,timestamp"
"process": [
"name": "vertex_movie",
"inputs": [
"type": "sql",
"options": {
"sql": "SELECT \n(unix_timestamp() * 1000) as timestamp, \n'v' as elem, \nCAST(movieId AS INT) AS id, \n'movielens' as service, \n'Movie' as column, \nto_json(\nnamed_struct(\n 'title', title, \n 'genres', genres\n)\n) as props \nFROM movies \nWHERE movieId != 'movieId'"
"name": "edge_rated",
"inputs": [
"type": "sql",
"options": {
"sql": "SELECT \nCAST(timestamp AS LONG) * 1000 AS timestamp, \n'e' as elem, \nCAST(userId AS INT) as `from`, \nCAST(movieId AS INT) as to, \n'rated' as label, \nto_json(\nnamed_struct(\n 'score', CAST(rating as float)\n)\n) as props \nFROM ratings \nWHERE userId != 'userId'"
"name": "edge_tagged",
"inputs": [
"type": "sql",
"options": {
"sql": "SELECT \nCAST(timestamp AS LONG) * 1000 AS timestamp, \n'e' as elem, \nCAST(userId AS INT) as `from`, \nCAST(movieId AS INT) as to, \n'tagged' as label, \nto_json(\nnamed_struct('tag', tag)\n) as props \nFROM tags \nWHERE userId != 'userId'"
"name": "edges",
"inputs": [
"type": "sql",
"options": {
"sql": "SELECT * FROM edge_rated UNION SELECT * FROM edge_tagged"
"name": "build_als_input",
"inputs": [
"type": "sql",
"options": {
"sql": "SELECT \n`from` as userId, `to` as movieId, 1.0 as rating FROM edge_rated"
"name": "factorize_rating",
"inputs": [
"type": "custom",
"options": {
"class": "org.apache.s2graph.s2jobs.task.custom.process.ALSModelProcess",
"rank": "10",
"maxIter": "5",
"regParam": "0.01",
"userCol": "userId",
"itemCol": "movieId",
"ratingCol": "rating"
"sink": [
"name": "vertex_sink",
"inputs": [
"type": "s2graph",
"options": {
"index.provider.lucene.fsType": "file",
"index.provider.base.dir": "/tmp/lucene",
"db.default.url": "jdbc:h2:tcp://localhost/./var/metastore;MODE=MYSQL",
"s2.spark.sql.streaming.sink.grouped.size": "10",
"s2.spark.sql.streaming.sink.wait.time": "10",
"cache.ttl.seconds": "600",
"name": "edge_sink",
"inputs": [
"type": "s2graph",
"options": {
"db.default.url": "jdbc:h2:tcp://localhost/./var/metastore;MODE=MYSQL",
"s2.spark.sql.streaming.sink.grouped.size": "10",
"s2.spark.sql.streaming.sink.wait.time": "10",
"cache.ttl.seconds": "600",
"name": "als_sink",
"inputs": [
"type": "file",
"options": {
"path": "/tmp/als_item",
"format": "json"
"name": "annoy_index_build",
"inputs": [
"type": "custom",
"options": {
"class": "org.apache.s2graph.s2jobs.task.custom.sink.AnnoyIndexBuildSink",
"itemFactors": "/tmp/itemFactors",
"path": "/tmp/annoy_result"