From 5e6d0d54bfc5e7c23f88d20975580612c921cd07 Mon Sep 17 00:00:00 2001 From: xz_ocs Date: Fri, 16 Jan 2026 10:36:32 +0000 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=20'ocs/0116/mgc-tuning-tool.?= =?UTF-8?q?py'?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ocs/0116/mgc-tuning-tool.py | 1586 +++++++++++++++++++++++++++++++++++ 1 file changed, 1586 insertions(+) create mode 100644 ocs/0116/mgc-tuning-tool.py diff --git a/ocs/0116/mgc-tuning-tool.py b/ocs/0116/mgc-tuning-tool.py new file mode 100644 index 0000000..d1ae698 --- /dev/null +++ b/ocs/0116/mgc-tuning-tool.py @@ -0,0 +1,1586 @@ +# uncompyle6 version 3.9.3 +# Python bytecode version base 3.6 (3379) +# Decompiled from: Python 3.6.8 (default, May 21 2025, 11:12:56) +# [GCC 8.5.0 20210514 (Anolis 8.5.0-10.0.1)] +# Embedded file name: /xz/gyou/nexusbench/toolbox/mgc_tuning_tool.py +# Compiled at: 2025-12-25 14:52:30 +# Size of source mod 2**32: 78005 bytes +from dataclasses import dataclass, field +import logging, time, os, json, sys +from typing import Dict, List, Any, Optional, Tuple +import matplotlib.pyplot as plt +from gpu.biren.exp_util import DevWhiteRiverExp +from toolbox.opt_reg_access_tool import OptRegAccessTool +from toolbox.prbs_tool import PrbsTool, RetimerPrbsResult +from parser.topology_parser import TopoMappingParser +from parser.transceiver_config_parser import TransceiverConfigParser, RegisterInfo +from toolbox.bathtub_curve import BathtubCurve + +@dataclass +class LaneErrInfo: + host: str = "" + exp: int = -1 + route: str = "" + slot_id: int = -1 + logic_lane: int = -1 + rt_phys_id: int = -1 + rt_phys_lane: int = -1 + peer_slot_id: int = -1 + peer_ibias_lane: int = -1 + peer_logic_lane: int = -1 + peer_rt_phys_id: int = -1 + peer_rt_phys_lane: int = -1 + preset: int = -1 + mgc: int = -1 + mgc_err_map: Dict[int, int] = field(default_factory=dict) + lowfreq_err_map: Dict[int, int] = field(default_factory=dict) + tmp_err_map: Dict[int, int] = field(default_factory=dict) + vpeak: int = -1 + tia_peak: int = -1 + ibias: int = -1 + ipcurrent: int = -1 + opcurrent: int = -1 + lowfreq_eq: int = -1 + highfreq_eq: int = -1 + vgc_set: int = -1 + drv_vpeak: int = -1 + rssi1: int = -1 + rssi2: int = -1 + tmp_raw_value: int = -1 + ctle: int = -1 + + +class MgcTuningTool: + + def __init__(self, host:str, reg_table: TransceiverConfigParser, reg_tool: OptRegAccessTool, + remote_reg_tool: OptRegAccessTool, + bmc: DevWhiteRiverExp, topo_map: TopoMappingParser, prbs_tool: PrbsTool, route_name: str): + self.host = host + self.reg_table = reg_table + self.bmc = bmc + self.target_vpeak = 0 + self.local_reg_tool = reg_tool + self.remote_reg_tool = remote_reg_tool + self.target_vpeaks_file = "target_vpeaks.json" + self.mode = route_name + self.topo_map = topo_map + self.prbs_tool = prbs_tool + self.route_name = route_name + self.is_onet = self.remote_reg_tool != self.local_reg_tool + + def debug_calc_target_vpeak_new(self, exp_id: int, slot_id: int) -> Optional[Dict[(int, int)]]: + target_vpeaks = {} + logging.info(f"-------slot {slot_id}") + self.local_reg_tool.enable_tia_agc(exp_id, slot_id) + time.sleep(0.05) + logging.info("----------step 2: toggle RF off") + if not self.toogle_rf(exp_id, slot_id, "off"): + logging.error(f"slot {slot_id}: toggle RF off fail") + return target_vpeaks + else: + self.local_reg_tool.write_agc_reg_all_lane(exp_id, slot_id, 0) + time.sleep(0.5) + logging.info("----------step 3: get base_line_vpeaks, and calculate target vpeaks") + base_line_vpeaks = self.local_reg_tool.read_vpeak_reg_all_lane(exp_id, slot_id) + logging.info(f"base_line_vpeaks: {base_line_vpeaks}") + self.toogle_rf(exp_id, slot_id, "on") + self.local_reg_tool.write_agc_reg_all_lane(exp_id, slot_id, 255) + time.sleep(0.5) + max_vpeaks = self.local_reg_tool.read_vpeak_reg_all_lane(exp_id, slot_id) + + logging.info(f"max_vpeaks: {max_vpeaks}") + percent_target_vpeaks = {} + for lane, base_vpeak in base_line_vpeaks.items(): + numerator = 19 + vpkdelta = round((max_vpeaks[lane] - base_vpeak) * (numerator / 29)) + percent_target_vpeaks[lane] = vpkdelta + base_vpeak + diff = max_vpeaks[lane] - base_line_vpeaks[lane] + if diff < 5: + logging.error(f"host: {self.host}, exp: {exp_id}, slot: {slot_id}, lane: {lane}, max_vpeak is error") + sys.exit(-1) + + logging.info(f"percent_target_vpeaks: {percent_target_vpeaks}") + diff_target_vpeaks = {} + for lane, base_vpeak in base_line_vpeaks.items(): + diff = max_vpeaks[lane] - base_line_vpeaks[lane] + if diff <= 22: + logging.error(f"host: {self.host}, exp: {exp_id}, slot: {slot_id}, lane: {lane}, max_vpeak is error") + diff_target_vpeaks[lane] = diff - 2 + base_vpeak + else: + diff_target_vpeaks[lane] = 21 + base_vpeak + + target_vpeaks = diff_target_vpeaks + logging.info(f"diff_target_vpeaks: {target_vpeaks}") + logging.info("\n" + "=" * 90) + logging.info(f'{"SLOT":<6} {"LANE":<6} {"BASE":<8} {"MAX":<8} {"RANGE":<8} {"PERCENT_TGT":<12} {"DIFF_TGT":<12} {"PCT_OFF":<10} {"DIFF_OFF":<10}') + logging.info("=" * 90) + for lane in sorted(base_line_vpeaks.keys()): + base = base_line_vpeaks[lane] + mx = max_vpeaks[lane] + rng = mx - base + pct_tgt = percent_target_vpeaks.get(lane, -1) + diff_tgt = diff_target_vpeaks.get(lane, -1) + pct_off = pct_tgt - base + diff_off = diff_tgt - base + logging.info(f"{slot_id:<6} {lane:<6} {base:<8} {mx:<8} {rng:<8} {pct_tgt:<12} {diff_tgt:<12} {pct_off:<10} {diff_off:<10}") + + logging.info("=" * 90 + "\n") + self.local_reg_tool.disable_tia_agc(exp_id, slot_id) + time.sleep(0.05) + try: + agc_start, agc_end, agc_step = (80, 180, 2) + agc_values = list(range(agc_start, agc_end + 1, agc_step)) + all_vpeaks_scan = {lane: [] for lane in base_line_vpeaks.keys()} + logging.info(f"Scanning AGC from {agc_start} to {agc_end} step {agc_step}...") + for agc in agc_values: + self.local_reg_tool.write_tmp_mgc_reg_all_lane(exp_id, slot_id, agc) + # self.local_reg_tool.write_agc_reg_all_lane(exp_id, slot_id, agc) + time.sleep(0.1) + vpeaks = self.local_reg_tool.read_vpeak_reg_all_lane(exp_id, slot_id) + for lane in base_line_vpeaks.keys(): + all_vpeaks_scan[lane].append(vpeaks.get(lane, 0)) + + plt.figure(figsize=(20, 8)) + for lane in sorted(base_line_vpeaks.keys()): + plt.plot(agc_values, (all_vpeaks_scan[lane]), marker="o", label=f"lane {lane}") + + for lane in base_line_vpeaks.keys(): + plt.scatter([80], [base_line_vpeaks[lane]], color="blue") + plt.scatter([180], [max_vpeaks[lane]], color="red") + + plt.title(f"Vpeak vs AGC (exp={exp_id}, slot={slot_id})") + plt.xlabel("AGC Value") + plt.ylabel("Vpeak") + plt.legend() + plt.grid(True) + plt.tight_layout() + plot_dir = "agc_vpeak_plots" + os.makedirs(plot_dir, exist_ok=True) + plot_path = os.path.join(plot_dir, f"{self.local_reg_tool.host}_agc_vpeak_exp{exp_id}_slot{slot_id}.png") + plt.savefig(plot_path) + logging.info(f"AGC-Vpeak curve saved to: {plot_path}") + plt.close() + except Exception as e: + logging.warning(f"Failed to generate AGC scan plot: {e}") + + if target_vpeaks: + self.save_target_vpeaks_to_json(exp_id, slot_id, target_vpeaks) + return target_vpeaks + + def auto_tune(self, exp_id: int, slot_id: int, lane_list: List[int]) -> bool: + lane_list = [ + 7, 6, 5, 4, 3, 2, 1, 0] + # self.debug_calc_target_vpeak_new(exp_id, slot_id) + + target_vpeaks = self.calc_target_vpeak_new(exp_id, slot_id, lane_list) + if target_vpeaks is None: + return False + + time.sleep(0.1) + logging.info("----------step 7: find optimal MGC") + self.match_optimal_mgc_new(exp_id, slot_id, lane_list, target_vpeaks) + return True + + def auto_tia_peak_tune(self, exp_id: int, retimers): + tune_value_list = [0, 30, 60, 90, 120, 150, 180, 210, 240] + self.general_auto_tune_value(exp_id, retimers, "tia_peak", tune_value_list, self.local_reg_tool) + + def auto_highfreq_tune(self, exp_id: int, retimers): + tune_value_list = [0, 20, 40, 60, 80, 100, 120, 140, 160, 180, 200, 220, 240] + self.general_auto_tune_value(exp_id, retimers, "highfreq_eq", tune_value_list, self.remote_reg_tool) + + def rt_ibias(self, exp_id: int, retimers): + tune_value_list = [ + 1647, 3647] + self.ibias_auto_tune_value_new(exp_id, retimers, "ibias_runtime", tune_value_list, self.remote_reg_tool, False) + + def auto_ibias_tune2(self, exp_id: int, retimers): + tune_value_list = [ + 1647, 1847, 2047, 2247, 2447, 2647, 2847, 3047, + 3247, 3447, 3647] + self.ibias_auto_tune_value(exp_id, retimers, "ibias_runtime", tune_value_list, self.remote_reg_tool, False) + + def auto_opcurrent_tune(self, exp_id: int, retimers): + tune_value_list = [130, 140, 150, 160, 120, 110, 90, 80, 100, 170, 180] + self.general_auto_tune_value(exp_id, retimers, "opcurrent", tune_value_list, self.remote_reg_tool) + + def auto_opcurrent_tune_test(self, exp_id: int, retimers): + tune_value_list = [110, 130, 150, 170, 90, 70, 50, 190, 210, 230] + self.general_auto_tune_value(exp_id, retimers, "opcurrent", tune_value_list, self.remote_reg_tool) + + def auto_lowfreq_tune(self, exp_id: int, retimers): + tune_value_list = [ + 120, 150, 180, 210, 240, 90, 60, 30, 0] + self.general_auto_tune_value(exp_id, retimers, "lowfreq_eq", tune_value_list, self.remote_reg_tool) + + def auto_ctle_tune(self, exp_id: int, retimers): + tune_value_list = [ + 0, 2, 4, 6, 8, 10, 12, 14, 16, 18] + self.general_auto_tune_value(exp_id, retimers, "lowfreq_eq", tune_value_list, self.remote_reg_tool) + + def auto_tune2(self, exp_id: int, retimers) -> bool: + + self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers) + + logging.info("Starting auto_tune2...") + + lane_data_list: List[LaneErrInfo] = [] + + # MGC_OFFSETS = [0, 2, -2, 4, -4, 6, -6, 8, -8, 10, -10, 12, -12, 14, -14] + # MGC_OFFSETS = [0, 2] + MGC_OFFSETS = [0, 2, -2, 4, -4, 6, -6, 8, -8, 10, -10] + + for slot_id in [0, 1, 2, 3, 4, 5, 6, 7]: + for lane in [0, 1, 2, 3, 4, 5, 6, 7]: + current_mgc = self.local_reg_tool.read_mgc_reg(exp_id, slot_id, lane) + current_mgc = int(current_mgc) + + mgc_points = [current_mgc + offset for offset in MGC_OFFSETS] + + mgc_err_map = {mgc: -1 for mgc in mgc_points} + + success, rt_phys_id, rt_phys_lane = self.topo_map.get_phys_rtmr_by_slot(slot_id, lane, self.route_name) + if success == False: + logging.error(f'get_phys_rtmr_by_slot failed, result: {success}') + return False + + lane_info = LaneErrInfo( + slot_id=slot_id, + logic_lane=lane, + rt_phys_id=rt_phys_id, + rt_phys_lane=rt_phys_lane, + mgc_err_map=mgc_err_map, + mgc= current_mgc + ) + + lane_data_list.append(lane_info) + + # logging.info( + # f"Initialized: Slot {slot_id}, Lane {lane}, " + # f"Base MGC={current_mgc}, Target MGCs={mgc_points}, ERRs={[-1.0]*5}" + # ) + + # logging.info(lane_data_list) + + prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True) + + all_unlocked = all(not prbs.locked for prbs in prbs_results) + if all_unlocked: + logging.error(f"PRBS check failed: ALL lanes are UNLOCKED for exp_id={exp_id}. Aborting auto_tune2.") + return False + + time.sleep(2) + prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, False) + + # return True + + # logging.info(f'-------------- prbs_results: {prbs_results}') + + for prbs in prbs_results: + local_slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name) + if not local_slot: + logging.error(f'local slot is none!') + return False + slot_id, lane_id = local_slot + # logging.info(f'------------------ slot: {slot_id}, lane_id: {lane_id}') + for info in lane_data_list: + if info.slot_id == slot_id and info.logic_lane == lane_id: + if prbs.locked: + info.mgc_err_map[info.mgc] = prbs.err_count + else: + info.mgc_err_map[info.mgc] = 65535 + # logging.info(f'prbs.locked: {prbs.locked}, prbs.err_count: {prbs.err_count}') + break + + # return True + + for offset in [offset for offset in MGC_OFFSETS if offset != 0]: + for info in lane_data_list: + new_mgc = info.mgc + offset + self.local_reg_tool.write_tmp_mgc_reg(exp_id, info.slot_id, info.logic_lane, new_mgc) + time.sleep(0.05) + + # for slot_id in [0,1,2,3,4,5,6,7]: + # self.local_reg_tool.write_confirm_reg(exp_id, slot_id) + # time.sleep(0.05) + + time.sleep(0.5) + prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True) + time.sleep(2) + + # 测 PRBS + prbs_results = self.prbs_tool.prbs_check(exp_id, retimers) + is_need_prbs_reset = False + # 更新结果:当前 MGC 是 info.mgc + offset + for prbs in prbs_results: + local_slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name) + if not local_slot: + logging.error(f'local slot is none!') + return False + slot_id, lane_id = local_slot + + for info in lane_data_list: + if info.slot_id == slot_id and info.logic_lane == lane_id: + target_mgc = info.mgc + offset + if prbs.locked: + info.mgc_err_map[target_mgc] = prbs.err_count + else: + info.mgc_err_map[target_mgc] = 69999 + is_need_prbs_reset = True + # logging.info(f'------update mgc err, {target_mgc}: {info.mgc_err_map[target_mgc]}') + break + + if is_need_prbs_reset: + self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers) + self.print_err_matrix(lane_data_list, MGC_OFFSETS) + + # 参数复原 + # logging.info(f'------------len: {len(lane_data_list)}') + for info in lane_data_list: + new_mgc = info.mgc + self.local_reg_tool.write_tmp_mgc_reg(exp_id, info.slot_id, info.logic_lane, new_mgc) + time.sleep(0.05) + + # for slot_id in [0,1,2,3,4,5,6,7]: + # self.local_reg_tool.write_confirm_reg(exp_id, slot_id) + # time.sleep(0.05) + # return True + + # Step 4: 调用简单调优,返回与 lane_data_list 一一对应的推荐 offset 列表 + recommended_mgcs, bad_lanes = self.simple_mgc_tuner(lane_data_list, bad_threshold=1) + + tuning_success = True + + for info, recommended_mgc in zip(lane_data_list, recommended_mgcs): + slot_id = info.slot_id + lane_id = info.logic_lane + current_mgc = info.mgc + target_mgc = recommended_mgc + diff_mgc = target_mgc - current_mgc + + # 日志输出 + if diff_mgc != 0: + logging.info( + f"Slot {slot_id}, Lane {lane_id}: MGC adjusted from {current_mgc} " + f"→ {target_mgc} (offset={diff_mgc:.1f})" + ) + # logging.info(f"Applying MGC reg write: exp_id={exp_id}, slot={slot_id}, lane={lane_id}, mgc={target_mgc}") + # self.local_reg_tool.write_mgc_reg(exp_id, slot_id, lane_id, target_mgc) + # time.sleep(0.05) + # self.local_reg_tool.write_confirm_reg(exp_id, slot_id) + # time.sleep(0.05) + # else: + # logging.info( + # f"Slot {slot_id}, Lane {lane_id}: MGC {current_mgc} is optimal. No change." + # ) + + tuning_success = True + if bad_lanes: + tuning_success = False + for slot, lane in bad_lanes: + logging.warning(f"Slot {slot}, Lane {lane}: CRITICAL - Too few clean MGC points (<=5). Signal integrity issue?") + + return True + + def low_ibias_tune2(self, exp_id: int, retimers, reg_name: str, value_offset_list: Optional[List[int]]=None, value_list: Optional[List[int]]=None) -> bool: + logging.info("Starting auto_tune2...") + lane_data_list = [] + MGC_OFFSETS = [ + 0, 2, -2, 4, -4, 6, -6, 8, -8] + for slot_id in (0, 1, 2, 3, 4, 5, 6, 7): + for lane in (0, 1, 2, 3, 4, 5, 6, 7): + if value_offset_list is not None: + current_value = self.local_reg_tool.read_opt_reg(exp_id, slot_id, lane, reg_name) + current_value = int(current_value) + value_points = [current_value + offset for offset in value_offset_list] + tmp_err_map = {value: -1 for value in value_points} + success, rt_phys_id, rt_phys_lane = self.topo_map.get_phys_rtmr_by_slot(slot_id, lane, self.route_name) + if success == False: + logging.error(f"get_phys_rtmr_by_slot failed, result: {success}") + return False + lane_info = LaneErrInfo(slot_id=slot_id, + logic_lane=lane, + rt_phys_id=rt_phys_id, + rt_phys_lane=rt_phys_lane, + mgc_err_map=tmp_err_map, + mgc=current_value) + lane_data_list.append(lane_info) + + prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True) + all_unlocked = all(not prbs.locked for prbs in prbs_results) + if all_unlocked: + logging.error(f"PRBS check failed: ALL lanes are UNLOCKED for exp_id={exp_id}. Aborting auto_tune2.") + return False + else: + time.sleep(2) + prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, False) + for prbs in prbs_results: + local_slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name) + if not local_slot: + logging.error("local slot is none!") + return False + slot_id, lane_id = local_slot + for info in lane_data_list: + if info.slot_id == slot_id: + if info.logic_lane == lane_id: + if prbs.locked: + info.tmp_err_map[info.tmp_value] = prbs.err_count + else: + info.tmp_err_map[info.tmp_value] = 65535 + break + + for offset in [offset for offset in MGC_OFFSETS if offset != 0]: + for info in lane_data_list: + new_mgc = info.mgc + offset + self.local_reg_tool.write_mgc_reg(exp_id, info.slot_id, info.logic_lane, new_mgc) + + for slot_id in (0, 1, 2, 3, 4, 5, 6, 7): + self.local_reg_tool.write_confirm_reg(exp_id, slot_id) + + time.sleep(0.5) + prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True) + time.sleep(2) + prbs_results = self.prbs_tool.prbs_check(exp_id, retimers) + for prbs in prbs_results: + local_slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name) + if not local_slot: + logging.error("local slot is none!") + return False + slot_id, lane_id = local_slot + for info in lane_data_list: + if info.slot_id == slot_id: + if info.logic_lane == lane_id: + target_mgc = info.mgc + offset + if prbs.locked: + info.mgc_err_map[target_mgc] = prbs.err_count + else: + info.mgc_err_map[target_mgc] = 65535 + break + + self.print_err_matrix(lane_data_list, MGC_OFFSETS) + for info in lane_data_list: + new_mgc = info.mgc + self.local_reg_tool.write_mgc_reg(exp_id, info.slot_id, info.logic_lane, new_mgc) + + recommended_mgcs, bad_lanes = self.simple_mgc_tuner(lane_data_list, bad_threshold=1) + tuning_success = True + for info, recommended_mgc in zip(lane_data_list, recommended_mgcs): + slot_id = info.slot_id + lane_id = info.logic_lane + current_mgc = info.mgc + target_mgc = recommended_mgc + diff_mgc = target_mgc - current_mgc + if diff_mgc != 0: + logging.info(f"Slot {slot_id}, Lane {lane_id}: MGC adjusted from {current_mgc} → {target_mgc} (offset={diff_mgc:.1f})") + logging.info(f"Applying MGC reg write: exp_id={exp_id}, slot={slot_id}, lane={lane_id}, mgc={target_mgc}") + self.local_reg_tool.write_mgc_reg(exp_id, slot_id, lane_id, target_mgc) + self.local_reg_tool.write_confirm_reg(exp_id, slot_id) + + tuning_success = True + if bad_lanes: + tuning_success = False + for slot, lane in bad_lanes: + logging.warning(f"Slot {slot}, Lane {lane}: CRITICAL - Too few clean MGC points (<=5). Signal integrity issue?") + + return True + + def general_auto_tune_value_new(self, exp_id: int, retimers, reg_name: str, tune_value_list: List[int], reg_tool: OptRegAccessTool, is_overwrite=False) -> bool: + logging.info("Starting auto_tune2...") + lane_data_list = [] + lane_list = [ + 0, 1, 2, 3, 4, 5, 6, 7] + slot_list = [0, 1, 2, 3, 4, 5, 6, 7] + if reg_name in ('ibias', 'opcurrent'): + reg_tool = self.remote_reg_tool + else: + reg_tool = self.local_reg_tool + for rt_phys_id in retimers: + for rt_phys_lane in range(0, 16): + if reg_tool == self.local_reg_tool: + slot = self.topo_map.get_local_slot_by_retimer(rt_phys_id, rt_phys_lane, self.route_name) + else: + slot = self.topo_map.get_remote_slot_by_retimer(rt_phys_id, rt_phys_lane, self.route_name) + if not slot: + logging.error("local slot is none!") + return False + slot_id, lane_id = slot + raw_value = reg_tool.read_opt_reg(exp_id, slot_id, lane_id, reg_name) + tmp_err_map = {value: -1 for value in tune_value_list} + lane_info = LaneErrInfo(host=(reg_tool.host), exp=exp_id, slot_id=slot_id, logic_lane=lane_id, rt_phys_id=rt_phys_id, + rt_phys_lane=rt_phys_lane, + tmp_err_map=tmp_err_map, + tmp_raw_value=raw_value) + lane_data_list.append(lane_info) + + time.sleep(0.05) + is_need_reset_prbs = True + for value in tune_value_list: + for info in lane_data_list: + reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, value, reg_name) + time.sleep(0.05) + + time.sleep(1) + if is_need_reset_prbs: + logging.info(f"-------- is_need_reset_prbs: {is_need_reset_prbs}") + self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers) + prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True) + time.sleep(2) + prbs_results = self.prbs_tool.prbs_check(exp_id, retimers) + is_need_reset_prbs = False + for prbs in prbs_results: + if reg_tool == self.local_reg_tool: + slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name) + else: + slot = self.topo_map.get_remote_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name) + if not slot: + logging.error("local slot is none!") + return False + slot_id, lane_id = slot + for info in lane_data_list: + if info.slot_id == slot_id: + if info.logic_lane == lane_id: + if prbs.locked: + info.tmp_err_map[value] = prbs.err_count + elif value in info.tmp_err_map: + info.tmp_err_map[value] = 69999 + else: + logging.error(f"--------Key {value} not found in tmp_err_map, skipping assignment.") + is_need_reset_prbs = True + break + + logging.info(lane_data_list) + self.print_general_err_matrix(lane_data_list, tune_value_list) + time.sleep(0.05) + for info in lane_data_list: + reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, info.tmp_raw_value, reg_name) + time.sleep(0.05) + + recommended_value, bad_lanes = self.simple_value_tuner(lane_data_list) + for info, recommended_value in zip(lane_data_list, recommended_value): + diff_value = recommended_value - info.tmp_raw_value + if diff_value != 0: + logging.info(f"Slot {info.slot_id}, Lane {info.logic_lane}: adjusted from {info.tmp_raw_value} → {recommended_value} (offset={diff_value:.1f})") + logging.info(f"Applying {reg_name} reg write: exp_id={exp_id}, slot={info.slot_id}, lane={info.logic_lane}, {reg_name}={recommended_value}") + if is_overwrite: + reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, recommended_value, reg_name) + + if bad_lanes: + for slot, lane in bad_lanes: + logging.error(f"Slot {slot}, Lane {lane}: CRITICAL - Too few clean points (<=5). Bad Modules") + + return True + + def ibias_auto_tune_value_new(self, exp_id: int, retimers, reg_name: str, tune_value_list: List[int], reg_tool: OptRegAccessTool, is_overwrite=False) -> bool: + logging.info("Starting ibias_auto_tune_value_new...") + local_data_list: List[LaneErrInfo] = [] + remote_data_list: List[LaneErrInfo] = [] + local_slot_list = list(range(8)) + local_lane_list = list(range(8)) + remote_slot_list = list(range(8)) + remote_lane_list = list(range(8)) + + # 存储结构:{slot_id: ibias_dict} + remote_raw_ibias: Dict[int, Dict] = {} + local_raw_ibias: Dict[int, Dict] = {} + + # init remote(ibias) + for slot_id in remote_slot_list: + self.remote_reg_tool.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 180, 136, 1, "01") + remote_raw_ibias = self.remote_reg_tool.read_ibias_reg_all_lane(exp_id, slot_id) + time.sleep(0.05) + + if self.is_onet: + for slot_id in local_slot_list: + self.local_reg_tool.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 180, 136, 1, "01") + local_raw_ibias = self.local_reg_tool.read_ibias_reg_all_lane(exp_id, slot_id) + time.sleep(0.05) + + # init remote (ibias) + for slot_id in local_slot_list: + for lane in local_lane_list: + peer_info = self.topo_map.get_remote_retimer_and_slot_by_slot(slot_id, lane, self.route_name) + if not peer_info: + logging.error(f"peer_info is None for slot {slot_id} lane {lane}") + return False + remote_rt_logic_id, remote_rt_logic_lane, remote_slot_id, remote_lane_id = peer_info + + tmp_err_map = {value: -1 for value in tune_value_list} + + result = self.topo_map.get_phys_rtmr_by_slot(slot_id, lane, self.route_name) + if result: + success, rt_phys_id, rt_phys_lane = result + if success == False: + logging.error(f"get_phys_rtmr_by_slot failed, result: {success}") + return False + lane_info = LaneErrInfo(host=(self.local_reg_tool.host), exp=exp_id, + route=(self.route_name.split("-")[-1]), + slot_id=slot_id, logic_lane=lane, rt_phys_id=rt_phys_id, rt_phys_lane=rt_phys_lane, + peer_slot_id=remote_slot_id, peer_logic_lane=remote_lane_id, + peer_rt_phys_id=remote_rt_logic_id, peer_rt_phys_lane=remote_rt_logic_lane, + tmp_err_map=tmp_err_map) + local_data_list.append(lane_info) + + # init local (ibias) + if self.is_onet: + for slot_id in remote_slot_list: + for lane in remote_lane_list: + peer_info = self.topo_map.get_remote_retimer_and_slot_by_slot(slot_id, lane, self.route_name) + if not peer_info: + logging.error(f"peer_info is None for slot {slot_id} lane {lane}") + return False + local_rt_logic_id, local_rt_logic_lane, local_slot_id, local_lane_id = peer_info + + tmp_err_map = {value: -1 for value in tune_value_list} + + result = self.topo_map.get_phys_rtmr_by_slot(slot_id, lane, self.route_name) + if result: + success, rt_phys_id, rt_phys_lane = result + if success == False: + logging.error(f"get_phys_rtmr_by_slot failed, result: {success}") + return False + lane_info = LaneErrInfo(host=(self.remote_reg_tool.host), exp=exp_id, + route=(self.route_name.split("-")[-1]), + slot_id=slot_id, logic_lane=lane, rt_phys_id=rt_phys_id, rt_phys_lane=rt_phys_lane, + peer_slot_id=local_slot_id, peer_logic_lane=local_lane_id, + peer_rt_phys_id=local_rt_logic_id, peer_rt_phys_lane=local_rt_logic_lane, + tmp_err_map=tmp_err_map) + remote_data_list.append(lane_info) + + time.sleep(0.05) + is_need_reset_prbs = True + for value in tune_value_list: + for info in local_data_list: + self.remote_reg_tool.write_vol_ibias_reg_all_lane(exp_id, info.peer_slot_id, value) + time.sleep(0.05) + + if self.is_onet: + for info in remote_data_list: + self.local_reg_tool.write_vol_ibias_reg_all_lane(exp_id, info.peer_slot_id, value) + time.sleep(0.05) + + time.sleep(1) + if is_need_reset_prbs: + logging.info(f"-------- is_need_reset_prbs: {is_need_reset_prbs}") + self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers) + + prbs_results = self.prbs_tool.prbs_check_bidirection(exp_id, retimers, True) + time.sleep(2) + prbs_results = self.prbs_tool.prbs_check_bidirection(exp_id, retimers) + + # logging.info(f'--------- prbs_results: {prbs_results}') + + is_need_reset_prbs = False + for prbs in prbs_results: + for info in local_data_list: + if info.host == prbs.host and info.rt_phys_id == prbs.rtmr_id \ + and info.rt_phys_lane == prbs.rtmr_lane: + if prbs.locked: + info.tmp_err_map[value] = prbs.err_count + elif value in info.tmp_err_map: + info.tmp_err_map[value] = 69999 + else: + logging.error(f"--------Key {value} not found in tmp_err_map, skipping assignment.") + is_need_reset_prbs = True + break + + for info in remote_data_list: + if info.host == prbs.host and info.rt_phys_id == prbs.rtmr_id \ + and info.rt_phys_lane == prbs.rtmr_lane: + if prbs.locked: + info.tmp_err_map[value] = prbs.err_count + elif value in info.tmp_err_map: + info.tmp_err_map[value] = 69999 + else: + logging.error(f"--------Key {value} not found in tmp_err_map, skipping assignment.") + is_need_reset_prbs = True + break + + self.print_general_err_matrix(local_data_list, tune_value_list) + self.print_general_err_matrix(remote_data_list, tune_value_list) + time.sleep(0.05) + + for slot_id in remote_slot_list: + self.remote_reg_tool.write_vol_ibias_reg_all_lane(exp_id, slot_id, wt_value_list=remote_raw_ibias[slot_id]) + time.sleep(0.05) + self.remote_reg_tool.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 180, 136, 1, "00") + time.sleep(0.05) + + if self.is_onet: + for slot_id in local_slot_list: + self.local_reg_tool.write_vol_ibias_reg_all_lane(exp_id, slot_id, wt_value_list=remote_raw_ibias[slot_id]) + time.sleep(0.05) + self.local_reg_tool.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 180, 136, 1, "00") + time.sleep(0.05) + + + recommended_value, bad_lanes = self.simple_value_tuner(local_data_list) + reg_name = "ibias" + for info, recommended_value in zip(local_data_list, recommended_value): + diff_value = recommended_value - info.tmp_raw_value + if diff_value != 0: + remote_info = self.topo_map.get_remote_retimer_and_slot_by_slot(info.slot_id, info.logic_lane, self.route_name) + if not remote_info: + logging.error(f"remote_info is None for slot {info.slot_id} lane {info.logic_lane}") + return False + remote_rt_logic_id, remote_rt_logic_lane, remote_slot_id, remote_lane_id = remote_info + logging.info(f"Rx Slot: {info.slot_id},{info.logic_lane}, Tx Slot: {remote_slot_id}, {remote_lane_id}, adjusted from {info.tmp_raw_value} → {recommended_value} (offset={diff_value:.1f})") + if is_overwrite: + reg_tool.write_ibias_reg(exp_id, remote_slot_id, remote_lane_id, recommended_value) + time.sleep(0.05) + + if bad_lanes: + for slot, lane in bad_lanes: + logging.error(f"Rx Slot {slot}, Rx Lane {lane}: CRITICAL - Too few clean points (<=5). Bad Modules") + + return True + + def ibias_auto_tune_value(self, exp_id: int, retimers, reg_name: str, tune_value_list: List[int], reg_tool: OptRegAccessTool, is_overwrite=False) -> bool: + logging.info("Starting auto_tune2...") + lane_data_list = [] + rx_slot_list = [0, 1, 2, 3, 4, 5, 6, 7] + tx_slot_list = [0, 1, 2, 3, 4, 5, 6, 7] + + for slot_id in rx_slot_list: + reg_tool.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 180, 136, 1, "01") + time.sleep(0.05) + + rx_lane_list = [0, 1, 2, 3, 4, 5, 6, 7] + for rx_slot_id in rx_slot_list: + for rx_lane in rx_lane_list: + remote_info = self.topo_map.get_remote_retimer_and_slot_by_slot(rx_slot_id, rx_lane, self.route_name) + if not remote_info: + logging.error(f"remote_info is None for slot {rx_slot_id} lane {rx_lane}") + return False + remote_rt_logic_id, remote_rt_logic_lane, remote_slot_id, remote_lane_id = remote_info + raw_value = reg_tool.read_opt_reg(exp_id, remote_slot_id, remote_lane_id, "ibias") + tmp_err_map = {value: -1 for value in tune_value_list} + result = self.topo_map.get_phys_rtmr_by_slot(rx_slot_id, rx_lane, self.route_name) + if result: + success, rt_phys_id, rt_phys_lane = result + if success == False: + logging.error(f"get_phys_rtmr_by_slot failed, result: {success}") + return False + lane_info = LaneErrInfo(host=(self.local_reg_tool.host), + exp=exp_id, + route=(self.route_name.split("-")[-1]), + slot_id=rx_slot_id, + logic_lane=rx_lane, + rt_phys_id=rt_phys_id, + rt_phys_lane=rt_phys_lane, + tmp_err_map=tmp_err_map, + tmp_raw_value=raw_value) + lane_data_list.append(lane_info) + + time.sleep(0.05) + is_need_reset_prbs = True + for value in tune_value_list: + for info in lane_data_list: + remote_info = self.topo_map.get_remote_retimer_and_slot_by_slot(info.slot_id, info.logic_lane, self.route_name) + if not remote_info: + logging.error(f"remote_info is None for slot {info.slot_id} lane {info.logic_lane}") + return False + remote_rt_logic_id, remote_rt_logic_lane, remote_slot_id, remote_lane_id = remote_info + reg_tool.write_opt_reg(exp_id, remote_slot_id, remote_lane_id, value, "ibias_runtime") + time.sleep(0.05) + + time.sleep(1) + if is_need_reset_prbs: + logging.info(f"-------- is_need_reset_prbs: {is_need_reset_prbs}") + self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers) + prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True) + time.sleep(2) + prbs_results = self.prbs_tool.prbs_check(exp_id, retimers) + + # logging.info(f'--------- prbs_results: {prbs_results}') + + is_need_reset_prbs = False + for prbs in prbs_results: + for info in lane_data_list: + if info.rt_phys_id == prbs.rtmr_id: + if info.rt_phys_lane == prbs.rtmr_lane: + if prbs.locked: + info.tmp_err_map[value] = prbs.err_count + elif value in info.tmp_err_map: + info.tmp_err_map[value] = 69999 + else: + logging.error(f"--------Key {value} not found in tmp_err_map, skipping assignment.") + is_need_reset_prbs = True + break + + # logging.info(lane_data_list) + self.print_general_err_matrix(lane_data_list, tune_value_list) + time.sleep(0.05) + + for info in lane_data_list: + reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, info.tmp_raw_value, "ibias_runtime") + time.sleep(0.05) + + recommended_value, bad_lanes = self.simple_value_tuner(lane_data_list) + reg_name = "ibias" + for info, recommended_value in zip(lane_data_list, recommended_value): + diff_value = recommended_value - info.tmp_raw_value + if diff_value != 0: + remote_info = self.topo_map.get_remote_retimer_and_slot_by_slot(info.slot_id, info.logic_lane, self.route_name) + if not remote_info: + logging.error(f"remote_info is None for slot {info.slot_id} lane {info.logic_lane}") + return False + remote_rt_logic_id, remote_rt_logic_lane, remote_slot_id, remote_lane_id = remote_info + logging.info(f"Rx Slot: {info.slot_id},{info.logic_lane}, Tx Slot: {remote_slot_id}, {remote_lane_id}, adjusted from {info.tmp_raw_value} → {recommended_value} (offset={diff_value:.1f})") + if is_overwrite: + reg_tool.write_ibias_reg(exp_id, remote_slot_id, remote_lane_id, recommended_value) + time.sleep(0.05) + + if bad_lanes: + for slot, lane in bad_lanes: + logging.error(f"Rx Slot {slot}, Rx Lane {lane}: CRITICAL - Too few clean points (<=5). Bad Modules") + + return True + + def general_auto_tune_value(self, exp_id: int, retimers, reg_name: str, tune_value_list: List[int], reg_tool: OptRegAccessTool, is_overwrite=False) -> bool: + logging.info("Starting auto_tune2...") + lane_data_list = [] + lane_list = [ + 0, 1, 2, 3, 4, 5, 6, 7] + for slot_id in (0, 1, 2, 3, 4, 5, 6, 7): + for lane in lane_list: + raw_value = reg_tool.read_opt_reg(exp_id, slot_id, lane, reg_name) + tmp_err_map = {value: -1 for value in tune_value_list} + result = self.topo_map.get_phys_rtmr_by_slot(slot_id, lane, self.route_name) + if result: + success, rt_phys_id, rt_phys_lane = result + if success == False: + logging.error(f"get_phys_rtmr_by_slot failed, result: {success}") + return False + lane_info = LaneErrInfo(host=(self.local_reg_tool.host), + exp=exp_id, + slot_id=slot_id, + logic_lane=lane, + rt_phys_id=rt_phys_id, + rt_phys_lane=rt_phys_lane, + tmp_err_map=tmp_err_map, + tmp_raw_value=raw_value) + lane_data_list.append(lane_info) + + time.sleep(0.05) + is_need_reset_prbs = True + for value in tune_value_list: + for info in lane_data_list: + reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, value, reg_name) + time.sleep(0.05) + + time.sleep(1) + if is_need_reset_prbs: + logging.info(f"-------- is_need_reset_prbs: {is_need_reset_prbs}") + self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers) + prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True) + time.sleep(2) + prbs_results = self.prbs_tool.prbs_check(exp_id, retimers) + is_need_reset_prbs = False + for prbs in prbs_results: + if reg_tool == self.local_reg_tool: + slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name) + else: + slot = self.topo_map.get_remote_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name) + if not slot: + logging.error("local slot is none!") + return False + slot_id, lane_id = slot + for info in lane_data_list: + if info.slot_id == slot_id: + if info.logic_lane == lane_id: + if prbs.locked: + info.tmp_err_map[value] = prbs.err_count + elif value in info.tmp_err_map: + info.tmp_err_map[value] = 69999 + else: + logging.error(f"--------Key {value} not found in tmp_err_map, skipping assignment.") + is_need_reset_prbs = True + break + + logging.info(lane_data_list) + self.print_general_err_matrix(lane_data_list, tune_value_list) + time.sleep(0.05) + for info in lane_data_list: + reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, info.tmp_raw_value, reg_name) + time.sleep(0.05) + + recommended_value, bad_lanes = self.simple_value_tuner(lane_data_list) + for info, recommended_value in zip(lane_data_list, recommended_value): + diff_value = recommended_value - info.tmp_raw_value + if diff_value != 0: + logging.info(f"Slot {info.slot_id}, Lane {info.logic_lane}: adjusted from {info.tmp_raw_value} → {recommended_value} (offset={diff_value:.1f})") + logging.info(f"Applying {reg_name} reg write: exp_id={exp_id}, slot={info.slot_id}, lane={info.logic_lane}, {reg_name}={recommended_value}") + if is_overwrite: + reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, recommended_value, reg_name) + + if bad_lanes: + for slot, lane in bad_lanes: + logging.error(f"Slot {slot}, Lane {lane}: CRITICAL - Too few clean points (<=5). Bad Modules") + + return True + + def simple_value_tuner(self, lane_data_list: List[LaneErrInfo], bad_threshold=1, min_good_count=5): + recommendations = [] + bad_lanes = [] + for info in lane_data_list: + err_map = info.tmp_err_map + good_offsets = [value for value, err in err_map.items() if err < bad_threshold] + if not good_offsets: + recommended_value = info.tmp_raw_value + else: + recommended_value = int((min(good_offsets) + max(good_offsets)) / 2) + if len(good_offsets) <= min_good_count: + bad_lanes.append((info.slot_id, info.logic_lane)) + recommendations.append(recommended_value) + + return (recommendations, bad_lanes) + + def simple_mgc_tuner(self, lane_data_list, bad_threshold=1, min_good_count=6): + recommendations = [] + bad_lanes = [] + for info in lane_data_list: + err_map = info.mgc_err_map + good_offsets = [mgc for mgc, err in err_map.items() if err < bad_threshold] + if not good_offsets: + recommended_mgc = info.mgc + else: + recommended_mgc = int((min(good_offsets) + max(good_offsets)) / 2) + if len(good_offsets) <= 5: + bad_lanes.append((info.slot_id, info.logic_lane)) + recommendations.append(recommended_mgc) + + return (recommendations, bad_lanes) + + def print_general_err_matrix(self, lane_data_list: List[LaneErrInfo], tune_value_list): + tune_value_list = sorted(tune_value_list) + header_parts = [ + 'Host', 'Exp', 'Route', 'Slot', 'Lane', 'RT_Id', + 'RT_Lane'] + [str(value) for value in tune_value_list] + header_widths = [16, 6, 8, 6, 6, 6, 10] + [6] * len(tune_value_list) + header_format = "{:<16} {:<6} {:<8} {:<6} {:<6} {:<6} {:<10} " + " ".join("{:<6}" for _ in tune_value_list) + print((header_format.format)(*header_parts)) + print("-" * (sum(header_widths) + len(header_widths) * 1)) + for info in lane_data_list: + try: + err_counts = [] + for value in tune_value_list: + err = info.tmp_err_map.get(value, -2) + if err < 0: + err_str = "70000" + else: + err_str = (f"{err}") + err_counts.append(err_str) + + data_format = "{:<16} {:<6} {:<8} {:<6} {:<6} {:<6} {:<10} " + " ".join("{:<6}" for _ in tune_value_list) + print((data_format.format)(self.host, info.exp, info.route, info.slot_id, info.logic_lane, info.rt_phys_id, info.rt_phys_lane, *err_counts)) + except Exception as e: + logging.error(f"Error printing row for Slot {info.slot_id}, Lane {info.logic_lane}: {e}") + continue + + def print_err_matrix(self, lane_data_list, mgc_offsets): + mgc_offsets = sorted(mgc_offsets) + + header_parts = ["Host","Slot", "Lane", "RT_Id", "RT_Lane"] + [str(offset) for offset in mgc_offsets] + header_widths = [16, 6, 6] + [6] * len(mgc_offsets) + + header_format = "{:<16} {:<6} {:<6} {:<6} {:<10} " + " ".join(f"{{:<6}}" for _ in mgc_offsets) + print(header_format.format(*header_parts)) + + print("-" * (sum(header_widths) + len(header_widths) * 1)) + + for info in lane_data_list: + base_mgc = info.mgc + try: + + err_counts = [] + for offset in mgc_offsets: + target_mgc = base_mgc + offset + err = info.mgc_err_map.get(target_mgc, -1.0) # 安全获取,防止 key 不存在 + if err < 0: + err_str = "N/A" + else: + err_str = f"{err}" + err_counts.append(err_str) + + # 格式化输出 + data_format = "{:<16} {:<6} {:<6} {:<6} {:<10} " + " ".join("{:<6}" for _ in mgc_offsets) + print(data_format.format(self.host, info.slot_id, info.logic_lane, info.rt_phys_id, info.rt_phys_lane, *err_counts)) + + except Exception as e: + logging.error(f"Error printing row for Slot {info.slot_id}, Lane {info.logic_lane}: {e}") + continue + + def verify_rt_index_lane_index(self, rt_index: int, lane_index: int) -> Tuple[(bool, int, int)]: + if rt_index in (1, 2, 3, 4): + if 0 <= lane_index <= 7: + new_rt_index = rt_index * 10 + 1 + return ( + True, new_rt_index, lane_index) + else: + if 8 <= lane_index <= 15: + new_rt_index = rt_index * 10 + 2 + new_lane_index = lane_index - 8 + return (True, new_rt_index, new_lane_index) + logging.error("Invalid lane_index: must be in [0, 15]") + return (False, 0, 0) + else: + if rt_index in (11, 12, 21, 22, 31, 32, 41, 42): + logging.info(f"use retimer logic index and lane, rt_index: {rt_index}, rt_lane: {lane_index}") + if lane_index > 7: + logging.error("For logic rt_index, lane_index must be in [0, 7]") + return (False, 0, 0) + return (True, rt_index, lane_index) + else: + logging.error(f"Invalid rt_index: {rt_index}") + return (False, 0, 0) + + def manual_tune(self, exp_id: int, slot_id: int) -> bool: + target_vpeaks = self.load_target_vpeaks_from_json(exp_id, slot_id) + if target_vpeaks is None: + logging.error("从文件加载target_vpeaks失败") + return False + else: + self.match_optimal_mgc_new2(exp_id, slot_id, target_vpeaks) + return True + + def calc_target_vpeak_new(self, exp_id: int, slot_id: int, lane_list: List[int]) -> Optional[Dict[(int, int)]]: + target_vpeaks = {} + logging.info(f"-------slot {slot_id}") + time.sleep(0.05) + self.local_reg_tool.enable_tia_agc(exp_id, slot_id) + time.sleep(0.05) + logging.info("----------step 2: toogle RF off") + if not self.toogle_rf(exp_id, slot_id, "off"): + logging.error(f"slot {slot_id}: toogle RF off fail") + return target_vpeaks + else: + self.local_reg_tool.write_agc_reg_all_lane(exp_id, slot_id, 0) + time.sleep(0.5) + logging.info("----------step 3: get base_line_vpeaks, and calculate target vpeaks") + base_line_vpeaks = self.local_reg_tool.read_vpeak_reg_all_lane(exp_id, slot_id) + logging.info(f"base_line_vpeaks: {base_line_vpeaks}") + self.toogle_rf(exp_id, slot_id, "on") + self.local_reg_tool.write_agc_reg_all_lane(exp_id, slot_id, 255) + time.sleep(0.5) + max_vpeaks = self.local_reg_tool.read_vpeak_reg_all_lane(exp_id, slot_id) + logging.info(f"max_vpeaks: {max_vpeaks}") + for lane, base_vpeak in base_line_vpeaks.items(): + numerator = 19 + vpkdelta = round((max_vpeaks[lane] - base_vpeak) * (numerator / 29)) + target_vpeaks[lane] = vpkdelta + base_vpeak + diff = max_vpeaks[lane] - base_line_vpeaks[lane] + if diff < 10: + logging.error(f"host: {self.host}, exp: {exp_id}, slot: {slot_id}, lane: {lane}, max_vpeak is error") + sys.exit(-1) + + # for lane, base_vpeak in base_line_vpeaks.items(): + # diff = max_vpeaks[lane] - base_line_vpeaks[lane] + # if diff <= 22: + # logging.error(f"host: {self.host}, exp: {exp_id}, slot: {slot_id}, lane: {lane}, max_vpeak is error") + # target_vpeaks[lane] = diff - 2 + base_vpeak + # else: + # target_vpeaks[lane] = 21 + base_vpeak + + logging.info(f"target_vpeaks: {target_vpeaks}") + + if target_vpeaks: + self.save_target_vpeaks_to_json(exp_id, slot_id, target_vpeaks) + self.local_reg_tool.disable_tia_agc(exp_id, slot_id) + return target_vpeaks + + def calc_target_vpeak(self, exp_id: int, lane_list: List[int], slot_id: int) -> Dict[(int, int)]: + target_vpeaks = {} + logging.info(f"-------slot {slot_id}") + logging.info("----------step 1: disable agc") + self.disable_agc(exp_id, slot_id) + logging.info("----------step 2: toogle RF off") + if not self.toogle_rf(exp_id, slot_id, lane_list, "off"): + logging.error(f"slot {slot_id}: toogle RF off fail") + return target_vpeaks + else: + logging.info("----------step 3: get base_line_vpeaks, and calculate target vpeaks") + base_line_vpeaks = self._read_vpeak_all_lanes(exp_id, slot_id, lane_list) + logging.info(f"base_line_vpeaks: {base_line_vpeaks}") + target_vpeaks = self.calculate_target_vpeak(base_line_vpeaks) + logging.info(f"target_vpeaks: {target_vpeaks}") + if target_vpeaks: + self.save_target_vpeaks_to_json(exp_id, slot_id, target_vpeaks) + logging.info("----------step 4: enable RF") + self.toogle_rf(exp_id, slot_id, lane_list, "on") + return target_vpeaks + + def disable_agc(self, exp_id: int, slot_id: int): + logging.debug("----------disable_agc, step 1") + agc_enable_regs = self.reg_table.get_registers_by_name("tia_agc_en") + if not agc_enable_regs: + logging.error("not find AGC_ENABLE register") + return False + else: + logging.debug("----------disable_agc, step 2") + success = True + for reg in agc_enable_regs: + logging.debug(f"----------disable_agc, step 2: reg: {reg}") + result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id, + slot_id=slot_id, + bank=(reg.bank), + page=(reg.page), + reg_offset=(reg.offset), + size=(reg.size), + hex_str="00") + time.sleep(0.05) + logging.debug(f"----------disable_agc, step 3: result: {result}") + if not result: + logging.error(f"设置寄存器失败: bank={reg.bank}, page={reg.page}, offset={reg.offset}") + success = False + + return success + + def enable_agc(self, exp_id: int, slot_id: int): + time.sleep(0.05) + logging.debug("----------enable_agc, step 1") + agc_enable_regs = self.reg_table.get_registers_by_name("tia_agc_en") + if not agc_enable_regs: + logging.error("not find AGC_ENABLE register") + return False + else: + logging.debug("----------enable_agc, step 2") + success = True + for reg in agc_enable_regs: + logging.debug(f"----------enable_agc, step 2: reg: {reg}") + result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id, + slot_id=slot_id, + bank=(reg.bank), + page=(reg.page), + reg_offset=(reg.offset), + size=(reg.size), + hex_str="FF") + time.sleep(0.05) + logging.info(f"----------enable_agc, step 3: result: {result}") + if not result: + logging.error(f"设置寄存器失败: bank={reg.bank}, page={reg.page}, offset={reg.offset}") + success = False + + return success + + def toogle_rf(self, exp_id, slot_id, switch='off'): + ret = False + logging.info(f"---------- toogle_rf to {switch}") + if switch == "off": + ret = self.local_reg_tool.write_opt_reg_all_lane(exp_id, slot_id, 0, "tia_stage2") + time.sleep(0.05) + if ret == False: + logging.error(f"---------- toogle_rf to {switch} failed") + else: + stage_recommended_value = 144 + ret = self.local_reg_tool.write_opt_reg_all_lane(exp_id, slot_id, stage_recommended_value, "tia_stage2") + time.sleep(0.05) + if ret == False: + logging.error(f"---------- toogle_rf to {switch} failed") + return ret + + def calculate_target_vpeak(self, base_line_vpeaks: Dict[(int, int)]) -> Dict[(int, int)]: + target_vpeaks = {} + for lane, vpeak in base_line_vpeaks.items(): + logging.info(f"lane={lane}, vpeak={vpeak}") + target_vpeaks[lane] = vpeak + 18 + + logging.info(f"target_vpeaks={target_vpeaks}") + return target_vpeaks + + def match_optimal_mgc_new2(self, exp_id: int, slot_id: int, target_vpeaks: Optional[Dict[(int, int)]]) -> bool: + mgc_regs = self.reg_table.get_registers_by_name("mgc") + vpeak_regs = self.reg_table.get_registers_by_name("vpeak") + if mgc_regs is None or vpeak_regs is None: + logging.error("not find MGC regs") + return False + else: + success = True + matched_results = {} + unmatched_lanes = [] + target_mgcs = {} + for reg in mgc_regs: + mgc_value = self.bmc.GetOpticalModuleRegs(exp_id=exp_id, + slot_id=slot_id, + bank=(reg.bank), + page=(reg.page), + reg_offset=(reg.offset), + size=1) + mgc_value = int(mgc_value, 16) + for vpeak_reg in vpeak_regs: + if vpeak_reg.lane == reg.lane: + vpeak_value = self.bmc.GetOpticalModuleRegs(exp_id=exp_id, + slot_id=slot_id, + bank=(vpeak_reg.bank), + page=(vpeak_reg.page), + reg_offset=(vpeak_reg.offset), + size=2) + vpeak_value = int(vpeak_value, 16) + logging.info(f"mgc_value: {mgc_value}, target_vpeak: {target_vpeaks[vpeak_reg.lane]}, vpeak_value: {vpeak_value}") + target_mgc = (target_vpeaks[vpeak_reg.lane] - vpeak_value) * 1.5 + mgc_value + logging.info(f"target_mgc: {target_mgc}") + target_mgcs[reg.lane] = target_mgc + break + + logging.info(f"----------------Target MGC: {target_mgcs}") + for reg in mgc_regs: + logging.info(f"mgc regs size: {len(mgc_regs)}") + min_val = int(target_mgcs[reg.lane] - 10) + if min_val > 255: + min_val = 255 + max_val = int(target_mgcs[reg.lane] + 10) + if max_val > 255: + logging.info("max_val is greater than 255, setting to 255") + max_val = 255 + logging.info(f"lane: {reg.lane}, min mgc : {min_val}, max_mgc: {max_val}, target vpeak: {target_vpeaks[reg.lane]}") + step = reg.step + search_values = list(range(max_val, min_val - 1, -step)) + logging.info(f"Search order for lane {reg.lane}: {search_values}") + is_match = False + matched_mgc = None + matched_vpeak = None + for mgc in search_values: + result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id, + slot_id=slot_id, + bank=(reg.bank), + page=(reg.page), + reg_offset=(reg.offset), + size=1, + hex_str=f"{mgc:02X}") + time.sleep(0.05) + result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id, + slot_id=slot_id, + bank=0, + page=208, + reg_offset=136, + size=1, + hex_str="01") + time.sleep(0.05) + for vpeak_reg in vpeak_regs: + if vpeak_reg.lane == reg.lane: + value = self.bmc.GetOpticalModuleRegs(exp_id=exp_id, + slot_id=slot_id, + bank=(vpeak_reg.bank), + page=(vpeak_reg.page), + reg_offset=(vpeak_reg.offset), + size=2) + value = int(value, 16) + logging.info(f"exp_id:{exp_id}, slot_id:{slot_id}, lane:{reg.lane} -> set mgc {mgc}, Vpeak value: {value}") + if value < target_vpeaks[reg.lane] + 1 and value > target_vpeaks[reg.lane] - 2: + logging.info(f"----------Vpeak is matched, target vpeak:{target_vpeaks[reg.lane]}, current vpeak: {value}, current mgc:{mgc} ") + is_match = True + matched_mgc = mgc + matched_vpeak = value + break + + if is_match: + break + time.sleep(0.03) + + if is_match and matched_mgc is not None: + if matched_vpeak is not None: + if str(exp_id) not in matched_results: + matched_results[str(exp_id)] = {} + else: + if str(slot_id) not in matched_results[str(exp_id)]: + matched_results[str(exp_id)][str(slot_id)] = {} + if self.mode not in matched_results[str(exp_id)][str(slot_id)]: + matched_results[str(exp_id)][str(slot_id)][self.mode] = {} + matched_results[str(exp_id)][str(slot_id)][self.mode][str(reg.lane)] = {'mgc':matched_mgc, + 'target_vpeak':target_vpeaks[reg.lane], + 'actual_vpeak':matched_vpeak} + else: + unmatched_lanes.append(reg.lane) + + if matched_results: + self.save_mgc_results_to_json(exp_id, slot_id, matched_results) + if unmatched_lanes: + logging.error(f"以下lane未匹配到合适的MGC值: {unmatched_lanes}") + return success + + def match_optimal_mgc_new(self, exp_id: int, slot_id: int, lane_list: List[int], target_vpeaks: Dict[(int, int)]) -> bool: + matched_results = {} + unmatched_lanes = [] + for index, lane_id in enumerate(lane_list): + is_match = False + matched_mgc = None + matched_vpeak = None + reg = self.reg_table.get_register_by_logic_lane("mgc", lane_id) + if reg is None or reg.valid_range is None: + logging.error(f"match_optimal_mgc_new error. exp:{exp_id}, slot:{slot_id}, lane: {lane_id}, register name: mgc") + return False + min_val = reg.valid_range[0] + max_val = reg.valid_range[1] + step = reg.step if reg.step is not None else 1 + wt_mgc = int((min_val + max_val) / 2) + target_vpeak = target_vpeaks[lane_id] + target_vpeak_range = [target_vpeak, target_vpeak + 1] + count = 0 + while 1: + ret = self.local_reg_tool.write_mgc_reg(exp_id, slot_id, lane_id, wt_mgc) + time.sleep(0.05) + if ret == False: + logging.error(f"match_optimal_mgc_new error. exp:{exp_id}, slot:{slot_id}, lane: {lane_id}, register name: mgc") + return False + ret = self.local_reg_tool.write_confirm_reg(exp_id, slot_id) + if ret == False: + logging.error(f"match_optimal_mgc_new error. exp:{exp_id}, slot:{slot_id}, lane: {lane_id}, register name: confirm") + return False + time.sleep(0.05) + current_vpeak = self.local_reg_tool.read_vpeak_reg(exp_id, slot_id, lane_id) + logging.info(f"exp_id:{exp_id}, slot_id:{slot_id}, lane:{lane_id} -> set mgc {wt_mgc}, Vpeak value: {current_vpeak}, target: {target_vpeak}") + diff_vpeak = abs(target_vpeak - current_vpeak) + diff_vpeak_step = { + 0: 0, + 1: 0, + 2: 0, + 3: 1, + 4: 2, + 5: 3, + 6: 3, + 7: 4, + 8: 5} + extra_step = diff_vpeak_step.get(diff_vpeak, 5) + if current_vpeak > target_vpeak_range[1]: + wt_mgc -= step + extra_step + if current_vpeak < target_vpeak_range[0]: + wt_mgc += step + extra_step + if current_vpeak >= target_vpeak_range[0]: + if current_vpeak <= target_vpeak_range[1]: + is_match = True + matched_mgc = wt_mgc + matched_vpeak = current_vpeak + break + if count > 30: + logging.error(f"vpeak is not match, target vpeak:{target_vpeak}, current vpeak: {current_vpeak}, current mgc:{wt_mgc} ") + sys.exit(-1) + break + count += 1 + + if is_match: + logging.info(f"------------------ Vpeak is matched, target vpeak:{target_vpeak}, current vpeak: {current_vpeak}, current mgc:{wt_mgc} ") + if str(exp_id) not in matched_results: + matched_results[str(exp_id)] = {} + if str(slot_id) not in matched_results[str(exp_id)]: + matched_results[str(exp_id)][str(slot_id)] = {} + if self.mode not in matched_results[str(exp_id)][str(slot_id)]: + matched_results[str(exp_id)][str(slot_id)][self.mode] = {} + matched_results[str(exp_id)][str(slot_id)][self.mode][str(reg.lane)] = {'mgc':matched_mgc, 'target_vpeak':target_vpeaks[lane_id], + 'actual_vpeak':matched_vpeak} + else: + unmatched_lanes.append(lane_id) + + if matched_results: + self.save_mgc_results_to_json(exp_id, slot_id, matched_results) + if unmatched_lanes: + logging.error(f"以下lane未匹配到合适的MGC值: {unmatched_lanes}") + return False + else: + return True + + def match_optimal_mgc_old(self, exp_id: int, slot_id: int, lane_list: List[int], target_vpeaks: Dict[(int, int)]) -> bool: + mgc_regs = self.reg_table.get_registers_by_name("mgc") + vpeak_regs = self.reg_table.get_registers_by_name("vpeak") + if mgc_regs is None or vpeak_regs is None: + logging.error("not find MGC regs") + return False + else: + success = True + matched_results = {} + unmatched_lanes = [] + for reg in mgc_regs: + logging.info(f"mgc regs size: {len(mgc_regs)}") + logging.info(f"lane: {reg.lane}, target vpeak:{target_vpeaks[reg.lane]}, mgc valid_range: {reg.valid_range}, step: {reg.step}") + min_val = reg.valid_range[0] + max_val = reg.valid_range[1] + step = reg.step + mid_val = (min_val + max_val) // 2 + mid_val = mid_val // step * step + search_values = [ + mid_val] + offset = step + while len(search_values) < (max_val - min_val) // step + 1: + right_val = mid_val + offset + if right_val <= max_val: + search_values.append(right_val) + left_val = mid_val - offset + if left_val >= min_val: + search_values.append(left_val) + offset += step + if len(search_values) >= (max_val - min_val) // step + 1: + break + + is_match = False + matched_mgc = None + matched_vpeak = None + for mgc in search_values: + result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id, + slot_id=slot_id, + bank=(reg.bank), + page=(reg.page), + reg_offset=(reg.offset), + size=1, + hex_str=f"{mgc:02X}") + time.sleep(0.02) + result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id, + slot_id=slot_id, + bank=0, + page=208, + reg_offset=136, + size=1, + hex_str="01") + time.sleep(0.02) + for vpeak_reg in vpeak_regs: + if vpeak_reg.lane == reg.lane: + value = self.bmc.GetOpticalModuleRegs(exp_id=exp_id, + slot_id=slot_id, + bank=(vpeak_reg.bank), + page=(vpeak_reg.page), + reg_offset=(vpeak_reg.offset), + size=2) + value = int(value, 16) + logging.info(f"exp_id:{exp_id}, slot_id:{slot_id}, lane:{reg.lane} -> set mgc {mgc}, Vpeak value: {value}") + if value < target_vpeaks[reg.lane] + 2 and value > target_vpeaks[reg.lane] - 2: + logging.info(f"----------Vpeak is matched, target vpeak:{target_vpeaks[reg.lane]}, current vpeak: {value}, current mgc:{mgc} ") + is_match = True + matched_mgc = mgc + matched_vpeak = value + break + + if is_match: + break + time.sleep(0.03) + + if is_match: + if matched_mgc is not None: + if matched_vpeak is not None: + if str(exp_id) not in matched_results: + matched_results[str(exp_id)] = {} + else: + if str(slot_id) not in matched_results[str(exp_id)]: + matched_results[str(exp_id)][str(slot_id)] = {} + if self.mode not in matched_results[str(exp_id)][str(slot_id)]: + matched_results[str(exp_id)][str(slot_id)][self.mode] = {} + matched_results[str(exp_id)][str(slot_id)][self.mode][str(reg.lane)] = {'mgc':matched_mgc, + 'target_vpeak':target_vpeaks[reg.lane], + 'actual_vpeak':matched_vpeak} + unmatched_lanes.append(reg.lane) + + if matched_results: + self.save_mgc_results_to_json(exp_id, slot_id, matched_results) + if unmatched_lanes: + logging.error(f"以下lane未匹配到合适的MGC值: {unmatched_lanes}") + return success + + def _read_vpeak_all_lanes(self, exp_id: int, slot_id: int, lane_list: List[int]) -> Dict[(int, int)]: + vpeak_values = {} + for lane in lane_list: + vpeak_int = self.local_reg_tool.read_vpeak_reg(exp_id, slot_id, lane) + vpeak_values[lane] = vpeak_int + + return vpeak_values + + def _write_mgc_all_lanes(self, exp_id: int, slot_id: int, lane_list: List[int], mgc: int) -> bool: + for lane_id in lane_list: + ret = self.local_reg_tool.write_mgc_reg(exp_id, slot_id, lane_id, mgc) + if ret == False: + logging.error(f"write mgc reg failed, exp_id:{exp_id}, slot_id:{slot_id}, lane_id:{lane_id}, mgc:{mgc}") + return False + + ret = self.local_reg_tool.write_confirm_reg(exp_id, slot_id) + if ret == False: + logging.error(f"write confirm reg failed, exp_id:{exp_id}, slot_id:{slot_id}") + return False + else: + return True + + def _write_agc_all_lanes(self, exp_id: int, slot_id: int, lane_list: List[int], agc: int) -> bool: + for lane_id in lane_list: + ret = self.local_reg_tool.write_agc_reg(exp_id, slot_id, lane_id, agc) + time.sleep(0.05) + if ret == False: + logging.error(f"write agc reg failed, exp_id:{exp_id}, slot_id:{slot_id}, lane_id:{lane_id}, agc:{agc}") + return False + + ret = self.local_reg_tool.write_confirm_reg(exp_id, slot_id) + if ret == False: + logging.error(f"write confirm reg failed, exp_id:{exp_id}, slot_id:{slot_id}") + return False + else: + return True + + def _generate_host_filename(self) -> str: + safe_host = self.host.replace(":", "_").replace("/", "_").replace("\\", "_") + return f"target_vpeaks_{safe_host}.json" + + def _generate_mgc_host_filename(self) -> str: + safe_host = self.host.replace(":", "_").replace("/", "_").replace("\\", "_") + return f"mgc_{safe_host}.json" + + def save_target_vpeaks_to_json(self, exp_id: int, slot_id: int, target_vpeaks: Dict[(int, int)]) -> bool: + try: + filename = self._generate_host_filename() + os.makedirs((os.path.dirname(filename) if os.path.dirname(filename) else "."), exist_ok=True) + if os.path.exists(filename): + with open(filename, "r") as f: + data = json.load(f) + else: + data = {} + if str(exp_id) not in data: + data[str(exp_id)] = {} + if str(slot_id) not in data[str(exp_id)]: + data[str(exp_id)][str(slot_id)] = {} + data[str(exp_id)][str(slot_id)][self.mode] = {str(k): v for k, v in target_vpeaks.items()} + with open(filename, "w") as f: + json.dump(data, f, indent=4) + logging.info(f"target_vpeaks saved to {filename}, path: {exp_id}/{slot_id}/{self.mode}") + return True + except Exception as e: + logging.error(f"Failed to save target_vpeaks to JSON file: {e}") + return False + + def load_target_vpeaks_from_json(self, exp_id: int, slot_id: int) -> Optional[Dict[(int, int)]]: + try: + filename = self._generate_host_filename() + if not os.path.exists(filename): + logging.error(f"File {filename} does not exist") + return + with open(filename, "r") as f: + data = json.load(f) + if str(exp_id) not in data or str(slot_id) not in data[str(exp_id)] or self.mode not in data[str(exp_id)][str(slot_id)]: + logging.error(f"Path {exp_id}/{slot_id}/{self.mode} not found in {filename}") + return + target_vpeaks_data = data[str(exp_id)][str(slot_id)][self.mode] + target_vpeaks = {int(k): v for k, v in target_vpeaks_data.items()} + logging.info(f"target_vpeaks loaded successfully from {filename}, path: {exp_id}/{slot_id}/{self.mode}") + return target_vpeaks + except Exception as e: + logging.error(f"Failed to load target_vpeaks from JSON file: {e}") + return + + def save_mgc_results_to_json(self, exp_id: int, slot_id: int, matched_results: Dict[(str, Any)]) -> bool: + try: + filename = self._generate_mgc_host_filename() + os.makedirs((os.path.dirname(filename) if os.path.dirname(filename) else "."), exist_ok=True) + if os.path.exists(filename): + with open(filename, "r") as f: + data = json.load(f) + else: + data = {} + for exp_key, exp_data in matched_results.items(): + if exp_key not in data: + data[exp_key] = {} + for slot_key, slot_data in exp_data.items(): + if slot_key not in data[exp_key]: + data[exp_key][slot_key] = {} + for mode_key, mode_data in slot_data.items(): + if mode_key not in data[exp_key][slot_key]: + data[exp_key][slot_key][mode_key] = {} + data[exp_key][slot_key][mode_key].update(mode_data) + + with open(filename, "w") as f: + json.dump(data, f, indent=4) + logging.info(f"MGC matching results saved to {filename}") + return True + except Exception as e: + logging.error(f"Failed to save MGC results to JSON file: {e}") + return False