MQTT
Author: Lawrence Chan
CAN-to-InfluxDB Telemetry Pipeline via MQTT
Overview
This system reads data from a CAN bus, sends it through MQTT, verifies the packet, and stores it in InfluxDB.
Workflow
-
CAN Bus Data Collection
- Reads telemetry values such as motor speed, torque, voltage, current, temperatures, and fault states.
- Packages data into a
TelemetryDatastructure.
-
MQTT Transmission
- Publishes
TelemetryDatato the"telemetry"topic on the MQTT broker. - Ensures reliable delivery using QoS
AtLeastOnce.
- Publishes
-
MQTT Listener Verification
- Subscribes to the
"telemetry"topic. - Converts incoming MQTT payload bytes to UTF-8 strings.
- Deserializes JSON into
TelemetryData. - Compares received data against expected telemetry for verification.
- Returns
trueif verification succeeds.
- Subscribes to the
-
InfluxDB Storage
- On successful verification, telemetry is stored in InfluxDB.
- Each parameter (e.g., motor speed, voltage, fault flags) is stored as a separate measurement or field for time-series tracking.
Key Features
- Real-Time Monitoring: Data flows from CAN bus → MQTT → verification → InfluxDB.
- Error Handling: Logs failed subscriptions, parsing errors, or deserialization issues.
- Test Coverage: Includes asynchronous tests to ensure listener correctly receives and verifies messages.
- Extensible: Can add new telemetry fields or modify verification rules without disrupting the pipeline.
MQTT Listener Verification Summary
Function: verify_mqtt_listener
- Purpose: Verifies that an MQTT listener receives a specific telemetry message.
- Parameters:
telemetry_struct: TelemetryData– the expected telemetry data to verify against. - Returns:
bool–trueif the message is received and matches, otherwisefalse.
Workflow
-
Setup MQTT Client
- Creates an
AsyncClientwith a 5-second keep-alive. - Connects to
MQTT_HOSTandMQTT_PORT.
- Creates an
-
Subscribe to Topic
- Subscribes to
"telemetry"topic with QoSAtLeastOnce. - Logs an error and returns
falseif subscription fails.
- Subscribes to
-
Listen for Messages
- Polls the event loop for incoming packets.
- Timeout: 1 second.
- On
Publishpackets:- Converts payload bytes to a UTF-8 string.
- Deserializes JSON into
TelemetryData. - Returns
trueif it matchestelemetry_struct.
-
Error Handling
- Logs conversion or deserialization errors.
- Returns
falseif no matching message is received or timeout occurs.
Test: test_verify_mqtt_listener
- Purpose: Ensures
verify_mqtt_listenercorrectly detects the telemetry message.
Steps
-
Prepare Test Data
- Creates
TelemetryDatawith default values and states.
- Creates
-
Run Listener
- Initializes a Tokio runtime.
- Spawns
verify_mqtt_listenerasynchronously.
-
Send Test Message
- Sleeps for 500 ms to allow the listener to start.
- Sends the test telemetry message via
send_message.
-
Validate
- Waits for listener result.
- Asserts that the listener received the expected message.
- Logs failure if the message was not received.
Key Points
- Uses asynchronous MQTT client (
rumqttc::AsyncClient). - Handles message parsing, deserialization, and matching safely.
- Includes robust error logging for debugging.
- Test ensures integration works in a runtime environment.