| /* |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| */ |
| package org.apache.plc4x; |
| |
| import org.apache.calcite.schema.Table; |
| import org.apache.calcite.schema.impl.AbstractSchema; |
| import org.apache.plc4x.java.api.exceptions.PlcRuntimeException; |
| import org.apache.plc4x.java.scraper.ResultHandler; |
| import org.apache.plc4x.java.scraper.Scraper; |
| import org.apache.plc4x.java.scraper.ScraperImpl; |
| import org.apache.plc4x.java.scraper.config.JobConfiguration; |
| import org.apache.plc4x.java.scraper.config.ScraperConfiguration; |
| import org.apache.plc4x.java.scraper.exception.ScraperException; |
| |
| import java.time.Instant; |
| import java.util.Map; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.stream.Collectors; |
| |
| public class Plc4xSchema extends AbstractSchema { |
| |
| protected final ScraperConfiguration configuration; |
| protected final Scraper scraper; |
| protected final QueueHandler handler; |
| protected final Map<String, BlockingQueue<Record>> queues; |
| protected final Map<String, Table> tableMap; |
| |
| public Plc4xSchema(ScraperConfiguration configuration, long tableCutoff) throws ScraperException { |
| this.configuration = configuration; |
| this.handler = new QueueHandler(); |
| this.scraper = new ScraperImpl(configuration, handler); |
| this.queues = configuration.getJobConfigurations().stream() |
| .collect(Collectors.toMap( |
| JobConfiguration::getName, |
| conf -> new ArrayBlockingQueue<Record>(1000) |
| )); |
| // Create the tables |
| this.tableMap = configuration.getJobConfigurations().stream() |
| .collect(Collectors.toMap( |
| JobConfiguration::getName, |
| conf -> defineTable(queues.get(conf.getName()), conf, tableCutoff) |
| )); |
| // Start the scraper |
| this.scraper.start(); |
| } |
| |
| Table defineTable(BlockingQueue<Record> queue, JobConfiguration configuration, Long limit) { |
| if (limit <= 0) { |
| return new Plc4xStreamTable(queue, configuration); |
| } else { |
| return new Plc4xTable(queue, configuration, limit); |
| } |
| } |
| |
| @Override |
| protected Map<String, Table> getTableMap() { |
| // Return a map of all jobs |
| return this.tableMap; |
| } |
| |
| public static class Record { |
| |
| public final Instant timestamp; |
| public final String source; |
| public final Map<String, Object> values; |
| |
| public Record(Instant timestamp, String source, Map<String, Object> values) { |
| this.timestamp = timestamp; |
| this.source = source; |
| this.values = values; |
| } |
| } |
| |
| class QueueHandler implements ResultHandler { |
| |
| @Override |
| public void handle(String job, String alias, Map<String, Object> results) { |
| try { |
| Record record = new Record(Instant.now(), alias, results); |
| queues.get(job).put(record); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new PlcRuntimeException("Handling got interrupted", e); |
| } |
| } |
| |
| } |
| } |