blob: 1fbd06092d0eaf3665479df96f76ed5fe15cfdba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.nexmark.queries;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
import org.apache.beam.sdk.nexmark.model.Auction;
import org.apache.beam.sdk.nexmark.model.AuctionBid;
import org.apache.beam.sdk.nexmark.model.Bid;
import org.apache.beam.sdk.nexmark.model.Event;
import org.apache.beam.sdk.nexmark.model.SellerPrice;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.joda.time.Duration;
/**
* Query 6, 'Average Selling Price by Seller'. Select the average selling price over the last 10
* closed auctions by the same seller. In CQL syntax:
*
* <pre>{@code
* SELECT Istream(AVG(Q.final), Q.seller)
* FROM (SELECT Rstream(MAX(B.price) AS final, A.seller)
* FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED]
* WHERE A.id=B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME
* GROUP BY A.id, A.seller) [PARTITION BY A.seller ROWS 10] Q
* GROUP BY Q.seller;
* }</pre>
*
* <p>We are a little more exact with selecting winning bids: see {@link WinningBids}.
*/
public class Query6 extends NexmarkQueryTransform<SellerPrice> {
private final NexmarkConfiguration configuration;
public Query6(NexmarkConfiguration configuration) {
super("Query6");
this.configuration = configuration;
}
/**
* Combiner to keep track of up to {@code maxNumBids} of the most recent wining bids and calculate
* their average selling price.
*/
private static class MovingMeanSellingPrice extends Combine.CombineFn<Bid, List<Bid>, Long> {
private final int maxNumBids;
public MovingMeanSellingPrice(int maxNumBids) {
this.maxNumBids = maxNumBids;
}
@Override
public List<Bid> createAccumulator() {
return new ArrayList<>();
}
@Override
public List<Bid> addInput(List<Bid> accumulator, Bid input) {
accumulator.add(input);
accumulator.sort(Bid.ASCENDING_TIME_THEN_PRICE);
if (accumulator.size() > maxNumBids) {
accumulator.remove(0);
}
return accumulator;
}
@Override
public List<Bid> mergeAccumulators(Iterable<List<Bid>> accumulators) {
List<Bid> result = new ArrayList<>();
for (List<Bid> accumulator : accumulators) {
result.addAll(accumulator);
}
result.sort(Bid.ASCENDING_TIME_THEN_PRICE);
if (result.size() > maxNumBids) {
result = Lists.newArrayList(result.listIterator(result.size() - maxNumBids));
}
return result;
}
@Override
public Long extractOutput(List<Bid> accumulator) {
if (accumulator.isEmpty()) {
return 0L;
}
long sumOfPrice = 0;
for (Bid bid : accumulator) {
sumOfPrice += bid.price;
}
return Math.round((double) sumOfPrice / accumulator.size());
}
}
@Override
public PCollection<SellerPrice> expand(PCollection<Event> events) {
return events
.apply(Filter.by(new AuctionOrBid()))
// Find the winning bid for each closed auction.
.apply(new WinningBids(name + ".WinningBids", configuration))
// Key the winning bid by the seller id.
.apply(
name + ".Rekey",
ParDo.of(
new DoFn<AuctionBid, KV<Long, Bid>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Auction auction = c.element().auction;
Bid bid = c.element().bid;
c.output(KV.of(auction.seller, bid));
}
}))
// Re-window to update on every wining bid.
.apply(
Window.<KV<Long, Bid>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.ZERO))
// Find the average of last 10 winning bids for each seller.
.apply(Combine.perKey(new MovingMeanSellingPrice(10)))
// Project into our datatype.
.apply(
name + ".Select",
ParDo.of(
new DoFn<KV<Long, Long>, SellerPrice>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(new SellerPrice(c.element().getKey(), c.element().getValue()));
}
}));
}
}