From 28a7c67aa370c045dd13a10ae4769d1742ee7e6d Mon Sep 17 00:00:00 2001 From: djmil Date: Tue, 23 Sep 2025 22:23:47 +0200 Subject: [PATCH] initial commit --- .gitignore | 2 +- Dockerfile | 20 ++ LICENSE | 2 +- README.md | 111 ++++++++- docker-build.sh | 4 + docker-run.sh | 7 + src/.vscode/launch.json | 7 + src/.vscode/settings.json | 74 ++++++ src/.vscode/tasks.json | 28 +++ src/CMakeLists.txt | 66 +++++ src/datapoint/analytics_event.cpp | 44 ++++ src/datapoint/analytics_event.hpp | 33 +++ src/datapoint/temperature_reading.cpp | 39 +++ src/datapoint/temperature_reading.hpp | 20 ++ src/demo/device.cpp | 63 +++++ src/demo/device.hpp | 23 ++ src/demo/interfaces/analytics_source.hpp | 24 ++ src/demo/interfaces/temperature_source.hpp | 19 ++ src/demo/main.cpp | 33 +++ src/demo/mocks/mock_analytics_source.cpp | 94 ++++++++ src/demo/mocks/mock_analytics_source.hpp | 22 ++ src/demo/mocks/mock_temperature_source.cpp | 35 +++ src/demo/mocks/mock_temperature_source.hpp | 21 ++ src/telemetry/CMakeLists.txt | 29 +++ src/telemetry/datapoint.hpp | 18 ++ src/telemetry/message.cpp | 197 +++++++++++++++ src/telemetry/message.hpp | 12 + src/telemetry/message.proto | 37 +++ src/telemetry/reader.cpp | 46 ++++ src/telemetry/reader.hpp | 21 ++ src/telemetry/sink.cpp | 73 ++++++ src/telemetry/sink.hpp | 35 +++ src/tests/data/test_telemetry.cpp | 253 ++++++++++++++++++++ src/tests/data/test_temperature_reading.cpp | 18 ++ 34 files changed, 1526 insertions(+), 4 deletions(-) create mode 100644 Dockerfile create mode 100755 docker-build.sh create mode 100755 docker-run.sh create mode 100644 src/.vscode/launch.json create mode 100644 src/.vscode/settings.json create mode 100644 src/.vscode/tasks.json create mode 100644 src/CMakeLists.txt create mode 100644 src/datapoint/analytics_event.cpp create mode 100644 src/datapoint/analytics_event.hpp create mode 100644 src/datapoint/temperature_reading.cpp create mode 100644 src/datapoint/temperature_reading.hpp create mode 100644 src/demo/device.cpp create mode 100644 src/demo/device.hpp create mode 100644 src/demo/interfaces/analytics_source.hpp create mode 100644 src/demo/interfaces/temperature_source.hpp create mode 100644 src/demo/main.cpp create mode 100644 src/demo/mocks/mock_analytics_source.cpp create mode 100644 src/demo/mocks/mock_analytics_source.hpp create mode 100644 src/demo/mocks/mock_temperature_source.cpp create mode 100644 src/demo/mocks/mock_temperature_source.hpp create mode 100644 src/telemetry/CMakeLists.txt create mode 100644 src/telemetry/datapoint.hpp create mode 100644 src/telemetry/message.cpp create mode 100644 src/telemetry/message.hpp create mode 100644 src/telemetry/message.proto create mode 100644 src/telemetry/reader.cpp create mode 100644 src/telemetry/reader.hpp create mode 100644 src/telemetry/sink.cpp create mode 100644 src/telemetry/sink.hpp create mode 100644 src/tests/data/test_telemetry.cpp create mode 100644 src/tests/data/test_temperature_reading.cpp diff --git a/.gitignore b/.gitignore index 12404dd..4f0c465 100644 --- a/.gitignore +++ b/.gitignore @@ -44,4 +44,4 @@ install_manifest.txt compile_commands.json CTestTestfile.cmake _deps - +build/* diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..1044fcb --- /dev/null +++ b/Dockerfile @@ -0,0 +1,20 @@ +FROM debian:latest + +RUN apt-get update \ + && apt-get install -y --no-install-recommends \ + git curl ca-certificates \ + build-essential \ + tar zip unzip pkg-config \ + gdb \ + cmake \ + protobuf-compiler libprotobuf-dev \ + && rm -rf /var/lib/apt/lists/* \ + && mkdir /workspace + +#RUN git clone --depth 1 --branch 2024.02.14 https://github.com/Microsoft/vcpkg.git /opt/vcpkg \ +# && cd /opt/vcpkg \ +# && ./bootstrap-vcpkg.sh \ +# && ./vcpkg integrate install \ +# && ./vcpkg install catch2 + +WORKDIR /workspace \ No newline at end of file diff --git a/LICENSE b/LICENSE index 20b7c3d..71556e8 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT NON-AI License -Copyright (c) 2024, Andriy Djmil +Copyright (c) 2025, Andriy Djmil Permission is hereby granted, free of charge, to any person obtaining a copy of the software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions. diff --git a/README.md b/README.md index 320b487..10c60de 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,110 @@ -# telemetry +# Telemetry -Simple library for capturing runtime analytic events from embedded device \ No newline at end of file +A simple yet functional library for capturing runtime analytic events from embedded devices. + +# Build & run + +`Dockerfile` with two helper scripts were added into project's root folder. Buy doing so, two useful goals were achieved: +- **Infrastructure as a code**: + All project dependancies and as well as all installation / configuration steps are easily documented as groups of handy scripts inside `Dockerfile`. +- **Containerization**: + Almost instant ability to jump into app development, testing and/or deployment with zero footprint (pollution) on main (host) PC. + +## IDE + +[VsCode](https://code.visualstudio.com/) IDE was used for in-container development. There is two different ways how one might approach this task: +- [Attach](https://code.visualstudio.com/docs/devcontainers/attach-container) VsCode to already [[#Build & run|running]] container +- Manually [install](https://github.com/ahmadnassri/docker-vscode-server/blob/master/Dockerfile) VsCode IDE into container and use [X11 window forwarding](https://goteleport.com/blog/x11-forwarding/), to automagically *spawn* VsCode IDE window on host PC + +# Design goals + +- agnostic to payload format +- adequate runtime overhead +- non intrusive / easy to use + +# Protobuf v3 + +Generally speaking, it is impossible to predict what kind of information would posses the most value in the future. Thus communication protocols tend to evolve with time. Not all customers are willing to update their devices on demand. As a result of such forces of nature, it is unavoidable that even devices of a same model would send Telemetry messages of different format. Endpoin servers (collectors) shall be capable of effective handling of such situation. [Protobuf](https://protobuf.dev/overview/) is well known industry solution for such problems. + +A known limitation of `Protobuf` library - is an inability to effectively store multiple message in file in serial aka `one-by-one` fashion. In order to overcome such limitation, our solution makes use of simple Length-Delimited (a simplified version of [TLV](https://en.wikipedia.org/wiki/Type%E2%80%93length%E2%80%93value) format) file encoding. Where essentially first two bytes of each serialized message are used to store message length. + +```mermaid +flowchart TB + Length1 + Message1 + Length2 + Message2 + Length3 + Message3 +``` + +# Architecture + +`Datapoint` aka *analytic event* representation - is a handy wrapper / utility class, aimed to ease usage of somewhat bloated autogenerated `protobuf` classes. + +```mermaid +flowchart LR + subgraph .proto + AnalyticsEvent --- M{Message} + TemperatureReading --- M + ShutdownReason --- M + etc.. --- M + end + + D(Datapoint) -. parse .-> M + M -. make .-> D +``` + +Than, a `Sink` instance shall be used to establish flow from runtime memory of an captured `Datapoints` into it's serialized form on disk. To `capture()` some `Datapoint`, an instance of `Writer` class shall be used. Each `Writer` instance is linked to it's `Sink`-paren class. `Writer` is a movable class that implements a simple, polymorphic, buffered and thread safe API for `Datapoints` (events) capturing. + +```mermaid +flowchart TB + subgraph "runtime (orbiter)" + DP1(Datapoint 1) -.capture.- W1(Writer 1) + DP2(Datapoint 2) -.capture.- W1 + DP3(Datapoint 3) -.capture.- W2(Writer 2) + DP4(Datapoint 4) -.capture.- W2 + W1 --- S(Sink) + W3(Writer n) --- S + W2 --- S + + end + S(Sink) ---> DB[(File)] + DB[(File)] ---> R(Reader) + subgraph "runtime (server)" + R(Reader) -.parse.-> D1(Datapoint 1) + R(Reader) -.parse.-> D2(Datapoint 2) + R(Reader) -.parse.-> D3(Datapoint 3) + R(Reader) -.parse.-> D4(Datapoint 4) + end +``` +`Reader` class shall be used in order to deserialize `Datapoints` from file. All `Datapoints` will be read in one-by-one fashion. + +# Tests and diskussion + +Tests for the project designed in such a way, so they can be used as a case study of an API usage as well as a way to ensure code quality. + +## Datapoint +Shows basic API use case. As well as provides a way to reliably discriminate between multiple types of `events`. + +## Serial IO +Cowers a case when all events being captured by single `reader` in serial fashion: +> write - write - write - read - read - read + +Also shows a way to check if there is some data to read from file. + +## Mixed IO +Slightly more complex case, where writing *into* and reading *from* file done in mixed order: +> write - read - write - write - read - read + +## Buferization +Writing data to disk (even SSD) is notoriously slow operation. Storing several messages in RAM and then writing all of them on disk as single batch - is a way to speed things up. + +> [!NOTE] Trade off +> In case of sudden power los - all cached data (aka not stored to disk) will be irretrievably lost + +## Multithreading +Yes, all `Writer` instances are thread safe. Altho, so further improvement can be made here. See comments in `telemetry\sink.cpp Sink::Writer::capture()`. + +## Shared +`RAII` and `std::shared_pointer` is exactly that type of magic, that provides ease of mind. No need to worry about dangling pointers, leaky memory and problems alike. \ No newline at end of file diff --git a/docker-build.sh b/docker-build.sh new file mode 100755 index 0000000..bc3c246 --- /dev/null +++ b/docker-build.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash +set -euxo pipefail + +docker build -t telemetry-ide:latest . \ No newline at end of file diff --git a/docker-run.sh b/docker-run.sh new file mode 100755 index 0000000..32b6362 --- /dev/null +++ b/docker-run.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash +set -euxo pipefail + +docker run \ + --name telemetry \ + -v $(pwd)/src:/workspace \ + -it telemetry-ide:latest \ No newline at end of file diff --git a/src/.vscode/launch.json b/src/.vscode/launch.json new file mode 100644 index 0000000..5c7247b --- /dev/null +++ b/src/.vscode/launch.json @@ -0,0 +1,7 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [] +} \ No newline at end of file diff --git a/src/.vscode/settings.json b/src/.vscode/settings.json new file mode 100644 index 0000000..ddbb281 --- /dev/null +++ b/src/.vscode/settings.json @@ -0,0 +1,74 @@ +{ + "C_Cpp.default.compilerPath": "/usr/bin/g++", + "files.associations": { + "*.tcc": "cpp", + "fstream": "cpp", + "iosfwd": "cpp", + "istream": "cpp", + "limits": "cpp", + "sstream": "cpp", + "stop_token": "cpp", + "streambuf": "cpp", + "any": "cpp", + "array": "cpp", + "atomic": "cpp", + "bit": "cpp", + "cctype": "cpp", + "chrono": "cpp", + "clocale": "cpp", + "cmath": "cpp", + "compare": "cpp", + "concepts": "cpp", + "condition_variable": "cpp", + "cstdarg": "cpp", + "cstddef": "cpp", + "cstdint": "cpp", + "cstdio": "cpp", + "cstdlib": "cpp", + "cstring": "cpp", + "ctime": "cpp", + "cwchar": "cpp", + "cwctype": "cpp", + "deque": "cpp", + "forward_list": "cpp", + "list": "cpp", + "map": "cpp", + "set": "cpp", + "unordered_map": "cpp", + "unordered_set": "cpp", + "vector": "cpp", + "exception": "cpp", + "algorithm": "cpp", + "functional": "cpp", + "iterator": "cpp", + "memory": "cpp", + "memory_resource": "cpp", + "numeric": "cpp", + "optional": "cpp", + "random": "cpp", + "ratio": "cpp", + "string": "cpp", + "string_view": "cpp", + "system_error": "cpp", + "tuple": "cpp", + "type_traits": "cpp", + "utility": "cpp", + "initializer_list": "cpp", + "iomanip": "cpp", + "iostream": "cpp", + "mutex": "cpp", + "new": "cpp", + "ostream": "cpp", + "ranges": "cpp", + "stdexcept": "cpp", + "thread": "cpp", + "typeinfo": "cpp", + "variant": "cpp", + "codecvt": "cpp", + "numbers": "cpp", + "semaphore": "cpp", + "*.inc": "cpp", + "csignal": "cpp", + "shared_mutex": "cpp" + } +} \ No newline at end of file diff --git a/src/.vscode/tasks.json b/src/.vscode/tasks.json new file mode 100644 index 0000000..54c0acc --- /dev/null +++ b/src/.vscode/tasks.json @@ -0,0 +1,28 @@ +{ + "tasks": [ + { + "type": "cppbuild", + "label": "C/C++: g++-12 build active file", + "command": "/usr/bin/g++-12", + "args": [ + "-fdiagnostics-color=always", + "-g", + "${file}", + "-o", + "${fileDirname}/${fileBasenameNoExtension}" + ], + "options": { + "cwd": "${fileDirname}" + }, + "problemMatcher": [ + "$gcc" + ], + "group": { + "kind": "build", + "isDefault": true + }, + "detail": "Task generated by Debugger." + } + ], + "version": "2.0.0" +} \ No newline at end of file diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt new file mode 100644 index 0000000..2ec111f --- /dev/null +++ b/src/CMakeLists.txt @@ -0,0 +1,66 @@ +# preamble +cmake_minimum_required(VERSION 3.14) + +project(telemetry VERSION 0.1) + +set(CMAKE_CXX_STANDARD 20) +# add_compile_options(-fdiagnostics-color) # does not goes well with default VsCode console +add_compile_options(-pedantic) +add_compile_options(-Wall) +add_compile_options(-Wextra) +add_compile_options(-Wpedantic) +add_compile_options(-Wno-unused-local-typedefs) + +include_directories(${PROJECT_SOURCE_DIR}) + +## Libraries +add_library(datapoint + datapoint/analytics_event.cpp + datapoint/temperature_reading.cpp +) + +add_library(mock_log_sources + demo/mocks/mock_temperature_source.cpp + demo/mocks/mock_analytics_source.cpp +) + +add_subdirectory(telemetry) + +# Device executable (simulation) +add_executable(device + demo/main.cpp + demo/device.cpp +) + +target_link_libraries(device + PRIVATE + mock_log_sources + datapoint + telemetry +) + +# Download googletest +include(FetchContent) +FetchContent_Declare( + googletest + URL https://github.com/google/googletest/archive/refs/tags/release-1.12.1.zip +) +# For Windows: Prevent overriding the parent project's compiler/linker settings +set(gtest_force_shared_crt ON CACHE BOOL "" FORCE) +FetchContent_MakeAvailable(googletest) + +# Add tests +enable_testing() + +add_executable(test_temperature_reading + tests/data/test_temperature_reading.cpp + tests/data/test_telemetry.cpp +) +target_link_libraries(test_temperature_reading + GTest::gtest_main + datapoint + telemetry +) + +include(GoogleTest) +gtest_discover_tests(test_temperature_reading) diff --git a/src/datapoint/analytics_event.cpp b/src/datapoint/analytics_event.cpp new file mode 100644 index 0000000..78a6562 --- /dev/null +++ b/src/datapoint/analytics_event.cpp @@ -0,0 +1,44 @@ +#include + +#include "analytics_event.hpp" + +namespace djm::device { + +std::string AnalyticsEvent::fmtDebug() const { + std::stringstream stream; + + stream << timestamp.time_since_epoch().count() << " "; + + switch (type) { + case Type::AcousticImagingReady: + stream << "AcousticImagingReady"; + break; + case Type::SdCardFormatted: + stream << "SdCardFormatted"; + break; + case Type::ExceptionThrown: + stream << "ExceptionThrown"; + break; + case Type::ImageSaved: + stream << "ImageSaved"; + break; + case Type::VideoSaved: + stream << "VideoSaved"; + break; + case Type::ImagingFrequencyChanged: + stream << "ImagingFrequencyChanged"; + break; + } + + if (exceptionWhat) { + stream << " " << *exceptionWhat; + } + + if (newFrequency) { + stream << " " << *newFrequency; + } + + return stream.str(); +} + +} // namespace djm::device diff --git a/src/datapoint/analytics_event.hpp b/src/datapoint/analytics_event.hpp new file mode 100644 index 0000000..75a6779 --- /dev/null +++ b/src/datapoint/analytics_event.hpp @@ -0,0 +1,33 @@ +#pragma once + +#include +#include +#include + +namespace djm::device { + +struct AnalyticsEvent { + enum class Type { + AcousticImagingReady, + SdCardFormatted, + ExceptionThrown, + ImageSaved, + VideoSaved, + ImagingFrequencyChanged, + }; + Type type; + + /** sender-side timestamp */ + std::chrono::system_clock::time_point timestamp; + + /** set if type == ExceptionThrown */ + std::optional exceptionWhat; + + /** set if type == ImagingFrequencyChanged */ + std::optional newFrequency; + + /** Meant for debugging only: Format this event as a human readable text */ + std::string fmtDebug() const; +}; + +} // namespace djm::device diff --git a/src/datapoint/temperature_reading.cpp b/src/datapoint/temperature_reading.cpp new file mode 100644 index 0000000..fd16fa0 --- /dev/null +++ b/src/datapoint/temperature_reading.cpp @@ -0,0 +1,39 @@ +#include + +#include "temperature_reading.hpp" + +namespace djm::device { + +std::string TemperatureReading::fmtDebug() const { + std::stringstream stream; + bool needspace = false; + if (cpu) { + needspace = true; + stream << "cpu: " << *cpu; + } + if (mainboard) { + if (needspace) { + stream << " "; + } + needspace = true; + stream << "mainboard: " << *mainboard; + } + + if (mics) { + if (needspace) { + stream << " "; + } + needspace = true; + stream << "mics: " << *mics; + } + + if (opticalCamera) { + if (needspace) { + stream << " "; + } + stream << "opticalCamera: " << *opticalCamera; + } + return stream.str(); +} + +} // namespace djm::device diff --git a/src/datapoint/temperature_reading.hpp b/src/datapoint/temperature_reading.hpp new file mode 100644 index 0000000..20e3f61 --- /dev/null +++ b/src/datapoint/temperature_reading.hpp @@ -0,0 +1,20 @@ +#pragma once + +#include +#include +#include + +namespace djm::device { + +struct TemperatureReading { + /* All temperatures are read as degree Celsius */ + std::optional cpu; + std::optional mainboard; + std::optional mics; + std::optional opticalCamera; + + /* Meant for debugging only: Format this reading as a human readable text */ + std::string fmtDebug() const; +}; + +} // namespace djm::device diff --git a/src/demo/device.cpp b/src/demo/device.cpp new file mode 100644 index 0000000..c80e31e --- /dev/null +++ b/src/demo/device.cpp @@ -0,0 +1,63 @@ +#include +#include +#include +#include + +#include "device.hpp" + +volatile std::sig_atomic_t gSignum; + +void signal_callback_handler(int signum) { + gSignum = signum; +} + +namespace djm::device { + +Device::Device(std::unique_ptr analyticsSource_, + std::unique_ptr temperatureSource_) + : analyticsSource(std::move(analyticsSource_)), + temperatureSource(std::move(temperatureSource_)) {} + +int Device::run(telemetry::Sink::Writer&& writer) +{ + using std::literals::chrono_literals::operator""ms; + + static bool signal_callback_handler_installed = false; + if (!signal_callback_handler_installed) { + std::signal(SIGTERM, signal_callback_handler); // systemd shutdown request + std::signal(SIGINT, signal_callback_handler); // ctrl-c + } + + while (!gSignum) { + auto start = std::chrono::system_clock::now(); + + auto tempReading = temperatureSource->read(100ms); + if (tempReading) { + std::cout << "read temperatures: " << tempReading->fmtDebug() + << std::endl; + writer.capture(tempReading.value()); + } else { + std::cout << "failed to read temperature sensors" << std::endl; + } + + auto analyticsEvent = analyticsSource->await(250ms); + if (analyticsEvent) { + std::cout << "got analytics event: " << analyticsEvent->fmtDebug() + << std::endl; + writer.capture(analyticsEvent.value()); + } else { + std::cout << "no analytics event available" << std::endl; + } + + auto delta = std::chrono::system_clock::now() - start; + std::this_thread::sleep_for(1000ms - delta); + } + + std::string shutdown_reason("gracefull shutdown"); + std::cout < + +#include "interfaces/analytics_source.hpp" +#include "interfaces/temperature_source.hpp" +#include "telemetry/sink.hpp" + +namespace djm::device { + +class Device { + public: + Device(std::unique_ptr analyticsSource_, std::unique_ptr temperatureSource_); + + /* will collect data forever */ + int run(telemetry::Sink::Writer &&writer); + +private: + std::unique_ptr analyticsSource; + std::unique_ptr temperatureSource; +}; + +} // namespace djm::device diff --git a/src/demo/interfaces/analytics_source.hpp b/src/demo/interfaces/analytics_source.hpp new file mode 100644 index 0000000..1a3a78c --- /dev/null +++ b/src/demo/interfaces/analytics_source.hpp @@ -0,0 +1,24 @@ +#pragma once + +#include +#include +#include + +#include "datapoint/analytics_event.hpp" + +namespace djm::device { + +class AnalyticsSource { + public: + virtual ~AnalyticsSource() noexcept = default; + + /** + * Yields an analytics event if any is available, otherwise blocks until + * `timeout` expires and returns Nothing. Multiple analytics event may queue + * in the background. + */ + virtual std::optional await( + std::chrono::milliseconds timeout) = 0; +}; + +} // namespace djm::device diff --git a/src/demo/interfaces/temperature_source.hpp b/src/demo/interfaces/temperature_source.hpp new file mode 100644 index 0000000..ab51959 --- /dev/null +++ b/src/demo/interfaces/temperature_source.hpp @@ -0,0 +1,19 @@ +#pragma once + +#include +#include + +#include "datapoint/temperature_reading.hpp" + +namespace djm::device { + +class TemperatureSource { + public: + virtual ~TemperatureSource() noexcept = default; + + /** blocking, can be called whenever a reading is desired */ + virtual std::optional read( + std::chrono::milliseconds timeout) = 0; +}; + +} // namespace djm::device diff --git a/src/demo/main.cpp b/src/demo/main.cpp new file mode 100644 index 0000000..0148823 --- /dev/null +++ b/src/demo/main.cpp @@ -0,0 +1,33 @@ +#include +#include +#include + +#include "demo/mocks/mock_analytics_source.hpp" +#include "demo/mocks/mock_temperature_source.hpp" +#include "device.hpp" +#include "telemetry/sink.hpp" + +int main(int /* unused */, char** /* unused */) { + using namespace djm; + + std::cout << "Creating Device instance" << std::endl; + size_t rngSeed = 12345; + + auto tempsensor = std::make_unique(rngSeed); + auto analyticsSource = std::make_unique(rngSeed); + + device::Device device( + std::move(analyticsSource), + std::move(tempsensor) + ); + + auto telemetrySink = telemetry::Sink( + std::filesystem::path("telemetry.ldp") + ); + + const auto res = device.run( + telemetrySink.makeWriter() + ); + + return res; +} \ No newline at end of file diff --git a/src/demo/mocks/mock_analytics_source.cpp b/src/demo/mocks/mock_analytics_source.cpp new file mode 100644 index 0000000..2a1314e --- /dev/null +++ b/src/demo/mocks/mock_analytics_source.cpp @@ -0,0 +1,94 @@ +#include +#include +#include +#include + +#include "mock_analytics_source.hpp" +#include "datapoint/analytics_event.hpp" + +namespace djm::device { + +MockAnalyticsSource::MockAnalyticsSource(size_t seed) + : hasAnnouncedReady(false), exceptionThrown(false), generator(seed) {} + +std::optional MockAnalyticsSource::await( + std::chrono::milliseconds timeout) { + if (exceptionThrown) { + // it's broken + return std::nullopt; + } + + // the longer we wait, the more likely an event comes in + auto eventProb = std::min(0.8, timeout.count() / 1000.0); + if (std::bernoulli_distribution(eventProb)(generator)) { + // block for the full duration + std::this_thread::sleep_for(timeout); + return std::nullopt; + } + + if (!hasAnnouncedReady) { + // always start with an "AcousticImagingReady" event + hasAnnouncedReady = true; + + return AnalyticsEvent{ + .type = AnalyticsEvent::Type::AcousticImagingReady, + .timestamp = std::chrono::system_clock::now(), + .exceptionWhat = std::nullopt, + .newFrequency = std::nullopt, + }; + } + + /* + otherwise we sample the following events: + SdCardFormatted: 8% + ImageSaved: 30% + VideoSaved: 30% + ImagingFrequencyChanged: 30% + ExceptionThrown: 2% + */ + std::discrete_distribution<> typeDist({8, 30, 30, 30, 2}); + switch (typeDist(generator)) { + case 0: + return AnalyticsEvent{ + .type = AnalyticsEvent::Type::SdCardFormatted, + .timestamp = std::chrono::system_clock::now(), + .exceptionWhat = std::nullopt, + .newFrequency = std::nullopt, + }; + case 2: + return AnalyticsEvent{ + .type = AnalyticsEvent::Type::ImageSaved, + .timestamp = std::chrono::system_clock::now(), + .exceptionWhat = std::nullopt, + .newFrequency = std::nullopt, + }; + case 3: + return AnalyticsEvent{ + .type = AnalyticsEvent::Type::VideoSaved, + .timestamp = std::chrono::system_clock::now(), + .exceptionWhat = std::nullopt, + .newFrequency = std::nullopt, + }; + case 4: { + float newFreq = + 31'000 + std::uniform_int_distribution<>(-16'000, 16'000)(generator); + return AnalyticsEvent{ + .type = AnalyticsEvent::Type::ImagingFrequencyChanged, + .timestamp = std::chrono::system_clock::now(), + .exceptionWhat = std::nullopt, + .newFrequency = newFreq, + }; + } + case 5: + exceptionThrown = true; + return AnalyticsEvent{ + .type = AnalyticsEvent::Type::ExceptionThrown, + .timestamp = std::chrono::system_clock::now(), + .exceptionWhat = "lp0 on fire", + .newFrequency = std::nullopt, + }; + default: + return std::nullopt; + } +} +} // namespace djm::device diff --git a/src/demo/mocks/mock_analytics_source.hpp b/src/demo/mocks/mock_analytics_source.hpp new file mode 100644 index 0000000..90f0955 --- /dev/null +++ b/src/demo/mocks/mock_analytics_source.hpp @@ -0,0 +1,22 @@ +#pragma once +#include +#include + +#include "demo/interfaces/analytics_source.hpp" + +namespace djm::device { + +class MockAnalyticsSource final : public AnalyticsSource { + public: + explicit MockAnalyticsSource(size_t seed); + ~MockAnalyticsSource() noexcept final = default; + + std::optional await(std::chrono::milliseconds timeout) final; + + private: + bool hasAnnouncedReady; + bool exceptionThrown; + std::default_random_engine generator; +}; + +} // namespace djm::device diff --git a/src/demo/mocks/mock_temperature_source.cpp b/src/demo/mocks/mock_temperature_source.cpp new file mode 100644 index 0000000..b52e7f5 --- /dev/null +++ b/src/demo/mocks/mock_temperature_source.cpp @@ -0,0 +1,35 @@ +#include +#include + +#include "mock_temperature_source.hpp" + +namespace djm::device { + +MockTemperatureSource::MockTemperatureSource(size_t seed) : generator(seed) {} + +std::optional MockTemperatureSource::read( + std::chrono::milliseconds timeout) { + if (std::bernoulli_distribution(0.01)(generator)) { + // reading temperature failed, yikes + return std::nullopt; + } + + // determine random time until read occurs + auto millisLag = std::uniform_int_distribution<>(0, 50)(generator); + if (millisLag > timeout.count()) { + return std::nullopt; + } + + std::this_thread::sleep_for(std::chrono::milliseconds(millisLag)); + + std::normal_distribution<> normalDist(0.0, 3.5); + + return TemperatureReading{ + 56.7 + normalDist(generator), + 45.6 + normalDist(generator), + 34.5 + normalDist(generator), + 32.1 + normalDist(generator), + }; +} + +} // namespace djm::device diff --git a/src/demo/mocks/mock_temperature_source.hpp b/src/demo/mocks/mock_temperature_source.hpp new file mode 100644 index 0000000..9ca179c --- /dev/null +++ b/src/demo/mocks/mock_temperature_source.hpp @@ -0,0 +1,21 @@ +#pragma once + +#include +#include + +#include "demo/interfaces/temperature_source.hpp" + +namespace djm::device { + +class MockTemperatureSource final : public TemperatureSource { + public: + explicit MockTemperatureSource(size_t seed); + ~MockTemperatureSource() noexcept final = default; + std::optional read( + std::chrono::milliseconds timeout) final; + + private: + std::default_random_engine generator; +}; + +} // namespace djm::device diff --git a/src/telemetry/CMakeLists.txt b/src/telemetry/CMakeLists.txt new file mode 100644 index 0000000..66004c2 --- /dev/null +++ b/src/telemetry/CMakeLists.txt @@ -0,0 +1,29 @@ +project(telemetry VERSION 0.1) + +# +# Protobuf https://cmake.org/cmake/help/latest/module/FindProtobuf.html +# see Dockerfile for installation example +# +find_package(Protobuf REQUIRED) +include_directories(${Protobuf_INCLUDE_DIRS}) +include_directories(${CMAKE_CURRENT_BINARY_DIR}) +protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS message.proto) + + +add_library(${PROJECT_NAME} + message.cpp + sink.cpp + reader.cpp + ${PROTO_SRCS} + ${PROTO_HDRS} +) + +target_include_directories(${PROJECT_NAME} + PUBLIC + ${PROJECT_SOURCE_DIR}/telemetry/sink.hpp + ${PROJECT_SOURCE_DIR}/telemetry/datapoint.hpp +) + +target_link_libraries(${PROJECT_NAME} + ${Protobuf_LIBRARIES} +) \ No newline at end of file diff --git a/src/telemetry/datapoint.hpp b/src/telemetry/datapoint.hpp new file mode 100644 index 0000000..c910cf0 --- /dev/null +++ b/src/telemetry/datapoint.hpp @@ -0,0 +1,18 @@ +#pragma once + +#include +#include + +#include "datapoint/analytics_event.hpp" +#include "datapoint/temperature_reading.hpp" + +namespace djm::telemetry { + + using Datapoint = std::variant< + std::monostate, + device::AnalyticsEvent, + device::TemperatureReading, + std::string + >; + +} \ No newline at end of file diff --git a/src/telemetry/message.cpp b/src/telemetry/message.cpp new file mode 100644 index 0000000..78cc9fe --- /dev/null +++ b/src/telemetry/message.cpp @@ -0,0 +1,197 @@ +// uncomment to disable assert() +// #define NDEBUG +#include + +#include "message.hpp" +#include "message.pb.h" + +#define assertm(exp, msg) assert(((void)msg, exp)) // Use (void) to silence unused warnings. + +namespace djm::telemetry { + + void verify_protobuf_version() { + static bool verifyed = false; + if (!verifyed) { + // Verify that the version of the protobuf library that we linked with + // is compatible with the version of the headers we compiled against. + GOOGLE_PROTOBUF_VERIFY_VERSION; + verifyed = true; + } + } + + /* + * AnalyticsEvent + */ + Message makeMessage(const device::AnalyticsEvent &analyticsEvent) { + Message msg; + auto msgAnalyticsEvent = msg.mutable_analytics_event(); + + using AeType = djm::device::AnalyticsEvent::Type; + switch (analyticsEvent.type) { + case AeType::AcousticImagingReady: + msgAnalyticsEvent->set_type(AnalyticsEvent_Type_AcousticImagingReady); + break; + case AeType::SdCardFormatted: + msgAnalyticsEvent->set_type(AnalyticsEvent_Type_SdCardFormatted); + break; + case AeType::ExceptionThrown: + msgAnalyticsEvent->set_type(AnalyticsEvent_Type_ExceptionThrown); + break; + case AeType::ImageSaved: + msgAnalyticsEvent->set_type(AnalyticsEvent_Type_ImageSaved); + break; + case AeType::VideoSaved: + msgAnalyticsEvent->set_type(AnalyticsEvent_Type_VideoSaved); + break; + case AeType::ImagingFrequencyChanged: + msgAnalyticsEvent->set_type(AnalyticsEvent_Type_ImagingFrequencyChanged); + break; + default: + assert("Unknown djm::device::AnalyticsEvent::Type"); + break; + } + + auto sec = std::chrono::duration_cast (analyticsEvent.timestamp.time_since_epoch()); + auto nanosec = std::chrono::duration_cast(analyticsEvent.timestamp.time_since_epoch()); + msgAnalyticsEvent->mutable_timestamp()->set_seconds(sec.count()); + msgAnalyticsEvent->mutable_timestamp()->set_nanos((nanosec - sec).count()); + + if (analyticsEvent.exceptionWhat.has_value()) + msgAnalyticsEvent->set_exceptionwhat(analyticsEvent.exceptionWhat.value()); + + if (analyticsEvent.newFrequency.has_value()) + msgAnalyticsEvent->set_newfrequency(analyticsEvent.newFrequency.value()); + + return msg; + } + + Datapoint makeAnalyticsEvent(const Message &message) + { + assertm(message.has_analytics_event(), "AnalyticsEvent message expected"); + + auto ae = message.analytics_event(); + using namespace std::chrono; + + auto getDpType = [](AnalyticsEvent_Type type) { + using AeType = djm::device::AnalyticsEvent::Type; + + switch (type) { + case AnalyticsEvent_Type_AcousticImagingReady: + return AeType::AcousticImagingReady; + case AnalyticsEvent_Type_SdCardFormatted: + return AeType::SdCardFormatted; + case AnalyticsEvent_Type_ExceptionThrown: + return AeType::ExceptionThrown; + case AnalyticsEvent_Type_ImageSaved: + return AeType::ImageSaved; + case AnalyticsEvent_Type_VideoSaved: + return AeType::VideoSaved; + case AnalyticsEvent_Type_ImagingFrequencyChanged: + return AeType::ImagingFrequencyChanged; + default: + assert("Unknown djm::telemetry::AnalyticsEvent::Type"); + return (AeType)0xDeadBeaf; // TODO: uninitialized default state was not defined + } + }; + + device::AnalyticsEvent dp{ + .type = getDpType(ae.type()), + .timestamp = system_clock::time_point(seconds(ae.timestamp().seconds()) + nanoseconds(ae.timestamp().nanos())), + .exceptionWhat = ae.has_exceptionwhat() ? std::optional(ae.exceptionwhat()) : std::nullopt, + .newFrequency = ae.has_newfrequency() ? std::optional(ae.newfrequency()) : std::nullopt + }; + + return Datapoint(dp); + } + + /* + * TemperatureRrading + */ + Message makeMessage(const device::TemperatureReading &temperatureReading) { + Message msg; + auto msgTemperatureReading = msg.mutable_temperature_reading(); + + if (temperatureReading.cpu.has_value()) + msgTemperatureReading->set_cpu(temperatureReading.cpu.value()); + if (temperatureReading.mainboard.has_value()) + msgTemperatureReading->set_mainboard(temperatureReading.mainboard.value()); + if (temperatureReading.mics.has_value()) + msgTemperatureReading->set_mics(temperatureReading.mics.value()); + if (temperatureReading.opticalCamera.has_value()) + msgTemperatureReading->set_opticalcamera(temperatureReading.opticalCamera.value()); + + return msg; + } + + Datapoint makeTemperatureReading(const Message &message) + { + assertm(message.has_temperature_reading(), "TemperatureReading message expected"); + + auto tp = message.temperature_reading(); + + return Datapoint(device::TemperatureReading { + .cpu = tp.has_cpu() ? std::optional(tp.cpu()) : std::nullopt, + .mainboard = tp.has_mainboard() ? std::optional(tp.mainboard()) : std::nullopt, + .mics = tp.has_mics() ? std::optional(tp.mics()) : std::nullopt, + .opticalCamera = tp.has_opticalcamera() ? std::optional(tp.opticalcamera()) : std::nullopt, + }); + } + + /* + * ShutdownReason + */ + Message makeMessage(const std::string &shutdownReason) { + Message msg; + msg.set_shutdown_reason(shutdownReason); + return msg; + } + + Datapoint makeShutdownReason(const Message &message) + { + assertm(message.has_shutdown_reason(), "ShutdownReason message expected"); + return Datapoint(std::string(message.shutdown_reason())); + } + + /* + * Core API + */ + Message makeMessage(const Datapoint &datapoint) + { + verify_protobuf_version(); + + if (std::holds_alternative(datapoint)) { + std::cout<<"AnalyticsEvent"<(datapoint)); + } else + if (std::holds_alternative(datapoint)) { + std::cout<<"TemperatureReading"<(datapoint)); + } else + if (std::holds_alternative(datapoint)) { + std::cout<<"shutdown reason"<(datapoint)); + } else { + std::cout <<"[telemetry] unknown Datapoint type" < + +#include "reader.hpp" +#include "message.hpp" +#include "message.pb.h" + +#include + +namespace djm::telemetry { + + Reader::Reader(const std::filesystem::path& storageName) + { + fileBuf.open(storageName, std::ios::in | std::ios::binary); + + std::cout <<"[telemetry] reader( path: " <(message_sz); // TODO: recycle tmpBuf between calls + + fileBuf.sgetn(&tmpBuf[0], message_sz); + message.ParseFromArray(&tmpBuf[0], message_sz); + + return parseMessage(message); + } + + bool Reader::hasData() + { + auto streamsz = fileBuf.in_avail(); + return streamsz >= 2; // sizeof (uint16_t) + } + +} \ No newline at end of file diff --git a/src/telemetry/reader.hpp b/src/telemetry/reader.hpp new file mode 100644 index 0000000..d221764 --- /dev/null +++ b/src/telemetry/reader.hpp @@ -0,0 +1,21 @@ +#pragma once + +#include +#include + +#include "datapoint.hpp" + +namespace djm::telemetry { + + struct Reader { + Reader(const std::filesystem::path& storage); + ~Reader(); + + Datapoint parseNext(); + bool hasData(); + + private: + std::filebuf fileBuf; // TODO: use unique_ptr to get rid of destructor + }; + +} \ No newline at end of file diff --git a/src/telemetry/sink.cpp b/src/telemetry/sink.cpp new file mode 100644 index 0000000..30e4762 --- /dev/null +++ b/src/telemetry/sink.cpp @@ -0,0 +1,73 @@ +#include + +// uncomment to disable assert() +// #define NDEBUG +#include +#define assertm(exp, msg) assert(((void)msg, exp)) // Use (void) to silence unused warnings. + +#include "sink.hpp" +#include "message.hpp" +#include "message.pb.h" + +namespace djm::telemetry { + + Sink::Sink(const std::filesystem::path& storageName) + : fileBufMutex{std::make_shared()} + { + auto newFb = new std::filebuf{}; + if (!newFb) + throw std::runtime_error("Not enought memmory"); + + auto delFb = [](std::filebuf *fb) { + fb->close(); + delete fb; + }; + + fileBuf = std::shared_ptr(newFb, delFb); + + // https://cplusplus.com/reference/ostream/ostream/ostream/ + fileBuf->open(storageName, std::ios::out | /* std::ios::app | */ std::ios::binary); + + std::cout <<"[telemetry] sink( path: " <(fileBuf.get())} + , CACHE_SZ{cache_sz} + { + + } + + void Sink::Writer::capture(const Datapoint &datapoint) + { + auto message = makeMessage(datapoint); + auto message_sz = message.ByteSizeLong(); + + std::lock_guard guard(*fileBufMutex.get()); + + if (message_sz > UINT16_MAX) { + assert("Serialized message is too looong"); + return; // Will be droped in production + } + + auto os = outStream.get(); + *os <<(uint8_t)(message_sz & 0xFF) <<(uint8_t)(message_sz >>8 & 0xFF); + + message.SerializeToOstream(os); + + if (++msg_cntr >= CACHE_SZ) { + // TODO: loack_guard should be place here + // TODO: replace ostream with locally allocated buffer + msg_cntr = 0; + outStream->flush(); + } + } + +} \ No newline at end of file diff --git a/src/telemetry/sink.hpp b/src/telemetry/sink.hpp new file mode 100644 index 0000000..aa0a534 --- /dev/null +++ b/src/telemetry/sink.hpp @@ -0,0 +1,35 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "datapoint.hpp" + +namespace djm::telemetry { + + struct Sink { + struct Writer { + Writer(const Sink &sink, unsigned cache_sz); + void capture(const Datapoint &datapoint); + + private: + std::shared_ptr fileBuf; + std::shared_ptr fileBufMutex; + + std::unique_ptr outStream; + const unsigned CACHE_SZ; + unsigned msg_cntr = 0; + }; + + Sink(const std::filesystem::path& storage); + Writer makeWriter(unsigned cache_sz =0); + + private: + std::shared_ptr fileBuf; + std::shared_ptr fileBufMutex; + }; + +} \ No newline at end of file diff --git a/src/tests/data/test_telemetry.cpp b/src/tests/data/test_telemetry.cpp new file mode 100644 index 0000000..85214c9 --- /dev/null +++ b/src/tests/data/test_telemetry.cpp @@ -0,0 +1,253 @@ +#include +#include +#include +#include + +#include "datapoint/temperature_reading.hpp" +#include "datapoint/analytics_event.hpp" +#include "telemetry/sink.hpp" +#include "telemetry/reader.hpp" + +/* + * Length Delimitted Protobuf + */ +const auto storage = std::filesystem::path("telemetry.test.ldp"); + +namespace djm { + const device::TemperatureReading tr1{ + .cpu = 12.3, + .mainboard = std::nullopt, + .mics = 45.6, + .opticalCamera = 67.8, + }; + + const device::TemperatureReading tr2{ + .cpu = 9000.3, + .mainboard = 77.33, + .mics = 45.6, + .opticalCamera = std::nullopt, + }; + + const device::AnalyticsEvent ae1{ + .type = device::AnalyticsEvent::Type::VideoSaved, + .timestamp = std::chrono::system_clock::now(), + .exceptionWhat = std::nullopt, + .newFrequency = std::nullopt, + }; + + const device::AnalyticsEvent ae2{ + .type = device::AnalyticsEvent::Type::ExceptionThrown, + .timestamp = std::chrono::system_clock::now(), + .exceptionWhat = "Test exception message", + .newFrequency = std::nullopt, + }; + + const device::AnalyticsEvent ae3{ + .type = device::AnalyticsEvent::Type::ImagingFrequencyChanged, + .timestamp = std::chrono::system_clock::now(), + .exceptionWhat = std::nullopt, + .newFrequency = 33.333, + }; +} + +/////////////////////////////////////////////////////////////////////////////// + +TEST(Telemetry, datapoint) { + using namespace djm; + + auto sink = telemetry::Sink(storage); + auto writer = sink.makeWriter(/* no bufferization */); + auto reader = telemetry::Reader(storage); + + // Capture and Parse + writer.capture(device::TemperatureReading{/* empty defaults */}); + const auto datapoint = reader.parseNext(); + + // Analyze datapoint + ASSERT_EQ(std::holds_alternative (datapoint), false); + ASSERT_ANY_THROW( std::get (datapoint)); + + ASSERT_EQ(std::holds_alternative(datapoint), true); + ASSERT_NO_THROW( std::get(datapoint)); + + ASSERT_EQ(device::TemperatureReading{}.fmtDebug(), std::get(datapoint).fmtDebug()); +} + +/////////////////////////////////////////////////////////////////////////////// + +TEST(Telemetry, serialIo) { + using namespace djm; + + const std::string shutdown_reason("end of testcase"); + + // Serialize + auto sink = telemetry::Sink(storage); + auto writer = sink.makeWriter(); + writer.capture(tr1); + writer.capture(tr2); + writer.capture(tr2); // <<-- store same event twice + writer.capture(ae1); + writer.capture(ae2); + writer.capture(ae3); + writer.capture(shutdown_reason); + + // Deserialize + auto reader = telemetry::Reader(storage); + + ASSERT_EQ(reader.hasData(), true); + auto tr1_deserialized = std::get(reader.parseNext()); + auto tr2_deserialized = std::get(reader.parseNext()); + auto tr3_deserialized = std::get(reader.parseNext()); + auto ae1_deserialized = std::get (reader.parseNext()); + auto ae2_deserialized = std::get (reader.parseNext()); + auto ae3_deserialized = std::get (reader.parseNext()); + auto rsn_deserialized = std::get (reader.parseNext()); + ASSERT_EQ(reader.hasData(), false); + + ASSERT_EQ(tr1.fmtDebug(), tr1_deserialized.fmtDebug()); + ASSERT_NE(tr1.fmtDebug(), tr2_deserialized.fmtDebug()); // first != second + ASSERT_EQ(tr2.fmtDebug(), tr2_deserialized.fmtDebug()); + ASSERT_EQ(tr2.fmtDebug(), tr3_deserialized.fmtDebug()); + + ASSERT_EQ(ae1.fmtDebug(), ae1_deserialized.fmtDebug()); + ASSERT_EQ(ae2.fmtDebug(), ae2_deserialized.fmtDebug()); + ASSERT_EQ(ae3.fmtDebug(), ae3_deserialized.fmtDebug()); + + ASSERT_EQ(shutdown_reason, rsn_deserialized); +} + +/////////////////////////////////////////////////////////////////////////////// + +TEST(Telemetry, mixedIo) { + using namespace djm; + + auto sink = telemetry::Sink(storage); + auto writer = sink.makeWriter(/* no bufferization */); + auto reader = telemetry::Reader(storage); + + /* + * first message + */ + ASSERT_EQ(reader.hasData(), false); + writer.capture(tr1); // <<-- write + ASSERT_EQ(reader.hasData(), true); + + auto tr1_deserialized = std::get(reader.parseNext()); + ASSERT_EQ(tr1.fmtDebug(), tr1_deserialized.fmtDebug()); + + /* + * second message + */ + ASSERT_EQ(reader.hasData(), false); + writer.capture(ae1); // <<-- write + ASSERT_EQ(reader.hasData(), true); + + auto ae1_deserialized = std::get(reader.parseNext()); + ASSERT_EQ(ae1.fmtDebug(), ae1_deserialized.fmtDebug()); +} + +/////////////////////////////////////////////////////////////////////////////// + +TEST(Telemetry, buferization) { + using namespace djm; + + constexpr auto CACHE_SZ = 5; // amount of buffered (cached) messages + constexpr auto REMINDER = 3; // must be smaller than CACHE_SZ + + { + // Capture events + auto sink = telemetry::Sink(storage); + auto writer = sink.makeWriter(CACHE_SZ); + + for (int i=0; i < CACHE_SZ + REMINDER; i++) + writer.capture(tr1); + + + // Read events + auto reader = telemetry::Reader(storage); + int cntr = 0; + while (reader.hasData()) { + reader.parseNext(); + cntr++; + } + + // Only first %CACHE_SZ% messages expected to be stored on disk, + // remaining %REMINDER% amount of messages - only caputred (aka) buffered + ASSERT_EQ(cntr, CACHE_SZ); + } // <-- RAII shall trigger flash remaning data + + auto reader = telemetry::Reader(storage); + int cntr = 0; + while (reader.hasData()) { + reader.parseNext(); + cntr++; + } + + ASSERT_EQ(cntr, CACHE_SZ + REMINDER); +} + +/////////////////////////////////////////////////////////////////////////////// + +TEST(Telemetry, multithreading) { + using namespace djm; + + auto sink = telemetry::Sink(storage); + constexpr auto MSG_AMOUNT = 100; + + auto eventProducer = [](telemetry::Sink::Writer &&writer, const device::AnalyticsEvent& event, int cntr) { + while (cntr-->0) + writer.capture(event); + }; + + std::thread ep1(eventProducer, sink.makeWriter(), ae1, MSG_AMOUNT); + std::thread ep2(eventProducer, sink.makeWriter(), ae2, MSG_AMOUNT); + ep1.join(); + ep2.join(); + + auto reader = telemetry::Reader(storage); + int parsed_cntr = 0; + while (reader.hasData()) { + if (std::holds_alternative (reader.parseNext())) + parsed_cntr++; + } + + ASSERT_EQ(parsed_cntr, MSG_AMOUNT *2); +} + +/////////////////////////////////////////////////////////////////////////////// + +TEST(Telemetry, shared) { + using namespace djm; + + auto getWriter = []() { + auto sink = telemetry::Sink(storage); + return sink.makeWriter(); // <<-- oh no! sink goes out of scope + }; + + auto writer = getWriter(); + writer.capture(tr1); + writer.capture(ae1); + + auto reader = telemetry::Reader(storage); + auto tr1_deserialized = std::get(reader.parseNext()); + auto ae1_deserialized = std::get (reader.parseNext()); + + ASSERT_EQ(tr1.fmtDebug(), tr1_deserialized.fmtDebug()); + ASSERT_EQ(ae1.fmtDebug(), ae1_deserialized.fmtDebug()); +} + +/////////////////////////////////////////////////////////////////////////////// + +TEST(Telemetry, unknownMessageType) { + using namespace djm; + + auto sink = telemetry::Sink(storage); + auto writer = sink.makeWriter(/* no bufferization */); + auto reader = telemetry::Reader(storage); + + ASSERT_EQ(reader.hasData(), false); + writer.capture(telemetry::Datapoint(/*empty aka default aka unknown*/)); + ASSERT_EQ(reader.hasData(), true); + + ASSERT_NO_THROW(std::get(reader.parseNext())); +} \ No newline at end of file diff --git a/src/tests/data/test_temperature_reading.cpp b/src/tests/data/test_temperature_reading.cpp new file mode 100644 index 0000000..dc199b8 --- /dev/null +++ b/src/tests/data/test_temperature_reading.cpp @@ -0,0 +1,18 @@ +#include +#include +#include + +#include "datapoint/temperature_reading.hpp" + +TEST(TemperatureReading, fmtDebug) { + using djm::device::TemperatureReading; + + TemperatureReading event{ + .cpu = 12.3, + .mainboard = std::nullopt, + .mics = 45.6, + .opticalCamera = 67.8, + }; + + ASSERT_EQ(event.fmtDebug(), "cpu: 12.3 mics: 45.6 opticalCamera: 67.8"); +}