You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

507 lines
20 KiB

from dataclasses import dataclass
import re
from typing import List, Optional, Dict, DefaultDict, Tuple
import time
from enum import Enum
import logging
from collections import defaultdict
from parser.topology_parser import TopoMappingParser
from parser.transceiver_config_parser import TransceiverConfigParser
from toolbox.opt_reg_access_tool import OptRegAccessTool
from gpu.biren.exp_util import DevWhiteRiverExp
from parser.ibias_rssi_map_parser import IbiasRssiMapParser
from toolbox.regs_save_load_tool import LaneRegInfo
from typing import NamedTuple, Optional
from gpu.biren.entity_info import SYNC_RESET_RTMR
import threading
ROUTE_TABLE = {
"786-1-oneta": [1, 2, 3, 4, 5, 6, 7, 8],
"786-1-onetb": [1, 2, 3, 4, 5, 6, 7, 8],
"786-1-onoc1": [2, 1, 4, 3, 6, 5, 8, 7], # c0 90
"786-1-onoc2": [4, 3, 2, 1, 8, 7, 6, 5], # c0 98
"786-1-onoc3": [8, 7, 6, 5, 4, 3, 2, 1], # c0 a0
"786-1-onoc4": [3, 4, 1, 2, 7, 8, 5, 6], # c0 a8
"786-1-onoc5": [6, 5, 8, 7, 2, 1, 4, 3], # c0 b0
"786-1-onoc6": [7, 8, 5, 6, 3, 4, 1, 2], # c0 b8
"786-1-onoc7": [5, 6, 7, 8, 1, 2, 3, 4], # c0 c0
"21436587": [2, 1, 4, 3, 6, 5, 8, 7], # c0 90
"14235867": [4, 3, 2, 1, 8, 7, 6, 5], # c0 98
"18273645": [8, 7, 6, 5, 4, 3, 2, 1], # c0 a0
"13245768": [3, 4, 1, 2, 7, 8, 5, 6], # c0 a8
"16253847": [6, 5, 8, 7, 2, 1, 4, 3], # c0 b0
"17283546": [7, 8, 5, 6, 3, 4, 1, 2], # c0 b8
"15263748": [5, 6, 7, 8, 1, 2, 3, 4] # c0 c0
}
@dataclass
class RetimerPrbsResult:
rtmr_id: int
rtmr_lane: int
locked: bool
rx_valid: bool
err_count: int
ber: float
result: bool
class PrbsTool:
def __init__(self, local_bmc: DevWhiteRiverExp, remote_bmc: DevWhiteRiverExp,
local_reg_tool: OptRegAccessTool, remote_reg_tool: OptRegAccessTool, route_name: str):
self.local_bmc = local_bmc
self.remote_bmc = remote_bmc
self.local_reg_tool = local_reg_tool
self.remote_reg_tool = remote_reg_tool
self.route_name = route_name
def ResetRetimerAndEnablePrbs(self, exp_id: int, rtmr_list: List[int], preset = 9):
# 开始prbs测试
logging.info(f"----------step 1: reset retimer, rtmr: {rtmr_list}")
self.ResetRetimer(exp_id, rtmr_list)
time.sleep(1)
logging.info("----------step 2: enable prbs")
self.EnableTxPrbs(exp_id, rtmr_list, preset)
def ResetRetimer(self, exp_id: int, rtmr_list: List[int]):
isSuccess = True
for rtmr in rtmr_list:
if self.remote_bmc.SyncCmd06(exp_id, SYNC_RESET_RTMR, rtmr, 0) is None: isSuccess = False
if self.remote_bmc != self.local_bmc:
for rtmr in rtmr_list:
if self.local_bmc.SyncCmd06(exp_id, SYNC_RESET_RTMR, rtmr, 0) is None: isSuccess = False
logging.info(f'retimer reset: {isSuccess}')
return isSuccess
def EnableTxPrbsDebug(self, exp_id: int, rtmr_list: List[int], preset = 9):
logging.info("EnablePrbs step 1")
# 获取 inversion 配置
inversion = self.get_retimer_inv_new(exp_id, [0,1,2,3,4,5,6,7], False, self.route_name)
# return
for slot in [0,1,2,3,4,5,6,7]:
self.local_reg_tool.tx_disable(exp_id, slot)
if self.remote_bmc != self.local_bmc:
self.remote_reg_tool.tx_disable(exp_id, slot)
def task_local():
if self.remote_bmc != self.local_bmc:
for i, rtmr in enumerate(rtmr_list):
self.prbs_tx(self.local_bmc, exp_id, rtmr, 1, preset, 31, inversion[i], s1=0)
def task_remote():
for i, rtmr in enumerate(rtmr_list):
self.prbs_tx(self.remote_bmc, exp_id, rtmr, 1, preset, 31, inversion[i], s1=0)
thread_local = threading.Thread(target=task_local, name=f"PRBS-TX-Local-{exp_id}")
thread_remote = threading.Thread(target=task_remote, name=f"PRBS-TX-Remote-{exp_id}")
thread_local.start()
thread_remote.start()
thread_local.join()
thread_remote.join()
for slot in [0,1,2,3,4,5,6,7]:
self.local_reg_tool.tx_enable(exp_id, slot)
if self.remote_bmc != self.local_bmc:
self.remote_reg_tool.tx_enable(exp_id, slot)
for i, rtmr in enumerate(rtmr_list):
logging.info(f'-------prbs_rx {exp_id}, {rtmr}')
self.prbs_rx(self.local_bmc, exp_id, rtmr)
if self.remote_bmc != self.local_bmc:
for i, rtmr in enumerate(rtmr_list):
logging.info(f'-------prbs_rx {exp_id}, {rtmr}')
self.prbs_rx(self.remote_bmc, exp_id, rtmr)
return True
def EnableTxPrbs(self, exp_id: int, rtmr_list: List[int], preset = 9):
logging.info("EnablePrbs step 1")
# 获取 inversion 配置
local_inv = self.get_retimer_inv(exp_id, [0,1,2,3,4,5,6,7], False, self.route_name)
remote_inv = self.get_retimer_inv(exp_id, [0,1,2,3,4,5,6,7], True, self.route_name)
logging.info(f'-------- local_inv: {local_inv}, remote_inv: {remote_inv}')
for slot in [0,1,2,3,4,5,6,7]:
self.local_reg_tool.tx_disable(exp_id, slot)
if self.remote_bmc != self.local_bmc:
self.remote_reg_tool.tx_disable(exp_id, slot)
def task_local():
if self.remote_bmc != self.local_bmc:
for i, rtmr in enumerate(rtmr_list):
self.prbs_tx(self.local_bmc, exp_id, rtmr, 5, preset, 31, remote_inv[i], s1=0)
def task_remote():
for i, rtmr in enumerate(rtmr_list):
self.prbs_tx(self.remote_bmc, exp_id, rtmr, 5, preset, 31, local_inv[i], s1=0)
thread_local = threading.Thread(target=task_local, name=f"PRBS-TX-Local-{exp_id}")
thread_remote = threading.Thread(target=task_remote, name=f"PRBS-TX-Remote-{exp_id}")
thread_local.start()
thread_remote.start()
thread_local.join()
thread_remote.join()
for slot in [0,1,2,3,4,5,6,7]:
self.local_reg_tool.tx_enable(exp_id, slot)
if self.remote_bmc != self.local_bmc:
self.remote_reg_tool.tx_enable(exp_id, slot)
for i, rtmr in enumerate(rtmr_list):
logging.info(f'-------prbs_rx {exp_id}, {rtmr}')
self.prbs_rx(self.local_bmc, exp_id, rtmr)
if self.remote_bmc != self.local_bmc:
for i, rtmr in enumerate(rtmr_list):
logging.info(f'-------prbs_rx {exp_id}, {rtmr}')
self.prbs_rx(self.remote_bmc, exp_id, rtmr)
return True
def prbs_check(self, exp_id: int, rtmr_list: List[int], reset:bool = False) -> List[RetimerPrbsResult]:
msg = ''
# logging.info(f'{self}.ver: {ver}')
t_start = time.perf_counter()
prbs_results: List[RetimerPrbsResult] = []
for i, rtmr in enumerate(rtmr_list):
msg = self.prbs_check_(exp_id, rtmr, reset)
prbs_result = self.parse_prbs_check_log(msg)
if reset == False:
self.print_prbs_check_results(prbs_result)
prbs_results.extend(prbs_result)
time.sleep(0.5)
return prbs_results
def get_retimer_inv_new(self, exp_id: int, slot_list: List[int], is_remote: bool, route):
# assert len(slot_list) == 8
retimer_inv_table = {
'786-1-oneta' : [
0b0010000000000000,
0b0000000000010000,
0b0101000011110000,
0b1010000011110000],
'786-1-onetb' : [
0b0010000000000000,
0b0000000000010000,
0b0101000011110000,
0b1010000011110000],
'786-1-onoc1' : [
0b1000000010000000,
0b1110000000100000,
0b1111000001110000,
0b0100000011000000],
'786-1-onoc2' : [
0b0000000000100000,
0b0000000000010000,
0b0101000011110000,
0b0111000000100000],
'786-1-onoc3' : [
0b0000000000000000,
0b0000000000110000,
0b1110000010010000,
0b1100000001000000],
'786-1-onoc4' : [
0b1000000010000000,
0b0011000011110000,
0b1101000001010000,
0b0100000011000000],
'786-1-onoc5' : [
0b1110000000110000,
0b1000000010010000,
0b1101000001110000,
0b0110000011000000],
'786-1-onoc6' : [
0b0011000011100000,
0b1000000010010000,
0b1101000001110000,
0b0100000011100000],
'786-1-onoc7' : [
0b0000000000000000,
0b0010000000010000,
0b0011000001000000,
0b1100000001000000]
}
retimer_inv = retimer_inv_table[route]
route_list = ROUTE_TABLE[route]
binary_str = ' '.join(f'{x:016b}' for x in retimer_inv)
logging.info(f'--------remote: {is_remote}, retimer_inv(add mzm_direction before): {binary_str}')
logging.info(f'----------remote: {is_remote}, route_list: {route_list}')
for slot_id in slot_list:
logging.debug(f'slot_id: {slot_id}')
if self.remote_reg_tool != self.local_reg_tool:
# onet
remote_mzm = int(self.remote_reg_tool.read_mzm_direction(exp_id, slot_id), 16)
local_mzm = int(self.local_reg_tool.read_mzm_direction(exp_id, slot_id), 16)
reversed_local = int(f'{local_mzm & 0xFF:08b}'[::-1], 2)
mzm = remote_mzm
logging.info(f"Remote MZM: {remote_mzm:08b}, {remote_mzm:02X}")
logging.info(f"Local MZM: {local_mzm:08b}, {local_mzm:02X}")
logging.info(f"XOR MZM: {mzm:08b}")
else:
#onoc
mzm_str = self.local_reg_tool.read_mzm_direction(exp_id, slot_id)
mzm = int(mzm_str, 16)
# mzm = 0
mzm_inv_flag = bin(mzm)[2:].zfill(8)
inv_flag = ''
# for i in range(0, 8):
# inv_flag += mzm_inv_flag[route_list[i] - 1]
# logging.info(f'remote: {is_remote}, index: {route_list[i] - 1}, value: {mzm_inv_flag[route_list[i] - 1]}, inv_flag:{inv_flag}')
inv_flag = mzm_inv_flag
logging.info(f'-------- remote: {is_remote}, slot_id: {slot_id}, mzm: {mzm:02X}, mzm_inv_flag: {mzm_inv_flag}, adjust slot after: {inv_flag}')
# rtmr 21
if inv_flag[7] == '1':
logging.info(f'------- rtmr 21')
mask_ = 1 << (slot_id)
retimer_inv[1] ^= mask_
# rtmr 41
if inv_flag[6] == '1':
logging.info(f'------- rtmr 41')
mask_ = 1 << (slot_id)
retimer_inv[3] ^= mask_
# rtmr 42
if inv_flag[5] == '1':
logging.info(f'------- rtmr 42')
mask_ = 1 << (slot_id + 8)
retimer_inv[3] ^= mask_
# rtmr 22
if inv_flag[4] == '1':
logging.info(f'------- rtmr 22')
mask_ = 1 << (slot_id + 8)
retimer_inv[1] ^= mask_
# rtmr 11
if inv_flag[3] == '1':
logging.info(f'------- rtmr 11')
mask_ = 1 << (slot_id)
retimer_inv[0] ^= mask_
# rtmr 31
if inv_flag[2] == '1':
logging.info(f'------- rtmr 31')
mask_ = 1 << (slot_id)
retimer_inv[2] ^= mask_
# rtmr 32
if inv_flag[1] == '1':
logging.info(f'------- rtmr 32')
mask_ = 1 << (slot_id + 8)
retimer_inv[2] ^= mask_
# rtmr 12
if inv_flag[0] == '1':
logging.info(f'------- rtmr 12')
mask_ = 1 << (slot_id + 8)
retimer_inv[0] ^= mask_
logging.debug(f'---retimer_inv: {retimer_inv}')
binary_str = ' '.join(f'{x:016b}' for x in retimer_inv)
logging.info(f'--------remote: {is_remote}, retimer_inv(manual before): {binary_str}')
return retimer_inv
def get_retimer_inv(self, exp_id: int, slot_list: List[int], is_remote: bool, route):
retimer_inv_table = {
'786-1-oneta' : [
0b0010000000000000,
0b0000000000010000,
0b0101000011110000,
0b1010000011110000],
'786-1-onetb' : [
0b0010000000000000,
0b0000000000010000,
0b0101000011110000,
0b1010000011110000],
'786-1-onoc1' : [
0b1000000010000000,
0b1110000000100000,
0b1111000001110000,
0b0100000011000000],
'786-1-onoc2' : [
0b0000000000100000,
0b0000000000010000,
0b0101000011110000,
0b0111000000100000],
'786-1-onoc3' : [
0b0000000000000000,
0b0000000000110000,
0b1110000010010000,
0b1100000001000000],
'786-1-onoc4' : [
0b1000000010000000,
0b0011000011110000,
0b1101000001010000,
0b0100000011000000],
'786-1-onoc5' : [
0b1110000000110000,
0b1000000010010000,
0b1101000001110000,
0b0110000011000000],
'786-1-onoc6' : [
0b0011000011100000,
0b1000000010010000,
0b1101000001110000,
0b0100000011100000],
'786-1-onoc7' : [
0b0000000000000000,
0b0010000000010000,
0b0011000001000000,
0b1100000001000000]
}
retimer_inv = retimer_inv_table[route]
route_list = ROUTE_TABLE[route]
binary_str = ' '.join(f'{x:016b}' for x in retimer_inv)
logging.info(f'--------remote: {is_remote}, retimer_inv(add mzm_direction before): {binary_str}')
for slot_id in slot_list:
logging.debug(f'slot_id: {slot_id}')
if is_remote:
mzm_str = self.remote_reg_tool.read_mzm_direction(exp_id, slot_id)
else:
mzm_str = self.local_reg_tool.read_mzm_direction(exp_id, slot_id)
mzm = int(mzm_str, 16)
# mzm = 0
mzm_inv_flag = bin(mzm)[2:].zfill(8)
inv_flag = ''
for i in range(0, 8):
inv_flag += mzm_inv_flag[route_list[i] - 1]
logging.debug(f'remote: {is_remote}, index: {route_list[i] - 1}, value: {mzm_inv_flag[route_list[i] - 1]}, inv_flag:{inv_flag}')
logging.info(f'--------remote: {is_remote}, slot_id: {slot_id}, mzm: {mzm:02X}, mzm_inv_flag: {mzm_inv_flag}, adjust slot after: {inv_flag}')
# rtmr 21
if inv_flag[7] == '1':
logging.debug(f'------- rtmr 21')
mask_ = 1 << (slot_id)
retimer_inv[1] ^= mask_
# rtmr 41
if inv_flag[6] == '1':
logging.debug(f'------- rtmr 41')
mask_ = 1 << (slot_id)
retimer_inv[3] ^= mask_
# rtmr 42
if inv_flag[5] == '1':
logging.debug(f'------- rtmr 42')
mask_ = 1 << (slot_id + 8)
retimer_inv[3] ^= mask_
# rtmr 22
if inv_flag[4] == '1':
logging.debug(f'------- rtmr 22')
mask_ = 1 << (slot_id + 8)
retimer_inv[1] ^= mask_
# rtmr 11
if inv_flag[3] == '1':
logging.debug(f'------- rtmr 11')
mask_ = 1 << (slot_id)
retimer_inv[0] ^= mask_
# rtmr 31
if inv_flag[2] == '1':
logging.debug(f'------- rtmr 31')
mask_ = 1 << (slot_id)
retimer_inv[2] ^= mask_
# rtmr 32
if inv_flag[1] == '1':
logging.debug(f'------- rtmr 32')
mask_ = 1 << (slot_id + 8)
retimer_inv[2] ^= mask_
# rtmr 12
if inv_flag[0] == '1':
logging.debug(f'------- rtmr 12')
mask_ = 1 << (slot_id + 8)
retimer_inv[0] ^= mask_
binary_str = ' '.join(f'{x:016b}' for x in retimer_inv)
logging.info(f'--------remote: {is_remote}, retimer_inv(manual after): {binary_str}')
return retimer_inv
def prbs_tx(self, bmc, exp_id: int, rtmr_id: int, gen:int = 5, p:int = 8, prbs:int = 31, s0:int = 0x0, s1:int = 0x0):
# cmd_ = f'rtmr {rtmr_id} prbs tx gen{gen} p{p} prbs{prbs} {hex(s0)} {hex(s1)} no no'
cmd_ = f'rtmr {rtmr_id} prbs tx gen{gen} p{p} prbs{prbs} {hex(s0)} {hex(s1)}'
logging.info(cmd_)
bmc.CmdVendorCommand(exp_id, cmd_)
def prbs_rx(self, bmc, exp_id: int, rtmr_id: int):
cmd_ = f'rtmr {rtmr_id} prbs rx'
logging.info(f'---cmd: {cmd_}')
bmc.CmdVendorCommand(exp_id, cmd_)
def prbs_check_(self, exp_id: int, rtmr_id: int, reset: bool = False):
cmd_ = f'rtmr {rtmr_id} prbs check {"reset" if reset else ""}'
logging.info(f'---------:{cmd_}')
r = self.local_bmc.CmdVendorCommand(exp_id, cmd_, False)
return r
def parse_prbs_check_log(self, log_content: str) -> List[RetimerPrbsResult]:
results = []
# 正则:只匹配 A00-A15,并忽略 Bxx 行
pattern = r"RTMR(\d+)\s+PRBS\s+A(\d{2}):\s+(RX-VALID|RX-INVAL),\s+(LOCKED|UNLOCK),\s+(\d+),\s+([+-]?\d*\.?\d+(?:[eE][+-]?\d+)?),\s+(\*?)([A-Z]+)"
for line in log_content.strip().splitlines():
line = line.strip()
if not line:
continue
match = re.match(pattern, line)
if not match:
# 如果是 Bxx 行或其他不匹配行,跳过(静默忽略)
continue
rt_id = int(match.group(1))
lane_num = int(match.group(2)) # A00 -> 0, A01 -> 1, ..., A15 -> 15
# 只保留 lane 0 ~ 15
if lane_num < 0 or lane_num > 15:
continue
rx_valid_str = match.group(3)
locked_str = match.group(4)
err_count = int(match.group(5))
ber = float(match.group(6))
status_type = match.group(8).upper() # SUCCESS or FAIL
# 转换为布尔值
rx_valid = (rx_valid_str == "RX-VALID")
locked = (locked_str == "LOCKED")
result = (status_type == "SUCCESS")
results.append(
RetimerPrbsResult(
rtmr_id=rt_id,
rtmr_lane=lane_num, # 直接使用 0~15 的整数
locked=locked,
rx_valid=rx_valid,
err_count=err_count,
ber=ber,
result=result
)
)
return results
def print_prbs_check_results(self, results: List[RetimerPrbsResult]):
print(f"{'RT_ID':^6} {'Lane':^6} {'Locked':^8} {'RX_Valid':^10} "
f"{'err_count':^6} {'ber':^6} {'Result':^8}")
print("-" * 60)
for r in results:
print(f"{r.rtmr_id:^6} {r.rtmr_lane:^6} "
f"{'LOCKED' if r.locked else 'UNLOCK'}{'':^6} "
f"{'VALID' if r.rx_valid else 'ERR'}{'':^8} "
f"{r.err_count:^6} {r.ber:^6} "
f"{'PASS' if r.result else 'FAILED':^8}")