initial commit

This commit is contained in:
djmil 2025-09-23 22:23:47 +02:00
parent 33789128f1
commit 28a7c67aa3
34 changed files with 1526 additions and 4 deletions

2
.gitignore vendored
View File

@ -44,4 +44,4 @@ install_manifest.txt
compile_commands.json
CTestTestfile.cmake
_deps
build/*

20
Dockerfile Normal file
View File

@ -0,0 +1,20 @@
FROM debian:latest
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
git curl ca-certificates \
build-essential \
tar zip unzip pkg-config \
gdb \
cmake \
protobuf-compiler libprotobuf-dev \
&& rm -rf /var/lib/apt/lists/* \
&& mkdir /workspace
#RUN git clone --depth 1 --branch 2024.02.14 https://github.com/Microsoft/vcpkg.git /opt/vcpkg \
# && cd /opt/vcpkg \
# && ./bootstrap-vcpkg.sh \
# && ./vcpkg integrate install \
# && ./vcpkg install catch2
WORKDIR /workspace

View File

@ -1,6 +1,6 @@
MIT NON-AI License
Copyright (c) 2024, Andriy Djmil
Copyright (c) 2025, Andriy Djmil
Permission is hereby granted, free of charge, to any person obtaining a copy of the software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions.

111
README.md
View File

@ -1,3 +1,110 @@
# telemetry
# Telemetry
Simple library for capturing runtime analytic events from embedded device
A simple yet functional library for capturing runtime analytic events from embedded devices.
# Build & run
`Dockerfile` with two helper scripts were added into project's root folder. Buy doing so, two useful goals were achieved:
- **Infrastructure as a code**:
All project dependancies and as well as all installation / configuration steps are easily documented as groups of handy scripts inside `Dockerfile`.
- **Containerization**:
Almost instant ability to jump into app development, testing and/or deployment with zero footprint (pollution) on main (host) PC.
## IDE
[VsCode](https://code.visualstudio.com/) IDE was used for in-container development. There is two different ways how one might approach this task:
- [Attach](https://code.visualstudio.com/docs/devcontainers/attach-container) VsCode to already [[#Build & run|running]] container
- Manually [install](https://github.com/ahmadnassri/docker-vscode-server/blob/master/Dockerfile) VsCode IDE into container and use [X11 window forwarding](https://goteleport.com/blog/x11-forwarding/), to automagically *spawn* VsCode IDE window on host PC
# Design goals
- agnostic to payload format
- adequate runtime overhead
- non intrusive / easy to use
# Protobuf v3
Generally speaking, it is impossible to predict what kind of information would posses the most value in the future. Thus communication protocols tend to evolve with time. Not all customers are willing to update their devices on demand. As a result of such forces of nature, it is unavoidable that even devices of a same model would send Telemetry messages of different format. Endpoin servers (collectors) shall be capable of effective handling of such situation. [Protobuf](https://protobuf.dev/overview/) is well known industry solution for such problems.
A known limitation of `Protobuf` library - is an inability to effectively store multiple message in file in serial aka `one-by-one` fashion. In order to overcome such limitation, our solution makes use of simple Length-Delimited (a simplified version of [TLV](https://en.wikipedia.org/wiki/Type%E2%80%93length%E2%80%93value) format) file encoding. Where essentially first two bytes of each serialized message are used to store message length.
```mermaid
flowchart TB
Length1
Message1
Length2
Message2
Length3
Message3
```
# Architecture
`Datapoint` aka *analytic event* representation - is a handy wrapper / utility class, aimed to ease usage of somewhat bloated autogenerated `protobuf` classes.
```mermaid
flowchart LR
subgraph .proto
AnalyticsEvent --- M{Message}
TemperatureReading --- M
ShutdownReason --- M
etc.. --- M
end
D(Datapoint) -. parse .-> M
M -. make .-> D
```
Than, a `Sink` instance shall be used to establish flow from runtime memory of an captured `Datapoints` into it's serialized form on disk. To `capture()` some `Datapoint`, an instance of `Writer` class shall be used. Each `Writer` instance is linked to it's `Sink`-paren class. `Writer` is a movable class that implements a simple, polymorphic, buffered and thread safe API for `Datapoints` (events) capturing.
```mermaid
flowchart TB
subgraph "runtime (orbiter)"
DP1(Datapoint 1) -.capture.- W1(Writer 1)
DP2(Datapoint 2) -.capture.- W1
DP3(Datapoint 3) -.capture.- W2(Writer 2)
DP4(Datapoint 4) -.capture.- W2
W1 --- S(Sink)
W3(Writer n) --- S
W2 --- S
end
S(Sink) ---> DB[(File)]
DB[(File)] ---> R(Reader)
subgraph "runtime (server)"
R(Reader) -.parse.-> D1(Datapoint 1)
R(Reader) -.parse.-> D2(Datapoint 2)
R(Reader) -.parse.-> D3(Datapoint 3)
R(Reader) -.parse.-> D4(Datapoint 4)
end
```
`Reader` class shall be used in order to deserialize `Datapoints` from file. All `Datapoints` will be read in one-by-one fashion.
# Tests and diskussion
Tests for the project designed in such a way, so they can be used as a case study of an API usage as well as a way to ensure code quality.
## Datapoint
Shows basic API use case. As well as provides a way to reliably discriminate between multiple types of `events`.
## Serial IO
Cowers a case when all events being captured by single `reader` in serial fashion:
> write - write - write - read - read - read
Also shows a way to check if there is some data to read from file.
## Mixed IO
Slightly more complex case, where writing *into* and reading *from* file done in mixed order:
> write - read - write - write - read - read
## Buferization
Writing data to disk (even SSD) is notoriously slow operation. Storing several messages in RAM and then writing all of them on disk as single batch - is a way to speed things up.
> [!NOTE] Trade off
> In case of sudden power los - all cached data (aka not stored to disk) will be irretrievably lost
## Multithreading
Yes, all `Writer` instances are thread safe. Altho, so further improvement can be made here. See comments in `telemetry\sink.cpp Sink::Writer::capture()`.
## Shared
`RAII` and `std::shared_pointer` is exactly that type of magic, that provides ease of mind. No need to worry about dangling pointers, leaky memory and problems alike.

4
docker-build.sh Executable file
View File

@ -0,0 +1,4 @@
#!/usr/bin/env bash
set -euxo pipefail
docker build -t telemetry-ide:latest .

7
docker-run.sh Executable file
View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
set -euxo pipefail
docker run \
--name telemetry \
-v $(pwd)/src:/workspace \
-it telemetry-ide:latest

7
src/.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,7 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": []
}

74
src/.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,74 @@
{
"C_Cpp.default.compilerPath": "/usr/bin/g++",
"files.associations": {
"*.tcc": "cpp",
"fstream": "cpp",
"iosfwd": "cpp",
"istream": "cpp",
"limits": "cpp",
"sstream": "cpp",
"stop_token": "cpp",
"streambuf": "cpp",
"any": "cpp",
"array": "cpp",
"atomic": "cpp",
"bit": "cpp",
"cctype": "cpp",
"chrono": "cpp",
"clocale": "cpp",
"cmath": "cpp",
"compare": "cpp",
"concepts": "cpp",
"condition_variable": "cpp",
"cstdarg": "cpp",
"cstddef": "cpp",
"cstdint": "cpp",
"cstdio": "cpp",
"cstdlib": "cpp",
"cstring": "cpp",
"ctime": "cpp",
"cwchar": "cpp",
"cwctype": "cpp",
"deque": "cpp",
"forward_list": "cpp",
"list": "cpp",
"map": "cpp",
"set": "cpp",
"unordered_map": "cpp",
"unordered_set": "cpp",
"vector": "cpp",
"exception": "cpp",
"algorithm": "cpp",
"functional": "cpp",
"iterator": "cpp",
"memory": "cpp",
"memory_resource": "cpp",
"numeric": "cpp",
"optional": "cpp",
"random": "cpp",
"ratio": "cpp",
"string": "cpp",
"string_view": "cpp",
"system_error": "cpp",
"tuple": "cpp",
"type_traits": "cpp",
"utility": "cpp",
"initializer_list": "cpp",
"iomanip": "cpp",
"iostream": "cpp",
"mutex": "cpp",
"new": "cpp",
"ostream": "cpp",
"ranges": "cpp",
"stdexcept": "cpp",
"thread": "cpp",
"typeinfo": "cpp",
"variant": "cpp",
"codecvt": "cpp",
"numbers": "cpp",
"semaphore": "cpp",
"*.inc": "cpp",
"csignal": "cpp",
"shared_mutex": "cpp"
}
}

28
src/.vscode/tasks.json vendored Normal file
View File

@ -0,0 +1,28 @@
{
"tasks": [
{
"type": "cppbuild",
"label": "C/C++: g++-12 build active file",
"command": "/usr/bin/g++-12",
"args": [
"-fdiagnostics-color=always",
"-g",
"${file}",
"-o",
"${fileDirname}/${fileBasenameNoExtension}"
],
"options": {
"cwd": "${fileDirname}"
},
"problemMatcher": [
"$gcc"
],
"group": {
"kind": "build",
"isDefault": true
},
"detail": "Task generated by Debugger."
}
],
"version": "2.0.0"
}

66
src/CMakeLists.txt Normal file
View File

@ -0,0 +1,66 @@
# preamble
cmake_minimum_required(VERSION 3.14)
project(telemetry VERSION 0.1)
set(CMAKE_CXX_STANDARD 20)
# add_compile_options(-fdiagnostics-color) # does not goes well with default VsCode console
add_compile_options(-pedantic)
add_compile_options(-Wall)
add_compile_options(-Wextra)
add_compile_options(-Wpedantic)
add_compile_options(-Wno-unused-local-typedefs)
include_directories(${PROJECT_SOURCE_DIR})
## Libraries
add_library(datapoint
datapoint/analytics_event.cpp
datapoint/temperature_reading.cpp
)
add_library(mock_log_sources
demo/mocks/mock_temperature_source.cpp
demo/mocks/mock_analytics_source.cpp
)
add_subdirectory(telemetry)
# Device executable (simulation)
add_executable(device
demo/main.cpp
demo/device.cpp
)
target_link_libraries(device
PRIVATE
mock_log_sources
datapoint
telemetry
)
# Download googletest
include(FetchContent)
FetchContent_Declare(
googletest
URL https://github.com/google/googletest/archive/refs/tags/release-1.12.1.zip
)
# For Windows: Prevent overriding the parent project's compiler/linker settings
set(gtest_force_shared_crt ON CACHE BOOL "" FORCE)
FetchContent_MakeAvailable(googletest)
# Add tests
enable_testing()
add_executable(test_temperature_reading
tests/data/test_temperature_reading.cpp
tests/data/test_telemetry.cpp
)
target_link_libraries(test_temperature_reading
GTest::gtest_main
datapoint
telemetry
)
include(GoogleTest)
gtest_discover_tests(test_temperature_reading)

View File

@ -0,0 +1,44 @@
#include <sstream>
#include "analytics_event.hpp"
namespace djm::device {
std::string AnalyticsEvent::fmtDebug() const {
std::stringstream stream;
stream << timestamp.time_since_epoch().count() << " ";
switch (type) {
case Type::AcousticImagingReady:
stream << "AcousticImagingReady";
break;
case Type::SdCardFormatted:
stream << "SdCardFormatted";
break;
case Type::ExceptionThrown:
stream << "ExceptionThrown";
break;
case Type::ImageSaved:
stream << "ImageSaved";
break;
case Type::VideoSaved:
stream << "VideoSaved";
break;
case Type::ImagingFrequencyChanged:
stream << "ImagingFrequencyChanged";
break;
}
if (exceptionWhat) {
stream << " " << *exceptionWhat;
}
if (newFrequency) {
stream << " " << *newFrequency;
}
return stream.str();
}
} // namespace djm::device

View File

@ -0,0 +1,33 @@
#pragma once
#include <chrono>
#include <optional>
#include <string>
namespace djm::device {
struct AnalyticsEvent {
enum class Type {
AcousticImagingReady,
SdCardFormatted,
ExceptionThrown,
ImageSaved,
VideoSaved,
ImagingFrequencyChanged,
};
Type type;
/** sender-side timestamp */
std::chrono::system_clock::time_point timestamp;
/** set if type == ExceptionThrown */
std::optional<std::string> exceptionWhat;
/** set if type == ImagingFrequencyChanged */
std::optional<float> newFrequency;
/** Meant for debugging only: Format this event as a human readable text */
std::string fmtDebug() const;
};
} // namespace djm::device

