我在rk3588芯片上通过spi接口挂载了两路tcan4550芯片
这是相应的dts
&spi0 {
status = "okay";
pinctrl-names = "default";
pinctrl-0 = <&spi0m3_cs0 &spi0m3_pins>;
num-cs = <1>;
max-freq = <18000000>;
tcan4x5x0: tcan4x5x@0{
compatible = "ti,tcan4x5x";
status = "okay";
reg = <0>;
#address-cells = <1>;
#size-cells = <1>;
spi-max-frequency = <18000000>;
pinctrl-0 = <&can0_int>;
interrupt-parent = <&gpio3>;
interrupts = <RK_PB6 IRQ_TYPE_LEVEL_LOW>;
bosch,mram-cfg = <0x0 0 0 16 0 0 1 1>;
reset-gpios = <&gpio4 RK_PC3 GPIO_ACTIVE_HIGH>;
spi-msb-first;
};
};
&spi3 {
status = "okay";
pinctrl-names = "default";
pinctrl-0 = <&spi3m3_cs0 &spi3m3_pins>;
num-cs = <1>;
max-freq = <18000000>;
tcan4x5x3: tcan4x5x@0{
compatible = "ti,tcan4x5x";
status = "okay";
reg = <0>;
#address-cells = <1>;
#size-cells = <1>;
pinctrl-0 = <&can1_int>;
interrupt-parent = <&gpio3>;
interrupts = <RK_PB7 IRQ_TYPE_LEVEL_LOW>;
spi-max-frequency = <18000000>;
bosch,mram-cfg = <0x0 0 0 16 0 0 1 1>;
reset-gpios = <&gpio3 RK_PA6 GPIO_ACTIVE_HIGH>;
spi-msb-first;
};
};
import socket
import struct
import can
import time
import threading
import os
# CAN 接口配置
CAN_TX_INTERFACE = 'can0' # 根据实际接口更改
CAN_RX_INTERFACE = 'can1' # 根据实际接口更改
CAN_BITRATE = 500000
CANFD_BITRATE = 5000000
DATA_LEN = 64
DATA_RLEN = DATA_LEN + 8
# 全局变量
running = True
total_sent = 0
total_received = 0
# 发送线程
def send_thread():
global total_sent
can_id = 0x123
can_dlc = DATA_LEN
flags = 0
os.system(f"sudo ip link set {CAN_TX_INTERFACE} down")
os.system(f"sudo ip link set {CAN_TX_INTERFACE} up type can bitrate {CAN_BITRATE} dbitrate {CANFD_BITRATE} fd on")
os.system(f"sudo ifconfig {CAN_TX_INTERFACE} txqueuelen 1000")
can_socket = socket.socket(socket.AF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
can_socket.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FD_FRAMES, 1)
can_socket.bind((CAN_TX_INTERFACE,))
data=bytearray(range(can_dlc))
can_fd_frame = struct.pack("=IB3x64s", can_id, can_dlc, data)
while running:
can_socket.send(can_fd_frame)
total_sent += 1
time.sleep(0.0001)
can_socket.close()
# 接收线程
def receive_thread():
global total_received
os.system(f"sudo ip link set {CAN_RX_INTERFACE} down")
os.system(f"sudo ip link set {CAN_RX_INTERFACE} up type can bitrate {CAN_BITRATE} dbitrate {CANFD_BITRATE} fd on")
os.system(f"sudo ifconfig {CAN_RX_INTERFACE} txqueuelen 1000")
can_socket = socket.socket(socket.AF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
can_socket.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FD_FRAMES, 1)
can_socket.bind((CAN_RX_INTERFACE,))
while running:
frame, addr = can_socket.recvfrom(DATA_RLEN)
can_id, can_dlc, data = struct.unpack("=IB3x64s", frame)
if data:
total_received += 1
# print(f"Recv: {message}")
can_socket.close()
# 计算和显示每秒的带宽和丢包率
def calculate_thread():
global total_sent, total_received
while running:
time.sleep(1)
sent_in_sec = total_sent
received_in_sec = total_received
lost_in_sec = sent_in_sec - received_in_sec
loss_rate = (lost_in_sec / sent_in_sec * 100) if sent_in_sec else 0
total_data = received_in_sec * DATA_LEN * 8
bandwidth = total_data / 1_000_000 # 转换为 Mbps
print(f"Sent: {sent_in_sec}, Received: {received_in_sec}, Lost: {lost_in_sec}, Loss Rate: {loss_rate:.2f}%, Bandwidth: {bandwidth:.2f} Mbps")
total_sent = 0 # 重置计数器
total_received = 0
# 启动测试
def start_test():
threads = []
threads.append(threading.Thread(target=send_thread))
threads.append(threading.Thread(target=receive_thread))
threads.append(threading.Thread(target=calculate_thread))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
if __name__ == "__main__":
try:
start_test()
except KeyboardInterrupt:
running = False
print("Test stopped.")


