This thread has been locked.
If you have a related question, please click the "Ask a related question" button in the top right corner. The newly created question will be automatically linked to this question.
我在rk3588芯片上通过spi接口挂载了两路tcan4550芯片
这是相应的dts
&spi0 { status = "okay"; pinctrl-names = "default"; pinctrl-0 = <&spi0m3_cs0 &spi0m3_pins>; num-cs = <1>; max-freq = <18000000>; tcan4x5x0: tcan4x5x@0{ compatible = "ti,tcan4x5x"; status = "okay"; reg = <0>; #address-cells = <1>; #size-cells = <1>; spi-max-frequency = <18000000>; pinctrl-0 = <&can0_int>; interrupt-parent = <&gpio3>; interrupts = <RK_PB6 IRQ_TYPE_LEVEL_LOW>; bosch,mram-cfg = <0x0 0 0 16 0 0 1 1>; reset-gpios = <&gpio4 RK_PC3 GPIO_ACTIVE_HIGH>; spi-msb-first; }; }; &spi3 { status = "okay"; pinctrl-names = "default"; pinctrl-0 = <&spi3m3_cs0 &spi3m3_pins>; num-cs = <1>; max-freq = <18000000>; tcan4x5x3: tcan4x5x@0{ compatible = "ti,tcan4x5x"; status = "okay"; reg = <0>; #address-cells = <1>; #size-cells = <1>; pinctrl-0 = <&can1_int>; interrupt-parent = <&gpio3>; interrupts = <RK_PB7 IRQ_TYPE_LEVEL_LOW>; spi-max-frequency = <18000000>; bosch,mram-cfg = <0x0 0 0 16 0 0 1 1>; reset-gpios = <&gpio3 RK_PA6 GPIO_ACTIVE_HIGH>; spi-msb-first; }; };
import socket import struct import can import time import threading import os # CAN 接口配置 CAN_TX_INTERFACE = 'can0' # 根据实际接口更改 CAN_RX_INTERFACE = 'can1' # 根据实际接口更改 CAN_BITRATE = 500000 CANFD_BITRATE = 5000000 DATA_LEN = 64 DATA_RLEN = DATA_LEN + 8 # 全局变量 running = True total_sent = 0 total_received = 0 # 发送线程 def send_thread(): global total_sent can_id = 0x123 can_dlc = DATA_LEN flags = 0 os.system(f"sudo ip link set {CAN_TX_INTERFACE} down") os.system(f"sudo ip link set {CAN_TX_INTERFACE} up type can bitrate {CAN_BITRATE} dbitrate {CANFD_BITRATE} fd on") os.system(f"sudo ifconfig {CAN_TX_INTERFACE} txqueuelen 1000") can_socket = socket.socket(socket.AF_CAN, socket.SOCK_RAW, socket.CAN_RAW) can_socket.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FD_FRAMES, 1) can_socket.bind((CAN_TX_INTERFACE,)) data=bytearray(range(can_dlc)) can_fd_frame = struct.pack("=IB3x64s", can_id, can_dlc, data) while running: can_socket.send(can_fd_frame) total_sent += 1 time.sleep(0.0001) can_socket.close() # 接收线程 def receive_thread(): global total_received os.system(f"sudo ip link set {CAN_RX_INTERFACE} down") os.system(f"sudo ip link set {CAN_RX_INTERFACE} up type can bitrate {CAN_BITRATE} dbitrate {CANFD_BITRATE} fd on") os.system(f"sudo ifconfig {CAN_RX_INTERFACE} txqueuelen 1000") can_socket = socket.socket(socket.AF_CAN, socket.SOCK_RAW, socket.CAN_RAW) can_socket.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FD_FRAMES, 1) can_socket.bind((CAN_RX_INTERFACE,)) while running: frame, addr = can_socket.recvfrom(DATA_RLEN) can_id, can_dlc, data = struct.unpack("=IB3x64s", frame) if data: total_received += 1 # print(f"Recv: {message}") can_socket.close() # 计算和显示每秒的带宽和丢包率 def calculate_thread(): global total_sent, total_received while running: time.sleep(1) sent_in_sec = total_sent received_in_sec = total_received lost_in_sec = sent_in_sec - received_in_sec loss_rate = (lost_in_sec / sent_in_sec * 100) if sent_in_sec else 0 total_data = received_in_sec * DATA_LEN * 8 bandwidth = total_data / 1_000_000 # 转换为 Mbps print(f"Sent: {sent_in_sec}, Received: {received_in_sec}, Lost: {lost_in_sec}, Loss Rate: {loss_rate:.2f}%, Bandwidth: {bandwidth:.2f} Mbps") total_sent = 0 # 重置计数器 total_received = 0 # 启动测试 def start_test(): threads = [] threads.append(threading.Thread(target=send_thread)) threads.append(threading.Thread(target=receive_thread)) threads.append(threading.Thread(target=calculate_thread)) for thread in threads: thread.start() for thread in threads: thread.join() if __name__ == "__main__": try: start_test() except KeyboardInterrupt: running = False print("Test stopped.")