View File

@ -0,0 +1,39 @@
#include <sstream>
#include "temperature_reading.hpp"
namespace djm::device {
std::string TemperatureReading::fmtDebug() const {
std::stringstream stream;
bool needspace = false;
if (cpu) {
needspace = true;
stream << "cpu: " << *cpu;
}
if (mainboard) {
if (needspace) {
stream << " ";
}
needspace = true;
stream << "mainboard: " << *mainboard;
}
if (mics) {
if (needspace) {
stream << " ";
}
needspace = true;
stream << "mics: " << *mics;
}
if (opticalCamera) {
if (needspace) {
stream << " ";
}
stream << "opticalCamera: " << *opticalCamera;
}
return stream.str();
}
} // namespace djm::device

View File

@ -0,0 +1,20 @@
#pragma once
#include <chrono>
#include <optional>
#include <string>
namespace djm::device {
struct TemperatureReading {
/* All temperatures are read as degree Celsius */
std::optional<float> cpu;
std::optional<float> mainboard;
std::optional<float> mics;
std::optional<float> opticalCamera;
/* Meant for debugging only: Format this reading as a human readable text */
std::string fmtDebug() const;
};
} // namespace djm::device

63
src/demo/device.cpp Normal file
View File

@ -0,0 +1,63 @@
#include <cstdlib>
#include <iostream>
#include <thread>
#include <csignal>
#include "device.hpp"
volatile std::sig_atomic_t gSignum;
void signal_callback_handler(int signum) {
gSignum = signum;
}
namespace djm::device {
Device::Device(std::unique_ptr<AnalyticsSource> analyticsSource_,
std::unique_ptr<TemperatureSource> temperatureSource_)
: analyticsSource(std::move(analyticsSource_)),
temperatureSource(std::move(temperatureSource_)) {}
int Device::run(telemetry::Sink::Writer&& writer)
{
using std::literals::chrono_literals::operator""ms;
static bool signal_callback_handler_installed = false;
if (!signal_callback_handler_installed) {
std::signal(SIGTERM, signal_callback_handler); // systemd shutdown request
std::signal(SIGINT, signal_callback_handler); // ctrl-c
}
while (!gSignum) {
auto start = std::chrono::system_clock::now();
auto tempReading = temperatureSource->read(100ms);
if (tempReading) {
std::cout << "read temperatures: " << tempReading->fmtDebug()
<< std::endl;
writer.capture(tempReading.value());
} else {
std::cout << "failed to read temperature sensors" << std::endl;
}
auto analyticsEvent = analyticsSource->await(250ms);
if (analyticsEvent) {
std::cout << "got analytics event: " << analyticsEvent->fmtDebug()
<< std::endl;
writer.capture(analyticsEvent.value());
} else {
std::cout << "no analytics event available" << std::endl;
}
auto delta = std::chrono::system_clock::now() - start;
std::this_thread::sleep_for(1000ms - delta);
}
std::string shutdown_reason("gracefull shutdown");
std::cout <<std::endl <<"[device] shutdown_reason: " <<shutdown_reason <<std::endl;
writer.capture(shutdown_reason);
return EXIT_SUCCESS;
}
} // namespace djm::device

