Browse Source

添加 'ocs/prbs_tool.py'

master
xz_ocs 2 months ago
parent
commit
50767fef9d
  1. 507
      ocs/prbs_tool.py

507
ocs/prbs_tool.py

@ -0,0 +1,507 @@ @@ -0,0 +1,507 @@
from dataclasses import dataclass
import re
from typing import List, Optional, Dict, DefaultDict, Tuple
import time
from enum import Enum
import logging
from collections import defaultdict
from parser.topology_parser import TopoMappingParser
from parser.transceiver_config_parser import TransceiverConfigParser
from toolbox.opt_reg_access_tool import OptRegAccessTool
from gpu.biren.exp_util import DevWhiteRiverExp
from parser.ibias_rssi_map_parser import IbiasRssiMapParser
from toolbox.regs_save_load_tool import LaneRegInfo
from typing import NamedTuple, Optional
from gpu.biren.entity_info import SYNC_RESET_RTMR
import threading
ROUTE_TABLE = {
"786-1-oneta": [1, 2, 3, 4, 5, 6, 7, 8],
"786-1-onetb": [1, 2, 3, 4, 5, 6, 7, 8],
"786-1-onoc1": [2, 1, 4, 3, 6, 5, 8, 7], # c0 90
"786-1-onoc2": [4, 3, 2, 1, 8, 7, 6, 5], # c0 98
"786-1-onoc3": [8, 7, 6, 5, 4, 3, 2, 1], # c0 a0
"786-1-onoc4": [3, 4, 1, 2, 7, 8, 5, 6], # c0 a8
"786-1-onoc5": [6, 5, 8, 7, 2, 1, 4, 3], # c0 b0
"786-1-onoc6": [7, 8, 5, 6, 3, 4, 1, 2], # c0 b8
"786-1-onoc7": [5, 6, 7, 8, 1, 2, 3, 4], # c0 c0
"21436587": [2, 1, 4, 3, 6, 5, 8, 7], # c0 90
"14235867": [4, 3, 2, 1, 8, 7, 6, 5], # c0 98
"18273645": [8, 7, 6, 5, 4, 3, 2, 1], # c0 a0
"13245768": [3, 4, 1, 2, 7, 8, 5, 6], # c0 a8
"16253847": [6, 5, 8, 7, 2, 1, 4, 3], # c0 b0
"17283546": [7, 8, 5, 6, 3, 4, 1, 2], # c0 b8
"15263748": [5, 6, 7, 8, 1, 2, 3, 4] # c0 c0
}
@dataclass
class RetimerPrbsResult:
rtmr_id: int
rtmr_lane: int
locked: bool
rx_valid: bool
err_count: int
ber: float
result: bool
class PrbsTool:
def __init__(self, local_bmc: DevWhiteRiverExp, remote_bmc: DevWhiteRiverExp,
local_reg_tool: OptRegAccessTool, remote_reg_tool: OptRegAccessTool, route_name: str):
self.local_bmc = local_bmc
self.remote_bmc = remote_bmc
self.local_reg_tool = local_reg_tool
self.remote_reg_tool = remote_reg_tool
self.route_name = route_name
def ResetRetimerAndEnablePrbs(self, exp_id: int, rtmr_list: List[int], preset = 9):
# 开始prbs测试
logging.info(f"----------step 1: reset retimer, rtmr: {rtmr_list}")
self.ResetRetimer(exp_id, rtmr_list)
time.sleep(1)
logging.info("----------step 2: enable prbs")
self.EnableTxPrbs(exp_id, rtmr_list, preset)
def ResetRetimer(self, exp_id: int, rtmr_list: List[int]):
isSuccess = True
for rtmr in rtmr_list:
if self.remote_bmc.SyncCmd06(exp_id, SYNC_RESET_RTMR, rtmr, 0) is None: isSuccess = False
if self.remote_bmc != self.local_bmc:
for rtmr in rtmr_list:
if self.local_bmc.SyncCmd06(exp_id, SYNC_RESET_RTMR, rtmr, 0) is None: isSuccess = False
logging.info(f'retimer reset: {isSuccess}')
return isSuccess
def EnableTxPrbsDebug(self, exp_id: int, rtmr_list: List[int], preset = 9):
logging.info("EnablePrbs step 1")
# 获取 inversion 配置
inversion = self.get_retimer_inv_new(exp_id, [0,1,2,3,4,5,6,7], False, self.route_name)
# return
for slot in [0,1,2,3,4,5,6,7]:
self.local_reg_tool.tx_disable(exp_id, slot)
if self.remote_bmc != self.local_bmc:
self.remote_reg_tool.tx_disable(exp_id, slot)
def task_local():
if self.remote_bmc != self.local_bmc:
for i, rtmr in enumerate(rtmr_list):
self.prbs_tx(self.local_bmc, exp_id, rtmr, 1, preset, 31, inversion[i], s1=0)
def task_remote():
for i, rtmr in enumerate(rtmr_list):
self.prbs_tx(self.remote_bmc, exp_id, rtmr, 1, preset, 31, inversion[i], s1=0)
thread_local = threading.Thread(target=task_local, name=f"PRBS-TX-Local-{exp_id}")
thread_remote = threading.Thread(target=task_remote, name=f"PRBS-TX-Remote-{exp_id}")
thread_local.start()
thread_remote.start()
thread_local.join()
thread_remote.join()
for slot in [0,1,2,3,4,5,6,7]:
self.local_reg_tool.tx_enable(exp_id, slot)
if self.remote_bmc != self.local_bmc:
self.remote_reg_tool.tx_enable(exp_id, slot)
for i, rtmr in enumerate(rtmr_list):
logging.info(f'-------prbs_rx {exp_id}, {rtmr}')
self.prbs_rx(self.local_bmc, exp_id, rtmr)
if self.remote_bmc != self.local_bmc:
for i, rtmr in enumerate(rtmr_list):
logging.info(f'-------prbs_rx {exp_id}, {rtmr}')
self.prbs_rx(self.remote_bmc, exp_id, rtmr)
return True
def EnableTxPrbs(self, exp_id: int, rtmr_list: List[int], preset = 9):
logging.info("EnablePrbs step 1")
# 获取 inversion 配置
local_inv = self.get_retimer_inv(exp_id, [0,1,2,3,4,5,6,7], False, self.route_name)
remote_inv = self.get_retimer_inv(exp_id, [0,1,2,3,4,5,6,7], True, self.route_name)
logging.info(f'-------- local_inv: {local_inv}, remote_inv: {remote_inv}')
for slot in [0,1,2,3,4,5,6,7]:
self.local_reg_tool.tx_disable(exp_id, slot)
if self.remote_bmc != self.local_bmc:
self.remote_reg_tool.tx_disable(exp_id, slot)
def task_local():
if self.remote_bmc != self.local_bmc:
for i, rtmr in enumerate(rtmr_list):
self.prbs_tx(self.local_bmc, exp_id, rtmr, 5, preset, 31, remote_inv[i], s1=0)
def task_remote():
for i, rtmr in enumerate(rtmr_list):
self.prbs_tx(self.remote_bmc, exp_id, rtmr, 5, preset, 31, local_inv[i], s1=0)
thread_local = threading.Thread(target=task_local, name=f"PRBS-TX-Local-{exp_id}")
thread_remote = threading.Thread(target=task_remote, name=f"PRBS-TX-Remote-{exp_id}")
thread_local.start()
thread_remote.start()
thread_local.join()
thread_remote.join()
for slot in [0,1,2,3,4,5,6,7]:
self.local_reg_tool.tx_enable(exp_id, slot)
if self.remote_bmc != self.local_bmc:
self.remote_reg_tool.tx_enable(exp_id, slot)
for i, rtmr in enumerate(rtmr_list):
logging.info(f'-------prbs_rx {exp_id}, {rtmr}')
self.prbs_rx(self.local_bmc, exp_id, rtmr)
if self.remote_bmc != self.local_bmc:
for i, rtmr in enumerate(rtmr_list):
logging.info(f'-------prbs_rx {exp_id}, {rtmr}')
self.prbs_rx(self.remote_bmc, exp_id, rtmr)
return True
def prbs_check(self, exp_id: int, rtmr_list: List[int], reset:bool = False) -> List[RetimerPrbsResult]:
msg = ''
# logging.info(f'{self}.ver: {ver}')
t_start = time.perf_counter()
prbs_results: List[RetimerPrbsResult] = []
for i, rtmr in enumerate(rtmr_list):
msg = self.prbs_check_(exp_id, rtmr, reset)
prbs_result = self.parse_prbs_check_log(msg)
if reset == False:
self.print_prbs_check_results(prbs_result)
prbs_results.extend(prbs_result)
time.sleep(0.5)
return prbs_results
def get_retimer_inv_new(self, exp_id: int, slot_list: List[int], is_remote: bool, route):
# assert len(slot_list) == 8
retimer_inv_table = {
'786-1-oneta' : [
0b0010000000000000,
0b0000000000010000,
0b0101000011110000,
0b1010000011110000],
'786-1-onetb' : [
0b0010000000000000,
0b0000000000010000,
0b0101000011110000,
0b1010000011110000],
'786-1-onoc1' : [
0b1000000010000000,
0b1110000000100000,
0b1111000001110000,
0b0100000011000000],
'786-1-onoc2' : [
0b0000000000100000,
0b0000000000010000,
0b0101000011110000,
0b0111000000100000],
'786-1-onoc3' : [
0b0000000000000000,
0b0000000000110000,
0b1110000010010000,
0b1100000001000000],
'786-1-onoc4' : [
0b1000000010000000,
0b0011000011110000,
0b1101000001010000,
0b0100000011000000],
'786-1-onoc5' : [
0b1110000000110000,
0b1000000010010000,
0b1101000001110000,
0b0110000011000000],
'786-1-onoc6' : [
0b0011000011100000,
0b1000000010010000,
0b1101000001110000,
0b0100000011100000],
'786-1-onoc7' : [
0b0000000000000000,
0b0010000000010000,
0b0011000001000000,
0b1100000001000000]
}
retimer_inv = retimer_inv_table[route]
route_list = ROUTE_TABLE[route]
binary_str = ' '.join(f'{x:016b}' for x in retimer_inv)
logging.info(f'--------remote: {is_remote}, retimer_inv(add mzm_direction before): {binary_str}')
logging.info(f'----------remote: {is_remote}, route_list: {route_list}')
for slot_id in slot_list:
logging.debug(f'slot_id: {slot_id}')
if self.remote_reg_tool != self.local_reg_tool:
# onet
remote_mzm = int(self.remote_reg_tool.read_mzm_direction(exp_id, slot_id), 16)
local_mzm = int(self.local_reg_tool.read_mzm_direction(exp_id, slot_id), 16)
reversed_local = int(f'{local_mzm & 0xFF:08b}'[::-1], 2)
mzm = remote_mzm
logging.info(f"Remote MZM: {remote_mzm:08b}, {remote_mzm:02X}")
logging.info(f"Local MZM: {local_mzm:08b}, {local_mzm:02X}")
logging.info(f"XOR MZM: {mzm:08b}")
else:
#onoc
mzm_str = self.local_reg_tool.read_mzm_direction(exp_id, slot_id)
mzm = int(mzm_str, 16)
# mzm = 0
mzm_inv_flag = bin(mzm)[2:].zfill(8)
inv_flag = ''
# for i in range(0, 8):
# inv_flag += mzm_inv_flag[route_list[i] - 1]
# logging.info(f'remote: {is_remote}, index: {route_list[i] - 1}, value: {mzm_inv_flag[route_list[i] - 1]}, inv_flag:{inv_flag}')
inv_flag = mzm_inv_flag
logging.info(f'-------- remote: {is_remote}, slot_id: {slot_id}, mzm: {mzm:02X}, mzm_inv_flag: {mzm_inv_flag}, adjust slot after: {inv_flag}')
# rtmr 21
if inv_flag[7] == '1':
logging.info(f'------- rtmr 21')
mask_ = 1 << (slot_id)
retimer_inv[1] ^= mask_
# rtmr 41
if inv_flag[6] == '1':
logging.info(f'------- rtmr 41')
mask_ = 1 << (slot_id)
retimer_inv[3] ^= mask_
# rtmr 42
if inv_flag[5] == '1':
logging.info(f'------- rtmr 42')
mask_ = 1 << (slot_id + 8)
retimer_inv[3] ^= mask_
# rtmr 22
if inv_flag[4] == '1':
logging.info(f'------- rtmr 22')
mask_ = 1 << (slot_id + 8)
retimer_inv[1] ^= mask_
# rtmr 11
if inv_flag[3] == '1':
logging.info(f'------- rtmr 11')
mask_ = 1 << (slot_id)
retimer_inv[0] ^= mask_
# rtmr 31
if inv_flag[2] == '1':
logging.info(f'------- rtmr 31')
mask_ = 1 << (slot_id)
retimer_inv[2] ^= mask_
# rtmr 32
if inv_flag[1] == '1':
logging.info(f'------- rtmr 32')
mask_ = 1 << (slot_id + 8)
retimer_inv[2] ^= mask_
# rtmr 12
if inv_flag[0] == '1':
logging.info(f'------- rtmr 12')
mask_ = 1 << (slot_id + 8)
retimer_inv[0] ^= mask_
logging.debug(f'---retimer_inv: {retimer_inv}')
binary_str = ' '.join(f'{x:016b}' for x in retimer_inv)
logging.info(f'--------remote: {is_remote}, retimer_inv(manual before): {binary_str}')
return retimer_inv
def get_retimer_inv(self, exp_id: int, slot_list: List[int], is_remote: bool, route):
retimer_inv_table = {
'786-1-oneta' : [
0b0010000000000000,
0b0000000000010000,
0b0101000011110000,
0b1010000011110000],
'786-1-onetb' : [
0b0010000000000000,
0b0000000000010000,
0b0101000011110000,
0b1010000011110000],
'786-1-onoc1' : [
0b1000000010000000,
0b1110000000100000,
0b1111000001110000,
0b0100000011000000],
'786-1-onoc2' : [
0b0000000000100000,
0b0000000000010000,
0b0101000011110000,
0b0111000000100000],
'786-1-onoc3' : [
0b0000000000000000,
0b0000000000110000,
0b1110000010010000,
0b1100000001000000],
'786-1-onoc4' : [
0b1000000010000000,
0b0011000011110000,
0b1101000001010000,
0b0100000011000000],
'786-1-onoc5' : [
0b1110000000110000,
0b1000000010010000,
0b1101000001110000,
0b0110000011000000],
'786-1-onoc6' : [
0b0011000011100000,
0b1000000010010000,
0b1101000001110000,
0b0100000011100000],
'786-1-onoc7' : [
0b0000000000000000,
0b0010000000010000,
0b0011000001000000,
0b1100000001000000]
}
retimer_inv = retimer_inv_table[route]
route_list = ROUTE_TABLE[route]
binary_str = ' '.join(f'{x:016b}' for x in retimer_inv)
logging.info(f'--------remote: {is_remote}, retimer_inv(add mzm_direction before): {binary_str}')
for slot_id in slot_list:
logging.debug(f'slot_id: {slot_id}')
if is_remote:
mzm_str = self.remote_reg_tool.read_mzm_direction(exp_id, slot_id)
else:
mzm_str = self.local_reg_tool.read_mzm_direction(exp_id, slot_id)
mzm = int(mzm_str, 16)
# mzm = 0
mzm_inv_flag = bin(mzm)[2:].zfill(8)
inv_flag = ''
for i in range(0, 8):
inv_flag += mzm_inv_flag[route_list[i] - 1]
logging.debug(f'remote: {is_remote}, index: {route_list[i] - 1}, value: {mzm_inv_flag[route_list[i] - 1]}, inv_flag:{inv_flag}')
logging.info(f'--------remote: {is_remote}, slot_id: {slot_id}, mzm: {mzm:02X}, mzm_inv_flag: {mzm_inv_flag}, adjust slot after: {inv_flag}')
# rtmr 21
if inv_flag[7] == '1':
logging.debug(f'------- rtmr 21')
mask_ = 1 << (slot_id)
retimer_inv[1] ^= mask_
# rtmr 41
if inv_flag[6] == '1':
logging.debug(f'------- rtmr 41')
mask_ = 1 << (slot_id)
retimer_inv[3] ^= mask_
# rtmr 42
if inv_flag[5] == '1':
logging.debug(f'------- rtmr 42')
mask_ = 1 << (slot_id + 8)
retimer_inv[3] ^= mask_
# rtmr 22
if inv_flag[4] == '1':
logging.debug(f'------- rtmr 22')
mask_ = 1 << (slot_id + 8)
retimer_inv[1] ^= mask_
# rtmr 11
if inv_flag[3] == '1':
logging.debug(f'------- rtmr 11')
mask_ = 1 << (slot_id)
retimer_inv[0] ^= mask_
# rtmr 31
if inv_flag[2] == '1':
logging.debug(f'------- rtmr 31')
mask_ = 1 << (slot_id)
retimer_inv[2] ^= mask_
# rtmr 32
if inv_flag[1] == '1':
logging.debug(f'------- rtmr 32')
mask_ = 1 << (slot_id + 8)
retimer_inv[2] ^= mask_
# rtmr 12
if inv_flag[0] == '1':
logging.debug(f'------- rtmr 12')
mask_ = 1 << (slot_id + 8)
retimer_inv[0] ^= mask_
binary_str = ' '.join(f'{x:016b}' for x in retimer_inv)
logging.info(f'--------remote: {is_remote}, retimer_inv(manual after): {binary_str}')
return retimer_inv
def prbs_tx(self, bmc, exp_id: int, rtmr_id: int, gen:int = 5, p:int = 8, prbs:int = 31, s0:int = 0x0, s1:int = 0x0):
# cmd_ = f'rtmr {rtmr_id} prbs tx gen{gen} p{p} prbs{prbs} {hex(s0)} {hex(s1)} no no'
cmd_ = f'rtmr {rtmr_id} prbs tx gen{gen} p{p} prbs{prbs} {hex(s0)} {hex(s1)}'
logging.info(cmd_)
bmc.CmdVendorCommand(exp_id, cmd_)
def prbs_rx(self, bmc, exp_id: int, rtmr_id: int):
cmd_ = f'rtmr {rtmr_id} prbs rx'
logging.info(f'---cmd: {cmd_}')
bmc.CmdVendorCommand(exp_id, cmd_)
def prbs_check_(self, exp_id: int, rtmr_id: int, reset: bool = False):
cmd_ = f'rtmr {rtmr_id} prbs check {"reset" if reset else ""}'
logging.info(f'---------:{cmd_}')
r = self.local_bmc.CmdVendorCommand(exp_id, cmd_, False)
return r
def parse_prbs_check_log(self, log_content: str) -> List[RetimerPrbsResult]:
results = []
# 正则:只匹配 A00-A15,并忽略 Bxx 行
pattern = r"RTMR(\d+)\s+PRBS\s+A(\d{2}):\s+(RX-VALID|RX-INVAL),\s+(LOCKED|UNLOCK),\s+(\d+),\s+([+-]?\d*\.?\d+(?:[eE][+-]?\d+)?),\s+(\*?)([A-Z]+)"
for line in log_content.strip().splitlines():
line = line.strip()
if not line:
continue
match = re.match(pattern, line)
if not match:
# 如果是 Bxx 行或其他不匹配行,跳过(静默忽略)
continue
rt_id = int(match.group(1))
lane_num = int(match.group(2)) # A00 -> 0, A01 -> 1, ..., A15 -> 15
# 只保留 lane 0 ~ 15
if lane_num < 0 or lane_num > 15:
continue
rx_valid_str = match.group(3)
locked_str = match.group(4)
err_count = int(match.group(5))
ber = float(match.group(6))
status_type = match.group(8).upper() # SUCCESS or FAIL
# 转换为布尔值
rx_valid = (rx_valid_str == "RX-VALID")
locked = (locked_str == "LOCKED")
result = (status_type == "SUCCESS")
results.append(
RetimerPrbsResult(
rtmr_id=rt_id,
rtmr_lane=lane_num, # 直接使用 0~15 的整数
locked=locked,
rx_valid=rx_valid,
err_count=err_count,
ber=ber,
result=result
)
)
return results
def print_prbs_check_results(self, results: List[RetimerPrbsResult]):
print(f"{'RT_ID':^6} {'Lane':^6} {'Locked':^8} {'RX_Valid':^10} "
f"{'err_count':^6} {'ber':^6} {'Result':^8}")
print("-" * 60)
for r in results:
print(f"{r.rtmr_id:^6} {r.rtmr_lane:^6} "
f"{'LOCKED' if r.locked else 'UNLOCK'}{'':^6} "
f"{'VALID' if r.rx_valid else 'ERR'}{'':^8} "
f"{r.err_count:^6} {r.ber:^6} "
f"{'PASS' if r.result else 'FAILED':^8}")
Loading…
Cancel
Save