from dataclasses import dataclass import re from typing import List, Optional, Dict, DefaultDict, Tuple import time from enum import Enum import logging from collections import defaultdict from parser.topology_parser import TopoMappingParser from parser.transceiver_config_parser import TransceiverConfigParser from toolbox.opt_reg_access_tool import OptRegAccessTool from gpu.biren.exp_util import DevWhiteRiverExp from parser.ibias_rssi_map_parser import IbiasRssiMapParser from toolbox.regs_save_load_tool import LaneRegInfo from typing import NamedTuple, Optional from gpu.biren.entity_info import SYNC_RESET_RTMR import threading ROUTE_TABLE = { "786-1-oneta": [1, 2, 3, 4, 5, 6, 7, 8], "786-1-onetb": [1, 2, 3, 4, 5, 6, 7, 8], "786-1-onoc1": [2, 1, 4, 3, 6, 5, 8, 7], # c0 90 "786-1-onoc2": [4, 3, 2, 1, 8, 7, 6, 5], # c0 98 "786-1-onoc3": [8, 7, 6, 5, 4, 3, 2, 1], # c0 a0 "786-1-onoc4": [3, 4, 1, 2, 7, 8, 5, 6], # c0 a8 "786-1-onoc5": [6, 5, 8, 7, 2, 1, 4, 3], # c0 b0 "786-1-onoc6": [7, 8, 5, 6, 3, 4, 1, 2], # c0 b8 "786-1-onoc7": [5, 6, 7, 8, 1, 2, 3, 4], # c0 c0 "21436587": [2, 1, 4, 3, 6, 5, 8, 7], # c0 90 "14235867": [4, 3, 2, 1, 8, 7, 6, 5], # c0 98 "18273645": [8, 7, 6, 5, 4, 3, 2, 1], # c0 a0 "13245768": [3, 4, 1, 2, 7, 8, 5, 6], # c0 a8 "16253847": [6, 5, 8, 7, 2, 1, 4, 3], # c0 b0 "17283546": [7, 8, 5, 6, 3, 4, 1, 2], # c0 b8 "15263748": [5, 6, 7, 8, 1, 2, 3, 4] # c0 c0 } @dataclass class RetimerPrbsResult: rtmr_id: int rtmr_lane: int locked: bool rx_valid: bool err_count: int ber: float result: bool class PrbsTool: def __init__(self, local_bmc: DevWhiteRiverExp, remote_bmc: DevWhiteRiverExp, local_reg_tool: OptRegAccessTool, remote_reg_tool: OptRegAccessTool, route_name: str): self.local_bmc = local_bmc self.remote_bmc = remote_bmc self.local_reg_tool = local_reg_tool self.remote_reg_tool = remote_reg_tool self.route_name = route_name def ResetRetimerAndEnablePrbs(self, exp_id: int, rtmr_list: List[int], preset = 9): # 开始prbs测试 logging.info(f"----------step 1: reset retimer, rtmr: {rtmr_list}") self.ResetRetimer(exp_id, rtmr_list) time.sleep(1) logging.info("----------step 2: enable prbs") self.EnableTxPrbs(exp_id, rtmr_list, preset) def ResetRetimer(self, exp_id: int, rtmr_list: List[int]): isSuccess = True for rtmr in rtmr_list: if self.remote_bmc.SyncCmd06(exp_id, SYNC_RESET_RTMR, rtmr, 0) is None: isSuccess = False if self.remote_bmc != self.local_bmc: for rtmr in rtmr_list: if self.local_bmc.SyncCmd06(exp_id, SYNC_RESET_RTMR, rtmr, 0) is None: isSuccess = False logging.info(f'retimer reset: {isSuccess}') return isSuccess def EnableTxPrbsDebug(self, exp_id: int, rtmr_list: List[int], preset = 9): logging.info("EnablePrbs step 1") # 获取 inversion 配置 inversion = self.get_retimer_inv_new(exp_id, [0,1,2,3,4,5,6,7], False, self.route_name) # return for slot in [0,1,2,3,4,5,6,7]: self.local_reg_tool.tx_disable(exp_id, slot) if self.remote_bmc != self.local_bmc: self.remote_reg_tool.tx_disable(exp_id, slot) def task_local(): if self.remote_bmc != self.local_bmc: for i, rtmr in enumerate(rtmr_list): self.prbs_tx(self.local_bmc, exp_id, rtmr, 1, preset, 31, inversion[i], s1=0) def task_remote(): for i, rtmr in enumerate(rtmr_list): self.prbs_tx(self.remote_bmc, exp_id, rtmr, 1, preset, 31, inversion[i], s1=0) thread_local = threading.Thread(target=task_local, name=f"PRBS-TX-Local-{exp_id}") thread_remote = threading.Thread(target=task_remote, name=f"PRBS-TX-Remote-{exp_id}") thread_local.start() thread_remote.start() thread_local.join() thread_remote.join() for slot in [0,1,2,3,4,5,6,7]: self.local_reg_tool.tx_enable(exp_id, slot) if self.remote_bmc != self.local_bmc: self.remote_reg_tool.tx_enable(exp_id, slot) for i, rtmr in enumerate(rtmr_list): logging.info(f'-------prbs_rx {exp_id}, {rtmr}') self.prbs_rx(self.local_bmc, exp_id, rtmr) if self.remote_bmc != self.local_bmc: for i, rtmr in enumerate(rtmr_list): logging.info(f'-------prbs_rx {exp_id}, {rtmr}') self.prbs_rx(self.remote_bmc, exp_id, rtmr) return True def EnableTxPrbs(self, exp_id: int, rtmr_list: List[int], preset = 9): logging.info("EnablePrbs step 1") # 获取 inversion 配置 local_inv = self.get_retimer_inv(exp_id, [0,1,2,3,4,5,6,7], False, self.route_name) remote_inv = self.get_retimer_inv(exp_id, [0,1,2,3,4,5,6,7], True, self.route_name) logging.info(f'-------- local_inv: {local_inv}, remote_inv: {remote_inv}') for slot in [0,1,2,3,4,5,6,7]: self.local_reg_tool.tx_disable(exp_id, slot) if self.remote_bmc != self.local_bmc: self.remote_reg_tool.tx_disable(exp_id, slot) def task_local(): if self.remote_bmc != self.local_bmc: for i, rtmr in enumerate(rtmr_list): self.prbs_tx(self.local_bmc, exp_id, rtmr, 5, preset, 31, remote_inv[i], s1=0) def task_remote(): for i, rtmr in enumerate(rtmr_list): self.prbs_tx(self.remote_bmc, exp_id, rtmr, 5, preset, 31, local_inv[i], s1=0) thread_local = threading.Thread(target=task_local, name=f"PRBS-TX-Local-{exp_id}") thread_remote = threading.Thread(target=task_remote, name=f"PRBS-TX-Remote-{exp_id}") thread_local.start() thread_remote.start() thread_local.join() thread_remote.join() for slot in [0,1,2,3,4,5,6,7]: self.local_reg_tool.tx_enable(exp_id, slot) if self.remote_bmc != self.local_bmc: self.remote_reg_tool.tx_enable(exp_id, slot) for i, rtmr in enumerate(rtmr_list): logging.info(f'-------prbs_rx {exp_id}, {rtmr}') self.prbs_rx(self.local_bmc, exp_id, rtmr) if self.remote_bmc != self.local_bmc: for i, rtmr in enumerate(rtmr_list): logging.info(f'-------prbs_rx {exp_id}, {rtmr}') self.prbs_rx(self.remote_bmc, exp_id, rtmr) return True def prbs_check(self, exp_id: int, rtmr_list: List[int], reset:bool = False) -> List[RetimerPrbsResult]: msg = '' # logging.info(f'{self}.ver: {ver}') t_start = time.perf_counter() prbs_results: List[RetimerPrbsResult] = [] for i, rtmr in enumerate(rtmr_list): msg = self.prbs_check_(exp_id, rtmr, reset) prbs_result = self.parse_prbs_check_log(msg) if reset == False: self.print_prbs_check_results(prbs_result) prbs_results.extend(prbs_result) time.sleep(0.5) return prbs_results def get_retimer_inv_new(self, exp_id: int, slot_list: List[int], is_remote: bool, route): # assert len(slot_list) == 8 retimer_inv_table = { '786-1-oneta' : [ 0b0010000000000000, 0b0000000000010000, 0b0101000011110000, 0b1010000011110000], '786-1-onetb' : [ 0b0010000000000000, 0b0000000000010000, 0b0101000011110000, 0b1010000011110000], '786-1-onoc1' : [ 0b1000000010000000, 0b1110000000100000, 0b1111000001110000, 0b0100000011000000], '786-1-onoc2' : [ 0b0000000000100000, 0b0000000000010000, 0b0101000011110000, 0b0111000000100000], '786-1-onoc3' : [ 0b0000000000000000, 0b0000000000110000, 0b1110000010010000, 0b1100000001000000], '786-1-onoc4' : [ 0b1000000010000000, 0b0011000011110000, 0b1101000001010000, 0b0100000011000000], '786-1-onoc5' : [ 0b1110000000110000, 0b1000000010010000, 0b1101000001110000, 0b0110000011000000], '786-1-onoc6' : [ 0b0011000011100000, 0b1000000010010000, 0b1101000001110000, 0b0100000011100000], '786-1-onoc7' : [ 0b0000000000000000, 0b0010000000010000, 0b0011000001000000, 0b1100000001000000] } retimer_inv = retimer_inv_table[route] route_list = ROUTE_TABLE[route] binary_str = ' '.join(f'{x:016b}' for x in retimer_inv) logging.info(f'--------remote: {is_remote}, retimer_inv(add mzm_direction before): {binary_str}') logging.info(f'----------remote: {is_remote}, route_list: {route_list}') for slot_id in slot_list: logging.debug(f'slot_id: {slot_id}') if self.remote_reg_tool != self.local_reg_tool: # onet remote_mzm = int(self.remote_reg_tool.read_mzm_direction(exp_id, slot_id), 16) local_mzm = int(self.local_reg_tool.read_mzm_direction(exp_id, slot_id), 16) reversed_local = int(f'{local_mzm & 0xFF:08b}'[::-1], 2) mzm = remote_mzm logging.info(f"Remote MZM: {remote_mzm:08b}, {remote_mzm:02X}") logging.info(f"Local MZM: {local_mzm:08b}, {local_mzm:02X}") logging.info(f"XOR MZM: {mzm:08b}") else: #onoc mzm_str = self.local_reg_tool.read_mzm_direction(exp_id, slot_id) mzm = int(mzm_str, 16) # mzm = 0 mzm_inv_flag = bin(mzm)[2:].zfill(8) inv_flag = '' # for i in range(0, 8): # inv_flag += mzm_inv_flag[route_list[i] - 1] # logging.info(f'remote: {is_remote}, index: {route_list[i] - 1}, value: {mzm_inv_flag[route_list[i] - 1]}, inv_flag:{inv_flag}') inv_flag = mzm_inv_flag logging.info(f'-------- remote: {is_remote}, slot_id: {slot_id}, mzm: {mzm:02X}, mzm_inv_flag: {mzm_inv_flag}, adjust slot after: {inv_flag}') # rtmr 21 if inv_flag[7] == '1': logging.info(f'------- rtmr 21') mask_ = 1 << (slot_id) retimer_inv[1] ^= mask_ # rtmr 41 if inv_flag[6] == '1': logging.info(f'------- rtmr 41') mask_ = 1 << (slot_id) retimer_inv[3] ^= mask_ # rtmr 42 if inv_flag[5] == '1': logging.info(f'------- rtmr 42') mask_ = 1 << (slot_id + 8) retimer_inv[3] ^= mask_ # rtmr 22 if inv_flag[4] == '1': logging.info(f'------- rtmr 22') mask_ = 1 << (slot_id + 8) retimer_inv[1] ^= mask_ # rtmr 11 if inv_flag[3] == '1': logging.info(f'------- rtmr 11') mask_ = 1 << (slot_id) retimer_inv[0] ^= mask_ # rtmr 31 if inv_flag[2] == '1': logging.info(f'------- rtmr 31') mask_ = 1 << (slot_id) retimer_inv[2] ^= mask_ # rtmr 32 if inv_flag[1] == '1': logging.info(f'------- rtmr 32') mask_ = 1 << (slot_id + 8) retimer_inv[2] ^= mask_ # rtmr 12 if inv_flag[0] == '1': logging.info(f'------- rtmr 12') mask_ = 1 << (slot_id + 8) retimer_inv[0] ^= mask_ logging.debug(f'---retimer_inv: {retimer_inv}') binary_str = ' '.join(f'{x:016b}' for x in retimer_inv) logging.info(f'--------remote: {is_remote}, retimer_inv(manual before): {binary_str}') return retimer_inv def get_retimer_inv(self, exp_id: int, slot_list: List[int], is_remote: bool, route): retimer_inv_table = { '786-1-oneta' : [ 0b0010000000000000, 0b0000000000010000, 0b0101000011110000, 0b1010000011110000], '786-1-onetb' : [ 0b0010000000000000, 0b0000000000010000, 0b0101000011110000, 0b1010000011110000], '786-1-onoc1' : [ 0b1000000010000000, 0b1110000000100000, 0b1111000001110000, 0b0100000011000000], '786-1-onoc2' : [ 0b0000000000100000, 0b0000000000010000, 0b0101000011110000, 0b0111000000100000], '786-1-onoc3' : [ 0b0000000000000000, 0b0000000000110000, 0b1110000010010000, 0b1100000001000000], '786-1-onoc4' : [ 0b1000000010000000, 0b0011000011110000, 0b1101000001010000, 0b0100000011000000], '786-1-onoc5' : [ 0b1110000000110000, 0b1000000010010000, 0b1101000001110000, 0b0110000011000000], '786-1-onoc6' : [ 0b0011000011100000, 0b1000000010010000, 0b1101000001110000, 0b0100000011100000], '786-1-onoc7' : [ 0b0000000000000000, 0b0010000000010000, 0b0011000001000000, 0b1100000001000000] } retimer_inv = retimer_inv_table[route] route_list = ROUTE_TABLE[route] binary_str = ' '.join(f'{x:016b}' for x in retimer_inv) logging.info(f'--------remote: {is_remote}, retimer_inv(add mzm_direction before): {binary_str}') for slot_id in slot_list: logging.debug(f'slot_id: {slot_id}') if is_remote: mzm_str = self.remote_reg_tool.read_mzm_direction(exp_id, slot_id) else: mzm_str = self.local_reg_tool.read_mzm_direction(exp_id, slot_id) mzm = int(mzm_str, 16) # mzm = 0 mzm_inv_flag = bin(mzm)[2:].zfill(8) inv_flag = '' for i in range(0, 8): inv_flag += mzm_inv_flag[route_list[i] - 1] logging.debug(f'remote: {is_remote}, index: {route_list[i] - 1}, value: {mzm_inv_flag[route_list[i] - 1]}, inv_flag:{inv_flag}') logging.info(f'--------remote: {is_remote}, slot_id: {slot_id}, mzm: {mzm:02X}, mzm_inv_flag: {mzm_inv_flag}, adjust slot after: {inv_flag}') # rtmr 21 if inv_flag[7] == '1': logging.debug(f'------- rtmr 21') mask_ = 1 << (slot_id) retimer_inv[1] ^= mask_ # rtmr 41 if inv_flag[6] == '1': logging.debug(f'------- rtmr 41') mask_ = 1 << (slot_id) retimer_inv[3] ^= mask_ # rtmr 42 if inv_flag[5] == '1': logging.debug(f'------- rtmr 42') mask_ = 1 << (slot_id + 8) retimer_inv[3] ^= mask_ # rtmr 22 if inv_flag[4] == '1': logging.debug(f'------- rtmr 22') mask_ = 1 << (slot_id + 8) retimer_inv[1] ^= mask_ # rtmr 11 if inv_flag[3] == '1': logging.debug(f'------- rtmr 11') mask_ = 1 << (slot_id) retimer_inv[0] ^= mask_ # rtmr 31 if inv_flag[2] == '1': logging.debug(f'------- rtmr 31') mask_ = 1 << (slot_id) retimer_inv[2] ^= mask_ # rtmr 32 if inv_flag[1] == '1': logging.debug(f'------- rtmr 32') mask_ = 1 << (slot_id + 8) retimer_inv[2] ^= mask_ # rtmr 12 if inv_flag[0] == '1': logging.debug(f'------- rtmr 12') mask_ = 1 << (slot_id + 8) retimer_inv[0] ^= mask_ binary_str = ' '.join(f'{x:016b}' for x in retimer_inv) logging.info(f'--------remote: {is_remote}, retimer_inv(manual after): {binary_str}') return retimer_inv def prbs_tx(self, bmc, exp_id: int, rtmr_id: int, gen:int = 5, p:int = 8, prbs:int = 31, s0:int = 0x0, s1:int = 0x0): # cmd_ = f'rtmr {rtmr_id} prbs tx gen{gen} p{p} prbs{prbs} {hex(s0)} {hex(s1)} no no' cmd_ = f'rtmr {rtmr_id} prbs tx gen{gen} p{p} prbs{prbs} {hex(s0)} {hex(s1)}' logging.info(cmd_) bmc.CmdVendorCommand(exp_id, cmd_) def prbs_rx(self, bmc, exp_id: int, rtmr_id: int): cmd_ = f'rtmr {rtmr_id} prbs rx' logging.info(f'---cmd: {cmd_}') bmc.CmdVendorCommand(exp_id, cmd_) def prbs_check_(self, exp_id: int, rtmr_id: int, reset: bool = False): cmd_ = f'rtmr {rtmr_id} prbs check {"reset" if reset else ""}' logging.info(f'---------:{cmd_}') r = self.local_bmc.CmdVendorCommand(exp_id, cmd_, False) return r def parse_prbs_check_log(self, log_content: str) -> List[RetimerPrbsResult]: results = [] # 正则:只匹配 A00-A15,并忽略 Bxx 行 pattern = r"RTMR(\d+)\s+PRBS\s+A(\d{2}):\s+(RX-VALID|RX-INVAL),\s+(LOCKED|UNLOCK),\s+(\d+),\s+([+-]?\d*\.?\d+(?:[eE][+-]?\d+)?),\s+(\*?)([A-Z]+)" for line in log_content.strip().splitlines(): line = line.strip() if not line: continue match = re.match(pattern, line) if not match: # 如果是 Bxx 行或其他不匹配行,跳过(静默忽略) continue rt_id = int(match.group(1)) lane_num = int(match.group(2)) # A00 -> 0, A01 -> 1, ..., A15 -> 15 # 只保留 lane 0 ~ 15 if lane_num < 0 or lane_num > 15: continue rx_valid_str = match.group(3) locked_str = match.group(4) err_count = int(match.group(5)) ber = float(match.group(6)) status_type = match.group(8).upper() # SUCCESS or FAIL # 转换为布尔值 rx_valid = (rx_valid_str == "RX-VALID") locked = (locked_str == "LOCKED") result = (status_type == "SUCCESS") results.append( RetimerPrbsResult( rtmr_id=rt_id, rtmr_lane=lane_num, # 直接使用 0~15 的整数 locked=locked, rx_valid=rx_valid, err_count=err_count, ber=ber, result=result ) ) return results def print_prbs_check_results(self, results: List[RetimerPrbsResult]): print(f"{'RT_ID':^6} {'Lane':^6} {'Locked':^8} {'RX_Valid':^10} " f"{'err_count':^6} {'ber':^6} {'Result':^8}") print("-" * 60) for r in results: print(f"{r.rtmr_id:^6} {r.rtmr_lane:^6} " f"{'LOCKED' if r.locked else 'UNLOCK'}{'':^6} " f"{'VALID' if r.rx_valid else 'ERR'}{'':^8} " f"{r.err_count:^6} {r.ber:^6} " f"{'PASS' if r.result else 'FAILED':^8}")