from dataclasses import dataclass import re from typing import List, Optional, Dict, DefaultDict, Tuple import time from enum import Enum import logging from collections import defaultdict from parser.topology_parser import TopoMappingParser from parser.transceiver_config_parser import TransceiverConfigParser from toolbox.opt_reg_access_tool import OptRegAccessTool from gpu.biren.exp_util import DevWhiteRiverExp from parser.ibias_rssi_map_parser import IbiasRssiMapParser from regs_save_load_tool import LaneRegInfo from typing import NamedTuple, Optional logger = logging.getLogger(__name__) # -------------------------------------------------- # Eye Issue Definition (Corrected Logic) # -------------------------------------------------- class OptReg(Enum): MGC = '' class EyeIssue(Enum): EYE_NORMAL = 0 EYE_TOO_SMALL = 1 # 垂直开口太小: sum(|top|, |down|) < 30 EYE_TOO_LARGE = 2 EYE_TOO_DOWN = 3 # 眼图偏低: top < 15 EYE_TOO_UP = 4 # 眼图偏高: |down| < 15 (i.e., down > -15) EYE_TOO_BAD = 5 @dataclass class EyeData: rt_phys_index: int = -1 phys_lane_index: int = -1 rt_index: int = -1 lane_index: int = -1 down: float = -1 # mV (usually negative) top: float = -1 # mV (usually positive) left: float = -1 # UI right: float = -1 # UI issue: EyeIssue = EyeIssue.EYE_NORMAL def __str__(self): return (f"RTMR{self.rt_index} EYE_A{self.lane_index:02d}: " f"({self.down:.1f}, {self.top:.1f}) mV | " f"({self.left:.2f}, {self.right:.2f}) UI") @property def vertical_amplitude(self) -> float: return abs(self.down) + abs(self.top) @property def horizontal_amplitude(self) -> float: return abs(self.left) + abs(self.right) @property def quality_score(self) -> float: return self.vertical_amplitude def determine_issue(self) -> None: if self.vertical_amplitude < 65: self.issue = EyeIssue.EYE_TOO_SMALL elif self.vertical_amplitude > 220: self.issue = EyeIssue.EYE_TOO_LARGE elif self.top < 15: self.issue = EyeIssue.EYE_TOO_DOWN elif self.down > -15: # 因为 down 是负值,> -15 表示离 0 太近(太高) self.issue = EyeIssue.EYE_TOO_UP else: self.issue = EyeIssue.EYE_NORMAL class TuneViewResult(NamedTuple): rt_index: int lane_index: int remote: LaneRegInfo local: LaneRegInfo # -------------------------------------------------- # 🔧 EQ Tuning Tool # -------------------------------------------------- class EqTuneTool: def __init__(self, local_bmc: DevWhiteRiverExp, remote_bmc: DevWhiteRiverExp, local_reg_tool: OptRegAccessTool, remote_reg_tool: OptRegAccessTool, ibias_rssi_map: IbiasRssiMapParser, topo_map: TopoMappingParser, route_name: str): self.local_bmc = local_bmc self.remote_bmc = remote_bmc self.local_reg_tool = local_reg_tool self.remote_reg_tool = remote_reg_tool self.ibias_rssi_map = ibias_rssi_map self.route_name = route_name self.topo_map = topo_map self.topo_map.parse() def eq_auto_tune(self, exp_id: int): cmd = 'ver' raw_output = self.local_bmc.CmdVendorCommand(exp_id, cmd) # logging.info(raw_output) logger.info(f"Starting auto tune for exp id: {exp_id}") # retimers = [1, 2, 3, 4] retimers = [3] for retimer_index in retimers: bad_eyes = self.get_issue_eye_diagram(exp_id, retimer_index) time.sleep(1) if bad_eyes: self.process_worst_eyes(exp_id, retimer_index, bad_eyes) else: logger.info(f"No eye issues found on RTMR{retimer_index}") def tune_view(self, exp_id: int, rt_index: int, lane_index: int) -> Optional[TuneViewResult]: #11,12, 21,22, 31,32, 41,42 success, rt_index, lane_index = self.verify_rt_index_lane_index(rt_index, lane_index) if success == False: return remote_slot = self.topo_map.get_remote_slot_by_retimer(rt_index, lane_index, self.route_name) local_slot = self.topo_map.get_local_slot_by_retimer(rt_index, lane_index, self.route_name) logging.info(f'remote_slot: {remote_slot}, local_slot: {local_slot}') remote_slot_id = remote_slot[0] remote_lane_id = remote_slot[1] local_slot_id = local_slot[0] local_lane_id = local_slot[1] logger.info(f"Processing tune view for RTMR{rt_index} lane {lane_index}: " f"remote_slot=({remote_slot_id},{remote_lane_id}), " f"local_slot=({local_slot_id},{local_lane_id})") raw_opcurrent_value = self.remote_reg_tool.read_opcurrent_reg(exp_id, remote_slot_id, remote_lane_id) raw_high_freq_value = self.remote_reg_tool.read_high_freq_reg(exp_id, remote_slot_id, remote_lane_id) raw_low_freq_value = self.remote_reg_tool.read_low_freq_reg(exp_id, remote_slot_id, remote_lane_id) raw_tia_peak_value = self.local_reg_tool.read_tia_peak_reg(exp_id, local_slot_id, local_lane_id) raw_mgc_value = self.local_reg_tool.read_mgc_reg(exp_id, local_slot_id, local_lane_id) ibias_value = self.remote_reg_tool.read_ibias_reg(exp_id, remote_slot_id, remote_lane_id) ibias_lane = self.remote_reg_tool.get_ibias_by_logic_lane(remote_lane_id) rssi_lanes = self.ibias_rssi_map.get_rssi_lane(self.route_name, ibias_lane) if rssi_lanes is None: logging.error('RSSI lanes is none') return rssi1_value = self.local_reg_tool.read_rssi_reg(exp_id, local_slot_id, rssi_lanes[0]) rssi2_value = self.local_reg_tool.read_rssi_reg(exp_id, local_slot_id, rssi_lanes[1]) # logging.info(f'RSSI values: rssi1={rssi1_value}, rssi2={rssi2_value}') logging.info(f'') logging.info(f'{self.remote_reg_tool.host} ---------- ibias: {ibias_value}, rssi1={rssi1_value}, rssi2={rssi2_value}, op:{raw_opcurrent_value}, high_freq:{raw_high_freq_value}, low_freq:{raw_low_freq_value}, mgc:{raw_mgc_value}, tia_peak: {raw_tia_peak_value}') remote_info = LaneRegInfo() remote_info.logic_lane = remote_lane_id remote_info.slot_id = remote_slot_id remote_info.ibias = ibias_value remote_info.opcurrent = raw_opcurrent_value remote_info.lowfreq_eq = raw_low_freq_value remote_info.highfreq_eq = raw_high_freq_value local_info = LaneRegInfo() local_info.logic_lane = local_lane_id local_info.slot_id = local_slot_id local_info.mgc = raw_mgc_value local_info.tia_peak = raw_tia_peak_value local_info.rssi1 = rssi1_value local_info.rssi2 = rssi2_value return TuneViewResult( rt_index=rt_index, lane_index=lane_index, remote=remote_info, local=local_info ) def tune_write(self, exp_id: int, rt_index: int, lane_index: int, reg_name: str, reg_value: int): success, rt_index, lane_index = self.verify_rt_index_lane_index(rt_index, lane_index) if success == False: return remote_slot = self.topo_map.get_remote_slot_by_retimer(rt_index, lane_index, self.route_name) local_slot = self.topo_map.get_local_slot_by_retimer(rt_index, lane_index, self.route_name) logging.info(f'remote_slot: {remote_slot}, local_slot: {local_slot}') remote_slot_id = remote_slot[0] remote_lane_id = remote_slot[1] local_slot_id = local_slot[0] local_lane_id = local_slot[1] logger.info(f"Processing tune write for RTMR{rt_index} lane {lane_index}: " f"remote_slot=({remote_slot_id},{remote_lane_id}), " f"local_slot=({local_slot_id},{local_lane_id})") logging.info(f'-----------------------reg_name: {reg_name}, reg_value: {reg_value}') if reg_name == 'opcurrent': self.remote_reg_tool.write_opcurrent_reg(exp_id, remote_slot_id, remote_lane_id, reg_value) elif reg_name == 'highfreq_eq': self.remote_reg_tool.write_high_freq_reg(exp_id, remote_slot_id, remote_lane_id, reg_value) elif reg_name == 'lowfreq_eq': self.remote_reg_tool.write_low_freq_reg(exp_id, remote_slot_id, remote_lane_id, reg_value) elif reg_name == 'ibias': self.remote_reg_tool.write_ibias_reg(exp_id, remote_slot_id, remote_lane_id, reg_value) elif reg_name == 'tia_peak': self.local_reg_tool.write_tia_peak_reg(exp_id, local_slot_id, local_lane_id, reg_value) elif reg_name == 'mgc': self.local_reg_tool.write_mgc_reg(exp_id, local_slot_id, local_lane_id, reg_value) self.local_reg_tool.write_confirm_reg(exp_id, local_slot_id) self.tune_view(exp_id, rt_index, lane_index) def eye_monitor(self, host:str, exp_id: int, route_name: str): retimers = [1, 2, 3, 4] all_bad_eyes: List['EyeData'] = [] logging.info(f"🔍 Starting eye diagram check for EXP {exp_id}") for retimer_index in retimers: try: # 假设 get_issue_eye_diagram 返回 List[EyeData] bad_eyes: List[EyeData] = self.get_issue_eye_diagram(exp_id, retimer_index, 3) if bad_eyes: logging.warning(f"⚠️ Found {len(bad_eyes)} bad eye(s) on Retimer (logical) {retimer_index}") all_bad_eyes.extend(bad_eyes) except Exception as e: logging.error(f"❌ Failed to check Retimer {retimer_index}: {e}") # === 汇总打印所有异常眼图(包含全部排查所需字段)=== if all_bad_eyes: logging.error("🚨==========================================================================") logging.error(" BAD EYE DIAGRAMS DETECTED ") logging.error(" Please replace or debug the modules below ") logging.error("🚨==========================================================================") logging.error(f"{'Host':^16} {'Slot':^6} {'Lane':^8} {'RT_ID':^6} {'RT_Lane':^9} " f"{'Down(mV)':^8} {'Top(mV)':^9} {'Left(UI)':^8} {'Right(UI)':^9} {'Issue':^18}") logging.error("-" * 100) for idx, eye in enumerate(all_bad_eyes, 1): remote_slot = self.topo_map.get_remote_slot_by_retimer(eye.rt_index, eye.lane_index, route_name) local_slot = self.topo_map.get_local_slot_by_retimer(eye.rt_index, eye.lane_index, route_name) local_slot_id = local_slot[0] local_lane_id = local_slot[1] logging.error( f"{host:^16} " f"{local_slot_id:^6} " f"{local_lane_id:^8} " f"{eye.rt_phys_index:^6} " f"{eye.phys_lane_index:^9} " f"{eye.down:8.2f} " f"{eye.top:7.2f} " f"{eye.left:7.3f} " f"{eye.right:8.3f} " f"{EyeIssue(eye.issue).name:18}" ) logging.error("💡 Tip: Check cabling, retimer settings, or optical module health.") else: logging.info("✅ All eye diagrams are within acceptable range. No issues detected.") def verify_rt_index_lane_index(self, rt_index: int, lane_index: int) -> Tuple[bool, int, int]: if rt_index in [1, 2, 3, 4]: logging.info(f'use retimer phys index and lane, rt_index: {rt_index}, rt_lane: {lane_index}') if 0 <= lane_index <= 7: new_rt_index = rt_index * 10 + 1 # 1->11, 2->21, etc. return (True, new_rt_index, lane_index) elif 8 <= lane_index <= 15: new_rt_index = rt_index * 10 + 2 # 1->12, 2->22, etc. new_lane_index = lane_index - 8 return (True, new_rt_index, new_lane_index) else: logging.error('Invalid lane_index: must be in [0, 15]') return (False, 0, 0) elif rt_index in [11, 12, 21, 22, 31, 32, 41, 42]: logging.info(f'use retimer logic index and lane, rt_index: {rt_index}, rt_lane: {lane_index}') if lane_index > 7: logging.error('For logic rt_index, lane_index must be in [0, 7]') return (False, 0, 0) return (True, rt_index, lane_index) else: logging.error(f'Invalid rt_index: {rt_index}') return (False, 0, 0) def process_worst_eyes_new(self, exp_id: int, retimer_index: int, bad_eyes: List[EyeData]) -> None: logger.warning(f"Detected eye issues on RTMR{retimer_index}:") for eye in bad_eyes: logger.warning( f"RTMR {eye.rt_index} Lane {eye.lane_index:02d}: {eye.issue.name} " f"(Vertical: {eye.vertical_amplitude:.1f}mV, Top={eye.top:.1f}, Down={eye.down:.1f})" ) for eye in bad_eyes: logger.warning( f"Starting Process RTMR {eye.rt_index} Lane {eye.lane_index:02d}: {eye.issue.name} " f"(Vertical: {eye.vertical_amplitude:.1f}mV, Top={eye.top:.1f}, Down={eye.down:.1f})" ) tmp_eye = eye result = self.tune_view(exp_id, eye.rt_index, eye.lane_index) remote_reg_info = result.remote local_reg_info = result.local if tmp_eye.issue == EyeIssue.EYE_TOO_SMALL: if remote_reg_info.lowfreq_eq <= 150: max_retry_cout = 2 try_count = 0 while tmp_eye.issue != EyeIssue.EYE_NORMAL and try_count < max_retry_cout: logging.info(f'------try count: {try_count}') if tmp_eye.issue == EyeIssue.EYE_TOO_SMALL: self.process_small_eye(exp_id, retimer_index, eye) if tmp_eye.issue == EyeIssue.EYE_TOO_DOWN: self.process_down_eye(exp_id, retimer_index, eye) elif tmp_eye.issue == EyeIssue.EYE_TOO_UP: self.process_up_eye(exp_id, retimer_index, eye) # if tmp_eye.issue == EyeIssue.EYE_TOO_LARGE: # logging.info(f"Eye issue: {eye.issue.name}") try_count +=1 break def process_worst_eyes(self, exp_id: int, retimer_index: int, bad_eyes: List[EyeData]) -> None: logger.warning(f"Detected eye issues on RTMR{retimer_index}:") for eye in bad_eyes: logger.warning( f"RTMR {eye.rt_index} Lane {eye.lane_index:02d}: {eye.issue.name} " f"(Vertical: {eye.vertical_amplitude:.1f}mV, Top={eye.top:.1f}, Down={eye.down:.1f})" ) for eye in bad_eyes: logger.warning( f"Starting Process RTMR {eye.rt_index} Lane {eye.lane_index:02d}: {eye.issue.name} " f"(Vertical: {eye.vertical_amplitude:.1f}mV, Top={eye.top:.1f}, Down={eye.down:.1f})" ) tmp_eye = eye max_retry_cout = 2 try_count = 0 while tmp_eye.issue != EyeIssue.EYE_NORMAL and try_count < max_retry_cout: logging.info(f'------try count: {try_count}') if tmp_eye.issue == EyeIssue.EYE_TOO_SMALL: self.process_small_eye(exp_id, retimer_index, eye) if tmp_eye.issue == EyeIssue.EYE_TOO_DOWN: self.process_down_eye(exp_id, retimer_index, eye) elif tmp_eye.issue == EyeIssue.EYE_TOO_UP: self.process_up_eye(exp_id, retimer_index, eye) # if tmp_eye.issue == EyeIssue.EYE_TOO_LARGE: # logging.info(f"Eye issue: {eye.issue.name}") try_count +=1 break # # if tmp_eye.issue == EyeIssue.EYE_TOO_LARGE: # # 如果rssi很大,那适当减少,减少ibias # # slot = self.topo_map.get_remote_slot_by_retimer(eye.rt_index, eye.lane_index) # # if slot is None: # # break # # slot_id = slot[0] # # lane_id = slot[1] # # ibias_value = self.remote_reg_tool. def process_small_eye(self, exp_id: int, retimer_index: int, eye: EyeData): logger.info(f"Starting small eye issue processing for RTMR{retimer_index} lane {eye.lane_index}, adjusting ibias") # 获取slot信息 logging.info(f'--------- rt_index: {eye.rt_index}, lane_index: {eye.lane_index}, route_name: {self.route_name}') remote_slot = self.topo_map.get_remote_slot_by_retimer(eye.rt_index, eye.lane_index, self.route_name) local_slot = self.topo_map.get_local_slot_by_retimer(eye.rt_index, eye.lane_index, self.route_name) if remote_slot is None or local_slot is None: logger.error(f"Invalid slot for RTMR{retimer_index} lane {eye.lane_index}") return eye remote_slot_id = remote_slot[0] remote_lane_id = remote_slot[1] local_slot_id = local_slot[0] local_lane_id = local_slot[1] logger.info(f"Processing small eye for RTMR{eye.rt_index} lane {eye.lane_index}: " f"remote_slot=({remote_slot_id},{remote_lane_id}), " f"local_slot=({local_slot_id},{local_lane_id})") # 调节ibias logger.info(f"Adjusting ibias for RTMR{eye.rt_index} lane {eye.lane_index}") eye = self.adjust_ibias_for_small_eye(exp_id, eye, remote_slot_id, remote_lane_id, local_slot_id, local_lane_id) # 如果依然很小,调节opcurrent if eye.issue == EyeIssue.EYE_TOO_SMALL: logger.info(f"Eye still too small for RTMR{retimer_index} lane {eye.lane_index}, adjusting opcurrent") eye = self.adjust_opcurrent_for_small_down_eye(exp_id, eye, remote_slot_id, remote_lane_id) # eye = self.adjust_opcurrent_for_up_eye(exp_id, eye, remote_slot_id, remote_lane_id) # 如果依然很小,调节mgc if eye.issue == EyeIssue.EYE_TOO_SMALL: logger.info(f"Eye still too small for RTMR{retimer_index} lane {eye.lane_index}, adjusting mgc") eye = self.adjust_mgc_for_small_eye(exp_id, eye, local_slot_id, local_lane_id) logger.info(f"Completed small eye processing for RTMR{retimer_index} lane {eye.lane_index}, " f"final issue: {eye.issue.name}") return eye def process_down_eye(self, exp_id: int, retimer_index: int, eye: EyeData): logging.info(f"Starting down eye issue processing for RTMR{retimer_index} lane {eye.lane_index}") # 获取slot信息 remote_slot = self.topo_map.get_remote_slot_by_retimer(eye.rt_index, eye.lane_index, self.route_name) local_slot = self.topo_map.get_local_slot_by_retimer(eye.rt_index, eye.lane_index, self.route_name) if remote_slot is None or local_slot is None: logger.error(f"Invalid slot for RTMR{retimer_index} lane {eye.lane_index}") return eye remote_slot_id = remote_slot[0] remote_lane_id = remote_slot[1] local_slot_id = local_slot[0] local_lane_id = local_slot[1] logger.info(f"Processing down eye for RTMR{eye.rt_index} lane {eye.lane_index}: " f"remote_slot=({remote_slot_id},{remote_lane_id}), " f"local_slot=({local_slot_id},{local_lane_id})") # 先调节tia_peak(值调小) logger.info(f"Adjusting tia_peak for RTMR{eye.rt_index} lane {eye.lane_index}") eye = self.adjust_tia_peak_for_down_eye(exp_id, eye, local_slot_id, local_lane_id) # 如果依然偏下,调节high_freq(值调小) if eye.issue == EyeIssue.EYE_TOO_DOWN: logger.info(f"Eye still too down for RTMR{retimer_index} lane {eye.lane_index}, adjusting high_freq") eye = self.adjust_high_freq_for_down_eye(exp_id, eye, remote_slot_id, remote_lane_id) # 如果依然偏下,调节opcurrent if eye.issue == EyeIssue.EYE_TOO_DOWN: logger.info(f"Eye still too down for RTMR{retimer_index} lane {eye.lane_index}, adjusting opcurrent") eye = self.adjust_opcurrent_for_small_down_eye(exp_id, eye, remote_slot_id, remote_lane_id) logger.info(f"Completed down eye processing for RTMR{retimer_index} lane {eye.lane_index}, " f"final issue: {eye.issue.name}") return eye def process_up_eye(self, exp_id: int, retimer_index: int, eye: EyeData): logging.info(f"Starting up eye issue processing for RTMR{retimer_index} lane {eye.lane_index}") # 获取slot信息 remote_slot = self.topo_map.get_remote_slot_by_retimer(eye.rt_index, eye.lane_index, self.route_name) local_slot = self.topo_map.get_local_slot_by_retimer(eye.rt_index, eye.lane_index, self.route_name) if remote_slot is None or local_slot is None: logger.error(f"Invalid slot for RTMR{retimer_index} lane {eye.lane_index}") return eye remote_slot_id = remote_slot[0] remote_lane_id = remote_slot[1] local_slot_id = local_slot[0] local_lane_id = local_slot[1] logger.info(f"Processing up eye for RTMR{eye.rt_index} lane {eye.lane_index}: " f"remote_slot=({remote_slot_id},{remote_lane_id}), " f"local_slot=({local_slot_id},{local_lane_id})") # 先调节tia_peak(值调大) logger.info(f"Adjusting tia_peak for RTMR{eye.rt_index} lane {eye.lane_index}") eye = self.adjust_tia_peak_for_up_eye(exp_id, eye, local_slot_id, local_lane_id) # 如果依然偏高,调节high_freq(值调大) if eye.issue == EyeIssue.EYE_TOO_UP: logger.info(f"Eye still too up for RTMR{retimer_index} lane {eye.lane_index}, adjusting high_freq") eye = self.adjust_high_freq_for_up_eye(exp_id, eye, remote_slot_id, remote_lane_id) # 如果依然偏高,调节opcurrent(值调小) if eye.issue == EyeIssue.EYE_TOO_UP: logger.info(f"Eye still too up for RTMR{retimer_index} lane {eye.lane_index}, adjusting opcurrent") eye = self.adjust_opcurrent_for_up_eye(exp_id, eye, remote_slot_id, remote_lane_id) logger.info(f"Completed up eye processing for RTMR{retimer_index} lane {eye.lane_index}, " f"final issue: {eye.issue.name}") return eye def adjust_ibias_for_small_eye(self, exp_id: int, eye: EyeData, remote_slot_id: int, remote_lane_id: int, local_slot_id: int, local_lane_id: int) -> EyeData: ibias_range = [2000, 3200] rssi_range = [5000, 10000] ibias_value = self.remote_reg_tool.read_ibias_reg(exp_id, remote_slot_id, remote_lane_id) logging.info(f'Initial ibias_value: {ibias_value}') # if ibias_value == 0: # logging.warning(f'read ibias reg again') # ibias_value = self.remote_reg_tool.read_ibias_reg(exp_id, remote_slot_id, remote_lane_id) # logging.info(f'Initial ibias_value: {ibias_value}') while ibias_value < ibias_range[1]: # 根据当前值确定步长 step = 300 if ibias_value < 2500 else 150 ibias_value += step self.remote_reg_tool.write_ibias_reg(exp_id, remote_slot_id, remote_lane_id, ibias_value) logging.info(f'Adjusted ibias to {ibias_value} with step {step}') time.sleep(5) ibias_lane = self.remote_reg_tool.get_ibias_by_logic_lane(remote_lane_id) rssi_lanes = self.ibias_rssi_map.get_rssi_lane(self.route_name, ibias_lane) if rssi_lanes is None: logging.error('RSSI lanes is none') break rssi1_value = self.local_reg_tool.read_rssi_reg(exp_id, local_slot_id, rssi_lanes[0]) rssi2_value = self.local_reg_tool.read_rssi_reg(exp_id, local_slot_id, rssi_lanes[1]) logging.info(f'RSSI values: rssi1={rssi1_value}, rssi2={rssi2_value}') is_margin = False if rssi1_value > rssi_range[1] or rssi2_value > rssi_range[1]: # 回退一步 self.remote_reg_tool.write_ibias_reg(exp_id, remote_slot_id, remote_lane_id, ibias_value - step) logging.info(f'RSSI margin exceeded, rollback ibias to {ibias_value - step}') is_margin = True eye = self.get_eye_diagram_per_lane(exp_id, eye.rt_phys_index, eye.phys_lane_index) logging.info(f'After ibias adjustment, eye issue: {eye.issue.name}') if eye.issue != EyeIssue.EYE_TOO_SMALL or is_margin: break logging.info(f'Final ibias adjustment result: ibias={ibias_value}, eye issue={eye.issue.name}') return eye def adjust_opcurrent_for_small_down_eye(self, exp_id: int, eye: EyeData, remote_slot_id: int, remote_lane_id: int) -> EyeData: raw_eye_issue = eye.issue raw_opcurrent_value = self.remote_reg_tool.read_opcurrent_reg(exp_id, remote_slot_id, remote_lane_id) logging.info(f'Initial opcurrent_value: {raw_opcurrent_value}') op_range = [100, 190] opcurrent_value = raw_opcurrent_value while opcurrent_value < op_range[1]: step = 10 opcurrent_value += step if opcurrent_value > op_range[1]: opcurrent_value = op_range[1] self.remote_reg_tool.write_opcurrent_reg(exp_id, remote_slot_id, remote_lane_id, opcurrent_value) logging.info(f'Adjusted opcurrent to {opcurrent_value}') time.sleep(5) eye = self.get_eye_diagram_per_lane(exp_id, eye.rt_phys_index, eye.phys_lane_index) logging.info(f'After opcurrent adjustment, eye issue: {eye.issue.name}') if eye.issue != raw_eye_issue or opcurrent_value == op_range[1]: break # 如果没有改善,恢复原始值 # if eye.issue == raw_eye_issue: # self.remote_reg_tool.write_opcurrent_reg(exp_id, remote_slot_id, remote_lane_id, raw_opcurrent_value) # logging.info(f'No improvement, rollback opcurrent to {raw_opcurrent_value}') # else: # logging.info(f'opcurrent adjustment successful, final value: {opcurrent_value}') return eye def adjust_mgc_for_small_eye(self, exp_id: int, eye: EyeData, local_slot_id: int, local_lane_id: int) -> EyeData: raw_mgc_value = self.local_reg_tool.read_mgc_reg(exp_id, local_slot_id, local_lane_id) logging.info(f'Initial mgc_value: {raw_mgc_value}') mgc_range = [raw_mgc_value, raw_mgc_value + 10] mgc_value = raw_mgc_value while mgc_value < mgc_range[1]: step = 2 mgc_value += step self.local_reg_tool.write_mgc_reg(exp_id, local_slot_id, local_lane_id, mgc_value) logging.info(f'Adjusted mgc to {mgc_value}') time.sleep(5) eye = self.get_eye_diagram_per_lane(exp_id, eye.rt_phys_index, eye.phys_lane_index) logging.info(f'After mgc adjustment, eye issue: {eye.issue.name}') if eye.issue != EyeIssue.EYE_TOO_SMALL: logging.info(f'mgc adjustment successful, final value: {mgc_value}') return eye # 如果没有改善,恢复原始值 # self.local_reg_tool.write_mgc_reg(exp_id, local_slot_id, local_lane_id, raw_mgc_value) # logging.info(f'No improvement, rollback mgc to {raw_mgc_value}') return eye def adjust_tia_peak_for_down_eye(self, exp_id: int, eye: EyeData, local_slot_id: int, local_lane_id: int) -> EyeData: raw_tia_peak_value = self.local_reg_tool.read_tia_peak_reg(exp_id, local_slot_id, local_lane_id) logging.info(f'Initial tia_peak_value: {raw_tia_peak_value}') tia_peak_range = [0, 200] tia_peak_value = raw_tia_peak_value while tia_peak_value > tia_peak_range[0]: step = 20 tia_peak_value -= step if tia_peak_value < tia_peak_range[0]: tia_peak_value = tia_peak_range[0] self.local_reg_tool.write_tia_peak_reg(exp_id, local_slot_id, local_lane_id, tia_peak_value) logging.info(f'Adjusted tia_peak to {tia_peak_value}') eye = self.get_eye_diagram_per_lane(exp_id, eye.rt_phys_index, eye.phys_lane_index) logging.info(f'After tia_peak adjustment, eye issue: {eye.issue.name}') if eye.issue != EyeIssue.EYE_TOO_DOWN or tia_peak_value == tia_peak_range[0]: break logging.info(f'Final tia_peak adjustment result: tia_peak={tia_peak_value}, eye issue={eye.issue.name}') return eye def adjust_high_freq_for_down_eye(self, exp_id: int, eye: EyeData, remote_slot_id: int, remote_lane_id: int) -> EyeData: raw_high_freq_value = self.remote_reg_tool.read_high_freq_reg(exp_id, remote_slot_id, remote_lane_id) logging.info(f'Initial high_freq_value: {raw_high_freq_value}') high_freq_range = [0, 150] high_freq_value = raw_high_freq_value while high_freq_value > high_freq_range[0]: step = 20 high_freq_value -= step if high_freq_value < high_freq_range[0]: high_freq_value = high_freq_range[0] self.remote_reg_tool.write_high_freq_reg(exp_id, remote_slot_id, remote_lane_id, high_freq_value) logging.info(f'Adjusted high_freq to {high_freq_value}') eye = self.get_eye_diagram_per_lane(exp_id, eye.rt_phys_index, eye.phys_lane_index) logging.info(f'After high_freq adjustment, eye issue: {eye.issue.name}') if eye.issue != EyeIssue.EYE_TOO_DOWN or high_freq_value == high_freq_range[0]: break logging.info(f'Final high_freq adjustment result: high_freq={high_freq_value}, eye issue={eye.issue.name}') return eye def adjust_tia_peak_for_up_eye(self, exp_id: int, eye: EyeData, local_slot_id: int, local_lane_id: int) -> EyeData: raw_tia_peak_value = self.local_reg_tool.read_tia_peak_reg(exp_id, local_slot_id, local_lane_id) logging.info(f'Initial tia_peak_value: {raw_tia_peak_value}') tia_peak_range = [0, 200] max_value = tia_peak_range[1] tia_peak_value = raw_tia_peak_value while tia_peak_value < max_value: step = 20 tia_peak_value += step if tia_peak_value > max_value: tia_peak_value = max_value self.local_reg_tool.write_tia_peak_reg(exp_id, local_slot_id, local_lane_id, tia_peak_value) logging.info(f'Adjusted tia_peak to {tia_peak_value}') eye = self.get_eye_diagram_per_lane(exp_id, eye.rt_phys_index, eye.phys_lane_index) logging.info(f'After tia_peak adjustment, eye issue: {eye.issue.name}') if eye.issue != EyeIssue.EYE_TOO_UP or tia_peak_value == max_value: break logging.info(f'Final tia_peak adjustment result: tia_peak={tia_peak_value}, eye issue={eye.issue.name}') return eye def adjust_high_freq_for_up_eye(self, exp_id: int, eye: EyeData, remote_slot_id: int, remote_lane_id: int) -> EyeData: raw_high_freq_value = self.remote_reg_tool.read_high_freq_reg(exp_id, remote_slot_id, remote_lane_id) logging.info(f'Initial high_freq_value: {raw_high_freq_value}') high_freq_value = raw_high_freq_value high_freq_range = [0, 150] while high_freq_value < high_freq_range[1]: step = 20 high_freq_value += step if high_freq_value > high_freq_range[1]: high_freq_value = high_freq_range[1] self.remote_reg_tool.write_high_freq_reg(exp_id, remote_slot_id, remote_lane_id, high_freq_value) logging.info(f'Adjusted high_freq to {high_freq_value}') eye = self.get_eye_diagram_per_lane(exp_id, eye.rt_phys_index, eye.phys_lane_index) logging.info(f'After high_freq adjustment, eye issue: {eye.issue.name}') if eye.issue != EyeIssue.EYE_TOO_UP or high_freq_value == high_freq_range[1]: break logging.info(f'Final high_freq adjustment result: high_freq={high_freq_value}, eye issue={eye.issue.name}') return eye def adjust_opcurrent_for_up_eye(self, exp_id: int, eye: EyeData, remote_slot_id: int, remote_lane_id: int) -> EyeData: raw_eye_issue = eye.issue raw_opcurrent_value = self.remote_reg_tool.read_opcurrent_reg(exp_id, remote_slot_id, remote_lane_id) logging.info(f'Initial opcurrent_value: {raw_opcurrent_value}') op_range = [0, 190] opcurrent_value = raw_opcurrent_value while opcurrent_value > op_range[0]: step = 10 opcurrent_value -= step if opcurrent_value < op_range[0]: opcurrent_value = op_range[0] self.remote_reg_tool.write_opcurrent_reg(exp_id, remote_slot_id, remote_lane_id, opcurrent_value) logging.info(f'Adjusted opcurrent to {opcurrent_value}') eye = self.get_eye_diagram_per_lane(exp_id, eye.rt_phys_index, eye.phys_lane_index) logging.info(f'After opcurrent adjustment, eye issue: {eye.issue.name}') if eye.issue != raw_eye_issue or opcurrent_value == op_range[0]: break # 如果没有改善,恢复原始值 if eye.issue == raw_eye_issue: self.remote_reg_tool.write_opcurrent_reg(exp_id, remote_slot_id, remote_lane_id, raw_opcurrent_value) logging.info(f'No improvement, rollback opcurrent to {raw_opcurrent_value}') else: logging.info(f'opcurrent adjustment successful, final value: {opcurrent_value}') return eye def get_issue_eye_diagram(self, exp_id: int, retimer_index: int, attempts: int = 1) -> List[EyeData]: all_measurements: DefaultDict[int, List[EyeData]] = defaultdict(list) for attempt in range(1, attempts + 1): logger.info(f"RTMR{retimer_index} measurement attempt {attempt}/{attempts}") eye_data_list = self.get_eye_diagram(exp_id, retimer_index) if not eye_data_list: logger.error(f"Attempt {attempt} failed for RTMR{retimer_index}") continue for eye_data in eye_data_list: all_measurements[eye_data.lane_index].append(eye_data) if not all_measurements: logger.error(f"No valid measurements for RTMR{retimer_index}") return [] worst_eyes: List[EyeData] = [] for lane_idx, measurements in all_measurements.items(): logging.debug(f'---------- lan_inx: {lane_idx}, measurements: {measurements}') worst_eye = self.determine_issue(measurements) worst_eyes.append(worst_eye) bad_eyes = [eye for eye in worst_eyes if eye.issue != EyeIssue.EYE_NORMAL] return bad_eyes def determine_issue(self, measurements: List[EyeData]): # 优先级: 小,大,上,下, # 假设不会有偏光导致眼图小的情况,一定是先解决眼图小和大的问题,再解决眼图上和下的问题 worst_eye = min(measurements, key=lambda x: x.quality_score) if worst_eye.issue in [EyeIssue.EYE_NORMAL, EyeIssue.EYE_TOO_LARGE]: # 如果是眼最小的是正常的或者偏大的, 那就返回最大的 worst_eye = max(measurements, key=lambda x: x.quality_score) return worst_eye def get_worst_eye_diagram_per_lane(self, exp_id: int, retimer_phys_index: int, phys_lane: int, attempts: int = 3) -> EyeData: measurements: List[EyeData] = [] for attempt in range(1, attempts + 1): try: logger.info(f"Attempt {attempt}/{attempts} for RTMR{retimer_phys_index} Lane{phys_lane}") eye_data = self.get_eye_diagram_per_lane(exp_id, retimer_phys_index, phys_lane) if eye_data.rt_phys_index != -1: measurements.append(eye_data) logger.info(f"Measurement {attempt}: {eye_data}") # 如果发现眼图有问题,直接返回,不再继续查询 if eye_data.issue != EyeIssue.EYE_NORMAL: logger.info(f"Eye issue detected: {eye_data.issue.name}, returning immediately") return eye_data else: logger.warning(f"Invalid measurement in attempt {attempt}") except Exception as e: logger.error(f"Attempt {attempt} failed: {str(e)}") # 如果所有尝试都是正常眼图,则进行综合判断 if not measurements: logger.error(f"No valid measurements obtained after {attempts} attempts") return EyeData() # 返回默认的空数据 # 找出质量最差的眼图 worst_eye = self.determine_issue(measurements) logger.warning(f"Worst eye diagram for RTMR{retimer_phys_index} Lane{phys_lane}: " f"Quality score: {worst_eye.quality_score:.2f}") return worst_eye def get_eye_diagram_per_lane(self, exp_id: int, retimer_phys_index: int, phys_lane: int, attempts: int = 3) -> EyeData: measurements: List[EyeData] = [] for attempt in range(1, attempts + 1): try: logger.info(f"Attempt {attempt}/{attempts} for RTMR{retimer_phys_index} Lane{phys_lane}") eye_data = self.get_eye_diagram_per_lane_1_count(exp_id, retimer_phys_index, phys_lane) if eye_data.rt_phys_index != -1: measurements.append(eye_data) logger.info(f"Measurement {attempt}: {eye_data}") # 如果发现眼图有问题,直接返回,不再继续查询 if eye_data.issue != EyeIssue.EYE_NORMAL: logger.info(f"Eye issue detected: {eye_data.issue.name}, returning immediately") return eye_data else: logger.warning(f"Invalid measurement in attempt {attempt}") except Exception as e: logger.error(f"Attempt {attempt} failed: {str(e)}") # 如果所有尝试都是正常眼图,则进行综合判断 if not measurements: logger.error(f"No valid measurements obtained after {attempts} attempts") return EyeData() # 返回默认的空数据 # 找出质量最差的眼图 worst_eye = self.determine_issue(measurements) logger.warning(f"Worst eye diagram for RTMR{retimer_phys_index} Lane{phys_lane}: " f"Quality score: {worst_eye.quality_score:.2f}") return worst_eye def get_eye_diagram_per_lane_1_count(self, exp_id: int, retimer_phys_index: int, phys_lane: int) -> EyeData: eye_data = EyeData() cmd = f"rtmr {retimer_phys_index} eye 0 a {phys_lane}" try: raw_output = self.local_bmc.CmdVendorCommand(exp_id, cmd) # logging.info(f'raw_output: {raw_output}') if raw_output == '': logger.error(f"No data returned from command: {cmd}") return eye_data eye_datas = self.parse_eye_data(raw_output) return eye_datas[0] except Exception as e: logger.error(f"Error executing command '{cmd}': {e}") return eye_data def get_eye_diagram(self, exp_id: int, retimer_index: int) -> List[EyeData]: cmd = f"rtmr {retimer_index} eye 0 a" try: raw_output = self.local_bmc.CmdVendorCommand(exp_id, cmd) if not raw_output: logger.error(f"No data returned from command: {cmd}") return [] return self.parse_eye_data(raw_output) except Exception as e: logger.error(f"Error executing command '{cmd}': {e}") return [] def parse_eye_data(self, text: str) -> List[EyeData]: pattern = ( r"RTMR(\d+)\s+EYE_A(\d+):\s*" r"\((-?\d+\.?\d*),\s*(-?\d+\.?\d*)\)\s*mV\s*\|\s*" r"\((-?\d+\.?\d*),\s*(-?\d+\.?\d*)\)\s*UI" ) regex = re.compile(pattern) results = [] for line_num, line in enumerate(text.strip().splitlines(), 1): line = line.strip() if not line or "error" in line.lower(): continue match = regex.search(line) # 使用 search 允许前后有额外字符 if not match: logger.debug(f"Line {line_num}: Skipped (no match) - {line}") continue try: rtmr_prefix = match.group(1) eye_index = int(match.group(2)) down = float(match.group(3)) top = float(match.group(4)) left = float(match.group(5)) right = float(match.group(6)) # 计算真实 retimer_index 和 lane_index rt_phys_index = int(f"{rtmr_prefix}") phys_lane_index = eye_index if eye_index < 8: rt_index = int(f"{rtmr_prefix}1") lane_index = eye_index else: rt_index = int(f"{rtmr_prefix}2") lane_index = eye_index - 8 eye_data = EyeData( rt_phys_index=rt_phys_index, phys_lane_index=phys_lane_index, rt_index=rt_index, lane_index=lane_index, down=down, top=top, left=left, right=right ) eye_data.determine_issue() # 初始化问题类型 results.append(eye_data) except Exception as e: logger.error(f"Failed to parse line {line_num}: {line} | Error: {e}") continue logger.info(f"Parsed {len(results)} valid eye diagrams.") return results