blob: 484a4123e8fb438c5f97791d6988e96231a4feb3 [file] [log] [blame]
/**
* @file GetGPS.cpp
* GetGPS class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <vector>
#include <queue>
#include <map>
#include <set>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <time.h>
#include <sstream>
#include <stdio.h>
#include <string>
#include <iostream>
#include <dirent.h>
#include <limits.h>
#include <unistd.h>
#include <regex>
#include "GetGPS.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/PropertyValidation.h"
#define policy_t gps_policy_t
#include <libgpsmm.h>
#undef policy_t
#define policy_t ambiguous use gps_policy_t
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
const std::string GetGPS::ProcessorName("GetGPS");
core::Relationship GetGPS::Success("success", "All files are routed to success");
core::Property GetGPS::GPSDHost(core::PropertyBuilder::createProperty("GPSD Host")->withDescription("The host running the GPSD daemon")->withDefaultValue<std::string>("localhost")->build());
core::Property GetGPS::GPSDPort(
core::PropertyBuilder::createProperty("GPSD Port")->withDescription("The GPSD daemon port")->withDefaultValue<int64_t>(2947, core::StandardValidators::PORT_VALIDATOR())->build());
core::Property GetGPS::GPSDWaitTime(
core::PropertyBuilder::createProperty("GPSD Wait Time")->withDescription("Timeout value for waiting for data from the GPSD instance")->withDefaultValue<uint64_t>(50000000)->build());
void GetGPS::initialize() {
//! Set the supported properties
std::set<core::Property> properties;
properties.insert(GPSDHost);
properties.insert(GPSDPort);
properties.insert(GPSDWaitTime);
setSupportedProperties(properties);
//! Set the supported relationships
std::set<core::Relationship> relationships;
relationships.insert(Success);
setSupportedRelationships(relationships);
}
void GetGPS::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
std::string value;
if (context->getProperty(GPSDHost.getName(), value)) {
gpsdHost_ = value;
}
if (context->getProperty(GPSDPort.getName(), value)) {
gpsdPort_ = value;
}
if (context->getProperty(GPSDWaitTime.getName(), value)) {
core::Property::StringToInt(value, gpsdWaitTime_);
}
logger_->log_trace("GPSD client scheduled");
}
void GetGPS::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
try {
gpsmm gps_rec(gpsdHost_.c_str(), gpsdPort_.c_str());
if (gps_rec.stream(WATCH_ENABLE | WATCH_JSON) == NULL) {
logger_->log_error("No GPSD running.");
return;
}
while (isRunning()) {
struct gps_data_t* gpsdata;
if (!gps_rec.waiting(gpsdWaitTime_))
continue;
if ((gpsdata = gps_rec.read()) == NULL) {
logger_->log_error("Read error");
return;
} else {
if (gpsdata->status > 0) {
if (gpsdata->fix.longitude != gpsdata->fix.longitude || gpsdata->fix.altitude != gpsdata->fix.altitude) {
logger_->log_info("No GPS fix.");
continue;
}
logger_->log_debug("Longitude: %lf\nLatitude: %lf\nAltitude: %lf\nAccuracy: %lf\n\n", gpsdata->fix.latitude, gpsdata->fix.longitude, gpsdata->fix.altitude,
(gpsdata->fix.epx > gpsdata->fix.epy) ? gpsdata->fix.epx : gpsdata->fix.epy);
std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
if (flowFile == nullptr)
return;
flowFile->addAttribute("gps_mode", std::to_string(gpsdata->fix.mode));
flowFile->addAttribute("gps_ept", std::to_string(gpsdata->fix.ept));
flowFile->addAttribute("gps_latitude", std::to_string(gpsdata->fix.latitude));
flowFile->addAttribute("gps_epy", std::to_string(gpsdata->fix.epy));
flowFile->addAttribute("gps_longitude", std::to_string(gpsdata->fix.longitude));
flowFile->addAttribute("gps_epx", std::to_string(gpsdata->fix.epx));
flowFile->addAttribute("gps_altitude", std::to_string(gpsdata->fix.altitude));
flowFile->addAttribute("gps_epv", std::to_string(gpsdata->fix.epv));
flowFile->addAttribute("gps_track", std::to_string(gpsdata->fix.track));
flowFile->addAttribute("gps_epd", std::to_string(gpsdata->fix.epd));
flowFile->addAttribute("gps_speed", std::to_string(gpsdata->fix.speed));
flowFile->addAttribute("gps_eps", std::to_string(gpsdata->fix.eps));
flowFile->addAttribute("gps_climb", std::to_string(gpsdata->fix.climb));
flowFile->addAttribute("gps_epc", std::to_string(gpsdata->fix.epc));
// Calculated Accuracy value
flowFile->addAttribute("gps_accuracy", std::to_string((gpsdata->fix.epx > gpsdata->fix.epy) ? gpsdata->fix.epx : gpsdata->fix.epy));
session->transfer(flowFile, Success);
//Break the for(;;) waiting loop
break;
} else {
logger_->log_info("Satellite lock has not yet been acquired");
}
}
}
} catch (std::exception &exception) {
logger_->log_error("GetGPS Caught Exception %s", exception.what());
throw;
}
}
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */