telemetry/tests/data/test_telemetry.cpp
2025-09-24 19:52:20 +00:00

253 lines
7.6 KiB
C++

#include <gtest/gtest.h>
#include <chrono>
#include <optional>
#include <iostream>
#include "datapoint/temperature_reading.hpp"
#include "datapoint/analytics_event.hpp"
#include "telemetry/sink.hpp"
#include "telemetry/reader.hpp"
/*
* Length Delimitted Protobuf
*/
const auto storage = std::filesystem::path("telemetry.test.ldp");
namespace djm {
const device::TemperatureReading tr1{
.cpu = 12.3,
.mainboard = std::nullopt,
.mics = 45.6,
.opticalCamera = 67.8,
};
const device::TemperatureReading tr2{
.cpu = 9000.3,
.mainboard = 77.33,
.mics = 45.6,
.opticalCamera = std::nullopt,
};
const device::AnalyticsEvent ae1{
.type = device::AnalyticsEvent::Type::VideoSaved,
.timestamp = std::chrono::system_clock::now(),
.exceptionWhat = std::nullopt,
.newFrequency = std::nullopt,
};
const device::AnalyticsEvent ae2{
.type = device::AnalyticsEvent::Type::ExceptionThrown,
.timestamp = std::chrono::system_clock::now(),
.exceptionWhat = "Test exception message",
.newFrequency = std::nullopt,
};
const device::AnalyticsEvent ae3{
.type = device::AnalyticsEvent::Type::ImagingFrequencyChanged,
.timestamp = std::chrono::system_clock::now(),
.exceptionWhat = std::nullopt,
.newFrequency = 33.333,
};
}
///////////////////////////////////////////////////////////////////////////////
TEST(Telemetry, datapoint) {
using namespace djm;
auto sink = telemetry::Sink(storage);
auto writer = sink.makeWriter(/* no bufferization */);
auto reader = telemetry::Reader(storage);
// Capture and Parse
writer.capture(device::TemperatureReading{/* empty defaults */});
const auto datapoint = reader.parseNext();
// Analyze datapoint
ASSERT_EQ(std::holds_alternative<device::AnalyticsEvent> (datapoint), false);
ASSERT_ANY_THROW( std::get<device::AnalyticsEvent> (datapoint));
ASSERT_EQ(std::holds_alternative<device::TemperatureReading>(datapoint), true);
ASSERT_NO_THROW( std::get<device::TemperatureReading>(datapoint));
ASSERT_EQ(device::TemperatureReading{}.fmtDebug(), std::get<device::TemperatureReading>(datapoint).fmtDebug());
}
///////////////////////////////////////////////////////////////////////////////
TEST(Telemetry, serialIo) {
using namespace djm;
const std::string shutdown_reason("end of testcase");
// Serialize
auto sink = telemetry::Sink(storage);
auto writer = sink.makeWriter();
writer.capture(tr1);
writer.capture(tr2);
writer.capture(tr2); // <<-- store same event twice
writer.capture(ae1);
writer.capture(ae2);
writer.capture(ae3);
writer.capture(shutdown_reason);
// Deserialize
auto reader = telemetry::Reader(storage);
ASSERT_EQ(reader.hasData(), true);
auto tr1_deserialized = std::get<device::TemperatureReading>(reader.parseNext());
auto tr2_deserialized = std::get<device::TemperatureReading>(reader.parseNext());
auto tr3_deserialized = std::get<device::TemperatureReading>(reader.parseNext());
auto ae1_deserialized = std::get<device::AnalyticsEvent> (reader.parseNext());
auto ae2_deserialized = std::get<device::AnalyticsEvent> (reader.parseNext());
auto ae3_deserialized = std::get<device::AnalyticsEvent> (reader.parseNext());
auto rsn_deserialized = std::get<std::string> (reader.parseNext());
ASSERT_EQ(reader.hasData(), false);
ASSERT_EQ(tr1.fmtDebug(), tr1_deserialized.fmtDebug());
ASSERT_NE(tr1.fmtDebug(), tr2_deserialized.fmtDebug()); // first != second
ASSERT_EQ(tr2.fmtDebug(), tr2_deserialized.fmtDebug());
ASSERT_EQ(tr2.fmtDebug(), tr3_deserialized.fmtDebug());
ASSERT_EQ(ae1.fmtDebug(), ae1_deserialized.fmtDebug());
ASSERT_EQ(ae2.fmtDebug(), ae2_deserialized.fmtDebug());
ASSERT_EQ(ae3.fmtDebug(), ae3_deserialized.fmtDebug());
ASSERT_EQ(shutdown_reason, rsn_deserialized);
}
///////////////////////////////////////////////////////////////////////////////
TEST(Telemetry, mixedIo) {
using namespace djm;
auto sink = telemetry::Sink(storage);
auto writer = sink.makeWriter(/* no bufferization */);
auto reader = telemetry::Reader(storage);
/*
* first message
*/
ASSERT_EQ(reader.hasData(), false);
writer.capture(tr1); // <<-- write
ASSERT_EQ(reader.hasData(), true);
auto tr1_deserialized = std::get<device::TemperatureReading>(reader.parseNext());
ASSERT_EQ(tr1.fmtDebug(), tr1_deserialized.fmtDebug());
/*
* second message
*/
ASSERT_EQ(reader.hasData(), false);
writer.capture(ae1); // <<-- write
ASSERT_EQ(reader.hasData(), true);
auto ae1_deserialized = std::get<device::AnalyticsEvent>(reader.parseNext());
ASSERT_EQ(ae1.fmtDebug(), ae1_deserialized.fmtDebug());
}
///////////////////////////////////////////////////////////////////////////////
TEST(Telemetry, buferization) {
using namespace djm;
constexpr auto CACHE_SZ = 5; // amount of buffered (cached) messages
constexpr auto REMINDER = 3; // must be smaller than CACHE_SZ
{
// Capture events
auto sink = telemetry::Sink(storage);
auto writer = sink.makeWriter(CACHE_SZ);
for (int i=0; i < CACHE_SZ + REMINDER; i++)
writer.capture(tr1);
// Read events
auto reader = telemetry::Reader(storage);
int cntr = 0;
while (reader.hasData()) {
reader.parseNext();
cntr++;
}
// Only first %CACHE_SZ% messages expected to be stored on disk,
// remaining %REMINDER% amount of messages - only caputred (aka) buffered
ASSERT_EQ(cntr, CACHE_SZ);
} // <-- RAII shall trigger flash remaning data
auto reader = telemetry::Reader(storage);
int cntr = 0;
while (reader.hasData()) {
reader.parseNext();
cntr++;
}
ASSERT_EQ(cntr, CACHE_SZ + REMINDER);
}
///////////////////////////////////////////////////////////////////////////////
TEST(Telemetry, multithreading) {
using namespace djm;
auto sink = telemetry::Sink(storage);
constexpr auto MSG_AMOUNT = 100;
auto eventProducer = [](telemetry::Sink::Writer &&writer, const device::AnalyticsEvent& event, int cntr) {
while (cntr-->0)
writer.capture(event);
};
std::thread ep1(eventProducer, sink.makeWriter(), ae1, MSG_AMOUNT);
std::thread ep2(eventProducer, sink.makeWriter(), ae2, MSG_AMOUNT);
ep1.join();
ep2.join();
auto reader = telemetry::Reader(storage);
int parsed_cntr = 0;
while (reader.hasData()) {
if (std::holds_alternative<device::AnalyticsEvent> (reader.parseNext()))
parsed_cntr++;
}
ASSERT_EQ(parsed_cntr, MSG_AMOUNT *2);
}
///////////////////////////////////////////////////////////////////////////////
TEST(Telemetry, shared) {
using namespace djm;
auto getWriter = []() {
auto sink = telemetry::Sink(storage);
return sink.makeWriter(); // <<-- oh no! sink goes out of scope
};
auto writer = getWriter();
writer.capture(tr1);
writer.capture(ae1);
auto reader = telemetry::Reader(storage);
auto tr1_deserialized = std::get<device::TemperatureReading>(reader.parseNext());
auto ae1_deserialized = std::get<device::AnalyticsEvent> (reader.parseNext());
ASSERT_EQ(tr1.fmtDebug(), tr1_deserialized.fmtDebug());
ASSERT_EQ(ae1.fmtDebug(), ae1_deserialized.fmtDebug());
}
///////////////////////////////////////////////////////////////////////////////
TEST(Telemetry, unknownMessageType) {
using namespace djm;
auto sink = telemetry::Sink(storage);
auto writer = sink.makeWriter(/* no bufferization */);
auto reader = telemetry::Reader(storage);
ASSERT_EQ(reader.hasData(), false);
writer.capture(telemetry::Datapoint(/*empty aka default aka unknown*/));
ASSERT_EQ(reader.hasData(), true);
ASSERT_NO_THROW(std::get<std::monostate>(reader.parseNext()));
}