Skip to content

SET_CAN_BUS_FILTER

Attach a message filter to a CAN bus connection. Setup a message filtering can be set up for each bus. The binary operation is as follow: <received_can_id> & can_mask == can_id & can_mask All messages that match at least one filter are returned. Where the interface supports it, this is carried out in the hardware or kernel layer - not in Python. A connection to the device is required. Use a CAN_CONNECT block to connect to a CAN device. Params: CAN_address : str The CAN bus address to attach the filter to. can_id : int The ID of the message to filter. can_mask : int Apply a binary mask to the ID and can_id. extended : bool If true, only matches messages where <received_is_extended> == extended. Returns: out : DataContainer Optional: None
Python Code
from flojoy import flojoy, DeviceConnectionManager, DataContainer
from typing import Optional
import can


@flojoy(deps={"python-can": "4.3.1"})
def SET_CAN_BUS_FILTER(
    CAN_address: str,
    can_id: int = 0x0000001,
    can_mask: int = 0xFFFFFFFF,
    extended: bool = False,
    default: Optional[DataContainer] = None,
) -> Optional[DataContainer]:
    """Attach a message filter to a CAN bus connection.

    Setup a message filtering can be set up for each bus.
    The binary operation is as follow: <received_can_id> & can_mask == can_id & can_mask
    All messages that match at least one filter are returned.

    Where the interface supports it, this is carried out in the hardware or kernel layer - not in Python.
    A connection to the device is required. Use a CAN_CONNECT block to connect to a CAN device.

    Parameters
    ----------
    CAN_address : str
        The CAN bus address to attach the filter to.
    can_id : int
        The ID of the message to filter.
    can_mask : int
        Apply a binary mask to the ID and can_id.
    extended : bool
        If true, only matches messages where <received_is_extended> == extended.

    Returns
    -------
    DataContainer
        Optional: None
    """
    connection: can.interface.Bus = DeviceConnectionManager.get_connection(
        CAN_address
    ).get_handle()

    connection.set_filters(
        [{"can_id": can_id, "can_mask": can_mask, "extended": extended}]
    )

    return None

Find this Flojoy Block on GitHub

Example

Having problems with this example app? Join our Discord community and we will help you out!
React Flow mini map

This application demonstrates how to use multiple CAN blocks to connect to a PEAK-USB device and read messages from it. The PEAK-USB device is a USB-to-CAN interface that enables you to connect a computer to a CAN network. This device is used in this case to capture the messages of a particular sensor by filtering them directly at the kernel level, thus reducing the load on the CPU, and save those messages to a log file locally.

Once the app is done, the generated logs are exported to an S3 bucket to keep a record of the sensor’s data for further analysis.

To replicate this application, you must connect the PEAK-USB to your computer and install the required driver (refer to the PEAK_CONNECT blocks for more information on the necessary driver for your platform). Then, simply specify the PEAK-USB channel in the required blocks, and this Flojoy application will log the messages received by the device!

Detecting channels A valuable block is the PEAK_DETECT_AVAILABLE_DEVICE. This block allows you to display the devices using the PCAN interface that are connected to your computer.