23
src/demo/device.hpp Normal file
View File

@ -0,0 +1,23 @@
#pragma once
#include <memory>
#include "interfaces/analytics_source.hpp"
#include "interfaces/temperature_source.hpp"
#include "telemetry/sink.hpp"
namespace djm::device {
class Device {
public:
Device(std::unique_ptr<AnalyticsSource> analyticsSource_, std::unique_ptr<TemperatureSource> temperatureSource_);
/* will collect data forever */
int run(telemetry::Sink::Writer &&writer);
private:
std::unique_ptr<AnalyticsSource> analyticsSource;
std::unique_ptr<TemperatureSource> temperatureSource;
};
} // namespace djm::device

View File

@ -0,0 +1,24 @@
#pragma once
#include <chrono>
#include <optional>
#include <string>
#include "datapoint/analytics_event.hpp"
namespace djm::device {
class AnalyticsSource {
public:
virtual ~AnalyticsSource() noexcept = default;
/**
* Yields an analytics event if any is available, otherwise blocks until
* `timeout` expires and returns Nothing. Multiple analytics event may queue
* in the background.
*/
virtual std::optional<AnalyticsEvent> await(
std::chrono::milliseconds timeout) = 0;
};
} // namespace djm::device

View File

@ -0,0 +1,19 @@
#pragma once
#include <chrono>
#include <optional>
#include "datapoint/temperature_reading.hpp"
namespace djm::device {
class TemperatureSource {
public:
virtual ~TemperatureSource() noexcept = default;
/** blocking, can be called whenever a reading is desired */
virtual std::optional<TemperatureReading> read(
std::chrono::milliseconds timeout) = 0;
};
} // namespace djm::device

33
src/demo/main.cpp Normal file
View File

