blob: 4663bca9c53d7f057bfc6ce98c1d4e3509e35b7d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// yatzy is an implementation of https://en.wikipedia.org/wiki/Yatzy that shows
// that pipeline construction is normal Go code. It can even be
// non-deterministic and produce different pipelines on each invocation.
package main
// beam-playground:
// name: Yatzy
// description: An examples shows that pipeline construction is normal Go code.
// It can even be non-deterministic and produce different pipelines on each invocation.
// multifile: false
// context_line: 133
// categories:
// - IO
// - Side Input
// complexity: ADVANCED
// tags:
// - pipeline
// - random
// - numbers
import (
"context"
"flag"
"fmt"
"math/rand"
"sort"
"time"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
var (
real = flag.Int("real_dice", 20, "Actual dice to use (cropped to formal).")
dice = flag.Int("dice", 6, "Formal dice to use.")
)
func init() {
register.Function1x1(incFn)
register.Function7x0(evalFn)
register.DoFn1x1[int, int](&minFn{})
}
// roll is a composite PTransform for a construction-time dice roll. The value
// is encoded in the shape of the pipeline, which will produce a single
// element of that value. The shape is as follows:
//
// 0 -> \x.x+1 -> \x.x+1 -> (N times) -> \x.min(x, 6)
//
// The single output will be a number between 1 and 6.
func roll(ctx context.Context, s beam.Scope) beam.PCollection {
num := rand.Intn(*real) + 1
log.Debugf(ctx, "Lucky number %v!", num)
s = s.Scope(fmt.Sprintf("roll[%v]", num))
col := beam.Create(s, 0)
for i := 0; i < num; i++ {
col = beam.ParDo(s, incFn, col)
}
return beam.ParDo(s, minFn{Num: *dice}, col)
}
// minFn is a DoFn that computes outputs the minimum of a fixed value
// and each incoming value.
type minFn struct {
Num int `json:"num"`
}
func (m minFn) ProcessElement(num int) int {
if m.Num < num {
return m.Num
}
return num
}
// incFn is a DoFn that increments each value by 1.
func incFn(num int) int {
return num + 1
}
func eq(n int, other ...int) bool {
for _, num := range other {
if num != n {
return false
}
}
return true
}
// evalFn is a DoFn that takes 5 dice rolls as singleton side inputs and
// evaluates them. It is triggered by an impulse, whose value is ignored.
// It does not output any value, but simply logs the result.
func evalFn(ctx context.Context, _ []byte, a, b, c, d, e int) {
r := []int{a, b, c, d, e}
sort.Ints(r)
log.Infof(ctx, "Roll: %v", r)
switch {
case eq(r[0], r[1], r[2], r[3], r[4]):
log.Info(ctx, "Yatzy!")
case eq(r[0], r[1], r[2], r[3]) || eq(r[1], r[2], r[3], r[4]):
log.Info(ctx, "Four of a kind!")
case eq(r[0], r[1], r[2]) && r[3] == r[4], r[0] == r[1] && eq(r[2], r[3], r[4]):
log.Info(ctx, "Full house!")
case r[0] == 1 && r[1] == 2 && r[2] == 3 && r[3] == 4 && r[4] == 5:
log.Info(ctx, "Small straight!")
case r[0] == 2 && r[1] == 3 && r[2] == 4 && r[3] == 5 && r[4] == 6:
log.Info(ctx, "Big straight!")
default:
log.Info(ctx, "Sorry, try again.")
}
}
func main() {
flag.Parse()
beam.Init()
rand.Seed(time.Now().UnixNano())
ctx := context.Background()
log.Info(ctx, "Running yatzy")
// Construct a construction-time-randomized pipeline.
p := beam.NewPipeline()
s := p.Root()
beam.ParDo0(s, evalFn, beam.Impulse(s),
beam.SideInput{Input: roll(ctx, s)},
beam.SideInput{Input: roll(ctx, s)},
beam.SideInput{Input: roll(ctx, s)},
beam.SideInput{Input: roll(ctx, s)},
beam.SideInput{Input: roll(ctx, s)},
)
if err := beamx.Run(context.Background(), p); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}