blob: 680f64c7dfe0ebebc7880ca836b6a442d6ed7459 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.nexmark.queries.sql;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
import org.apache.beam.sdk.nexmark.model.AuctionCount;
import org.apache.beam.sdk.nexmark.model.Event;
import org.apache.beam.sdk.nexmark.model.Event.Type;
import org.apache.beam.sdk.nexmark.model.sql.SelectEvent;
import org.apache.beam.sdk.nexmark.queries.NexmarkQueryTransform;
import org.apache.beam.sdk.nexmark.queries.NexmarkQueryUtil;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Joiner;
/**
* Query 5, 'Hot Items'. Which auctions have seen the most bids in the last hour (updated every
* minute). In CQL syntax:
*
* <pre>{@code
* SELECT Rstream(auction)
* FROM (SELECT B1.auction, count(*) AS num
* FROM Bid [RANGE 60 MINUTE SLIDE 1 MINUTE] B1
* GROUP BY B1.auction)
* WHERE num >= ALL (SELECT count(*)
* FROM Bid [RANGE 60 MINUTE SLIDE 1 MINUTE] B2
* GROUP BY B2.auction);
* }</pre>
*
* <p>To make things a bit more dynamic and easier to test we use much shorter windows, and we'll
* also preserve the bid counts.
*/
public class SqlQuery5 extends NexmarkQueryTransform<AuctionCount> {
private static final String QUERY_TEMPLATE =
Joiner.on("\n\t")
.join(
" SELECT AuctionBids.auction, AuctionBids.num",
" FROM (",
" SELECT",
" B1.auction,",
" count(*) AS num,",
" HOP_START(B1.dateTime, INTERVAL '%1$d' SECOND, INTERVAL '%2$d' SECOND) AS starttime",
" FROM Bid B1 ",
" GROUP BY ",
" B1.auction,",
" HOP(B1.dateTime, INTERVAL '%1$d' SECOND, INTERVAL '%2$d' SECOND)",
" ) AS AuctionBids",
" JOIN (",
" SELECT ",
" max(CountBids.num) AS maxnum, ",
" CountBids.starttime",
" FROM (",
" SELECT",
" count(*) AS num,",
" HOP_START(B2.dateTime, INTERVAL '%1$d' SECOND, INTERVAL '%2$d' SECOND) AS starttime",
" FROM Bid B2 ",
" GROUP BY ",
" B2.auction, ",
" HOP(B2.dateTime, INTERVAL '%1$d' SECOND, INTERVAL '%2$d' SECOND)",
" ) AS CountBids",
" GROUP BY CountBids.starttime",
" ) AS MaxBids ",
" ON AuctionBids.starttime = MaxBids.starttime AND AuctionBids.num >= MaxBids.maxnum ");
private final PTransform<PInput, PCollection<Row>> query;
public SqlQuery5(NexmarkConfiguration configuration) {
super("SqlQuery5");
String queryString =
String.format(QUERY_TEMPLATE, configuration.windowPeriodSec, configuration.windowSizeSec);
query = SqlTransform.query(queryString);
}
@Override
public PCollection<AuctionCount> expand(PCollection<Event> allEvents) {
PCollection<Row> bids =
allEvents
.apply(Filter.by(NexmarkQueryUtil.IS_BID))
.apply(getName() + ".SelectEvent", new SelectEvent(Type.BID));
return PCollectionTuple.of(new TupleTag<>("Bid"), bids)
.apply(query)
.apply(Convert.fromRows(AuctionCount.class));
}
}