Skip to content

DECODE_CAN_MESSAGE

DECODE_CAN a CAN message. DECODE_CAN a CAN message using the provided databse. Params: dbc : Stateful The database to use for decoding the message. messages : Stateful The message to DECODE_CAN. Must be a can.Message object. ignore_undefined_id_error : bool If True, ignore undefined id error. Default is False. Returns: DataFrame : DataFrame Return dataframe containing the decoded message.
Python Code
from flojoy import flojoy, Stateful, DataFrame
import pandas as pd
import logging
from cantools.database import namedsignalvalue


@flojoy(deps={"python-can": "4.3.1", "cantools": "39.4.2"})
def DECODE_CAN_MESSAGE(
    dbc: Stateful,
    messages: Stateful,
    ignore_undefined_id_error: bool = False,
) -> DataFrame:
    """DECODE_CAN a CAN message.

    DECODE_CAN a CAN message using the provided databse.

    Parameters
    ----------
    dbc : Stateful
        The database to use for decoding the message.
    messages : Stateful
        The message to DECODE_CAN. Must be a can.Message object.
    ignore_undefined_id_error : bool
        If True, ignore undefined id error. Default is False.

    Returns
    -------
    DataFrame : DataFrame
        Return dataframe containing the decoded message.
    """

    db = dbc.obj
    messages = messages.obj
    decoded_messages = []

    for message in messages:
        try:
            decoded_message = db.decode_message(message.arbitration_id, message.data)
            # SG_ (signal) is not JSON encodable, so we need to convert it to its representation
            decoded_message = {
                key: value.name
                if isinstance(value, namedsignalvalue.NamedSignalValue)
                else value
                for key, value in decoded_message.items()
            }
            decoded_message["timestamp"] = message.timestamp
            decoded_messages.append(decoded_message)
        except Exception as err:
            logging.error(f"Error decoding message: {err}")
            if ignore_undefined_id_error:
                continue
            raise Exception(f"Error decoding message: {err}")

    return DataFrame(df=pd.DataFrame(decoded_messages))

Find this Flojoy Block on GitHub

Example

Having problems with this example app? Join our Discord community and we will help you out!
React Flow mini map

This application demonstrates how to use multiple CAN blocks to connect to a PEAK-USB device and read messages from it. The PEAK-USB device is a USB-to-CAN interface that enables you to connect a computer to a CAN network. This device is used in this case to capture the messages of a particular sensor by filtering them directly at the kernel level, thus reducing the load on the CPU, and save those messages to a log file locally.

Once the app is done, the generated logs are exported to an S3 bucket to keep a record of the sensorโ€™s data for further analysis.

To replicate this application, you must connect the PEAK-USB to your computer and install the required driver (refer to the PEAK_CONNECT blocks for more information on the necessary driver for your platform). Then, simply specify the PEAK-USB channel in the required blocks, and this Flojoy application will log the messages received by the device!

Detecting channels A valuable block is the PEAK_DETECT_AVAILABLE_DEVICE. This block allows you to display the devices using the PCAN interface that are connected to your computer.