@ -0,0 +1,33 @@
#include <cstdlib>
#include <iostream>
#include <memory>
#include "demo/mocks/mock_analytics_source.hpp"
#include "demo/mocks/mock_temperature_source.hpp"
#include "device.hpp"
#include "telemetry/sink.hpp"
int main(int /* unused */, char** /* unused */) {
using namespace djm;
std::cout << "Creating Device instance" << std::endl;
size_t rngSeed = 12345;
auto tempsensor = std::make_unique<device::MockTemperatureSource>(rngSeed);
auto analyticsSource = std::make_unique<device::MockAnalyticsSource>(rngSeed);
device::Device device(
std::move(analyticsSource),
std::move(tempsensor)
);
auto telemetrySink = telemetry::Sink(
std::filesystem::path("telemetry.ldp")
);
const auto res = device.run(
telemetrySink.makeWriter()
);
return res;
}

View File

@ -0,0 +1,94 @@
#include <chrono>
#include <optional>
#include <random>
#include <thread>
#include "mock_analytics_source.hpp"
#include "datapoint/analytics_event.hpp"
namespace djm::device {
MockAnalyticsSource::MockAnalyticsSource(size_t seed)
: hasAnnouncedReady(false), exceptionThrown(false), generator(seed) {}
std::optional<AnalyticsEvent> MockAnalyticsSource::await(
std::chrono::milliseconds timeout) {
if (exceptionThrown) {
// it's broken
return std::nullopt;
}
// the longer we wait, the more likely an event comes in
auto eventProb = std::min(0.8, timeout.count() / 1000.0);
if (std::bernoulli_distribution(eventProb)(generator)) {
// block for the full duration
std::this_thread::sleep_for(timeout);
return std::nullopt;
}
if (!hasAnnouncedReady) {
// always start with an "AcousticImagingReady" event
hasAnnouncedReady = true;
return AnalyticsEvent{
.type = AnalyticsEvent::Type::AcousticImagingReady,
.timestamp = std::chrono::system_clock::now(),
.exceptionWhat = std::nullopt,
.newFrequency = std::nullopt,
};
}
/*
otherwise we sample the following events:
SdCardFormatted: 8%
ImageSaved: 30%
VideoSaved: 30%
ImagingFrequencyChanged: 30%
ExceptionThrown: 2%
*/
std::discrete_distribution<> typeDist({8, 30, 30, 30, 2});
switch (typeDist(generator)) {
case 0:
return AnalyticsEvent{
.type = AnalyticsEvent::Type::SdCardFormatted,
.timestamp = std::chrono::system_clock::now(),
.exceptionWhat = std::nullopt,
.newFrequency = std::nullopt,
};
case 2:
return AnalyticsEvent{
.type = AnalyticsEvent::Type::ImageSaved,
.timestamp = std::chrono::system_clock::now(),
.exceptionWhat = std::nullopt,
.newFrequency = std::nullopt,
};
case 3:
return AnalyticsEvent{
.type = AnalyticsEvent::Type::VideoSaved,
.timestamp = std::chrono::system_clock::now(),
.exceptionWhat = std::nullopt,
.newFrequency = std::nullopt,
};
case 4: {
float newFreq =
31'000 + std::uniform_int_distribution<>(-16'000, 16'000)(generator);
return AnalyticsEvent{
.type = AnalyticsEvent::Type::ImagingFrequencyChanged,
.timestamp = std::chrono::system_clock::now(),
.exceptionWhat = std::nullopt,
.newFrequency = newFreq,
};
}
case 5:
exceptionThrown = true;
return AnalyticsEvent{
.type = AnalyticsEvent::Type::ExceptionThrown,
.timestamp = std::chrono::system_clock::now(),
.exceptionWhat = "lp0 on fire",
.newFrequency = std::nullopt,
};
default:
return std::nullopt;
}
}
} // namespace djm::device

View File

@ -0,0 +1,22 @@
#pragma once
#include <optional>
#include <random>
#include "demo/interfaces/analytics_source.hpp"
namespace djm::device {
class MockAnalyticsSource final : public AnalyticsSource {
public:
explicit MockAnalyticsSource(size_t seed);
~MockAnalyticsSource() noexcept final = default;
std::optional<AnalyticsEvent> await(std::chrono::milliseconds timeout) final;
private:
bool hasAnnouncedReady;
bool exceptionThrown;
std::default_random_engine generator;
};
} // namespace djm::device

View File

@ -0,0 +1,35 @@
#include <random>
#include <thread>
#include "mock_temperature_source.hpp"
namespace djm::device {
MockTemperatureSource::MockTemperatureSource(size_t seed) : generator(seed) {}
std::optional<TemperatureReading> MockTemperatureSource::read(
std::chrono::milliseconds timeout) {
if (std::bernoulli_distribution(0.01)(generator)) {
// reading temperature failed, yikes
return std::nullopt;
}
// determine random time until read occurs
auto millisLag = std::uniform_int_distribution<>(0, 50)(generator);
if (millisLag > timeout.count()) {
return std::nullopt;
}
std::this_thread::sleep_for(std::chrono::milliseconds(millisLag));
std::normal_distribution<> normalDist(0.0, 3.5);
return TemperatureReading{
56.7 + normalDist(generator),
45.6 + normalDist(generator),
34.5 + normalDist(generator),
32.1 + normalDist(generator),
};
}
} // namespace djm::device

View File

@ -0,0 +1,21 @@
#pragma once
#include <optional>
#include <random>
#include "demo/interfaces/temperature_source.hpp"
namespace djm::device {
class MockTemperatureSource final : public TemperatureSource {
public:
explicit MockTemperatureSource(size_t seed);
~MockTemperatureSource() noexcept final = default;
std::optional<TemperatureReading> read(
std::chrono::milliseconds timeout) final;
private:
std::default_random_engine generator;
};
} // namespace djm::device

View File

