From ff3abe3f5c872a89283d1b162e789d0dfb591d75 Mon Sep 17 00:00:00 2001 From: xz_ocs Date: Mon, 6 Oct 2025 02:47:34 +0000 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=20'ocs/note.md'?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ocs/note.md | 1161 ++++++++++++++++++++------------------------------- 1 file changed, 459 insertions(+), 702 deletions(-) diff --git a/ocs/note.md b/ocs/note.md index 397ee2d..628826f 100644 --- a/ocs/note.md +++ b/ocs/note.md @@ -76,687 +76,494 @@ scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_lin scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.156:/usr/bin/ - - -from dataclasses import dataclass -import re -from typing import List, Optional, Dict, DefaultDict -import time -from enum import Enum -import logging -from collections import defaultdict -from parser.topology_parser import TopoMappingParser -from parser.transceiver_config_parser import TransceiverConfigParser -from toolbox.opt_reg_access_tool import OptRegAccessTool +import logging, time, os, json +from typing import Dict, List, Any, Optional from gpu.biren.exp_util import DevWhiteRiverExp -from parser.ibias_rssi_map_parser import IbiasRssiMapParser - -logger = logging.getLogger(__name__) - -# -------------------------------------------------- -# Eye Issue Definition (Corrected Logic) -# -------------------------------------------------- - -class OptReg(Enum): - MGC = '' - -class EyeIssue(Enum): - EYE_NORMAL = 0 - EYE_TOO_SMALL = 1 # 垂直开口太小: sum(|top|, |down|) < 30 - EYE_TOO_LARGE = 2 - EYE_TOO_DOWN = 3 # 眼图偏低: top < 15 - EYE_TOO_UP = 4 # 眼图偏高: |down| < 15 (i.e., down > -15) - -@dataclass -class EyeData: - rt_phys_index: int = -1 - phys_lane_index: int = -1 - rt_index: int = -1 - lane_index: int = -1 - down: float = -1 # mV (usually negative) - top: float = -1 # mV (usually positive) - left: float = -1 # UI - right: float = -1 # UI - issue: EyeIssue = EyeIssue.EYE_NORMAL - - def __str__(self): - return (f"RTMR{self.rt_index} EYE_A{self.lane_index:02d}: " - f"({self.down:.1f}, {self.top:.1f}) mV | " - f"({self.left:.2f}, {self.right:.2f}) UI") - - @property - def vertical_amplitude(self) -> float: - return abs(self.down) + abs(self.top) - - @property - def horizontal_amplitude(self) -> float: - return abs(self.left) + abs(self.right) - - @property - def quality_score(self) -> float: - return self.vertical_amplitude - - def determine_issue(self) -> None: - if self.vertical_amplitude < 30: - self.issue = EyeIssue.EYE_TOO_SMALL - elif self.vertical_amplitude > 220: - self.issue = EyeIssue.EYE_TOO_LARGE - elif self.top < 15: - self.issue = EyeIssue.EYE_TOO_DOWN - elif self.down > -15: # 因为 down 是负值,> -15 表示离 0 太近(太高) - self.issue = EyeIssue.EYE_TOO_UP - else: - self.issue = EyeIssue.EYE_NORMAL - - -# -------------------------------------------------- -# 🔧 EQ Tuning Tool -# -------------------------------------------------- - -class EqTuneTool: - def __init__(self, local_bmc: DevWhiteRiverExp, remote_bmc: DevWhiteRiverExp, - local_reg_tool: OptRegAccessTool, remote_reg_tool: OptRegAccessTool, - ibias_rssi_map: IbiasRssiMapParser, route_name: str): - self.local_bmc = local_bmc - self.remote_bmc = remote_bmc - self.local_reg_tool = local_reg_tool - self.remote_reg_tool = remote_reg_tool - self.ibias_rssi_map = ibias_rssi_map - self.route_name = route_name - self.topo_map = TopoMappingParser("./main_data/topo_mapping.yaml") - self.topo_map.parse() - - def eq_auto_tune(self, exp_id: int): - - cmd = 'ver' - raw_output = self.local_bmc.CmdVendorCommand(exp_id, cmd) - logging.info(raw_output) - - raw_tia_peak_value = self.local_reg_tool.read_tia_peak_reg(exp_id, 2, 0) - logging.info(f'Test Initial tia_peak_value: {raw_tia_peak_value}') - - logger.info(f"Starting auto tune for exp id: {exp_id}") - # retimers = [1, 2, 3, 4] - retimers = [3] - for retimer_index in retimers: - bad_eyes = self.get_issue_eye_diagram(exp_id, retimer_index) - time.sleep(1) - if bad_eyes: - self.process_worst_eyes(exp_id, retimer_index, bad_eyes) - else: - logger.info(f"No eye issues found on RTMR{retimer_index}") +from toolbox.opt_reg_access_tool import OptRegAccessTool - def process_worst_eyes(self, exp_id: int, retimer_index: int, bad_eyes: List[EyeData]) -> None: - logger.warning(f"Detected eye issues on RTMR{retimer_index}:") +from parser.transceiver_config_parser import TransceiverConfigParser, RegisterInfo - for eye in bad_eyes: - logger.warning( - f"RTMR {eye.rt_index} Lane {eye.lane_index:02d}: {eye.issue.name} " - f"(Vertical: {eye.vertical_amplitude:.1f}mV, Top={eye.top:.1f}, Down={eye.down:.1f})" - ) - - for eye in bad_eyes: - logger.warning( - f"Starting Process RTMR {eye.rt_index} Lane {eye.lane_index:02d}: {eye.issue.name} " - f"(Vertical: {eye.vertical_amplitude:.1f}mV, Top={eye.top:.1f}, Down={eye.down:.1f})" +class MgcTuningTool: + """ + MGC自动校准工具类 + """ + def __init__(self, host:str, reg_table: TransceiverConfigParser, reg_tool: OptRegAccessTool, + bmc: DevWhiteRiverExp): + self.host = host + self.reg_table = reg_table + self.bmc = bmc + self.target_vpeak = 0 + self.reg_tool = reg_tool + self.target_vpeaks_file = "target_vpeaks.json" + self.mode = "onoc1" # 默认mode + + # 暂时只考虑了onoc模式 + # onet模式暂时手动传 + + def auto_tune(self, exp_id: int, slot_id: int, lane_list: List[int]) -> bool: + # target_vpeaks = self.calc_target_vpeak(exp_id, slot_id) + target_vpeaks = self.calc_target_vpeak_new(exp_id, slot_id, lane_list) + # 开始匹配目标target_vpeaks + logging.info("----------step 7: find optimal MGC") + self.match_optimal_mgc_new(exp_id, slot_id, lane_list, target_vpeaks) + return True + + def manual_tune(self, exp_id: int, slot_id: int) -> bool: + target_vpeaks = self.load_target_vpeaks_from_json(exp_id, slot_id) + if target_vpeaks is None: + logging.error("从文件加载target_vpeaks失败") + return False + self.match_optimal_mgc_new2(exp_id, slot_id, target_vpeaks) + return True + + def calc_target_vpeak_new(self, exp_id: int, slot_id: int, lane_list: List[int]) -> Dict[int, int]: + target_vpeaks : Dict[int, int] = {} + logging.info(f"-------slot {slot_id}") + + # 切换成mgc + logging.info("----------step 1: disable agc") + self.disable_agc(exp_id, slot_id) + + # 关闭RF + logging.info("----------step 2: toogle RF off") + if not self.toogle_rf(exp_id, slot_id, lane_list, "off"): + logging.error(f"slot {slot_id}: toogle RF off fail") + return target_vpeaks + + # 读取base_line_vpeaks + logging.info("----------step 3: get base_line_vpeaks, and calculate target vpeaks") + base_line_vpeaks = self._read_vpeak_all_lanes(exp_id, slot_id, lane_list) + logging.info(f"base_line_vpeaks: {base_line_vpeaks}") + self.toogle_rf(exp_id, slot_id, lane_list, "on") + + self._write_mgc_all_lanes(exp_id, slot_id, lane_list, 255) + + max_vpeaks = self._read_vpeak_all_lanes(exp_id, slot_id, lane_list) + + for lane, base_vpeak in base_line_vpeaks.items(): + vpkdelta = round((max_vpeaks[lane] - base_vpeak) * (16 / 29)) + target_vpeaks[lane] = vpkdelta + base_vpeak + + logging.info(f"target_vpeaks: {target_vpeaks}") + + if target_vpeaks: + self.save_target_vpeaks_to_json(exp_id, slot_id, target_vpeaks) + + # 开启RF + logging.info("----------step 4: enable RF") + # time.sleep(5) + + return target_vpeaks + + def calc_target_vpeak(self, exp_id: int, lane_list: List[int], slot_id: int) -> Dict[int, int]: + + target_vpeaks : Dict[int, int] = {} + logging.info(f"-------slot {slot_id}") + + # 切换成mgc + logging.info("----------step 1: disable agc") + self.disable_agc(exp_id, slot_id) + + # 关闭RF + logging.info("----------step 2: toogle RF off") + if not self.toogle_rf(exp_id, slot_id, lane_list, "off"): + logging.error(f"slot {slot_id}: toogle RF off fail") + return target_vpeaks + + # 读取base_line_vpeaks + logging.info("----------step 3: get base_line_vpeaks, and calculate target vpeaks") + base_line_vpeaks = self._read_vpeak_all_lanes(exp_id, slot_id, lane_list) + logging.info(f"base_line_vpeaks: {base_line_vpeaks}") + target_vpeaks = self.calculate_target_vpeak(base_line_vpeaks) + logging.info(f"target_vpeaks: {target_vpeaks}") + + if target_vpeaks: + self.save_target_vpeaks_to_json(exp_id, slot_id, target_vpeaks) + + # 开启RF + logging.info("----------step 4: enable RF") + # time.sleep(5) + self.toogle_rf(exp_id, slot_id, lane_list, "on") + + return target_vpeaks + + def disable_agc(self, exp_id: int, slot_id: int): + logging.debug("----------disable_agc, step 1") + agc_enable_regs = self.reg_table.get_registers_by_name("tia_agc_en") + if not agc_enable_regs: + logging.error("not find AGC_ENABLE register") + return False + + logging.debug("----------disable_agc, step 2") + success = True + for reg in agc_enable_regs: + logging.debug(f"----------disable_agc, step 2: reg: {reg}") + result = self.bmc.SetOpticalModuleRegs( + exp_id=exp_id, + slot_id=slot_id, + bank=reg.bank, + page=reg.page, + reg_offset=reg.offset, + size=reg.size, + hex_str="0000" ) - tmp_eye = eye - max_retry_cout = 2 - try_count = 0 - while tmp_eye.issue != EyeIssue.EYE_NORMAL and try_count < max_retry_cout: - logging.info(f'------try count: {try_count}') - if tmp_eye.issue == EyeIssue.EYE_TOO_SMALL: - self.process_small_eye(exp_id, retimer_index, eye) + logging.debug(f"----------disable_agc, step 3: result: {result}") + if not result: + logging.error(f"设置寄存器失败: bank={reg.bank}, page={reg.page}, offset={reg.offset}") + success = False - if tmp_eye.issue == EyeIssue.EYE_TOO_DOWN: - self.process_down_eye(exp_id, retimer_index, eye) - elif tmp_eye.issue == EyeIssue.EYE_TOO_UP: - self.process_up_eye(exp_id, retimer_index, eye) - - # if tmp_eye.issue == EyeIssue.EYE_TOO_LARGE: - # logging.info(f"Eye issue: {eye.issue.name}") - - try_count +=1 - - # # if tmp_eye.issue == EyeIssue.EYE_TOO_LARGE: - # # 如果rssi很大,那适当减少,减少ibias - # # slot = self.topo_map.get_remote_slot_by_retimer(eye.rt_index, eye.lane_index) - # # if slot is None: - # # break - # # slot_id = slot[0] - # # lane_id = slot[1] - # # ibias_value = self.remote_reg_tool. - - def process_small_eye(self, exp_id: int, retimer_index: int, eye: EyeData): - logger.info(f"Starting small eye issue processing for RTMR{retimer_index} lane {eye.lane_index}, adjusting ibias") - - # 获取slot信息 - remote_slot = self.topo_map.get_remote_slot_by_retimer(eye.rt_index, eye.lane_index) - local_slot = self.topo_map.get_local_slot_by_retimer(eye.rt_index, eye.lane_index) - if remote_slot is None or local_slot is None: - logger.error(f"Invalid slot for RTMR{retimer_index} lane {eye.lane_index}") - return eye - - remote_slot_id = remote_slot[0] - remote_lane_id = remote_slot[1] - local_slot_id = local_slot[0] - local_lane_id = local_slot[1] + return success - logger.info(f"Processing small eye for RTMR{eye.rt_index} lane {eye.lane_index}: " - f"remote_slot=({remote_slot_id},{remote_lane_id}), " - f"local_slot=({local_slot_id},{local_lane_id})") + def toogle_rf(self, exp_id: int, slot_id: int, lane_list: List[int], switch: str = "off") -> bool: + ret = False + logging.info(f"---------- toogle_rf to {switch}") - # 调节ibias - logger.info(f"Adjusting ibias for RTMR{eye.rt_index} lane {eye.lane_index}") - eye = self.adjust_ibias_for_small_eye(exp_id, eye, remote_slot_id, remote_lane_id, local_slot_id, local_lane_id) - - # 如果依然很小,调节opcurrent - if eye.issue == EyeIssue.EYE_TOO_SMALL: - logger.info(f"Eye still too small for RTMR{retimer_index} lane {eye.lane_index}, adjusting opcurrent") - eye = self.adjust_opcurrent_for_small_down_eye(exp_id, eye, remote_slot_id, remote_lane_id) - # eye = self.adjust_opcurrent_for_up_eye(exp_id, eye, remote_slot_id, remote_lane_id) - - # 如果依然很小,调节mgc - if eye.issue == EyeIssue.EYE_TOO_SMALL: - logger.info(f"Eye still too small for RTMR{retimer_index} lane {eye.lane_index}, adjusting mgc") - eye = self.adjust_mgc_for_small_eye(exp_id, eye, local_slot_id, local_lane_id) - - logger.info(f"Completed small eye processing for RTMR{retimer_index} lane {eye.lane_index}, " - f"final issue: {eye.issue.name}") - return eye - - def process_down_eye(self, exp_id: int, retimer_index: int, eye: EyeData): - logging.info(f"Starting down eye issue processing for RTMR{retimer_index} lane {eye.lane_index}") - - # 获取slot信息 - remote_slot = self.topo_map.get_remote_slot_by_retimer(eye.rt_index, eye.lane_index) - local_slot = self.topo_map.get_local_slot_by_retimer(eye.rt_index, eye.lane_index) - if remote_slot is None or local_slot is None: - logger.error(f"Invalid slot for RTMR{retimer_index} lane {eye.lane_index}") - return eye - - remote_slot_id = remote_slot[0] - remote_lane_id = remote_slot[1] - local_slot_id = local_slot[0] - local_lane_id = local_slot[1] - - logger.info(f"Processing down eye for RTMR{eye.rt_index} lane {eye.lane_index}: " - f"remote_slot=({remote_slot_id},{remote_lane_id}), " - f"local_slot=({local_slot_id},{local_lane_id})") + for lane in lane_list: + if switch == "off": + ret = self.reg_tool.write_tia_stage2_reg(exp_id, slot_id, lane, 0) + if ret == False: + logging.error(f"---------- toogle_rf to {switch} failed") + else: + stage_recommended_value = 144 + ret = self.reg_tool.write_tia_stage2_reg(exp_id, slot_id, lane, stage_recommended_value) + if ret == False: + logging.error(f"---------- toogle_rf to {switch} failed") - # 先调节tia_peak(值调小) - logger.info(f"Adjusting tia_peak for RTMR{eye.rt_index} lane {eye.lane_index}") - eye = self.adjust_tia_peak_for_down_eye(exp_id, eye, local_slot_id, local_lane_id) - - # 如果依然偏下,调节high_freq(值调小) - if eye.issue == EyeIssue.EYE_TOO_DOWN: - logger.info(f"Eye still too down for RTMR{retimer_index} lane {eye.lane_index}, adjusting high_freq") - eye = self.adjust_high_freq_for_down_eye(exp_id, eye, remote_slot_id, remote_lane_id) - - # 如果依然偏下,调节opcurrent - if eye.issue == EyeIssue.EYE_TOO_DOWN: - logger.info(f"Eye still too down for RTMR{retimer_index} lane {eye.lane_index}, adjusting opcurrent") - eye = self.adjust_opcurrent_for_small_down_eye(exp_id, eye, remote_slot_id, remote_lane_id) - - logger.info(f"Completed down eye processing for RTMR{retimer_index} lane {eye.lane_index}, " - f"final issue: {eye.issue.name}") - return eye + return ret + def calculate_target_vpeak(self, base_line_vpeaks: Dict[int, int]) -> Dict[int, int]: + # base_line值加22 + target_vpeaks:Dict[int, int] = {} + for lane, vpeak in base_line_vpeaks.items(): + logging.info(f"lane={lane}, vpeak={vpeak}") + target_vpeaks[lane] = vpeak + 18 + logging.info(f"target_vpeaks={target_vpeaks}") + return target_vpeaks + + def match_optimal_mgc_new2(self, exp_id: int, slot_id: int, target_vpeaks: Optional[Dict[int, int]]) -> bool: + mgc_regs = self.reg_table.get_registers_by_name("mgc") + vpeak_regs = self.reg_table.get_registers_by_name("vpeak") + if mgc_regs is None or vpeak_regs is None: + logging.error("not find MGC regs") + return False + + success = True + matched_results = {} + unmatched_lanes = [] + target_mgcs = {} + + for reg in mgc_regs: + # logging.info('------------------step 1') + mgc_value = self.bmc.GetOpticalModuleRegs( + exp_id=exp_id, + slot_id=slot_id, + bank=reg.bank, + page=reg.page, + reg_offset=reg.offset, + size=1 + ) + + # logging.info(f'------------------step 2: {mgc_value}') + mgc_value = int(mgc_value, 16) + for vpeak_reg in vpeak_regs: + if vpeak_reg.lane == reg.lane: + vpeak_value = self.bmc.GetOpticalModuleRegs( + exp_id=exp_id, + slot_id=slot_id, + bank=vpeak_reg.bank, + page=vpeak_reg.page, + reg_offset=vpeak_reg.offset, + size=2 + ) + vpeak_value = int(vpeak_value, 16) + # logging.info(f'------------------step 3: {vpeak_value}') + logging.info(f'mgc_value: {mgc_value}, target_vpeak: {target_vpeaks[vpeak_reg.lane]}, vpeak_value: {vpeak_value}') + target_mgc = (target_vpeaks[vpeak_reg.lane] - vpeak_value) * 1.5 + mgc_value + logging.info(f'target_mgc: {target_mgc}') + target_mgcs[reg.lane] = target_mgc + break - def process_up_eye(self, exp_id: int, retimer_index: int, eye: EyeData): - logging.info(f"Starting up eye issue processing for RTMR{retimer_index} lane {eye.lane_index}") - - # 获取slot信息 - remote_slot = self.topo_map.get_remote_slot_by_retimer(eye.rt_index, eye.lane_index) - local_slot = self.topo_map.get_local_slot_by_retimer(eye.rt_index, eye.lane_index) - if remote_slot is None or local_slot is None: - logger.error(f"Invalid slot for RTMR{retimer_index} lane {eye.lane_index}") - return eye - - remote_slot_id = remote_slot[0] - remote_lane_id = remote_slot[1] - local_slot_id = local_slot[0] - local_lane_id = local_slot[1] + logging.info(f'----------------Target MGC: {target_mgcs}') - logger.info(f"Processing up eye for RTMR{eye.rt_index} lane {eye.lane_index}: " - f"remote_slot=({remote_slot_id},{remote_lane_id}), " - f"local_slot=({local_slot_id},{local_lane_id})") + for reg in mgc_regs: + logging.info(f"mgc regs size: {len(mgc_regs)}") - # 先调节tia_peak(值调大) - logger.info(f"Adjusting tia_peak for RTMR{eye.rt_index} lane {eye.lane_index}") - eye = self.adjust_tia_peak_for_up_eye(exp_id, eye, local_slot_id, local_lane_id) - - # 如果依然偏高,调节high_freq(值调大) - if eye.issue == EyeIssue.EYE_TOO_UP: - logger.info(f"Eye still too up for RTMR{retimer_index} lane {eye.lane_index}, adjusting high_freq") - eye = self.adjust_high_freq_for_up_eye(exp_id, eye, remote_slot_id, remote_lane_id) - - # 如果依然偏高,调节opcurrent(值调小) - if eye.issue == EyeIssue.EYE_TOO_UP: - logger.info(f"Eye still too up for RTMR{retimer_index} lane {eye.lane_index}, adjusting opcurrent") - eye = self.adjust_opcurrent_for_up_eye(exp_id, eye, remote_slot_id, remote_lane_id) - - logger.info(f"Completed up eye processing for RTMR{retimer_index} lane {eye.lane_index}, " - f"final issue: {eye.issue.name}") - return eye - - def adjust_ibias_for_small_eye(self, exp_id: int, eye: EyeData, remote_slot_id: int, remote_lane_id: int, - local_slot_id: int, local_lane_id: int) -> EyeData: - ibias_range = [2000, 3200] - rssi_range = [5000, 10000] - ibias_value = self.remote_reg_tool.read_ibias_reg(exp_id, remote_slot_id, remote_lane_id) - logging.info(f'Initial ibias_value: {ibias_value}') - if ibias_value == 0: - logging.warning(f'read ibias reg again') - ibias_value = self.remote_reg_tool.read_ibias_reg(exp_id, remote_slot_id, remote_lane_id) - logging.info(f'Initial ibias_value: {ibias_value}') - - while ibias_value < ibias_range[1]: - # 根据当前值确定步长 - step = 300 if ibias_value < 2500 else 150 - ibias_value += step - self.remote_reg_tool.write_ibias_reg(exp_id, remote_slot_id, remote_lane_id, ibias_value) - logging.info(f'Adjusted ibias to {ibias_value} with step {step}') - - ibias_lane = self.remote_reg_tool.get_ibias_by_logic_lane(remote_lane_id) - rssi_lanes = self.ibias_rssi_map.get_rssi_lane(self.route_name, ibias_lane) - if rssi_lanes is None: - logging.error('RSSI lanes is none') - break + min_val = int(target_mgcs[reg.lane] - 10) + if min_val > 255: + min_val = 255 + max_val = int(target_mgcs[reg.lane] + 10) + if max_val > 255: + logging.info(f'max_val is greater than 255, setting to 255') + max_val = 255 + logging.info(f"lane: {reg.lane}, min mgc : {min_val}, max_mgc: {max_val}, target vpeak: {target_vpeaks[reg.lane]}") + step = reg.step - rssi1_value = self.local_reg_tool.read_rssi_reg(exp_id, local_slot_id, rssi_lanes[0]) - rssi2_value = self.local_reg_tool.read_rssi_reg(exp_id, local_slot_id, rssi_lanes[1]) - logging.info(f'RSSI values: rssi1={rssi1_value}, rssi2={rssi2_value}') - is_margin = False - if rssi1_value > rssi_range[1] or rssi2_value > rssi_range[1]: - # 回退一步 - self.remote_reg_tool.write_ibias_reg(exp_id, remote_slot_id, remote_lane_id, ibias_value - step) - logging.info(f'RSSI margin exceeded, rollback ibias to {ibias_value - step}') - is_margin = True + search_values = list(range(max_val, min_val - 1, -step)) + logging.info(f"Search order for lane {reg.lane}: {search_values}") - eye = self.get_eye_diagram_per_lane(exp_id, eye.rt_phys_index, eye.phys_lane_index) - logging.info(f'After ibias adjustment, eye issue: {eye.issue.name}') - if eye.issue != EyeIssue.EYE_TOO_SMALL or is_margin: - break - - logging.info(f'Final ibias adjustment result: ibias={ibias_value}, eye issue={eye.issue.name}') - return eye - - def adjust_opcurrent_for_small_down_eye(self, exp_id: int, eye: EyeData, remote_slot_id: int, remote_lane_id: int) -> EyeData: - raw_eye_issue = eye.issue - raw_opcurrent_value = self.remote_reg_tool.read_opcurrent_reg(exp_id, remote_slot_id, remote_lane_id) - logging.info(f'Initial opcurrent_value: {raw_opcurrent_value}') - op_range = [100, 190] - opcurrent_value = raw_opcurrent_value - - while opcurrent_value < op_range[1]: - step = 10 - opcurrent_value += step - if opcurrent_value > op_range[1]: - opcurrent_value = op_range[1] - - self.remote_reg_tool.write_opcurrent_reg(exp_id, remote_slot_id, remote_lane_id, opcurrent_value) - logging.info(f'Adjusted opcurrent to {opcurrent_value}') + is_match = False + matched_mgc = None + matched_vpeak = None - eye = self.get_eye_diagram_per_lane(exp_id, eye.rt_phys_index, eye.phys_lane_index) - logging.info(f'After opcurrent adjustment, eye issue: {eye.issue.name}') - - if eye.issue != raw_eye_issue or opcurrent_value == op_range[1]: - break - - # 如果没有改善,恢复原始值 - if eye.issue == raw_eye_issue: - self.remote_reg_tool.write_opcurrent_reg(exp_id, remote_slot_id, remote_lane_id, raw_opcurrent_value) - logging.info(f'No improvement, rollback opcurrent to {raw_opcurrent_value}') - else: - logging.info(f'opcurrent adjustment successful, final value: {opcurrent_value}') - - return eye + for mgc in search_values: + result = self.bmc.SetOpticalModuleRegs( + exp_id=exp_id, + slot_id=slot_id, + bank=reg.bank, + page=reg.page, + reg_offset=reg.offset, + size=1, + hex_str=f"{mgc:02X}" + ) + time.sleep(0.05) + + # 新固件特殊逻辑 + result = self.bmc.SetOpticalModuleRegs( + exp_id=exp_id, + slot_id=slot_id, + bank= 0, + page=0xd0, + reg_offset=0x88, + size=1, + hex_str="01" + ) + time.sleep(0.05) - def adjust_mgc_for_small_eye(self, exp_id: int, eye: EyeData, local_slot_id: int, local_lane_id: int) -> EyeData: - raw_mgc_value = self.local_reg_tool.read_mgc_reg(exp_id, local_slot_id, local_lane_id) - logging.info(f'Initial mgc_value: {raw_mgc_value}') - mgc_range = [raw_mgc_value, raw_mgc_value + 10] - mgc_value = raw_mgc_value - - while mgc_value < mgc_range[1]: - step = 2 - mgc_value += step - self.local_reg_tool.write_mgc_reg(exp_id, local_slot_id, local_lane_id, mgc_value) - logging.info(f'Adjusted mgc to {mgc_value}') - - eye = self.get_eye_diagram_per_lane(exp_id, eye.rt_phys_index, eye.phys_lane_index) - logging.info(f'After mgc adjustment, eye issue: {eye.issue.name}') - - if eye.issue != EyeIssue.EYE_TOO_SMALL: - logging.info(f'mgc adjustment successful, final value: {mgc_value}') - return eye - - # 如果没有改善,恢复原始值 - self.local_reg_tool.write_mgc_reg(exp_id, local_slot_id, local_lane_id, raw_mgc_value) - logging.info(f'No improvement, rollback mgc to {raw_mgc_value}') - return eye - - def adjust_tia_peak_for_down_eye(self, exp_id: int, eye: EyeData, local_slot_id: int, local_lane_id: int) -> EyeData: - raw_tia_peak_value = self.local_reg_tool.read_tia_peak_reg(exp_id, local_slot_id, local_lane_id) - logging.info(f'Initial tia_peak_value: {raw_tia_peak_value}') - tia_peak_range = [0, 200] - tia_peak_value = raw_tia_peak_value - - while tia_peak_value > tia_peak_range[0]: - step = 20 - tia_peak_value -= step - if tia_peak_value < tia_peak_range[0]: - tia_peak_value = tia_peak_range[0] + + for vpeak_reg in vpeak_regs: + if vpeak_reg.lane == reg.lane: + value = self.bmc.GetOpticalModuleRegs( + exp_id=exp_id, + slot_id=slot_id, + bank=vpeak_reg.bank, + page=vpeak_reg.page, + reg_offset=vpeak_reg.offset, + size=2 + ) + value = int(value, 16) + logging.info(f"exp_id:{exp_id}, slot_id:{slot_id}, lane:{reg.lane} -> set mgc {mgc}, Vpeak value: {value}") + if value < (target_vpeaks[reg.lane] + 1) and value > (target_vpeaks[reg.lane] - 2): + logging.info(f"----------Vpeak is matched, target vpeak:{target_vpeaks[reg.lane]}, current vpeak: {value}, current mgc:{mgc} ") + is_match = True + matched_mgc = mgc + matched_vpeak = value + break + if is_match: + break + time.sleep(0.03) - self.local_reg_tool.write_tia_peak_reg(exp_id, local_slot_id, local_lane_id, tia_peak_value) - logging.info(f'Adjusted tia_peak to {tia_peak_value}') + if is_match and matched_mgc is not None and matched_vpeak is not None: + if str(exp_id) not in matched_results: + matched_results[str(exp_id)] = {} + if str(slot_id) not in matched_results[str(exp_id)]: + matched_results[str(exp_id)][str(slot_id)] = {} + if self.mode not in matched_results[str(exp_id)][str(slot_id)]: + matched_results[str(exp_id)][str(slot_id)][self.mode] = {} + + matched_results[str(exp_id)][str(slot_id)][self.mode][str(reg.lane)] = { + "mgc": matched_mgc, + "target_vpeak": target_vpeaks[reg.lane], + "actual_vpeak": matched_vpeak + } + else: + unmatched_lanes.append(reg.lane) + + if matched_results: + self.save_mgc_results_to_json(exp_id, slot_id, matched_results) - eye = self.get_eye_diagram_per_lane(exp_id, eye.rt_phys_index, eye.phys_lane_index) - logging.info(f'After tia_peak adjustment, eye issue: {eye.issue.name}') + if unmatched_lanes: + logging.error(f"以下lane未匹配到合适的MGC值: {unmatched_lanes}") - if eye.issue != EyeIssue.EYE_TOO_DOWN or tia_peak_value == tia_peak_range[0]: - break - - logging.info(f'Final tia_peak adjustment result: tia_peak={tia_peak_value}, eye issue={eye.issue.name}') - return eye - - def adjust_high_freq_for_down_eye(self, exp_id: int, eye: EyeData, remote_slot_id: int, remote_lane_id: int) -> EyeData: - raw_high_freq_value = self.remote_reg_tool.read_high_freq_reg(exp_id, remote_slot_id, remote_lane_id) - logging.info(f'Initial high_freq_value: {raw_high_freq_value}') - high_freq_range = [0, 150] - high_freq_value = raw_high_freq_value + return success + + def match_optimal_mgc_new(self, exp_id: int, slot_id: int, lane_list: List[int], target_vpeaks: Dict[int, int]) -> bool: + matched_results = {} + unmatched_lanes = [] + + for lane_id in lane_list: + + is_match = False + matched_mgc = None + matched_vpeak = None - while high_freq_value > high_freq_range[0]: - step = 20 - high_freq_value -= step - if high_freq_value < high_freq_range[0]: - high_freq_value = high_freq_range[0] - - self.remote_reg_tool.write_high_freq_reg(exp_id, remote_slot_id, remote_lane_id, high_freq_value) - logging.info(f'Adjusted high_freq to {high_freq_value}') + reg = self.reg_table.get_register_by_logic_lane('mgc', lane_id) + if reg is None or reg.valid_range is None: + logging.error(f'match_optimal_mgc_new error. exp:{exp_id}, slot:{slot_id}, lane: {lane_id}, register name: mgc') + return False - eye = self.get_eye_diagram_per_lane(exp_id, eye.rt_phys_index, eye.phys_lane_index) - logging.info(f'After high_freq adjustment, eye issue: {eye.issue.name}') + min_val = reg.valid_range[0] + max_val = reg.valid_range[1] + step = reg.step if reg.step is not None else 1 + + wt_mgc = int((min_val + max_val) / 2) + target_vpeak = target_vpeaks[lane_id] # example: 100 + target_vpeak_range = [target_vpeak -1, target_vpeak + 1] # 99,101 + while True: + ret = self.reg_tool.write_mgc_reg(exp_id, slot_id, lane_id, wt_mgc) + if ret != 0: + logging.error(f'match_optimal_mgc_new error. exp:{exp_id}, slot:{slot_id}, lane: {lane_id}, register name: mgc') + return False + + ret = self.reg_tool.write_confirm_reg(exp_id, slot_id) + if ret != 0: + logging.error(f'match_optimal_mgc_new error. exp:{exp_id}, slot:{slot_id}, lane: {lane_id}, register name: confirm') + return False + + time.sleep(0.03) + current_vpeak = self.reg_tool.read_vpeak_reg(exp_id, slot_id, lane_id) + + logging.info(f"exp_id:{exp_id}, slot_id:{slot_id}, lane:{lane_id} -> set mgc {wt_mgc}, Vpeak value: {current_vpeak}") + + if current_vpeak > target_vpeak_range[1]: + wt_mgc -= step + + if current_vpeak < target_vpeak_range[0]: + wt_mgc += step + + if current_vpeak >= target_vpeak_range[0] and current_vpeak <= target_vpeak_range[1]: + is_match = True + matched_mgc = wt_mgc + matched_vpeak = current_vpeak + break - if eye.issue != EyeIssue.EYE_TOO_DOWN or high_freq_value == high_freq_range[0]: - break - - logging.info(f'Final high_freq adjustment result: high_freq={high_freq_value}, eye issue={eye.issue.name}') - return eye - - def adjust_tia_peak_for_up_eye(self, exp_id: int, eye: EyeData, local_slot_id: int, local_lane_id: int) -> EyeData: - raw_tia_peak_value = self.local_reg_tool.read_tia_peak_reg(exp_id, local_slot_id, local_lane_id) - logging.info(f'Initial tia_peak_value: {raw_tia_peak_value}') - tia_peak_range = [0, 200] - max_value = tia_peak_range[1] - tia_peak_value = raw_tia_peak_value + if is_match: + logging.info(f"----------Vpeak is matched, target vpeak:{target_vpeak}, current vpeak: {current_vpeak}, current mgc:{wt_mgc} ") + if str(exp_id) not in matched_results: + matched_results[str(exp_id)] = {} + if str(slot_id) not in matched_results[str(exp_id)]: + matched_results[str(exp_id)][str(slot_id)] = {} + if self.mode not in matched_results[str(exp_id)][str(slot_id)]: + matched_results[str(exp_id)][str(slot_id)][self.mode] = {} + + matched_results[str(exp_id)][str(slot_id)][self.mode][str(reg.lane)] = { + "mgc": matched_mgc, + "target_vpeak": target_vpeaks[lane_id], + "actual_vpeak": matched_vpeak + } + else: + unmatched_lanes.append(lane_id) - while tia_peak_value < max_value: - step = 20 - tia_peak_value += step - if tia_peak_value > max_value: - tia_peak_value = max_value + if matched_results: + self.save_mgc_results_to_json(exp_id, slot_id, matched_results) - self.local_reg_tool.write_tia_peak_reg(exp_id, local_slot_id, local_lane_id, tia_peak_value) - logging.info(f'Adjusted tia_peak to {tia_peak_value}') + if unmatched_lanes: + logging.error(f"以下lane未匹配到合适的MGC值: {unmatched_lanes}") + return False - eye = self.get_eye_diagram_per_lane(exp_id, eye.rt_phys_index, eye.phys_lane_index) - logging.info(f'After tia_peak adjustment, eye issue: {eye.issue.name}') + return True + + def _read_vpeak_all_lanes(self, exp_id: int, slot_id: int, lane_list: List[int]) -> Dict[int, int]: + vpeak_values = {} + for lane in lane_list: + vpeak_int = self.reg_tool.read_tia_peak_reg(exp_id, slot_id, lane) + vpeak_values[lane] = vpeak_int + + return vpeak_values + + def _write_mgc_all_lanes(self, exp_id: int, slot_id: int, lane_list: List[int], mgc: int) -> bool: + for lane_id in lane_list: + ret = self.reg_tool.write_mgc_reg(exp_id, slot_id, lane_id, mgc) + if ret == False: + logging.error(f'write mgc reg failed, exp_id:{exp_id}, slot_id:{slot_id}, lane_id:{lane_id}, mgc:{mgc}') + return False - if eye.issue != EyeIssue.EYE_TOO_UP or tia_peak_value == max_value: - break + ret = self.reg_tool.write_confirm_reg(exp_id, slot_id) + if ret == False: + logging.error(f'write confirm reg failed, exp_id:{exp_id}, slot_id:{slot_id}') + return False - logging.info(f'Final tia_peak adjustment result: tia_peak={tia_peak_value}, eye issue={eye.issue.name}') - return eye - - def adjust_high_freq_for_up_eye(self, exp_id: int, eye: EyeData, remote_slot_id: int, remote_lane_id: int) -> EyeData: - raw_high_freq_value = self.remote_reg_tool.read_high_freq_reg(exp_id, remote_slot_id, remote_lane_id) - logging.info(f'Initial high_freq_value: {raw_high_freq_value}') - high_freq_value = raw_high_freq_value - high_freq_range = [0, 150] - while high_freq_value < high_freq_range[1]: - step = 20 - high_freq_value += step - if high_freq_value > high_freq_range[1]: - high_freq_value = high_freq_range[1] - - self.remote_reg_tool.write_high_freq_reg(exp_id, remote_slot_id, remote_lane_id, high_freq_value) - logging.info(f'Adjusted high_freq to {high_freq_value}') + return True + + def _generate_host_filename(self) -> str: + safe_host = self.host.replace(':', '_').replace('/', '_').replace('\\', '_') + return f"target_vpeaks_{safe_host}.json" + + def _generate_mgc_host_filename(self) -> str: + safe_host = self.host.replace(':', '_').replace('/', '_').replace('\\', '_') + return f"mgc_{safe_host}.json" + + def save_target_vpeaks_to_json(self, exp_id: int, slot_id: int, target_vpeaks: Dict[int, int]) -> bool: + try: + filename = self._generate_host_filename() + os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True) - eye = self.get_eye_diagram_per_lane(exp_id, eye.rt_phys_index, eye.phys_lane_index) - logging.info(f'After high_freq adjustment, eye issue: {eye.issue.name}') + if os.path.exists(filename): + with open(filename, 'r') as f: + data = json.load(f) + else: + data = {} - if eye.issue != EyeIssue.EYE_TOO_UP or high_freq_value == high_freq_range[1]: - break - - logging.info(f'Final high_freq adjustment result: high_freq={high_freq_value}, eye issue={eye.issue.name}') - return eye - - def adjust_opcurrent_for_up_eye(self, exp_id: int, eye: EyeData, remote_slot_id: int, remote_lane_id: int) -> EyeData: - raw_eye_issue = eye.issue - raw_opcurrent_value = self.remote_reg_tool.read_opcurrent_reg(exp_id, remote_slot_id, remote_lane_id) - logging.info(f'Initial opcurrent_value: {raw_opcurrent_value}') - op_range = [0, 190] - opcurrent_value = raw_opcurrent_value - - while opcurrent_value > op_range[0]: - step = 10 - opcurrent_value -= step - if opcurrent_value < op_range[0]: - opcurrent_value = op_range[0] + if str(exp_id) not in data: + data[str(exp_id)] = {} + if str(slot_id) not in data[str(exp_id)]: + data[str(exp_id)][str(slot_id)] = {} - self.remote_reg_tool.write_opcurrent_reg(exp_id, remote_slot_id, remote_lane_id, opcurrent_value) - logging.info(f'Adjusted opcurrent to {opcurrent_value}') + data[str(exp_id)][str(slot_id)][self.mode] = {str(k): v for k, v in target_vpeaks.items()} - eye = self.get_eye_diagram_per_lane(exp_id, eye.rt_phys_index, eye.phys_lane_index) - logging.info(f'After opcurrent adjustment, eye issue: {eye.issue.name}') - - if eye.issue != raw_eye_issue or opcurrent_value == op_range[0]: - break - - # 如果没有改善,恢复原始值 - if eye.issue == raw_eye_issue: - self.remote_reg_tool.write_opcurrent_reg(exp_id, remote_slot_id, remote_lane_id, raw_opcurrent_value) - logging.info(f'No improvement, rollback opcurrent to {raw_opcurrent_value}') - else: - logging.info(f'opcurrent adjustment successful, final value: {opcurrent_value}') - - return eye - - def get_issue_eye_diagram(self, exp_id: int, retimer_index: int, attempts: int = 1) -> List[EyeData]: - all_measurements: DefaultDict[int, List[EyeData]] = defaultdict(list) - - for attempt in range(1, attempts + 1): - logger.info(f"RTMR{retimer_index} measurement attempt {attempt}/{attempts}") - eye_data_list = self.get_eye_diagram(exp_id, retimer_index) - - if not eye_data_list: - logger.error(f"Attempt {attempt} failed for RTMR{retimer_index}") - continue - - for eye_data in eye_data_list: - all_measurements[eye_data.lane_index].append(eye_data) - - if not all_measurements: - logger.error(f"No valid measurements for RTMR{retimer_index}") - return [] - - worst_eyes: List[EyeData] = [] - for lane_idx, measurements in all_measurements.items(): - logging.debug(f'---------- lan_inx: {lane_idx}, measurements: {measurements}') - worst_eye = self.determine_issue(measurements) - worst_eyes.append(worst_eye) + with open(filename, 'w') as f: + json.dump(data, f, indent=4) + + logging.info(f"target_vpeaks saved to {filename}, path: {exp_id}/{slot_id}/{self.mode}") + return True + except Exception as e: + logging.error(f"Failed to save target_vpeaks to JSON file: {e}") + return False - bad_eyes = [eye for eye in worst_eyes if eye.issue != EyeIssue.EYE_NORMAL] - return bad_eyes - - def determine_issue(self, measurements: List[EyeData]): - # 优先级: 小,大,上,下, - # 假设不会有偏光导致眼图小的情况,一定是先解决眼图小和大的问题,再解决眼图上和下的问题 - worst_eye = min(measurements, key=lambda x: x.quality_score) - if worst_eye.issue in [EyeIssue.EYE_NORMAL, EyeIssue.EYE_TOO_LARGE]: - # 如果是眼最小的是正常的或者偏大的, 那就返回最大的 - worst_eye = max(measurements, key=lambda x: x.quality_score) - - return worst_eye - - def get_worst_eye_diagram_per_lane(self, exp_id: int, retimer_phys_index: int, - phys_lane: int, attempts: int = 3) -> EyeData: - - measurements: List[EyeData] = [] - - for attempt in range(1, attempts + 1): - try: - logger.info(f"Attempt {attempt}/{attempts} for RTMR{retimer_phys_index} Lane{phys_lane}") - eye_data = self.get_eye_diagram_per_lane(exp_id, retimer_phys_index, phys_lane) + def load_target_vpeaks_from_json(self, exp_id: int, slot_id: int) -> Optional[Dict[int, int]]: + try: + filename = self._generate_host_filename() + if not os.path.exists(filename): + logging.error(f"File {filename} does not exist") + return None - if eye_data.rt_phys_index != -1: - measurements.append(eye_data) - logger.info(f"Measurement {attempt}: {eye_data}") - - # 如果发现眼图有问题,直接返回,不再继续查询 - if eye_data.issue != EyeIssue.EYE_NORMAL: - logger.info(f"Eye issue detected: {eye_data.issue.name}, returning immediately") - return eye_data - else: - logger.warning(f"Invalid measurement in attempt {attempt}") - - except Exception as e: - logger.error(f"Attempt {attempt} failed: {str(e)}") - - # 如果所有尝试都是正常眼图,则进行综合判断 - if not measurements: - logger.error(f"No valid measurements obtained after {attempts} attempts") - return EyeData() # 返回默认的空数据 - - # 找出质量最差的眼图 - worst_eye = self.determine_issue(measurements) - logger.warning(f"Worst eye diagram for RTMR{retimer_phys_index} Lane{phys_lane}: " - f"Quality score: {worst_eye.quality_score:.2f}") - - return worst_eye - def get_eye_diagram_per_lane(self, exp_id: int, retimer_phys_index: int, - phys_lane: int, attempts: int = 3) -> EyeData: - measurements: List[EyeData] = [] - - for attempt in range(1, attempts + 1): - try: - logger.info(f"Attempt {attempt}/{attempts} for RTMR{retimer_phys_index} Lane{phys_lane}") - eye_data = self.get_eye_diagram_per_lane_1_count(exp_id, retimer_phys_index, phys_lane) + with open(filename, 'r') as f: + data = json.load(f) - if eye_data.rt_phys_index != -1: - measurements.append(eye_data) - logger.info(f"Measurement {attempt}: {eye_data}") - - # 如果发现眼图有问题,直接返回,不再继续查询 - if eye_data.issue != EyeIssue.EYE_NORMAL: - logger.info(f"Eye issue detected: {eye_data.issue.name}, returning immediately") - return eye_data - else: - logger.warning(f"Invalid measurement in attempt {attempt}") - - except Exception as e: - logger.error(f"Attempt {attempt} failed: {str(e)}") - - # 如果所有尝试都是正常眼图,则进行综合判断 - if not measurements: - logger.error(f"No valid measurements obtained after {attempts} attempts") - return EyeData() # 返回默认的空数据 - - # 找出质量最差的眼图 - worst_eye = self.determine_issue(measurements) - logger.warning(f"Worst eye diagram for RTMR{retimer_phys_index} Lane{phys_lane}: " - f"Quality score: {worst_eye.quality_score:.2f}") - - return worst_eye - - def get_eye_diagram_per_lane_1_count(self, exp_id: int, retimer_phys_index: int, phys_lane: int) -> EyeData: - eye_data = EyeData() - cmd = f"rtmr {retimer_phys_index} eye 0 a {phys_lane}" - try: - raw_output = self.local_bmc.CmdVendorCommand(exp_id, cmd) - # logging.info(f'raw_output: {raw_output}') - if raw_output == '': - logger.error(f"No data returned from command: {cmd}") - return eye_data - eye_datas = self.parse_eye_data(raw_output) - return eye_datas[0] + if (str(exp_id) not in data or + str(slot_id) not in data[str(exp_id)] or + self.mode not in data[str(exp_id)][str(slot_id)]): + logging.error(f"Path {exp_id}/{slot_id}/{self.mode} not found in {filename}") + return None + + target_vpeaks_data = data[str(exp_id)][str(slot_id)][self.mode] + target_vpeaks = {int(k): v for k, v in target_vpeaks_data.items()} + + logging.info(f"target_vpeaks loaded successfully from {filename}, path: {exp_id}/{slot_id}/{self.mode}") + return target_vpeaks except Exception as e: - logger.error(f"Error executing command '{cmd}': {e}") - return eye_data - - def get_eye_diagram(self, exp_id: int, retimer_index: int) -> List[EyeData]: - cmd = f"rtmr {retimer_index} eye 0 a" + logging.error(f"Failed to load target_vpeaks from JSON file: {e}") + return None + + def save_mgc_results_to_json(self, exp_id: int, slot_id: int, matched_results: Dict[str, Any]) -> bool: try: - raw_output = self.local_bmc.CmdVendorCommand(exp_id, cmd) - if not raw_output: - logger.error(f"No data returned from command: {cmd}") - return [] - return self.parse_eye_data(raw_output) + filename = self._generate_mgc_host_filename() + os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True) + + if os.path.exists(filename): + with open(filename, 'r') as f: + data = json.load(f) + else: + data = {} + + for exp_key, exp_data in matched_results.items(): + if exp_key not in data: + data[exp_key] = {} + for slot_key, slot_data in exp_data.items(): + if slot_key not in data[exp_key]: + data[exp_key][slot_key] = {} + for mode_key, mode_data in slot_data.items(): + if mode_key not in data[exp_key][slot_key]: + data[exp_key][slot_key][mode_key] = {} + data[exp_key][slot_key][mode_key].update(mode_data) + + with open(filename, 'w') as f: + json.dump(data, f, indent=4) + + logging.info(f"MGC matching results saved to {filename}") + return True except Exception as e: - logger.error(f"Error executing command '{cmd}': {e}") - return [] - - def parse_eye_data(self, text: str) -> List[EyeData]: - - pattern = ( - r"RTMR(\d+)\s+EYE_A(\d+):\s*" - r"\((-?\d+\.?\d*),\s*(-?\d+\.?\d*)\)\s*mV\s*\|\s*" - r"\((-?\d+\.?\d*),\s*(-?\d+\.?\d*)\)\s*UI" - ) - regex = re.compile(pattern) - results = [] - - for line_num, line in enumerate(text.strip().splitlines(), 1): - line = line.strip() - if not line or "error" in line.lower(): - continue - - match = regex.search(line) # 使用 search 允许前后有额外字符 - if not match: - logger.debug(f"Line {line_num}: Skipped (no match) - {line}") - continue - - try: - rtmr_prefix = match.group(1) - eye_index = int(match.group(2)) - down = float(match.group(3)) - top = float(match.group(4)) - left = float(match.group(5)) - right = float(match.group(6)) - - # 计算真实 retimer_index 和 lane_index - rt_phys_index = int(f"{rtmr_prefix}") - phys_lane_index = eye_index - if eye_index < 8: - rt_index = int(f"{rtmr_prefix}1") - lane_index = eye_index - else: - rt_index = int(f"{rtmr_prefix}2") - lane_index = eye_index - 8 - - eye_data = EyeData( - rt_phys_index=rt_phys_index, - phys_lane_index=phys_lane_index, - rt_index=rt_index, - lane_index=lane_index, - down=down, - top=top, - left=left, - right=right - ) - eye_data.determine_issue() # 初始化问题类型 - results.append(eye_data) - - except Exception as e: - logger.error(f"Failed to parse line {line_num}: {line} | Error: {e}") - continue - - logger.info(f"Parsed {len(results)} valid eye diagrams.") - return results - - + logging.error(f"Failed to save MGC results to JSON file: {e}") + return False @@ -769,64 +576,6 @@ class EqTuneTool: - - -# ocsdiag.py -def process_host(host, reg_table_file, exp_list, slot_list, lane_list, cmd, param, reg_wt_value, sl_file): - logging.info(f'-----------process: {host}') - - for exp_id in exp_list: - exp_id = int(exp_id) - - exp_util = SmbusHttpUtil(f"https://{host}", "root", "0penBmc", host) - - exp_util.lock(exp_id) - logging.info(f'----------process_host {host} lock') - - vuart_util = VuartUtil(host) - bmc = DevWhiteRiverExp(exp_util, vuart_util) - - route_info = bmc.GetOpticalRouteStatus(exp_id, 0) - logging.info(f'--------GetOpticalRouteStatus route_info:{route_info}') - yaml_route_map = { - '1111111111111111': '786-1-oneta', - '1212121212121212': '786-1-onetb', - '0201040306050807': '786-1-onoc1', - '0403020108070605': '786-1-onoc2', - '0807060504030201': '786-1-onoc3', - '0304010207080506': '786-1-onoc4', - '0605080702010403': '786-1-onoc5', - '0708050603040102': '786-1-onoc6', - '0506070801020304': '786-1-onoc7', - } - - reg_table_file = f'main_data/{yaml_route_map[route_info]}.yaml' - route_name = yaml_route_map[route_info] - logging.info(f'input reg_table_file is empty, use default config file:{reg_table_file}, route_name:{route_name}') - - reg_access_tool = OptRegAccessTool(host, reg_table_file, bmc) - logging.info(f'init reg_access_tool with reg_table_file:{reg_table_file}, route_name:{route_name}') - - remote_bmc = None - remote_exp_util = exp_util - remote_host = host - remote_reg_access_tool = reg_access_tool - if cmd in ['ibias-auto-tune', 'eq-auto-tune'] and param != '': - logging.info(f'------- remote_host: {param}') - remote_host = param - remote_exp_util = SmbusHttpUtil(f"https://{remote_host}", "root", "0penBmc", remote_host) - remote_bmc = DevWhiteRiverExp(remote_exp_util, vuart_util) - remote_exp_util.lock(exp_id) - logging.info(f'----------process dst host {remote_host} lock') - - remote_reg_access_tool = OptRegAccessTool(remote_host, reg_table_file, remote_bmc) - - if cmd in ['eq-auto-tune']: - slot_list = [0,1,2,3,4,5,6,7] - - -# opt.py - import logging, math, sys from typing import List from gpu.biren.exp_util import DevWhiteRiverExp @@ -895,6 +644,12 @@ class OptRegAccessTool: def write_tia_stage2_reg(self, exp_id: int, slot_id: int, lane: int, wt_value: int) -> bool: return self.write_opt_reg(exp_id, slot_id, lane, wt_value, 'tia_stage2') + + def write_confirm_reg(self, exp_id: int, slot_id: int) -> bool: + return self.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 0xd0, 0x88, 1, "01") + + def read_vpeak_reg(self, exp_id: int, slot_id: int, lane: int) -> int: + return self.read_opt_reg(exp_id, slot_id, lane, 'vpeak') def read_opt_reg(self, exp_id: int, slot_id: int, lane: int, reg_name: str): @@ -1160,3 +915,5 @@ class OptRegAccessTool: # reg_offset=reg.offset, size=size, # hex_str=write_value) # # logging.info(f'exp_id:{exp_id}, slot_id: {slot_id}, bank:{reg.bank}, page:{reg.page}, offset:{reg.offset}, value:{value_hex_str}, status: {ret}') + +