blob: c892c533dac4957533f79c5984909cce327edc73 [file] [log] [blame]
#
# Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#agent1 on node1
agent1.channels = ch1
agent1.sources = netcatSource
agent1.sinks = dt
# channels
agent1.channels.ch1.type = file
agent1.channels.ch1.capacity = 10000000
agent1.channels.ch1.transactionCapacity = 10000
agent1.channels.ch1.maxFileSize = 67108864
agent1.sources.netcatSource.type = exec
agent1.sources.netcatSource.channels = ch1
agent1.sources.netcatSource.command = src/test/bash/subcat_periodically src/test/resources/test_data/dt_spend 10000 1
# Pick and Reorder the columns we need from a larger record for efficiency
agent1.sources.netcatSource.interceptors = columnchooser
agent1.sources.netcatSource.interceptors.columnchooser.type = com.datatorrent.flume.interceptor.ColumnFilteringInterceptor$Builder
agent1.sources.netcatSource.interceptors.columnchooser.srcSeparator = 2
agent1.sources.netcatSource.interceptors.columnchooser.dstSeparator = 1
agent1.sources.netcatSource.interceptors.columnchooser.columns = 0 43 62 69 68 139 190 70 71 52 75 37 39 42 191 138
agent2.sources.netcatSource.interceptors.columnchooser.type = com.datatorrent.flume.interceptor.ColumnFilteringFormattingInterceptor$Builder
agent2.sources.netcatSource.interceptors.columnchooser.srcSeparator = 2
agent2.sources.netcatSource.interceptors.columnchooser.columnsFormatter = {0}\u0001{43}\u0001{62}\u0001{69}\u0001{68}\u0001{139}\u0001{190}\u0001{70}\u0001{71}\u0001{52}\u0001{75}\u0001{37}\u0001{39}\u0001{42}\u0001{191}\u0001{138}\u0001
# index -- description -- type if different
# 0 Slice guid; // long
# 43 public long time // yyyy-MM-dd HH:mm:ss
# 62 public long adv_id;
# 69 public int cmp_type; // string
# 68 public long cmp_id;
# 139 public long line_id;
# 190 public long bslice_id;
# 70 public long ao_id;
# 71 public long creative_id;
# 52 public long algo_id;
# 75 public int device_model_id; // string
# 37 public long impressions;
# 39 public long clicks;
# 42 public double spend;
# 191 public double bonus_spend;
# 138 public double spend_local;
#
# first sink - dt
agent1.sinks.dt.id = CEVL00P
agent1.sinks.dt.type = com.datatorrent.flume.sink.DTFlumeSink
agent1.sinks.dt.hostname = localhost
agent1.sinks.dt.port = 8080
agent1.sinks.dt.sleepMillis = 7
agent1.sinks.dt.throughputAdjustmentFactor = 2
agent1.sinks.dt.maximumEventsPerTransaction = 5000
agent1.sinks.dt.minimumEventsPerTransaction = 1
# Ensure that we do not lose the data handed over to us by flume.
agent1.sinks.dt.storage = com.datatorrent.flume.storage.HDFSStorage
agent1.sinks.dt.storage.restore = false
agent1.sinks.dt.storage.baseDir = /tmp/flume101
agent1.sinks.dt.channel = ch1
# Ensure that we are able to detect flume sinks (and failures) automatically.
agent1.sinks.dt.discovery = com.datatorrent.flume.discovery.ZKAssistedDiscovery
agent1.sinks.dt.discovery.connectionString = 127.0.0.1:2181
agent1.sinks.dt.discovery.basePath = /HelloDT
agent1.sinks.dt.discovery.connectionTimeoutMillis = 1000
agent1.sinks.dt.discovery.connectionRetryCount = 10
agent1.sinks.dt.discovery.connectionRetrySleepMillis = 500