@ -0,0 +1,29 @@
project(telemetry VERSION 0.1)
#
# Protobuf https://cmake.org/cmake/help/latest/module/FindProtobuf.html
# see Dockerfile for installation example
#
find_package(Protobuf REQUIRED)
include_directories(${Protobuf_INCLUDE_DIRS})
include_directories(${CMAKE_CURRENT_BINARY_DIR})
protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS message.proto)
add_library(${PROJECT_NAME}
message.cpp
sink.cpp
reader.cpp
${PROTO_SRCS}
${PROTO_HDRS}
)
target_include_directories(${PROJECT_NAME}
PUBLIC
${PROJECT_SOURCE_DIR}/telemetry/sink.hpp
${PROJECT_SOURCE_DIR}/telemetry/datapoint.hpp
)
target_link_libraries(${PROJECT_NAME}
${Protobuf_LIBRARIES}
)

View File

@ -0,0 +1,18 @@
#pragma once
#include <variant>
#include <string>
#include "datapoint/analytics_event.hpp"
#include "datapoint/temperature_reading.hpp"
namespace djm::telemetry {
using Datapoint = std::variant<
std::monostate,
device::AnalyticsEvent,
device::TemperatureReading,
std::string
>;
}

197
src/telemetry/message.cpp Normal file
View File

@ -0,0 +1,197 @@
// uncomment to disable assert()
// #define NDEBUG
#include <cassert>
#include "message.hpp"
#include "message.pb.h"
#define assertm(exp, msg) assert(((void)msg, exp)) // Use (void) to silence unused warnings.
namespace djm::telemetry {
void verify_protobuf_version() {
static bool verifyed = false;
if (!verifyed) {
// Verify that the version of the protobuf library that we linked with
// is compatible with the version of the headers we compiled against.
GOOGLE_PROTOBUF_VERIFY_VERSION;
verifyed = true;
}
}
/*
* AnalyticsEvent
*/
Message makeMessage(const device::AnalyticsEvent &analyticsEvent) {
Message msg;
auto msgAnalyticsEvent = msg.mutable_analytics_event();
using AeType = djm::device::AnalyticsEvent::Type;
switch (analyticsEvent.type) {
case AeType::AcousticImagingReady:
msgAnalyticsEvent->set_type(AnalyticsEvent_Type_AcousticImagingReady);
break;
case AeType::SdCardFormatted:
msgAnalyticsEvent->set_type(AnalyticsEvent_Type_SdCardFormatted);
break;
case AeType::ExceptionThrown:
msgAnalyticsEvent->set_type(AnalyticsEvent_Type_ExceptionThrown);
break;
case AeType::ImageSaved:
msgAnalyticsEvent->set_type(AnalyticsEvent_Type_ImageSaved);
break;
case AeType::VideoSaved:
msgAnalyticsEvent->set_type(AnalyticsEvent_Type_VideoSaved);
break;
case AeType::ImagingFrequencyChanged:
msgAnalyticsEvent->set_type(AnalyticsEvent_Type_ImagingFrequencyChanged);
break;
default:
assert("Unknown djm::device::AnalyticsEvent::Type");
break;
}
auto sec = std::chrono::duration_cast<std::chrono::seconds> (analyticsEvent.timestamp.time_since_epoch());
auto nanosec = std::chrono::duration_cast<std::chrono::nanoseconds>(analyticsEvent.timestamp.time_since_epoch());
msgAnalyticsEvent->mutable_timestamp()->set_seconds(sec.count());
msgAnalyticsEvent->mutable_timestamp()->set_nanos((nanosec - sec).count());
if (analyticsEvent.exceptionWhat.has_value())
msgAnalyticsEvent->set_exceptionwhat(analyticsEvent.exceptionWhat.value());
if (analyticsEvent.newFrequency.has_value())
msgAnalyticsEvent->set_newfrequency(analyticsEvent.newFrequency.value());
return msg;
}
Datapoint makeAnalyticsEvent(const Message &message)
{
assertm(message.has_analytics_event(), "AnalyticsEvent message expected");
auto ae = message.analytics_event();
using namespace std::chrono;
auto getDpType = [](AnalyticsEvent_Type type) {
using AeType = djm::device::AnalyticsEvent::Type;
switch (type) {
case AnalyticsEvent_Type_AcousticImagingReady:
return AeType::AcousticImagingReady;
case AnalyticsEvent_Type_SdCardFormatted:
return AeType::SdCardFormatted;
case AnalyticsEvent_Type_ExceptionThrown:
return AeType::ExceptionThrown;
case AnalyticsEvent_Type_ImageSaved:
return AeType::ImageSaved;
case AnalyticsEvent_Type_VideoSaved:
return AeType::VideoSaved;
case AnalyticsEvent_Type_ImagingFrequencyChanged:
return AeType::ImagingFrequencyChanged;
default:
assert("Unknown djm::telemetry::AnalyticsEvent::Type");
return (AeType)0xDeadBeaf; // TODO: uninitialized default state was not defined
}
};
device::AnalyticsEvent dp{
.type = getDpType(ae.type()),
.timestamp = system_clock::time_point(seconds(ae.timestamp().seconds()) + nanoseconds(ae.timestamp().nanos())),
.exceptionWhat = ae.has_exceptionwhat() ? std::optional(ae.exceptionwhat()) : std::nullopt,
.newFrequency = ae.has_newfrequency() ? std::optional(ae.newfrequency()) : std::nullopt
};
return Datapoint(dp);
}
/*
* TemperatureRrading
*/
Message makeMessage(const device::TemperatureReading &temperatureReading) {
Message msg;
auto msgTemperatureReading = msg.mutable_temperature_reading();
if (temperatureReading.cpu.has_value())
msgTemperatureReading->set_cpu(temperatureReading.cpu.value());
if (temperatureReading.mainboard.has_value())
msgTemperatureReading->set_mainboard(temperatureReading.mainboard.value());
if (temperatureReading.mics.has_value())
msgTemperatureReading->set_mics(temperatureReading.mics.value());
if (temperatureReading.opticalCamera.has_value())
msgTemperatureReading->set_opticalcamera(temperatureReading.opticalCamera.value());
return msg;
}
Datapoint makeTemperatureReading(const Message &message)
{
assertm(message.has_temperature_reading(), "TemperatureReading message expected");
auto tp = message.temperature_reading();
return Datapoint(device::TemperatureReading {
.cpu = tp.has_cpu() ? std::optional(tp.cpu()) : std::nullopt,
.mainboard = tp.has_mainboard() ? std::optional(tp.mainboard()) : std::nullopt,
.mics = tp.has_mics() ? std::optional(tp.mics()) : std::nullopt,
.opticalCamera = tp.has_opticalcamera() ? std::optional(tp.opticalcamera()) : std::nullopt,
});
}
/*
* ShutdownReason
*/
Message makeMessage(const std::string &shutdownReason) {
Message msg;
msg.set_shutdown_reason(shutdownReason);
return msg;
}
Datapoint makeShutdownReason(const Message &message)
{
assertm(message.has_shutdown_reason(), "ShutdownReason message expected");
return Datapoint(std::string(message.shutdown_reason()));
}
/*
* Core API
*/
Message makeMessage(const Datapoint &datapoint)
{
verify_protobuf_version();
if (std::holds_alternative<device::AnalyticsEvent>(datapoint)) {
std::cout<<"AnalyticsEvent"<<std::endl;
return makeMessage(std::get<device::AnalyticsEvent>(datapoint));
} else
if (std::holds_alternative<device::TemperatureReading>(datapoint)) {
std::cout<<"TemperatureReading"<<std::endl;
return makeMessage(std::get<device::TemperatureReading>(datapoint));
} else
if (std::holds_alternative<std::string>(datapoint)) {
std::cout<<"shutdown reason"<<std::endl;
return makeMessage(std::get<std::string>(datapoint));
} else {
std::cout <<"[telemetry] unknown Datapoint type" <<std::endl;
return Message();
}
}
Datapoint parseMessage(const Message &message)
{
verify_protobuf_version();
if (message.has_analytics_event()) {
return makeAnalyticsEvent(message);
} else
if (message.has_temperature_reading()) {
return makeTemperatureReading(message);
} else
if (message.has_shutdown_reason()) {
return makeShutdownReason(message);
} else {
std::cout <<"[telemetry] unknown Message type" <<std::endl;
return Datapoint();
}
}
}

