This thread has been locked.

If you have a related question, please click the "Ask a related question" button in the top right corner. The newly created question will be automatically linked to this question.

TCAN4550: 在linux平台测试canfd(5Mbps带宽下)收发速率,只有0.17Mbps实际有效带宽

Part Number: TCAN4550

我在rk3588芯片上通过spi接口挂载了两路tcan4550芯片

这是相应的dts

&spi0 {
	status = "okay";
	pinctrl-names = "default";
	pinctrl-0 = <&spi0m3_cs0 &spi0m3_pins>;
	num-cs = <1>;
	max-freq = <18000000>;
	tcan4x5x0: tcan4x5x@0{
		compatible = "ti,tcan4x5x";
		status = "okay";
		reg = <0>;
		#address-cells = <1>;
		#size-cells = <1>;
		spi-max-frequency = <18000000>;
		pinctrl-0 = <&can0_int>;
		interrupt-parent = <&gpio3>;
		interrupts = <RK_PB6 IRQ_TYPE_LEVEL_LOW>;
		bosch,mram-cfg = <0x0 0 0 16 0 0 1 1>;
		reset-gpios = <&gpio4 RK_PC3 GPIO_ACTIVE_HIGH>;
		spi-msb-first;
	};
	

};

&spi3 {
	status = "okay";
	pinctrl-names = "default";
	pinctrl-0 = <&spi3m3_cs0 &spi3m3_pins>;
	num-cs = <1>;
	max-freq = <18000000>;

	tcan4x5x3: tcan4x5x@0{
		compatible = "ti,tcan4x5x";
		status = "okay";
		reg = <0>;
		#address-cells = <1>;
		#size-cells = <1>;
		pinctrl-0 = <&can1_int>;
		interrupt-parent = <&gpio3>;
		interrupts = <RK_PB7 IRQ_TYPE_LEVEL_LOW>;
		spi-max-frequency = <18000000>;
		bosch,mram-cfg = <0x0 0 0 16 0 0 1 1>;
		reset-gpios = <&gpio3 RK_PA6 GPIO_ACTIVE_HIGH>;
		spi-msb-first;
	};
};

这是收发测试脚本canfd-test.py
import socket
import struct
import can
import time
import threading
import os

# CAN 接口配置
CAN_TX_INTERFACE = 'can0'  # 根据实际接口更改
CAN_RX_INTERFACE = 'can1'  # 根据实际接口更改
CAN_BITRATE = 500000
CANFD_BITRATE = 5000000
DATA_LEN = 64
DATA_RLEN = DATA_LEN + 8
# 全局变量
running = True
total_sent = 0
total_received = 0

# 发送线程
def send_thread():
    global total_sent
    can_id = 0x123
    can_dlc = DATA_LEN
    flags = 0
    os.system(f"sudo ip link set {CAN_TX_INTERFACE} down")
    os.system(f"sudo ip link set {CAN_TX_INTERFACE} up type can bitrate {CAN_BITRATE} dbitrate {CANFD_BITRATE} fd on")
    os.system(f"sudo ifconfig {CAN_TX_INTERFACE} txqueuelen 1000")
    can_socket = socket.socket(socket.AF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
    can_socket.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FD_FRAMES, 1)
    can_socket.bind((CAN_TX_INTERFACE,))
    data=bytearray(range(can_dlc))
    can_fd_frame = struct.pack("=IB3x64s", can_id, can_dlc, data)
    while running:
        can_socket.send(can_fd_frame)
        total_sent += 1
        time.sleep(0.0001)
    can_socket.close()

# 接收线程
def receive_thread():
    global total_received
    os.system(f"sudo ip link set {CAN_RX_INTERFACE} down")
    os.system(f"sudo ip link set {CAN_RX_INTERFACE} up type can bitrate {CAN_BITRATE} dbitrate {CANFD_BITRATE} fd on")
    os.system(f"sudo ifconfig {CAN_RX_INTERFACE} txqueuelen 1000")
    can_socket = socket.socket(socket.AF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
    can_socket.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FD_FRAMES, 1)
    can_socket.bind((CAN_RX_INTERFACE,))
    while running:
        frame, addr = can_socket.recvfrom(DATA_RLEN)
        can_id, can_dlc, data = struct.unpack("=IB3x64s", frame)
        if data:
            total_received += 1
            # print(f"Recv: {message}")
    can_socket.close()

# 计算和显示每秒的带宽和丢包率
def calculate_thread():
    global total_sent, total_received
    while running:
        time.sleep(1)
        sent_in_sec = total_sent
        received_in_sec = total_received
        lost_in_sec = sent_in_sec - received_in_sec
        loss_rate = (lost_in_sec / sent_in_sec * 100) if sent_in_sec else 0
        total_data = received_in_sec * DATA_LEN * 8
        bandwidth = total_data / 1_000_000  # 转换为 Mbps
        print(f"Sent: {sent_in_sec}, Received: {received_in_sec}, Lost: {lost_in_sec}, Loss Rate: {loss_rate:.2f}%, Bandwidth: {bandwidth:.2f} Mbps")
        total_sent = 0  # 重置计数器
        total_received = 0

# 启动测试
def start_test():
    threads = []
    threads.append(threading.Thread(target=send_thread))
    threads.append(threading.Thread(target=receive_thread))
    threads.append(threading.Thread(target=calculate_thread))

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

if __name__ == "__main__":
    try:
        start_test()
    except KeyboardInterrupt:
        running = False
        print("Test stopped.")


测试结果如下: