| # |
| # Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| #agent1 on node1 |
| agent1.channels = ch1 |
| agent1.sources = netcatSource |
| agent1.sinks = dt |
| |
| # channels |
| agent1.channels.ch1.type = file |
| agent1.channels.ch1.capacity = 10000000 |
| agent1.channels.ch1.transactionCapacity = 10000 |
| agent1.channels.ch1.maxFileSize = 67108864 |
| |
| agent1.sources.netcatSource.type = exec |
| agent1.sources.netcatSource.channels = ch1 |
| agent1.sources.netcatSource.command = src/test/bash/subcat_periodically src/test/resources/test_data/dt_spend 10000 1 |
| # Pick and Reorder the columns we need from a larger record for efficiency |
| agent1.sources.netcatSource.interceptors = columnchooser |
| agent1.sources.netcatSource.interceptors.columnchooser.type = com.datatorrent.flume.interceptor.ColumnFilteringInterceptor$Builder |
| agent1.sources.netcatSource.interceptors.columnchooser.srcSeparator = 2 |
| agent1.sources.netcatSource.interceptors.columnchooser.dstSeparator = 1 |
| agent1.sources.netcatSource.interceptors.columnchooser.columns = 0 43 62 69 68 139 190 70 71 52 75 37 39 42 191 138 |
| |
| agent2.sources.netcatSource.interceptors.columnchooser.type = com.datatorrent.flume.interceptor.ColumnFilteringFormattingInterceptor$Builder |
| agent2.sources.netcatSource.interceptors.columnchooser.srcSeparator = 2 |
| agent2.sources.netcatSource.interceptors.columnchooser.columnsFormatter = {0}\u0001{43}\u0001{62}\u0001{69}\u0001{68}\u0001{139}\u0001{190}\u0001{70}\u0001{71}\u0001{52}\u0001{75}\u0001{37}\u0001{39}\u0001{42}\u0001{191}\u0001{138}\u0001 |
| |
| # index -- description -- type if different |
| # 0 Slice guid; // long |
| # 43 public long time // yyyy-MM-dd HH:mm:ss |
| # 62 public long adv_id; |
| # 69 public int cmp_type; // string |
| # 68 public long cmp_id; |
| # 139 public long line_id; |
| # 190 public long bslice_id; |
| # 70 public long ao_id; |
| # 71 public long creative_id; |
| # 52 public long algo_id; |
| # 75 public int device_model_id; // string |
| # 37 public long impressions; |
| # 39 public long clicks; |
| # 42 public double spend; |
| # 191 public double bonus_spend; |
| # 138 public double spend_local; |
| # |
| |
| # first sink - dt |
| agent1.sinks.dt.id = CEVL00P |
| agent1.sinks.dt.type = com.datatorrent.flume.sink.DTFlumeSink |
| agent1.sinks.dt.hostname = localhost |
| agent1.sinks.dt.port = 8080 |
| agent1.sinks.dt.sleepMillis = 7 |
| agent1.sinks.dt.throughputAdjustmentFactor = 2 |
| agent1.sinks.dt.maximumEventsPerTransaction = 5000 |
| agent1.sinks.dt.minimumEventsPerTransaction = 1 |
| |
| # Ensure that we do not lose the data handed over to us by flume. |
| agent1.sinks.dt.storage = com.datatorrent.flume.storage.HDFSStorage |
| agent1.sinks.dt.storage.restore = false |
| agent1.sinks.dt.storage.baseDir = /tmp/flume101 |
| agent1.sinks.dt.channel = ch1 |
| |
| # Ensure that we are able to detect flume sinks (and failures) automatically. |
| agent1.sinks.dt.discovery = com.datatorrent.flume.discovery.ZKAssistedDiscovery |
| agent1.sinks.dt.discovery.connectionString = 127.0.0.1:2181 |
| agent1.sinks.dt.discovery.basePath = /HelloDT |
| agent1.sinks.dt.discovery.connectionTimeoutMillis = 1000 |
| agent1.sinks.dt.discovery.connectionRetryCount = 10 |
| agent1.sinks.dt.discovery.connectionRetrySleepMillis = 500 |
| |