12
src/telemetry/message.hpp Normal file
View File

@ -0,0 +1,12 @@
#pragma once
#include "datapoint.hpp"
namespace djm::telemetry {
class Message; // proto3 forward declaration
Message makeMessage(const Datapoint &datapoint);
Datapoint parseMessage(const Message &message);
}

View File

@ -0,0 +1,37 @@
syntax = "proto3";
import "google/protobuf/timestamp.proto";
package djm.telemetry;
message TemperatureReading {
optional float cpu = 1;
optional float mainboard = 2;
optional float mics = 3;
optional float opticalCamera = 4;
}
message AnalyticsEvent {
enum Type {
AcousticImagingReady = 0;
SdCardFormatted = 1;
ExceptionThrown = 2;
ImageSaved = 3;
VideoSaved = 4;
ImagingFrequencyChanged = 5;
}
Type type = 1;
google.protobuf.Timestamp timestamp = 2;
optional string exceptionWhat = 100;
optional float newFrequency = 101;
}
message Message {
oneof datapoint_types {
AnalyticsEvent analytics_event = 1;
TemperatureReading temperature_reading = 2;
string shutdown_reason = 3;
}
}

46
src/telemetry/reader.cpp Normal file
View File

@ -0,0 +1,46 @@
#include <cstdint>
#include "reader.hpp"
#include "message.hpp"
#include "message.pb.h"
#include <vector>
namespace djm::telemetry {
Reader::Reader(const std::filesystem::path& storageName)
{
fileBuf.open(storageName, std::ios::in | std::ios::binary);
std::cout <<"[telemetry] reader( path: " <<storageName.filename() <<" )" <<std::endl;
}
Reader::~Reader() {
fileBuf.close();
}
Datapoint Reader::parseNext()
{
Message message;
uint16_t message_sz;
uint8_t p1, p2;
fileBuf.sgetn((char*)&p1, 1);
fileBuf.sgetn((char*)&p2, 1);
message_sz = p2<<8 | p1;
auto tmpBuf = std::vector<char>(message_sz); // TODO: recycle tmpBuf between calls
fileBuf.sgetn(&tmpBuf[0], message_sz);
message.ParseFromArray(&tmpBuf[0], message_sz);
return parseMessage(message);
}
bool Reader::hasData()
{
auto streamsz = fileBuf.in_avail();
return streamsz >= 2; // sizeof (uint16_t)
}
}

21
src/telemetry/reader.hpp Normal file
View File

@ -0,0 +1,21 @@
#pragma once
#include <filesystem>
#include <fstream>
#include "datapoint.hpp"
namespace djm::telemetry {
struct Reader {
Reader(const std::filesystem::path& storage);
~Reader();
Datapoint parseNext();
bool hasData();
private:
std::filebuf fileBuf; // TODO: use unique_ptr to get rid of destructor
};
}

73
src/telemetry/sink.cpp Normal file
View File

