# uncompyle6 version 3.9.3 # Python bytecode version base 3.6 (3379) # Decompiled from: Python 3.6.8 (default, May 21 2025, 11:12:56) # [GCC 8.5.0 20210514 (Anolis 8.5.0-10.0.1)] # Embedded file name: /xz/gyou/nexusbench/toolbox/mgc_tuning_tool.py # Compiled at: 2025-12-25 14:52:30 # Size of source mod 2**32: 78005 bytes from dataclasses import dataclass, field import logging, time, os, json, sys from typing import Dict, List, Any, Optional, Tuple import matplotlib.pyplot as plt from gpu.biren.exp_util import DevWhiteRiverExp from toolbox.opt_reg_access_tool import OptRegAccessTool from toolbox.prbs_tool import PrbsTool, RetimerPrbsResult from parser.topology_parser import TopoMappingParser from parser.transceiver_config_parser import TransceiverConfigParser, RegisterInfo from toolbox.bathtub_curve import BathtubCurve @dataclass class LaneErrInfo: host: str = "" exp: int = -1 route: str = "" slot_id: int = -1 logic_lane: int = -1 rt_phys_id: int = -1 rt_phys_lane: int = -1 peer_slot_id: int = -1 peer_ibias_lane: int = -1 peer_logic_lane: int = -1 peer_rt_phys_id: int = -1 peer_rt_phys_lane: int = -1 preset: int = -1 mgc: int = -1 mgc_err_map: Dict[int, int] = field(default_factory=dict) lowfreq_err_map: Dict[int, int] = field(default_factory=dict) tmp_err_map: Dict[int, int] = field(default_factory=dict) vpeak: int = -1 tia_peak: int = -1 ibias: int = -1 ipcurrent: int = -1 opcurrent: int = -1 lowfreq_eq: int = -1 highfreq_eq: int = -1 vgc_set: int = -1 drv_vpeak: int = -1 rssi1: int = -1 rssi2: int = -1 tmp_raw_value: int = -1 ctle: int = -1 class MgcTuningTool: def __init__(self, host:str, reg_table: TransceiverConfigParser, reg_tool: OptRegAccessTool, remote_reg_tool: OptRegAccessTool, bmc: DevWhiteRiverExp, topo_map: TopoMappingParser, prbs_tool: PrbsTool, route_name: str): self.host = host self.reg_table = reg_table self.bmc = bmc self.target_vpeak = 0 self.local_reg_tool = reg_tool self.remote_reg_tool = remote_reg_tool self.target_vpeaks_file = "target_vpeaks.json" self.mode = route_name self.topo_map = topo_map self.prbs_tool = prbs_tool self.route_name = route_name self.is_onet = self.remote_reg_tool != self.local_reg_tool def debug_calc_target_vpeak_new(self, exp_id: int, slot_id: int) -> Optional[Dict[(int, int)]]: target_vpeaks = {} logging.info(f"-------slot {slot_id}") self.local_reg_tool.enable_tia_agc(exp_id, slot_id) time.sleep(0.05) logging.info("----------step 2: toggle RF off") if not self.toogle_rf(exp_id, slot_id, "off"): logging.error(f"slot {slot_id}: toggle RF off fail") return target_vpeaks else: self.local_reg_tool.write_agc_reg_all_lane(exp_id, slot_id, 0) time.sleep(0.5) logging.info("----------step 3: get base_line_vpeaks, and calculate target vpeaks") base_line_vpeaks = self.local_reg_tool.read_vpeak_reg_all_lane(exp_id, slot_id) logging.info(f"base_line_vpeaks: {base_line_vpeaks}") self.toogle_rf(exp_id, slot_id, "on") self.local_reg_tool.write_agc_reg_all_lane(exp_id, slot_id, 255) time.sleep(0.5) max_vpeaks = self.local_reg_tool.read_vpeak_reg_all_lane(exp_id, slot_id) logging.info(f"max_vpeaks: {max_vpeaks}") percent_target_vpeaks = {} for lane, base_vpeak in base_line_vpeaks.items(): numerator = 19 vpkdelta = round((max_vpeaks[lane] - base_vpeak) * (numerator / 29)) percent_target_vpeaks[lane] = vpkdelta + base_vpeak diff = max_vpeaks[lane] - base_line_vpeaks[lane] if diff < 5: logging.error(f"host: {self.host}, exp: {exp_id}, slot: {slot_id}, lane: {lane}, max_vpeak is error") sys.exit(-1) logging.info(f"percent_target_vpeaks: {percent_target_vpeaks}") diff_target_vpeaks = {} for lane, base_vpeak in base_line_vpeaks.items(): diff = max_vpeaks[lane] - base_line_vpeaks[lane] if diff <= 22: logging.error(f"host: {self.host}, exp: {exp_id}, slot: {slot_id}, lane: {lane}, max_vpeak is error") diff_target_vpeaks[lane] = diff - 2 + base_vpeak else: diff_target_vpeaks[lane] = 21 + base_vpeak target_vpeaks = diff_target_vpeaks logging.info(f"diff_target_vpeaks: {target_vpeaks}") logging.info("\n" + "=" * 90) logging.info(f'{"SLOT":<6} {"LANE":<6} {"BASE":<8} {"MAX":<8} {"RANGE":<8} {"PERCENT_TGT":<12} {"DIFF_TGT":<12} {"PCT_OFF":<10} {"DIFF_OFF":<10}') logging.info("=" * 90) for lane in sorted(base_line_vpeaks.keys()): base = base_line_vpeaks[lane] mx = max_vpeaks[lane] rng = mx - base pct_tgt = percent_target_vpeaks.get(lane, -1) diff_tgt = diff_target_vpeaks.get(lane, -1) pct_off = pct_tgt - base diff_off = diff_tgt - base logging.info(f"{slot_id:<6} {lane:<6} {base:<8} {mx:<8} {rng:<8} {pct_tgt:<12} {diff_tgt:<12} {pct_off:<10} {diff_off:<10}") logging.info("=" * 90 + "\n") self.local_reg_tool.disable_tia_agc(exp_id, slot_id) time.sleep(0.05) try: agc_start, agc_end, agc_step = (80, 180, 2) agc_values = list(range(agc_start, agc_end + 1, agc_step)) all_vpeaks_scan = {lane: [] for lane in base_line_vpeaks.keys()} logging.info(f"Scanning AGC from {agc_start} to {agc_end} step {agc_step}...") for agc in agc_values: self.local_reg_tool.write_tmp_mgc_reg_all_lane(exp_id, slot_id, agc) # self.local_reg_tool.write_agc_reg_all_lane(exp_id, slot_id, agc) time.sleep(0.1) vpeaks = self.local_reg_tool.read_vpeak_reg_all_lane(exp_id, slot_id) for lane in base_line_vpeaks.keys(): all_vpeaks_scan[lane].append(vpeaks.get(lane, 0)) plt.figure(figsize=(20, 8)) for lane in sorted(base_line_vpeaks.keys()): plt.plot(agc_values, (all_vpeaks_scan[lane]), marker="o", label=f"lane {lane}") for lane in base_line_vpeaks.keys(): plt.scatter([80], [base_line_vpeaks[lane]], color="blue") plt.scatter([180], [max_vpeaks[lane]], color="red") plt.title(f"Vpeak vs AGC (exp={exp_id}, slot={slot_id})") plt.xlabel("AGC Value") plt.ylabel("Vpeak") plt.legend() plt.grid(True) plt.tight_layout() plot_dir = "agc_vpeak_plots" os.makedirs(plot_dir, exist_ok=True) plot_path = os.path.join(plot_dir, f"{self.local_reg_tool.host}_agc_vpeak_exp{exp_id}_slot{slot_id}.png") plt.savefig(plot_path) logging.info(f"AGC-Vpeak curve saved to: {plot_path}") plt.close() except Exception as e: logging.warning(f"Failed to generate AGC scan plot: {e}") if target_vpeaks: self.save_target_vpeaks_to_json(exp_id, slot_id, target_vpeaks) return target_vpeaks def auto_tune(self, exp_id: int, slot_id: int, lane_list: List[int]) -> bool: lane_list = [ 7, 6, 5, 4, 3, 2, 1, 0] # self.debug_calc_target_vpeak_new(exp_id, slot_id) target_vpeaks = self.calc_target_vpeak_new(exp_id, slot_id, lane_list) if target_vpeaks is None: return False time.sleep(0.1) logging.info("----------step 7: find optimal MGC") self.match_optimal_mgc_new(exp_id, slot_id, lane_list, target_vpeaks) return True def auto_tia_peak_tune(self, exp_id: int, retimers): tune_value_list = [0, 30, 60, 90, 120, 150, 180, 210, 240] self.general_auto_tune_value(exp_id, retimers, "tia_peak", tune_value_list, self.local_reg_tool) def auto_highfreq_tune(self, exp_id: int, retimers): tune_value_list = [0, 20, 40, 60, 80, 100, 120, 140, 160, 180, 200, 220, 240] self.general_auto_tune_value(exp_id, retimers, "highfreq_eq", tune_value_list, self.remote_reg_tool) def rt_ibias(self, exp_id: int, retimers): tune_value_list = [ 1647, 3647] self.ibias_auto_tune_value_new(exp_id, retimers, "ibias_runtime", tune_value_list, self.remote_reg_tool, False) def auto_ibias_tune2(self, exp_id: int, retimers): tune_value_list = [ 1647, 1847, 2047, 2247, 2447, 2647, 2847, 3047, 3247, 3447, 3647] self.ibias_auto_tune_value(exp_id, retimers, "ibias_runtime", tune_value_list, self.remote_reg_tool, False) def auto_opcurrent_tune(self, exp_id: int, retimers): tune_value_list = [130, 140, 150, 160, 120, 110, 90, 80, 100, 170, 180] self.general_auto_tune_value(exp_id, retimers, "opcurrent", tune_value_list, self.remote_reg_tool) def auto_opcurrent_tune_test(self, exp_id: int, retimers): tune_value_list = [110, 130, 150, 170, 90, 70, 50, 190, 210, 230] self.general_auto_tune_value(exp_id, retimers, "opcurrent", tune_value_list, self.remote_reg_tool) def auto_lowfreq_tune(self, exp_id: int, retimers): tune_value_list = [ 120, 150, 180, 210, 240, 90, 60, 30, 0] self.general_auto_tune_value(exp_id, retimers, "lowfreq_eq", tune_value_list, self.remote_reg_tool) def auto_ctle_tune(self, exp_id: int, retimers): tune_value_list = [ 0, 2, 4, 6, 8, 10, 12, 14, 16, 18] self.general_auto_tune_value(exp_id, retimers, "lowfreq_eq", tune_value_list, self.remote_reg_tool) def auto_tune2(self, exp_id: int, retimers) -> bool: self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers) logging.info("Starting auto_tune2...") lane_data_list: List[LaneErrInfo] = [] # MGC_OFFSETS = [0, 2, -2, 4, -4, 6, -6, 8, -8, 10, -10, 12, -12, 14, -14] # MGC_OFFSETS = [0, 2] MGC_OFFSETS = [0, 2, -2, 4, -4, 6, -6, 8, -8, 10, -10] for slot_id in [0, 1, 2, 3, 4, 5, 6, 7]: for lane in [0, 1, 2, 3, 4, 5, 6, 7]: current_mgc = self.local_reg_tool.read_mgc_reg(exp_id, slot_id, lane) current_mgc = int(current_mgc) mgc_points = [current_mgc + offset for offset in MGC_OFFSETS] mgc_err_map = {mgc: -1 for mgc in mgc_points} success, rt_phys_id, rt_phys_lane = self.topo_map.get_phys_rtmr_by_slot(slot_id, lane, self.route_name) if success == False: logging.error(f'get_phys_rtmr_by_slot failed, result: {success}') return False lane_info = LaneErrInfo( slot_id=slot_id, logic_lane=lane, rt_phys_id=rt_phys_id, rt_phys_lane=rt_phys_lane, mgc_err_map=mgc_err_map, mgc= current_mgc ) lane_data_list.append(lane_info) # logging.info( # f"Initialized: Slot {slot_id}, Lane {lane}, " # f"Base MGC={current_mgc}, Target MGCs={mgc_points}, ERRs={[-1.0]*5}" # ) # logging.info(lane_data_list) prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True) all_unlocked = all(not prbs.locked for prbs in prbs_results) if all_unlocked: logging.error(f"PRBS check failed: ALL lanes are UNLOCKED for exp_id={exp_id}. Aborting auto_tune2.") return False time.sleep(2) prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, False) # return True # logging.info(f'-------------- prbs_results: {prbs_results}') for prbs in prbs_results: local_slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name) if not local_slot: logging.error(f'local slot is none!') return False slot_id, lane_id = local_slot # logging.info(f'------------------ slot: {slot_id}, lane_id: {lane_id}') for info in lane_data_list: if info.slot_id == slot_id and info.logic_lane == lane_id: if prbs.locked: info.mgc_err_map[info.mgc] = prbs.err_count else: info.mgc_err_map[info.mgc] = 65535 # logging.info(f'prbs.locked: {prbs.locked}, prbs.err_count: {prbs.err_count}') break # return True for offset in [offset for offset in MGC_OFFSETS if offset != 0]: for info in lane_data_list: new_mgc = info.mgc + offset self.local_reg_tool.write_tmp_mgc_reg(exp_id, info.slot_id, info.logic_lane, new_mgc) time.sleep(0.05) # for slot_id in [0,1,2,3,4,5,6,7]: # self.local_reg_tool.write_confirm_reg(exp_id, slot_id) # time.sleep(0.05) time.sleep(0.5) prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True) time.sleep(2) # 测 PRBS prbs_results = self.prbs_tool.prbs_check(exp_id, retimers) is_need_prbs_reset = False # 更新结果:当前 MGC 是 info.mgc + offset for prbs in prbs_results: local_slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name) if not local_slot: logging.error(f'local slot is none!') return False slot_id, lane_id = local_slot for info in lane_data_list: if info.slot_id == slot_id and info.logic_lane == lane_id: target_mgc = info.mgc + offset if prbs.locked: info.mgc_err_map[target_mgc] = prbs.err_count else: info.mgc_err_map[target_mgc] = 69999 is_need_prbs_reset = True # logging.info(f'------update mgc err, {target_mgc}: {info.mgc_err_map[target_mgc]}') break if is_need_prbs_reset: self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers) self.print_err_matrix(lane_data_list, MGC_OFFSETS) # 参数复原 # logging.info(f'------------len: {len(lane_data_list)}') for info in lane_data_list: new_mgc = info.mgc self.local_reg_tool.write_tmp_mgc_reg(exp_id, info.slot_id, info.logic_lane, new_mgc) time.sleep(0.05) # for slot_id in [0,1,2,3,4,5,6,7]: # self.local_reg_tool.write_confirm_reg(exp_id, slot_id) # time.sleep(0.05) # return True # Step 4: 调用简单调优,返回与 lane_data_list 一一对应的推荐 offset 列表 recommended_mgcs, bad_lanes = self.simple_mgc_tuner(lane_data_list, bad_threshold=1) tuning_success = True for info, recommended_mgc in zip(lane_data_list, recommended_mgcs): slot_id = info.slot_id lane_id = info.logic_lane current_mgc = info.mgc target_mgc = recommended_mgc diff_mgc = target_mgc - current_mgc # 日志输出 if diff_mgc != 0: logging.info( f"Slot {slot_id}, Lane {lane_id}: MGC adjusted from {current_mgc} " f"→ {target_mgc} (offset={diff_mgc:.1f})" ) # logging.info(f"Applying MGC reg write: exp_id={exp_id}, slot={slot_id}, lane={lane_id}, mgc={target_mgc}") # self.local_reg_tool.write_mgc_reg(exp_id, slot_id, lane_id, target_mgc) # time.sleep(0.05) # self.local_reg_tool.write_confirm_reg(exp_id, slot_id) # time.sleep(0.05) # else: # logging.info( # f"Slot {slot_id}, Lane {lane_id}: MGC {current_mgc} is optimal. No change." # ) tuning_success = True if bad_lanes: tuning_success = False for slot, lane in bad_lanes: logging.warning(f"Slot {slot}, Lane {lane}: CRITICAL - Too few clean MGC points (<=5). Signal integrity issue?") return True def low_ibias_tune2(self, exp_id: int, retimers, reg_name: str, value_offset_list: Optional[List[int]]=None, value_list: Optional[List[int]]=None) -> bool: logging.info("Starting auto_tune2...") lane_data_list = [] MGC_OFFSETS = [ 0, 2, -2, 4, -4, 6, -6, 8, -8] for slot_id in (0, 1, 2, 3, 4, 5, 6, 7): for lane in (0, 1, 2, 3, 4, 5, 6, 7): if value_offset_list is not None: current_value = self.local_reg_tool.read_opt_reg(exp_id, slot_id, lane, reg_name) current_value = int(current_value) value_points = [current_value + offset for offset in value_offset_list] tmp_err_map = {value: -1 for value in value_points} success, rt_phys_id, rt_phys_lane = self.topo_map.get_phys_rtmr_by_slot(slot_id, lane, self.route_name) if success == False: logging.error(f"get_phys_rtmr_by_slot failed, result: {success}") return False lane_info = LaneErrInfo(slot_id=slot_id, logic_lane=lane, rt_phys_id=rt_phys_id, rt_phys_lane=rt_phys_lane, mgc_err_map=tmp_err_map, mgc=current_value) lane_data_list.append(lane_info) prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True) all_unlocked = all(not prbs.locked for prbs in prbs_results) if all_unlocked: logging.error(f"PRBS check failed: ALL lanes are UNLOCKED for exp_id={exp_id}. Aborting auto_tune2.") return False else: time.sleep(2) prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, False) for prbs in prbs_results: local_slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name) if not local_slot: logging.error("local slot is none!") return False slot_id, lane_id = local_slot for info in lane_data_list: if info.slot_id == slot_id: if info.logic_lane == lane_id: if prbs.locked: info.tmp_err_map[info.tmp_value] = prbs.err_count else: info.tmp_err_map[info.tmp_value] = 65535 break for offset in [offset for offset in MGC_OFFSETS if offset != 0]: for info in lane_data_list: new_mgc = info.mgc + offset self.local_reg_tool.write_mgc_reg(exp_id, info.slot_id, info.logic_lane, new_mgc) for slot_id in (0, 1, 2, 3, 4, 5, 6, 7): self.local_reg_tool.write_confirm_reg(exp_id, slot_id) time.sleep(0.5) prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True) time.sleep(2) prbs_results = self.prbs_tool.prbs_check(exp_id, retimers) for prbs in prbs_results: local_slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name) if not local_slot: logging.error("local slot is none!") return False slot_id, lane_id = local_slot for info in lane_data_list: if info.slot_id == slot_id: if info.logic_lane == lane_id: target_mgc = info.mgc + offset if prbs.locked: info.mgc_err_map[target_mgc] = prbs.err_count else: info.mgc_err_map[target_mgc] = 65535 break self.print_err_matrix(lane_data_list, MGC_OFFSETS) for info in lane_data_list: new_mgc = info.mgc self.local_reg_tool.write_mgc_reg(exp_id, info.slot_id, info.logic_lane, new_mgc) recommended_mgcs, bad_lanes = self.simple_mgc_tuner(lane_data_list, bad_threshold=1) tuning_success = True for info, recommended_mgc in zip(lane_data_list, recommended_mgcs): slot_id = info.slot_id lane_id = info.logic_lane current_mgc = info.mgc target_mgc = recommended_mgc diff_mgc = target_mgc - current_mgc if diff_mgc != 0: logging.info(f"Slot {slot_id}, Lane {lane_id}: MGC adjusted from {current_mgc} → {target_mgc} (offset={diff_mgc:.1f})") logging.info(f"Applying MGC reg write: exp_id={exp_id}, slot={slot_id}, lane={lane_id}, mgc={target_mgc}") self.local_reg_tool.write_mgc_reg(exp_id, slot_id, lane_id, target_mgc) self.local_reg_tool.write_confirm_reg(exp_id, slot_id) tuning_success = True if bad_lanes: tuning_success = False for slot, lane in bad_lanes: logging.warning(f"Slot {slot}, Lane {lane}: CRITICAL - Too few clean MGC points (<=5). Signal integrity issue?") return True def general_auto_tune_value_new(self, exp_id: int, retimers, reg_name: str, tune_value_list: List[int], reg_tool: OptRegAccessTool, is_overwrite=False) -> bool: logging.info("Starting auto_tune2...") lane_data_list = [] lane_list = [ 0, 1, 2, 3, 4, 5, 6, 7] slot_list = [0, 1, 2, 3, 4, 5, 6, 7] if reg_name in ('ibias', 'opcurrent'): reg_tool = self.remote_reg_tool else: reg_tool = self.local_reg_tool for rt_phys_id in retimers: for rt_phys_lane in range(0, 16): if reg_tool == self.local_reg_tool: slot = self.topo_map.get_local_slot_by_retimer(rt_phys_id, rt_phys_lane, self.route_name) else: slot = self.topo_map.get_remote_slot_by_retimer(rt_phys_id, rt_phys_lane, self.route_name) if not slot: logging.error("local slot is none!") return False slot_id, lane_id = slot raw_value = reg_tool.read_opt_reg(exp_id, slot_id, lane_id, reg_name) tmp_err_map = {value: -1 for value in tune_value_list} lane_info = LaneErrInfo(host=(reg_tool.host), exp=exp_id, slot_id=slot_id, logic_lane=lane_id, rt_phys_id=rt_phys_id, rt_phys_lane=rt_phys_lane, tmp_err_map=tmp_err_map, tmp_raw_value=raw_value) lane_data_list.append(lane_info) time.sleep(0.05) is_need_reset_prbs = True for value in tune_value_list: for info in lane_data_list: reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, value, reg_name) time.sleep(0.05) time.sleep(1) if is_need_reset_prbs: logging.info(f"-------- is_need_reset_prbs: {is_need_reset_prbs}") self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers) prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True) time.sleep(2) prbs_results = self.prbs_tool.prbs_check(exp_id, retimers) is_need_reset_prbs = False for prbs in prbs_results: if reg_tool == self.local_reg_tool: slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name) else: slot = self.topo_map.get_remote_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name) if not slot: logging.error("local slot is none!") return False slot_id, lane_id = slot for info in lane_data_list: if info.slot_id == slot_id: if info.logic_lane == lane_id: if prbs.locked: info.tmp_err_map[value] = prbs.err_count elif value in info.tmp_err_map: info.tmp_err_map[value] = 69999 else: logging.error(f"--------Key {value} not found in tmp_err_map, skipping assignment.") is_need_reset_prbs = True break logging.info(lane_data_list) self.print_general_err_matrix(lane_data_list, tune_value_list) time.sleep(0.05) for info in lane_data_list: reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, info.tmp_raw_value, reg_name) time.sleep(0.05) recommended_value, bad_lanes = self.simple_value_tuner(lane_data_list) for info, recommended_value in zip(lane_data_list, recommended_value): diff_value = recommended_value - info.tmp_raw_value if diff_value != 0: logging.info(f"Slot {info.slot_id}, Lane {info.logic_lane}: adjusted from {info.tmp_raw_value} → {recommended_value} (offset={diff_value:.1f})") logging.info(f"Applying {reg_name} reg write: exp_id={exp_id}, slot={info.slot_id}, lane={info.logic_lane}, {reg_name}={recommended_value}") if is_overwrite: reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, recommended_value, reg_name) if bad_lanes: for slot, lane in bad_lanes: logging.error(f"Slot {slot}, Lane {lane}: CRITICAL - Too few clean points (<=5). Bad Modules") return True def ibias_auto_tune_value_new(self, exp_id: int, retimers, reg_name: str, tune_value_list: List[int], reg_tool: OptRegAccessTool, is_overwrite=False) -> bool: logging.info("Starting ibias_auto_tune_value_new...") local_data_list: List[LaneErrInfo] = [] remote_data_list: List[LaneErrInfo] = [] local_slot_list = list(range(8)) local_lane_list = list(range(8)) remote_slot_list = list(range(8)) remote_lane_list = list(range(8)) # 存储结构:{slot_id: ibias_dict} remote_raw_ibias: Dict[int, Dict] = {} local_raw_ibias: Dict[int, Dict] = {} # init remote(ibias) for slot_id in remote_slot_list: self.remote_reg_tool.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 180, 136, 1, "01") remote_raw_ibias = self.remote_reg_tool.read_ibias_reg_all_lane(exp_id, slot_id) time.sleep(0.05) if self.is_onet: for slot_id in local_slot_list: self.local_reg_tool.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 180, 136, 1, "01") local_raw_ibias = self.local_reg_tool.read_ibias_reg_all_lane(exp_id, slot_id) time.sleep(0.05) # init remote (ibias) for slot_id in local_slot_list: for lane in local_lane_list: peer_info = self.topo_map.get_remote_retimer_and_slot_by_slot(slot_id, lane, self.route_name) if not peer_info: logging.error(f"peer_info is None for slot {slot_id} lane {lane}") return False remote_rt_logic_id, remote_rt_logic_lane, remote_slot_id, remote_lane_id = peer_info tmp_err_map = {value: -1 for value in tune_value_list} result = self.topo_map.get_phys_rtmr_by_slot(slot_id, lane, self.route_name) if result: success, rt_phys_id, rt_phys_lane = result if success == False: logging.error(f"get_phys_rtmr_by_slot failed, result: {success}") return False lane_info = LaneErrInfo(host=(self.local_reg_tool.host), exp=exp_id, route=(self.route_name.split("-")[-1]), slot_id=slot_id, logic_lane=lane, rt_phys_id=rt_phys_id, rt_phys_lane=rt_phys_lane, peer_slot_id=remote_slot_id, peer_logic_lane=remote_lane_id, peer_rt_phys_id=remote_rt_logic_id, peer_rt_phys_lane=remote_rt_logic_lane, tmp_err_map=tmp_err_map) local_data_list.append(lane_info) # init local (ibias) if self.is_onet: for slot_id in remote_slot_list: for lane in remote_lane_list: peer_info = self.topo_map.get_remote_retimer_and_slot_by_slot(slot_id, lane, self.route_name) if not peer_info: logging.error(f"peer_info is None for slot {slot_id} lane {lane}") return False local_rt_logic_id, local_rt_logic_lane, local_slot_id, local_lane_id = peer_info tmp_err_map = {value: -1 for value in tune_value_list} result = self.topo_map.get_phys_rtmr_by_slot(slot_id, lane, self.route_name) if result: success, rt_phys_id, rt_phys_lane = result if success == False: logging.error(f"get_phys_rtmr_by_slot failed, result: {success}") return False lane_info = LaneErrInfo(host=(self.remote_reg_tool.host), exp=exp_id, route=(self.route_name.split("-")[-1]), slot_id=slot_id, logic_lane=lane, rt_phys_id=rt_phys_id, rt_phys_lane=rt_phys_lane, peer_slot_id=local_slot_id, peer_logic_lane=local_lane_id, peer_rt_phys_id=local_rt_logic_id, peer_rt_phys_lane=local_rt_logic_lane, tmp_err_map=tmp_err_map) remote_data_list.append(lane_info) time.sleep(0.05) is_need_reset_prbs = True for value in tune_value_list: for info in local_data_list: self.remote_reg_tool.write_vol_ibias_reg_all_lane(exp_id, info.peer_slot_id, value) time.sleep(0.05) if self.is_onet: for info in remote_data_list: self.local_reg_tool.write_vol_ibias_reg_all_lane(exp_id, info.peer_slot_id, value) time.sleep(0.05) time.sleep(1) if is_need_reset_prbs: logging.info(f"-------- is_need_reset_prbs: {is_need_reset_prbs}") self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers) prbs_results = self.prbs_tool.prbs_check_bidirection(exp_id, retimers, True) time.sleep(2) prbs_results = self.prbs_tool.prbs_check_bidirection(exp_id, retimers) # logging.info(f'--------- prbs_results: {prbs_results}') is_need_reset_prbs = False for prbs in prbs_results: for info in local_data_list: if info.host == prbs.host and info.rt_phys_id == prbs.rtmr_id \ and info.rt_phys_lane == prbs.rtmr_lane: if prbs.locked: info.tmp_err_map[value] = prbs.err_count elif value in info.tmp_err_map: info.tmp_err_map[value] = 69999 else: logging.error(f"--------Key {value} not found in tmp_err_map, skipping assignment.") is_need_reset_prbs = True break for info in remote_data_list: if info.host == prbs.host and info.rt_phys_id == prbs.rtmr_id \ and info.rt_phys_lane == prbs.rtmr_lane: if prbs.locked: info.tmp_err_map[value] = prbs.err_count elif value in info.tmp_err_map: info.tmp_err_map[value] = 69999 else: logging.error(f"--------Key {value} not found in tmp_err_map, skipping assignment.") is_need_reset_prbs = True break self.print_general_err_matrix(local_data_list, tune_value_list) self.print_general_err_matrix(remote_data_list, tune_value_list) time.sleep(0.05) for slot_id in remote_slot_list: self.remote_reg_tool.write_vol_ibias_reg_all_lane(exp_id, slot_id, wt_value_list=remote_raw_ibias[slot_id]) time.sleep(0.05) self.remote_reg_tool.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 180, 136, 1, "00") time.sleep(0.05) if self.is_onet: for slot_id in local_slot_list: self.local_reg_tool.write_vol_ibias_reg_all_lane(exp_id, slot_id, wt_value_list=remote_raw_ibias[slot_id]) time.sleep(0.05) self.local_reg_tool.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 180, 136, 1, "00") time.sleep(0.05) recommended_value, bad_lanes = self.simple_value_tuner(local_data_list) reg_name = "ibias" for info, recommended_value in zip(local_data_list, recommended_value): diff_value = recommended_value - info.tmp_raw_value if diff_value != 0: remote_info = self.topo_map.get_remote_retimer_and_slot_by_slot(info.slot_id, info.logic_lane, self.route_name) if not remote_info: logging.error(f"remote_info is None for slot {info.slot_id} lane {info.logic_lane}") return False remote_rt_logic_id, remote_rt_logic_lane, remote_slot_id, remote_lane_id = remote_info logging.info(f"Rx Slot: {info.slot_id},{info.logic_lane}, Tx Slot: {remote_slot_id}, {remote_lane_id}, adjusted from {info.tmp_raw_value} → {recommended_value} (offset={diff_value:.1f})") if is_overwrite: reg_tool.write_ibias_reg(exp_id, remote_slot_id, remote_lane_id, recommended_value) time.sleep(0.05) if bad_lanes: for slot, lane in bad_lanes: logging.error(f"Rx Slot {slot}, Rx Lane {lane}: CRITICAL - Too few clean points (<=5). Bad Modules") return True def ibias_auto_tune_value(self, exp_id: int, retimers, reg_name: str, tune_value_list: List[int], reg_tool: OptRegAccessTool, is_overwrite=False) -> bool: logging.info("Starting auto_tune2...") lane_data_list = [] rx_slot_list = [0, 1, 2, 3, 4, 5, 6, 7] tx_slot_list = [0, 1, 2, 3, 4, 5, 6, 7] for slot_id in rx_slot_list: reg_tool.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 180, 136, 1, "01") time.sleep(0.05) rx_lane_list = [0, 1, 2, 3, 4, 5, 6, 7] for rx_slot_id in rx_slot_list: for rx_lane in rx_lane_list: remote_info = self.topo_map.get_remote_retimer_and_slot_by_slot(rx_slot_id, rx_lane, self.route_name) if not remote_info: logging.error(f"remote_info is None for slot {rx_slot_id} lane {rx_lane}") return False remote_rt_logic_id, remote_rt_logic_lane, remote_slot_id, remote_lane_id = remote_info raw_value = reg_tool.read_opt_reg(exp_id, remote_slot_id, remote_lane_id, "ibias") tmp_err_map = {value: -1 for value in tune_value_list} result = self.topo_map.get_phys_rtmr_by_slot(rx_slot_id, rx_lane, self.route_name) if result: success, rt_phys_id, rt_phys_lane = result if success == False: logging.error(f"get_phys_rtmr_by_slot failed, result: {success}") return False lane_info = LaneErrInfo(host=(self.local_reg_tool.host), exp=exp_id, route=(self.route_name.split("-")[-1]), slot_id=rx_slot_id, logic_lane=rx_lane, rt_phys_id=rt_phys_id, rt_phys_lane=rt_phys_lane, tmp_err_map=tmp_err_map, tmp_raw_value=raw_value) lane_data_list.append(lane_info) time.sleep(0.05) is_need_reset_prbs = True for value in tune_value_list: for info in lane_data_list: remote_info = self.topo_map.get_remote_retimer_and_slot_by_slot(info.slot_id, info.logic_lane, self.route_name) if not remote_info: logging.error(f"remote_info is None for slot {info.slot_id} lane {info.logic_lane}") return False remote_rt_logic_id, remote_rt_logic_lane, remote_slot_id, remote_lane_id = remote_info reg_tool.write_opt_reg(exp_id, remote_slot_id, remote_lane_id, value, "ibias_runtime") time.sleep(0.05) time.sleep(1) if is_need_reset_prbs: logging.info(f"-------- is_need_reset_prbs: {is_need_reset_prbs}") self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers) prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True) time.sleep(2) prbs_results = self.prbs_tool.prbs_check(exp_id, retimers) # logging.info(f'--------- prbs_results: {prbs_results}') is_need_reset_prbs = False for prbs in prbs_results: for info in lane_data_list: if info.rt_phys_id == prbs.rtmr_id: if info.rt_phys_lane == prbs.rtmr_lane: if prbs.locked: info.tmp_err_map[value] = prbs.err_count elif value in info.tmp_err_map: info.tmp_err_map[value] = 69999 else: logging.error(f"--------Key {value} not found in tmp_err_map, skipping assignment.") is_need_reset_prbs = True break # logging.info(lane_data_list) self.print_general_err_matrix(lane_data_list, tune_value_list) time.sleep(0.05) for info in lane_data_list: reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, info.tmp_raw_value, "ibias_runtime") time.sleep(0.05) recommended_value, bad_lanes = self.simple_value_tuner(lane_data_list) reg_name = "ibias" for info, recommended_value in zip(lane_data_list, recommended_value): diff_value = recommended_value - info.tmp_raw_value if diff_value != 0: remote_info = self.topo_map.get_remote_retimer_and_slot_by_slot(info.slot_id, info.logic_lane, self.route_name) if not remote_info: logging.error(f"remote_info is None for slot {info.slot_id} lane {info.logic_lane}") return False remote_rt_logic_id, remote_rt_logic_lane, remote_slot_id, remote_lane_id = remote_info logging.info(f"Rx Slot: {info.slot_id},{info.logic_lane}, Tx Slot: {remote_slot_id}, {remote_lane_id}, adjusted from {info.tmp_raw_value} → {recommended_value} (offset={diff_value:.1f})") if is_overwrite: reg_tool.write_ibias_reg(exp_id, remote_slot_id, remote_lane_id, recommended_value) time.sleep(0.05) if bad_lanes: for slot, lane in bad_lanes: logging.error(f"Rx Slot {slot}, Rx Lane {lane}: CRITICAL - Too few clean points (<=5). Bad Modules") return True def general_auto_tune_value(self, exp_id: int, retimers, reg_name: str, tune_value_list: List[int], reg_tool: OptRegAccessTool, is_overwrite=False) -> bool: logging.info("Starting auto_tune2...") lane_data_list = [] lane_list = [ 0, 1, 2, 3, 4, 5, 6, 7] for slot_id in (0, 1, 2, 3, 4, 5, 6, 7): for lane in lane_list: raw_value = reg_tool.read_opt_reg(exp_id, slot_id, lane, reg_name) tmp_err_map = {value: -1 for value in tune_value_list} result = self.topo_map.get_phys_rtmr_by_slot(slot_id, lane, self.route_name) if result: success, rt_phys_id, rt_phys_lane = result if success == False: logging.error(f"get_phys_rtmr_by_slot failed, result: {success}") return False lane_info = LaneErrInfo(host=(self.local_reg_tool.host), exp=exp_id, slot_id=slot_id, logic_lane=lane, rt_phys_id=rt_phys_id, rt_phys_lane=rt_phys_lane, tmp_err_map=tmp_err_map, tmp_raw_value=raw_value) lane_data_list.append(lane_info) time.sleep(0.05) is_need_reset_prbs = True for value in tune_value_list: for info in lane_data_list: reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, value, reg_name) time.sleep(0.05) time.sleep(1) if is_need_reset_prbs: logging.info(f"-------- is_need_reset_prbs: {is_need_reset_prbs}") self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers) prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True) time.sleep(2) prbs_results = self.prbs_tool.prbs_check(exp_id, retimers) is_need_reset_prbs = False for prbs in prbs_results: if reg_tool == self.local_reg_tool: slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name) else: slot = self.topo_map.get_remote_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name) if not slot: logging.error("local slot is none!") return False slot_id, lane_id = slot for info in lane_data_list: if info.slot_id == slot_id: if info.logic_lane == lane_id: if prbs.locked: info.tmp_err_map[value] = prbs.err_count elif value in info.tmp_err_map: info.tmp_err_map[value] = 69999 else: logging.error(f"--------Key {value} not found in tmp_err_map, skipping assignment.") is_need_reset_prbs = True break logging.info(lane_data_list) self.print_general_err_matrix(lane_data_list, tune_value_list) time.sleep(0.05) for info in lane_data_list: reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, info.tmp_raw_value, reg_name) time.sleep(0.05) recommended_value, bad_lanes = self.simple_value_tuner(lane_data_list) for info, recommended_value in zip(lane_data_list, recommended_value): diff_value = recommended_value - info.tmp_raw_value if diff_value != 0: logging.info(f"Slot {info.slot_id}, Lane {info.logic_lane}: adjusted from {info.tmp_raw_value} → {recommended_value} (offset={diff_value:.1f})") logging.info(f"Applying {reg_name} reg write: exp_id={exp_id}, slot={info.slot_id}, lane={info.logic_lane}, {reg_name}={recommended_value}") if is_overwrite: reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, recommended_value, reg_name) if bad_lanes: for slot, lane in bad_lanes: logging.error(f"Slot {slot}, Lane {lane}: CRITICAL - Too few clean points (<=5). Bad Modules") return True def simple_value_tuner(self, lane_data_list: List[LaneErrInfo], bad_threshold=1, min_good_count=5): recommendations = [] bad_lanes = [] for info in lane_data_list: err_map = info.tmp_err_map good_offsets = [value for value, err in err_map.items() if err < bad_threshold] if not good_offsets: recommended_value = info.tmp_raw_value else: recommended_value = int((min(good_offsets) + max(good_offsets)) / 2) if len(good_offsets) <= min_good_count: bad_lanes.append((info.slot_id, info.logic_lane)) recommendations.append(recommended_value) return (recommendations, bad_lanes) def simple_mgc_tuner(self, lane_data_list, bad_threshold=1, min_good_count=6): recommendations = [] bad_lanes = [] for info in lane_data_list: err_map = info.mgc_err_map good_offsets = [mgc for mgc, err in err_map.items() if err < bad_threshold] if not good_offsets: recommended_mgc = info.mgc else: recommended_mgc = int((min(good_offsets) + max(good_offsets)) / 2) if len(good_offsets) <= 5: bad_lanes.append((info.slot_id, info.logic_lane)) recommendations.append(recommended_mgc) return (recommendations, bad_lanes) def print_general_err_matrix(self, lane_data_list: List[LaneErrInfo], tune_value_list): tune_value_list = sorted(tune_value_list) header_parts = [ 'Host', 'Exp', 'Route', 'Slot', 'Lane', 'RT_Id', 'RT_Lane'] + [str(value) for value in tune_value_list] header_widths = [16, 6, 8, 6, 6, 6, 10] + [6] * len(tune_value_list) header_format = "{:<16} {:<6} {:<8} {:<6} {:<6} {:<6} {:<10} " + " ".join("{:<6}" for _ in tune_value_list) print((header_format.format)(*header_parts)) print("-" * (sum(header_widths) + len(header_widths) * 1)) for info in lane_data_list: try: err_counts = [] for value in tune_value_list: err = info.tmp_err_map.get(value, -2) if err < 0: err_str = "70000" else: err_str = (f"{err}") err_counts.append(err_str) data_format = "{:<16} {:<6} {:<8} {:<6} {:<6} {:<6} {:<10} " + " ".join("{:<6}" for _ in tune_value_list) print((data_format.format)(self.host, info.exp, info.route, info.slot_id, info.logic_lane, info.rt_phys_id, info.rt_phys_lane, *err_counts)) except Exception as e: logging.error(f"Error printing row for Slot {info.slot_id}, Lane {info.logic_lane}: {e}") continue def print_err_matrix(self, lane_data_list, mgc_offsets): mgc_offsets = sorted(mgc_offsets) header_parts = ["Host","Slot", "Lane", "RT_Id", "RT_Lane"] + [str(offset) for offset in mgc_offsets] header_widths = [16, 6, 6] + [6] * len(mgc_offsets) header_format = "{:<16} {:<6} {:<6} {:<6} {:<10} " + " ".join(f"{{:<6}}" for _ in mgc_offsets) print(header_format.format(*header_parts)) print("-" * (sum(header_widths) + len(header_widths) * 1)) for info in lane_data_list: base_mgc = info.mgc try: err_counts = [] for offset in mgc_offsets: target_mgc = base_mgc + offset err = info.mgc_err_map.get(target_mgc, -1.0) # 安全获取,防止 key 不存在 if err < 0: err_str = "N/A" else: err_str = f"{err}" err_counts.append(err_str) # 格式化输出 data_format = "{:<16} {:<6} {:<6} {:<6} {:<10} " + " ".join("{:<6}" for _ in mgc_offsets) print(data_format.format(self.host, info.slot_id, info.logic_lane, info.rt_phys_id, info.rt_phys_lane, *err_counts)) except Exception as e: logging.error(f"Error printing row for Slot {info.slot_id}, Lane {info.logic_lane}: {e}") continue def verify_rt_index_lane_index(self, rt_index: int, lane_index: int) -> Tuple[(bool, int, int)]: if rt_index in (1, 2, 3, 4): if 0 <= lane_index <= 7: new_rt_index = rt_index * 10 + 1 return ( True, new_rt_index, lane_index) else: if 8 <= lane_index <= 15: new_rt_index = rt_index * 10 + 2 new_lane_index = lane_index - 8 return (True, new_rt_index, new_lane_index) logging.error("Invalid lane_index: must be in [0, 15]") return (False, 0, 0) else: if rt_index in (11, 12, 21, 22, 31, 32, 41, 42): logging.info(f"use retimer logic index and lane, rt_index: {rt_index}, rt_lane: {lane_index}") if lane_index > 7: logging.error("For logic rt_index, lane_index must be in [0, 7]") return (False, 0, 0) return (True, rt_index, lane_index) else: logging.error(f"Invalid rt_index: {rt_index}") return (False, 0, 0) def manual_tune(self, exp_id: int, slot_id: int) -> bool: target_vpeaks = self.load_target_vpeaks_from_json(exp_id, slot_id) if target_vpeaks is None: logging.error("从文件加载target_vpeaks失败") return False else: self.match_optimal_mgc_new2(exp_id, slot_id, target_vpeaks) return True def calc_target_vpeak_new(self, exp_id: int, slot_id: int, lane_list: List[int]) -> Optional[Dict[(int, int)]]: target_vpeaks = {} logging.info(f"-------slot {slot_id}") time.sleep(0.05) self.local_reg_tool.enable_tia_agc(exp_id, slot_id) time.sleep(0.05) logging.info("----------step 2: toogle RF off") if not self.toogle_rf(exp_id, slot_id, "off"): logging.error(f"slot {slot_id}: toogle RF off fail") return target_vpeaks else: self.local_reg_tool.write_agc_reg_all_lane(exp_id, slot_id, 0) time.sleep(0.5) logging.info("----------step 3: get base_line_vpeaks, and calculate target vpeaks") base_line_vpeaks = self.local_reg_tool.read_vpeak_reg_all_lane(exp_id, slot_id) logging.info(f"base_line_vpeaks: {base_line_vpeaks}") self.toogle_rf(exp_id, slot_id, "on") self.local_reg_tool.write_agc_reg_all_lane(exp_id, slot_id, 255) time.sleep(0.5) max_vpeaks = self.local_reg_tool.read_vpeak_reg_all_lane(exp_id, slot_id) logging.info(f"max_vpeaks: {max_vpeaks}") for lane, base_vpeak in base_line_vpeaks.items(): numerator = 19 vpkdelta = round((max_vpeaks[lane] - base_vpeak) * (numerator / 29)) target_vpeaks[lane] = vpkdelta + base_vpeak diff = max_vpeaks[lane] - base_line_vpeaks[lane] if diff < 10: logging.error(f"host: {self.host}, exp: {exp_id}, slot: {slot_id}, lane: {lane}, max_vpeak is error") sys.exit(-1) # for lane, base_vpeak in base_line_vpeaks.items(): # diff = max_vpeaks[lane] - base_line_vpeaks[lane] # if diff <= 22: # logging.error(f"host: {self.host}, exp: {exp_id}, slot: {slot_id}, lane: {lane}, max_vpeak is error") # target_vpeaks[lane] = diff - 2 + base_vpeak # else: # target_vpeaks[lane] = 21 + base_vpeak logging.info(f"target_vpeaks: {target_vpeaks}") if target_vpeaks: self.save_target_vpeaks_to_json(exp_id, slot_id, target_vpeaks) self.local_reg_tool.disable_tia_agc(exp_id, slot_id) return target_vpeaks def calc_target_vpeak(self, exp_id: int, lane_list: List[int], slot_id: int) -> Dict[(int, int)]: target_vpeaks = {} logging.info(f"-------slot {slot_id}") logging.info("----------step 1: disable agc") self.disable_agc(exp_id, slot_id) logging.info("----------step 2: toogle RF off") if not self.toogle_rf(exp_id, slot_id, lane_list, "off"): logging.error(f"slot {slot_id}: toogle RF off fail") return target_vpeaks else: logging.info("----------step 3: get base_line_vpeaks, and calculate target vpeaks") base_line_vpeaks = self._read_vpeak_all_lanes(exp_id, slot_id, lane_list) logging.info(f"base_line_vpeaks: {base_line_vpeaks}") target_vpeaks = self.calculate_target_vpeak(base_line_vpeaks) logging.info(f"target_vpeaks: {target_vpeaks}") if target_vpeaks: self.save_target_vpeaks_to_json(exp_id, slot_id, target_vpeaks) logging.info("----------step 4: enable RF") self.toogle_rf(exp_id, slot_id, lane_list, "on") return target_vpeaks def disable_agc(self, exp_id: int, slot_id: int): logging.debug("----------disable_agc, step 1") agc_enable_regs = self.reg_table.get_registers_by_name("tia_agc_en") if not agc_enable_regs: logging.error("not find AGC_ENABLE register") return False else: logging.debug("----------disable_agc, step 2") success = True for reg in agc_enable_regs: logging.debug(f"----------disable_agc, step 2: reg: {reg}") result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id, slot_id=slot_id, bank=(reg.bank), page=(reg.page), reg_offset=(reg.offset), size=(reg.size), hex_str="00") time.sleep(0.05) logging.debug(f"----------disable_agc, step 3: result: {result}") if not result: logging.error(f"设置寄存器失败: bank={reg.bank}, page={reg.page}, offset={reg.offset}") success = False return success def enable_agc(self, exp_id: int, slot_id: int): time.sleep(0.05) logging.debug("----------enable_agc, step 1") agc_enable_regs = self.reg_table.get_registers_by_name("tia_agc_en") if not agc_enable_regs: logging.error("not find AGC_ENABLE register") return False else: logging.debug("----------enable_agc, step 2") success = True for reg in agc_enable_regs: logging.debug(f"----------enable_agc, step 2: reg: {reg}") result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id, slot_id=slot_id, bank=(reg.bank), page=(reg.page), reg_offset=(reg.offset), size=(reg.size), hex_str="FF") time.sleep(0.05) logging.info(f"----------enable_agc, step 3: result: {result}") if not result: logging.error(f"设置寄存器失败: bank={reg.bank}, page={reg.page}, offset={reg.offset}") success = False return success def toogle_rf(self, exp_id, slot_id, switch='off'): ret = False logging.info(f"---------- toogle_rf to {switch}") if switch == "off": ret = self.local_reg_tool.write_opt_reg_all_lane(exp_id, slot_id, 0, "tia_stage2") time.sleep(0.05) if ret == False: logging.error(f"---------- toogle_rf to {switch} failed") else: stage_recommended_value = 144 ret = self.local_reg_tool.write_opt_reg_all_lane(exp_id, slot_id, stage_recommended_value, "tia_stage2") time.sleep(0.05) if ret == False: logging.error(f"---------- toogle_rf to {switch} failed") return ret def calculate_target_vpeak(self, base_line_vpeaks: Dict[(int, int)]) -> Dict[(int, int)]: target_vpeaks = {} for lane, vpeak in base_line_vpeaks.items(): logging.info(f"lane={lane}, vpeak={vpeak}") target_vpeaks[lane] = vpeak + 18 logging.info(f"target_vpeaks={target_vpeaks}") return target_vpeaks def match_optimal_mgc_new2(self, exp_id: int, slot_id: int, target_vpeaks: Optional[Dict[(int, int)]]) -> bool: mgc_regs = self.reg_table.get_registers_by_name("mgc") vpeak_regs = self.reg_table.get_registers_by_name("vpeak") if mgc_regs is None or vpeak_regs is None: logging.error("not find MGC regs") return False else: success = True matched_results = {} unmatched_lanes = [] target_mgcs = {} for reg in mgc_regs: mgc_value = self.bmc.GetOpticalModuleRegs(exp_id=exp_id, slot_id=slot_id, bank=(reg.bank), page=(reg.page), reg_offset=(reg.offset), size=1) mgc_value = int(mgc_value, 16) for vpeak_reg in vpeak_regs: if vpeak_reg.lane == reg.lane: vpeak_value = self.bmc.GetOpticalModuleRegs(exp_id=exp_id, slot_id=slot_id, bank=(vpeak_reg.bank), page=(vpeak_reg.page), reg_offset=(vpeak_reg.offset), size=2) vpeak_value = int(vpeak_value, 16) logging.info(f"mgc_value: {mgc_value}, target_vpeak: {target_vpeaks[vpeak_reg.lane]}, vpeak_value: {vpeak_value}") target_mgc = (target_vpeaks[vpeak_reg.lane] - vpeak_value) * 1.5 + mgc_value logging.info(f"target_mgc: {target_mgc}") target_mgcs[reg.lane] = target_mgc break logging.info(f"----------------Target MGC: {target_mgcs}") for reg in mgc_regs: logging.info(f"mgc regs size: {len(mgc_regs)}") min_val = int(target_mgcs[reg.lane] - 10) if min_val > 255: min_val = 255 max_val = int(target_mgcs[reg.lane] + 10) if max_val > 255: logging.info("max_val is greater than 255, setting to 255") max_val = 255 logging.info(f"lane: {reg.lane}, min mgc : {min_val}, max_mgc: {max_val}, target vpeak: {target_vpeaks[reg.lane]}") step = reg.step search_values = list(range(max_val, min_val - 1, -step)) logging.info(f"Search order for lane {reg.lane}: {search_values}") is_match = False matched_mgc = None matched_vpeak = None for mgc in search_values: result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id, slot_id=slot_id, bank=(reg.bank), page=(reg.page), reg_offset=(reg.offset), size=1, hex_str=f"{mgc:02X}") time.sleep(0.05) result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id, slot_id=slot_id, bank=0, page=208, reg_offset=136, size=1, hex_str="01") time.sleep(0.05) for vpeak_reg in vpeak_regs: if vpeak_reg.lane == reg.lane: value = self.bmc.GetOpticalModuleRegs(exp_id=exp_id, slot_id=slot_id, bank=(vpeak_reg.bank), page=(vpeak_reg.page), reg_offset=(vpeak_reg.offset), size=2) value = int(value, 16) logging.info(f"exp_id:{exp_id}, slot_id:{slot_id}, lane:{reg.lane} -> set mgc {mgc}, Vpeak value: {value}") if value < target_vpeaks[reg.lane] + 1 and value > target_vpeaks[reg.lane] - 2: logging.info(f"----------Vpeak is matched, target vpeak:{target_vpeaks[reg.lane]}, current vpeak: {value}, current mgc:{mgc} ") is_match = True matched_mgc = mgc matched_vpeak = value break if is_match: break time.sleep(0.03) if is_match and matched_mgc is not None: if matched_vpeak is not None: if str(exp_id) not in matched_results: matched_results[str(exp_id)] = {} else: if str(slot_id) not in matched_results[str(exp_id)]: matched_results[str(exp_id)][str(slot_id)] = {} if self.mode not in matched_results[str(exp_id)][str(slot_id)]: matched_results[str(exp_id)][str(slot_id)][self.mode] = {} matched_results[str(exp_id)][str(slot_id)][self.mode][str(reg.lane)] = {'mgc':matched_mgc, 'target_vpeak':target_vpeaks[reg.lane], 'actual_vpeak':matched_vpeak} else: unmatched_lanes.append(reg.lane) if matched_results: self.save_mgc_results_to_json(exp_id, slot_id, matched_results) if unmatched_lanes: logging.error(f"以下lane未匹配到合适的MGC值: {unmatched_lanes}") return success def match_optimal_mgc_new(self, exp_id: int, slot_id: int, lane_list: List[int], target_vpeaks: Dict[(int, int)]) -> bool: matched_results = {} unmatched_lanes = [] for index, lane_id in enumerate(lane_list): is_match = False matched_mgc = None matched_vpeak = None reg = self.reg_table.get_register_by_logic_lane("mgc", lane_id) if reg is None or reg.valid_range is None: logging.error(f"match_optimal_mgc_new error. exp:{exp_id}, slot:{slot_id}, lane: {lane_id}, register name: mgc") return False min_val = reg.valid_range[0] max_val = reg.valid_range[1] step = reg.step if reg.step is not None else 1 wt_mgc = int((min_val + max_val) / 2) target_vpeak = target_vpeaks[lane_id] target_vpeak_range = [target_vpeak, target_vpeak + 1] count = 0 while 1: ret = self.local_reg_tool.write_mgc_reg(exp_id, slot_id, lane_id, wt_mgc) time.sleep(0.05) if ret == False: logging.error(f"match_optimal_mgc_new error. exp:{exp_id}, slot:{slot_id}, lane: {lane_id}, register name: mgc") return False ret = self.local_reg_tool.write_confirm_reg(exp_id, slot_id) if ret == False: logging.error(f"match_optimal_mgc_new error. exp:{exp_id}, slot:{slot_id}, lane: {lane_id}, register name: confirm") return False time.sleep(0.05) current_vpeak = self.local_reg_tool.read_vpeak_reg(exp_id, slot_id, lane_id) logging.info(f"exp_id:{exp_id}, slot_id:{slot_id}, lane:{lane_id} -> set mgc {wt_mgc}, Vpeak value: {current_vpeak}, target: {target_vpeak}") diff_vpeak = abs(target_vpeak - current_vpeak) diff_vpeak_step = { 0: 0, 1: 0, 2: 0, 3: 1, 4: 2, 5: 3, 6: 3, 7: 4, 8: 5} extra_step = diff_vpeak_step.get(diff_vpeak, 5) if current_vpeak > target_vpeak_range[1]: wt_mgc -= step + extra_step if current_vpeak < target_vpeak_range[0]: wt_mgc += step + extra_step if current_vpeak >= target_vpeak_range[0]: if current_vpeak <= target_vpeak_range[1]: is_match = True matched_mgc = wt_mgc matched_vpeak = current_vpeak break if count > 30: logging.error(f"vpeak is not match, target vpeak:{target_vpeak}, current vpeak: {current_vpeak}, current mgc:{wt_mgc} ") sys.exit(-1) break count += 1 if is_match: logging.info(f"------------------ Vpeak is matched, target vpeak:{target_vpeak}, current vpeak: {current_vpeak}, current mgc:{wt_mgc} ") if str(exp_id) not in matched_results: matched_results[str(exp_id)] = {} if str(slot_id) not in matched_results[str(exp_id)]: matched_results[str(exp_id)][str(slot_id)] = {} if self.mode not in matched_results[str(exp_id)][str(slot_id)]: matched_results[str(exp_id)][str(slot_id)][self.mode] = {} matched_results[str(exp_id)][str(slot_id)][self.mode][str(reg.lane)] = {'mgc':matched_mgc, 'target_vpeak':target_vpeaks[lane_id], 'actual_vpeak':matched_vpeak} else: unmatched_lanes.append(lane_id) if matched_results: self.save_mgc_results_to_json(exp_id, slot_id, matched_results) if unmatched_lanes: logging.error(f"以下lane未匹配到合适的MGC值: {unmatched_lanes}") return False else: return True def match_optimal_mgc_old(self, exp_id: int, slot_id: int, lane_list: List[int], target_vpeaks: Dict[(int, int)]) -> bool: mgc_regs = self.reg_table.get_registers_by_name("mgc") vpeak_regs = self.reg_table.get_registers_by_name("vpeak") if mgc_regs is None or vpeak_regs is None: logging.error("not find MGC regs") return False else: success = True matched_results = {} unmatched_lanes = [] for reg in mgc_regs: logging.info(f"mgc regs size: {len(mgc_regs)}") logging.info(f"lane: {reg.lane}, target vpeak:{target_vpeaks[reg.lane]}, mgc valid_range: {reg.valid_range}, step: {reg.step}") min_val = reg.valid_range[0] max_val = reg.valid_range[1] step = reg.step mid_val = (min_val + max_val) // 2 mid_val = mid_val // step * step search_values = [ mid_val] offset = step while len(search_values) < (max_val - min_val) // step + 1: right_val = mid_val + offset if right_val <= max_val: search_values.append(right_val) left_val = mid_val - offset if left_val >= min_val: search_values.append(left_val) offset += step if len(search_values) >= (max_val - min_val) // step + 1: break is_match = False matched_mgc = None matched_vpeak = None for mgc in search_values: result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id, slot_id=slot_id, bank=(reg.bank), page=(reg.page), reg_offset=(reg.offset), size=1, hex_str=f"{mgc:02X}") time.sleep(0.02) result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id, slot_id=slot_id, bank=0, page=208, reg_offset=136, size=1, hex_str="01") time.sleep(0.02) for vpeak_reg in vpeak_regs: if vpeak_reg.lane == reg.lane: value = self.bmc.GetOpticalModuleRegs(exp_id=exp_id, slot_id=slot_id, bank=(vpeak_reg.bank), page=(vpeak_reg.page), reg_offset=(vpeak_reg.offset), size=2) value = int(value, 16) logging.info(f"exp_id:{exp_id}, slot_id:{slot_id}, lane:{reg.lane} -> set mgc {mgc}, Vpeak value: {value}") if value < target_vpeaks[reg.lane] + 2 and value > target_vpeaks[reg.lane] - 2: logging.info(f"----------Vpeak is matched, target vpeak:{target_vpeaks[reg.lane]}, current vpeak: {value}, current mgc:{mgc} ") is_match = True matched_mgc = mgc matched_vpeak = value break if is_match: break time.sleep(0.03) if is_match: if matched_mgc is not None: if matched_vpeak is not None: if str(exp_id) not in matched_results: matched_results[str(exp_id)] = {} else: if str(slot_id) not in matched_results[str(exp_id)]: matched_results[str(exp_id)][str(slot_id)] = {} if self.mode not in matched_results[str(exp_id)][str(slot_id)]: matched_results[str(exp_id)][str(slot_id)][self.mode] = {} matched_results[str(exp_id)][str(slot_id)][self.mode][str(reg.lane)] = {'mgc':matched_mgc, 'target_vpeak':target_vpeaks[reg.lane], 'actual_vpeak':matched_vpeak} unmatched_lanes.append(reg.lane) if matched_results: self.save_mgc_results_to_json(exp_id, slot_id, matched_results) if unmatched_lanes: logging.error(f"以下lane未匹配到合适的MGC值: {unmatched_lanes}") return success def _read_vpeak_all_lanes(self, exp_id: int, slot_id: int, lane_list: List[int]) -> Dict[(int, int)]: vpeak_values = {} for lane in lane_list: vpeak_int = self.local_reg_tool.read_vpeak_reg(exp_id, slot_id, lane) vpeak_values[lane] = vpeak_int return vpeak_values def _write_mgc_all_lanes(self, exp_id: int, slot_id: int, lane_list: List[int], mgc: int) -> bool: for lane_id in lane_list: ret = self.local_reg_tool.write_mgc_reg(exp_id, slot_id, lane_id, mgc) if ret == False: logging.error(f"write mgc reg failed, exp_id:{exp_id}, slot_id:{slot_id}, lane_id:{lane_id}, mgc:{mgc}") return False ret = self.local_reg_tool.write_confirm_reg(exp_id, slot_id) if ret == False: logging.error(f"write confirm reg failed, exp_id:{exp_id}, slot_id:{slot_id}") return False else: return True def _write_agc_all_lanes(self, exp_id: int, slot_id: int, lane_list: List[int], agc: int) -> bool: for lane_id in lane_list: ret = self.local_reg_tool.write_agc_reg(exp_id, slot_id, lane_id, agc) time.sleep(0.05) if ret == False: logging.error(f"write agc reg failed, exp_id:{exp_id}, slot_id:{slot_id}, lane_id:{lane_id}, agc:{agc}") return False ret = self.local_reg_tool.write_confirm_reg(exp_id, slot_id) if ret == False: logging.error(f"write confirm reg failed, exp_id:{exp_id}, slot_id:{slot_id}") return False else: return True def _generate_host_filename(self) -> str: safe_host = self.host.replace(":", "_").replace("/", "_").replace("\\", "_") return f"target_vpeaks_{safe_host}.json" def _generate_mgc_host_filename(self) -> str: safe_host = self.host.replace(":", "_").replace("/", "_").replace("\\", "_") return f"mgc_{safe_host}.json" def save_target_vpeaks_to_json(self, exp_id: int, slot_id: int, target_vpeaks: Dict[(int, int)]) -> bool: try: filename = self._generate_host_filename() os.makedirs((os.path.dirname(filename) if os.path.dirname(filename) else "."), exist_ok=True) if os.path.exists(filename): with open(filename, "r") as f: data = json.load(f) else: data = {} if str(exp_id) not in data: data[str(exp_id)] = {} if str(slot_id) not in data[str(exp_id)]: data[str(exp_id)][str(slot_id)] = {} data[str(exp_id)][str(slot_id)][self.mode] = {str(k): v for k, v in target_vpeaks.items()} with open(filename, "w") as f: json.dump(data, f, indent=4) logging.info(f"target_vpeaks saved to {filename}, path: {exp_id}/{slot_id}/{self.mode}") return True except Exception as e: logging.error(f"Failed to save target_vpeaks to JSON file: {e}") return False def load_target_vpeaks_from_json(self, exp_id: int, slot_id: int) -> Optional[Dict[(int, int)]]: try: filename = self._generate_host_filename() if not os.path.exists(filename): logging.error(f"File {filename} does not exist") return with open(filename, "r") as f: data = json.load(f) if str(exp_id) not in data or str(slot_id) not in data[str(exp_id)] or self.mode not in data[str(exp_id)][str(slot_id)]: logging.error(f"Path {exp_id}/{slot_id}/{self.mode} not found in {filename}") return target_vpeaks_data = data[str(exp_id)][str(slot_id)][self.mode] target_vpeaks = {int(k): v for k, v in target_vpeaks_data.items()} logging.info(f"target_vpeaks loaded successfully from {filename}, path: {exp_id}/{slot_id}/{self.mode}") return target_vpeaks except Exception as e: logging.error(f"Failed to load target_vpeaks from JSON file: {e}") return def save_mgc_results_to_json(self, exp_id: int, slot_id: int, matched_results: Dict[(str, Any)]) -> bool: try: filename = self._generate_mgc_host_filename() os.makedirs((os.path.dirname(filename) if os.path.dirname(filename) else "."), exist_ok=True) if os.path.exists(filename): with open(filename, "r") as f: data = json.load(f) else: data = {} for exp_key, exp_data in matched_results.items(): if exp_key not in data: data[exp_key] = {} for slot_key, slot_data in exp_data.items(): if slot_key not in data[exp_key]: data[exp_key][slot_key] = {} for mode_key, mode_data in slot_data.items(): if mode_key not in data[exp_key][slot_key]: data[exp_key][slot_key][mode_key] = {} data[exp_key][slot_key][mode_key].update(mode_data) with open(filename, "w") as f: json.dump(data, f, indent=4) logging.info(f"MGC matching results saved to {filename}") return True except Exception as e: logging.error(f"Failed to save MGC results to JSON file: {e}") return False