@ -0,0 +1,73 @@
#include <cstdint>
// uncomment to disable assert()
// #define NDEBUG
#include <cassert>
#define assertm(exp, msg) assert(((void)msg, exp)) // Use (void) to silence unused warnings.
#include "sink.hpp"
#include "message.hpp"
#include "message.pb.h"
namespace djm::telemetry {
Sink::Sink(const std::filesystem::path& storageName)
: fileBufMutex{std::make_shared<std::mutex>()}
{
auto newFb = new std::filebuf{};
if (!newFb)
throw std::runtime_error("Not enought memmory");
auto delFb = [](std::filebuf *fb) {
fb->close();
delete fb;
};
fileBuf = std::shared_ptr<std::filebuf>(newFb, delFb);
// https://cplusplus.com/reference/ostream/ostream/ostream/
fileBuf->open(storageName, std::ios::out | /* std::ios::app | */ std::ios::binary);
std::cout <<"[telemetry] sink( path: " <<storageName.filename() <<" )" <<std::endl;
}
Sink::Writer Sink::makeWriter(unsigned cache_sz)
{
return Sink::Writer(*this, cache_sz);
}
Sink::Writer::Writer(const Sink &sink, unsigned cache_sz)
: fileBuf{sink.fileBuf}
, fileBufMutex{sink.fileBufMutex}
, outStream{std::make_unique<std::ostream>(fileBuf.get())}
, CACHE_SZ{cache_sz}
{
}
void Sink::Writer::capture(const Datapoint &datapoint)
{
auto message = makeMessage(datapoint);
auto message_sz = message.ByteSizeLong();
std::lock_guard<std::mutex> guard(*fileBufMutex.get());
if (message_sz > UINT16_MAX) {
assert("Serialized message is too looong");
return; // Will be droped in production
}
auto os = outStream.get();
*os <<(uint8_t)(message_sz & 0xFF) <<(uint8_t)(message_sz >>8 & 0xFF);
message.SerializeToOstream(os);
if (++msg_cntr >= CACHE_SZ) {
// TODO: loack_guard should be place here
// TODO: replace ostream with locally allocated buffer
msg_cntr = 0;
outStream->flush();
}
}
}

35
src/telemetry/sink.hpp Normal file
View File

@ -0,0 +1,35 @@
#pragma once
#include <filesystem>
#include <fstream>
#include <iostream>
#include <memory>
#include <mutex>
#include "datapoint.hpp"
namespace djm::telemetry {
struct Sink {
struct Writer {
Writer(const Sink &sink, unsigned cache_sz);
void capture(const Datapoint &datapoint);
private:
std::shared_ptr<std::filebuf> fileBuf;
std::shared_ptr<std::mutex> fileBufMutex;
std::unique_ptr<std::ostream> outStream;
const unsigned CACHE_SZ;
unsigned msg_cntr = 0;
};
Sink(const std::filesystem::path& storage);
Writer makeWriter(unsigned cache_sz =0);
private:
std::shared_ptr<std::filebuf> fileBuf;
std::shared_ptr<std::mutex> fileBufMutex;
};
}

View File

@ -0,0 +1,253 @@
#include <gtest/gtest.h>
#include <chrono>
#include <optional>
#include <iostream>
#include "datapoint/temperature_reading.hpp"
#include "datapoint/analytics_event.hpp"
#include "telemetry/sink.hpp"
#include "telemetry/reader.hpp"
/*
* Length Delimitted Protobuf
*/
const auto storage = std::filesystem::path("telemetry.test.ldp");
namespace djm {
const device::TemperatureReading tr1{
.cpu = 12.3,
.mainboard = std::nullopt,
.mics = 45.6,
.opticalCamera = 67.8,
};
const device::TemperatureReading tr2{
.cpu = 9000.3,
.mainboard = 77.33,
.mics = 45.6,
.opticalCamera = std::nullopt,
};
const device::AnalyticsEvent ae1{
.type = device::AnalyticsEvent::Type::VideoSaved,
.timestamp = std::chrono::system_clock::now(),
.exceptionWhat = std::nullopt,
.newFrequency = std::nullopt,
};
const device::AnalyticsEvent ae2{
.type = device::AnalyticsEvent::Type::ExceptionThrown,
.timestamp = std::chrono::system_clock::now(),
.exceptionWhat = "Test exception message",
.newFrequency = std::nullopt,
};
const device::AnalyticsEvent ae3{
.type = device::AnalyticsEvent::Type::ImagingFrequencyChanged,
.timestamp = std::chrono::system_clock::now(),
.exceptionWhat = std::nullopt,
.newFrequency = 33.333,
};
}
///////////////////////////////////////////////////////////////////////////////
TEST(Telemetry, datapoint) {
using namespace djm;
auto sink = telemetry::Sink(storage);
auto writer = sink.makeWriter(/* no bufferization */);
auto reader = telemetry::Reader(storage);
// Capture and Parse
writer.capture(device::TemperatureReading{/* empty defaults */});
const auto datapoint = reader.parseNext();
// Analyze datapoint
ASSERT_EQ(std::holds_alternative<device::AnalyticsEvent> (datapoint), false);
ASSERT_ANY_THROW( std::get<device::AnalyticsEvent> (datapoint));
ASSERT_EQ(std::holds_alternative<device::TemperatureReading>(datapoint), true);
ASSERT_NO_THROW( std::get<device::TemperatureReading>(datapoint));
ASSERT_EQ(device::TemperatureReading{}.fmtDebug(), std::get<device::TemperatureReading>(datapoint).fmtDebug());
}
///////////////////////////////////////////////////////////////////////////////
TEST(Telemetry, serialIo) {
using namespace djm;
const std::string shutdown_reason("end of testcase");
// Serialize
auto sink = telemetry::Sink(storage);
auto writer = sink.makeWriter();
writer.capture(tr1);
writer.capture(tr2);
writer.capture(tr2); // <<-- store same event twice
writer.capture(ae1);
writer.capture(ae2);
writer.capture(ae3);
writer.capture(shutdown_reason);
// Deserialize
auto reader = telemetry::Reader(storage);
ASSERT_EQ(reader.hasData(), true);
auto tr1_deserialized = std::get<device::TemperatureReading>(reader.parseNext());
auto tr2_deserialized = std::get<device::TemperatureReading>(reader.parseNext());
auto tr3_deserialized = std::get<device::TemperatureReading>(reader.parseNext());
auto ae1_deserialized = std::get<device::AnalyticsEvent> (reader.parseNext());
auto ae2_deserialized = std::get<device::AnalyticsEvent> (reader.parseNext());
auto ae3_deserialized = std::get<device::AnalyticsEvent> (reader.parseNext());
auto rsn_deserialized = std::get<std::string> (reader.parseNext());
ASSERT_EQ(reader.hasData(), false);
ASSERT_EQ(tr1.fmtDebug(), tr1_deserialized.fmtDebug());
ASSERT_NE(tr1.fmtDebug(), tr2_deserialized.fmtDebug()); // first != second
ASSERT_EQ(tr2.fmtDebug(), tr2_deserialized.fmtDebug());
ASSERT_EQ(tr2.fmtDebug(), tr3_deserialized.fmtDebug());
ASSERT_EQ(ae1.fmtDebug(), ae1_deserialized.fmtDebug());
ASSERT_EQ(ae2.fmtDebug(), ae2_deserialized.fmtDebug());
ASSERT_EQ(ae3.fmtDebug(), ae3_deserialized.fmtDebug());
ASSERT_EQ(shutdown_reason, rsn_deserialized);
}
///////////////////////////////////////////////////////////////////////////////
TEST(Telemetry, mixedIo) {
using namespace djm;
auto sink = telemetry::Sink(storage);
auto writer = sink.makeWriter(/* no bufferization */);
auto reader = telemetry::Reader(storage);
/*
* first message
*/
ASSERT_EQ(reader.hasData(), false);
writer.capture(tr1); // <<-- write
ASSERT_EQ(reader.hasData(), true);
auto tr1_deserialized = std::get<device::TemperatureReading>(reader.parseNext());
ASSERT_EQ(tr1.fmtDebug(), tr1_deserialized.fmtDebug());
/*
* second message
*/
ASSERT_EQ(reader.hasData(), false);
writer.capture(ae1); // <<-- write
ASSERT_EQ(reader.hasData(), true);
auto ae1_deserialized = std::get<device::AnalyticsEvent>(reader.parseNext());
ASSERT_EQ(ae1.fmtDebug(), ae1_deserialized.fmtDebug());
}
///////////////////////////////////////////////////////////////////////////////
TEST(Telemetry, buferization) {
using namespace djm;
constexpr auto CACHE_SZ = 5; // amount of buffered (cached) messages
constexpr auto REMINDER = 3; // must be smaller than CACHE_SZ
{
// Capture events
auto sink = telemetry::Sink(storage);
auto writer = sink.makeWriter(CACHE_SZ);
for (int i=0; i < CACHE_SZ + REMINDER; i++)
writer.capture(tr1);
// Read events
auto reader = telemetry::Reader(storage);
int cntr = 0;
while (reader.hasData()) {
reader.parseNext();
cntr++;
}
// Only first %CACHE_SZ% messages expected to be stored on disk,
// remaining %REMINDER% amount of messages - only caputred (aka) buffered
ASSERT_EQ(cntr, CACHE_SZ);
} // <-- RAII shall trigger flash remaning data
auto reader = telemetry::Reader(storage);
int cntr = 0;
while (reader.hasData()) {
reader.parseNext();
cntr++;
}
ASSERT_EQ(cntr, CACHE_SZ + REMINDER);
}
///////////////////////////////////////////////////////////////////////////////
TEST(Telemetry, multithreading) {
using namespace djm;
auto sink = telemetry::Sink(storage);
constexpr auto MSG_AMOUNT = 100;
auto eventProducer = [](telemetry::Sink::Writer &&writer, const device::AnalyticsEvent& event, int cntr) {
while (cntr-->0)
writer.capture(event);
};
std::thread ep1(eventProducer, sink.makeWriter(), ae1, MSG_AMOUNT);
std::thread ep2(eventProducer, sink.makeWriter(), ae2, MSG_AMOUNT);
ep1.join();
ep2.join();
auto reader = telemetry::Reader(storage);
int parsed_cntr = 0;
while (reader.hasData()) {
if (std::holds_alternative<device::AnalyticsEvent> (reader.parseNext()))
parsed_cntr++;
}
ASSERT_EQ(parsed_cntr, MSG_AMOUNT *2);
}
///////////////////////////////////////////////////////////////////////////////
TEST(Telemetry, shared) {
using namespace djm;
auto getWriter = []() {
auto sink = telemetry::Sink(storage);
return sink.makeWriter(); // <<-- oh no! sink goes out of scope
};
auto writer = getWriter();
writer.capture(tr1);
writer.capture(ae1);
auto reader = telemetry::Reader(storage);
auto tr1_deserialized = std::get<device::TemperatureReading>(reader.parseNext());
auto ae1_deserialized = std::get<device::AnalyticsEvent> (reader.parseNext());
ASSERT_EQ(tr1.fmtDebug(), tr1_deserialized.fmtDebug());
ASSERT_EQ(ae1.fmtDebug(), ae1_deserialized.fmtDebug());
}
///////////////////////////////////////////////////////////////////////////////
TEST(Telemetry, unknownMessageType) {
using namespace djm;
auto sink = telemetry::Sink(storage);
auto writer = sink.makeWriter(/* no bufferization */);
auto reader = telemetry::Reader(storage);
ASSERT_EQ(reader.hasData(), false);
writer.capture(telemetry::Datapoint(/*empty aka default aka unknown*/));
ASSERT_EQ(reader.hasData(), true);
ASSERT_NO_THROW(std::get<std::monostate>(reader.parseNext()));
}

View File

@ -0,0 +1,18 @@
#include <gtest/gtest.h>
#include <chrono>
#include <optional>
#include "datapoint/temperature_reading.hpp"
TEST(TemperatureReading, fmtDebug) {
using djm::device::TemperatureReading;
TemperatureReading event{
.cpu = 12.3,
.mainboard = std::nullopt,
.mics = 45.6,
.opticalCamera = 67.8,
};
ASSERT_EQ(event.fmtDebug(), "cpu: 12.3 mics: 45.6 opticalCamera: 67.8");
}