|  |  |  | # xz_cable_setup_check_tool.py build
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | ```shell | 
					
						
							|  |  |  | pyinstaller --onefile \ | 
					
						
							|  |  |  | --paths=/home/gyou/NexusBench-baihe-br/nexusbench \ | 
					
						
							|  |  |  | --paths=/home/gyou/NexusBench-baihe-br/nexusbench/log \ | 
					
						
							|  |  |  | --paths=/home/gyou/NexusBench-baihe-br/nexusbench/connection \ | 
					
						
							|  |  |  | --paths=/home/gyou/NexusBench-baihe-br/nexusbench/gpu/biren \ | 
					
						
							|  |  |  | --hidden-import=connection.http_helper \ | 
					
						
							|  |  |  | --runtime-tmpdir=/home/gyou/tmp \ | 
					
						
							|  |  |  | /home/gyou/NexusBench-baihe-br/nexusbench/biren_test.py | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # 加了这一行就可以运行了
 | 
					
						
							|  |  |  | --runtime-tmpdir=/home/gyou/tmp \ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | --hidden-import=log.logger \ | 
					
						
							|  |  |  | --hidden-import=connection.ssh_connection_manager \ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | ``` | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |  ./build/whiteriver_exp --host 10.57.216.109 --exp 2 --cmd vcmd --param rev | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | ./vuart -i 10.57.216.109 -e 2 -c vcmd -p ver | 
					
						
							|  |  |  | ./vuart -i 10.57.216.108 -e 4 -c fw-down -p  | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | ./build/whiteriver_exp  -i 10.57.216.108 -e 4 -c fw-down -p  | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | ./build/whiteriver_exp  -i 10.57.216.91 -e 4 -c vcmd -p ver | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | net use X: \\10.57.216.173\shared /user:root ossdbg1   | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | PicT1!2@3#4$ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   | 
					
						
							|  |  |  | RCms@Zte3 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | ./build/whiteriver_exp -i 10.57.216.94 -e 4 -c fw-down -p "./whiteriver_exp@1.0.17+2508161844.img" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | ./build/whiteriver_exp -i 10.57.216.95,10.57.216.96,10.57.216.97,10.57.216.98  | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | scp /usr/bin/ocs_link_reset root@10.57.216.166 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.166:/usr/bin/ | 
					
						
							|  |  |  | scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.165:/usr/bin/ | 
					
						
							|  |  |  | scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.187:/usr/bin/ | 
					
						
							|  |  |  | scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.148:/usr/bin/ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.163:/usr/bin/ | 
					
						
							|  |  |  | scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.139:/usr/bin/ | 
					
						
							|  |  |  | scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.173:/usr/bin/ | 
					
						
							|  |  |  | scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.167:/usr/bin/ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.134:/usr/bin/ | 
					
						
							|  |  |  | scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.145:/usr/bin/ | 
					
						
							|  |  |  | scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.176:/usr/bin/ | 
					
						
							|  |  |  | scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.180:/usr/bin/ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.185:/usr/bin/ | 
					
						
							|  |  |  | scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.150:/usr/bin/ | 
					
						
							|  |  |  | scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.168:/usr/bin/ | 
					
						
							|  |  |  | scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.174:/usr/bin/ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.132:/usr/bin/ | 
					
						
							|  |  |  | scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.189:/usr/bin/ | 
					
						
							|  |  |  | scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.151:/usr/bin/ | 
					
						
							|  |  |  | scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.156:/usr/bin/ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |      | 
					
						
							|  |  |  |     def read_tia_stage2_reg(self, exp_id: int, slot_id: int,  | 
					
						
							|  |  |  |                         lane: int) -> int: | 
					
						
							|  |  |  |         return self.read_opt_reg(exp_id, slot_id, lane, 'tia_stage2') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def write_tia_stage2_reg(self, exp_id: int, slot_id: int,  | 
					
						
							|  |  |  |                         lane: int, wt_value: int) -> bool: | 
					
						
							|  |  |  |         return self.write_opt_reg(exp_id, slot_id, lane, wt_value, 'tia_stage2') | 
					
						
							|  |  |  |      | 
					
						
							|  |  |  |     def write_confirm_reg(self, exp_id: int, slot_id: int) -> bool: | 
					
						
							|  |  |  |         return self.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 0xd0, 0x88, 1, "01") | 
					
						
							|  |  |  |      | 
					
						
							|  |  |  |     def read_vpeak_reg(self, exp_id: int, slot_id: int, lane: int) -> int: | 
					
						
							|  |  |  |         return self.read_opt_reg(exp_id, slot_id, lane, 'vpeak') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def read_opt_reg(self, exp_id: int, slot_id: int,  | 
					
						
							|  |  |  |                         lane: int, reg_name: str):   | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def calc_target_vpeak_new(self, exp_id: int, slot_id: int, lane_list: List[int]) -> Dict[int, int]: | 
					
						
							|  |  |  |         target_vpeaks : Dict[int, int] = {} | 
					
						
							|  |  |  |         logging.info(f"-------slot {slot_id}") | 
					
						
							|  |  |  |          | 
					
						
							|  |  |  |         # 切换成mgc | 
					
						
							|  |  |  |         logging.info("----------step 1: disable agc") | 
					
						
							|  |  |  |         self.disable_agc(exp_id, slot_id) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # 关闭RF | 
					
						
							|  |  |  |         logging.info("----------step 2: toogle RF off") | 
					
						
							|  |  |  |         if not self.toogle_rf(exp_id, slot_id, lane_list, "off"): | 
					
						
							|  |  |  |             logging.error(f"slot {slot_id}: toogle RF off fail") | 
					
						
							|  |  |  |             return target_vpeaks | 
					
						
							|  |  |  |          | 
					
						
							|  |  |  |         # 读取base_line_vpeaks | 
					
						
							|  |  |  |         logging.info("----------step 3: get base_line_vpeaks, and calculate target vpeaks") | 
					
						
							|  |  |  |         base_line_vpeaks = self._read_vpeak_all_lanes(exp_id, slot_id, lane_list) | 
					
						
							|  |  |  |         logging.info(f"base_line_vpeaks: {base_line_vpeaks}") | 
					
						
							|  |  |  |         self.toogle_rf(exp_id, slot_id, lane_list, "on") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self._write_mgc_all_lanes(exp_id, slot_id, lane_list, 255) | 
					
						
							|  |  |  |         time.sleep(0.05) # important | 
					
						
							|  |  |  |         max_vpeaks = self._read_vpeak_all_lanes(exp_id, slot_id, lane_list) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         for lane, base_vpeak in base_line_vpeaks.items(): | 
					
						
							|  |  |  |             numerator = 16 | 
					
						
							|  |  |  |             vpkdelta = round((max_vpeaks[lane] - base_vpeak) * (numerator / 29)) | 
					
						
							|  |  |  |             logging.info(f'-------numerator:{numerator}') | 
					
						
							|  |  |  |             target_vpeaks[lane] =  vpkdelta + base_vpeak | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         logging.info(f"target_vpeaks: {target_vpeaks}") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if target_vpeaks: | 
					
						
							|  |  |  |             self.save_target_vpeaks_to_json(exp_id, slot_id, target_vpeaks) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # 开启RF | 
					
						
							|  |  |  |         logging.info("----------step 4: enable RF") | 
					
						
							|  |  |  |         # time.sleep(5) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         return target_vpeaks | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def match_optimal_mgc_new(self, exp_id: int, slot_id: int, lane_list: List[int], target_vpeaks: Dict[int, int]) -> bool: | 
					
						
							|  |  |  |         matched_results = {}  | 
					
						
							|  |  |  |         unmatched_lanes = []   | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         for lane_id in lane_list: | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             is_match = False | 
					
						
							|  |  |  |             matched_mgc = None | 
					
						
							|  |  |  |             matched_vpeak = None | 
					
						
							|  |  |  |          | 
					
						
							|  |  |  |             reg = self.reg_table.get_register_by_logic_lane('mgc', lane_id) | 
					
						
							|  |  |  |             if reg is None or reg.valid_range is None: | 
					
						
							|  |  |  |                 logging.error(f'match_optimal_mgc_new error. exp:{exp_id}, slot:{slot_id}, lane: {lane_id}, register name: mgc') | 
					
						
							|  |  |  |                 return False | 
					
						
							|  |  |  |              | 
					
						
							|  |  |  |             min_val = reg.valid_range[0] | 
					
						
							|  |  |  |             max_val = reg.valid_range[1] | 
					
						
							|  |  |  |             step = reg.step if reg.step is not None else 1 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             wt_mgc = int((min_val + max_val) / 2) | 
					
						
							|  |  |  |             target_vpeak = target_vpeaks[lane_id] # example: 100 | 
					
						
							|  |  |  |             target_vpeak_range = [target_vpeak, target_vpeak + 1] # 99,101 | 
					
						
							|  |  |  |             while True: | 
					
						
							|  |  |  |                 ret = self.reg_tool.write_mgc_reg(exp_id, slot_id, lane_id, wt_mgc) | 
					
						
							|  |  |  |                 if ret == False: | 
					
						
							|  |  |  |                     logging.error(f'match_optimal_mgc_new error. exp:{exp_id}, slot:{slot_id}, lane: {lane_id}, register name: mgc') | 
					
						
							|  |  |  |                     return False | 
					
						
							|  |  |  |                  | 
					
						
							|  |  |  |                 time.sleep(0.05) | 
					
						
							|  |  |  |                 ret = self.reg_tool.write_confirm_reg(exp_id, slot_id) | 
					
						
							|  |  |  |                 if ret == False: | 
					
						
							|  |  |  |                     logging.error(f'match_optimal_mgc_new error. exp:{exp_id}, slot:{slot_id}, lane: {lane_id}, register name: confirm') | 
					
						
							|  |  |  |                     return False | 
					
						
							|  |  |  |                  | 
					
						
							|  |  |  |                 time.sleep(0.05) | 
					
						
							|  |  |  |                 current_vpeak = self.reg_tool.read_vpeak_reg(exp_id, slot_id, lane_id) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 logging.info(f"exp_id:{exp_id}, slot_id:{slot_id}, lane:{lane_id} ->  set mgc {wt_mgc}, Vpeak value: {current_vpeak}, target: {target_vpeak}") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 diff_vpeak = abs(target_vpeak - current_vpeak) | 
					
						
							|  |  |  |                 diff_vpeak_step = { | 
					
						
							|  |  |  |                     0: 0, | 
					
						
							|  |  |  |                     1: 0, | 
					
						
							|  |  |  |                     2: 1, | 
					
						
							|  |  |  |                     3: 1, | 
					
						
							|  |  |  |                     4: 2, | 
					
						
							|  |  |  |                     5: 3, | 
					
						
							|  |  |  |                     6: 3, | 
					
						
							|  |  |  |                     7: 4, | 
					
						
							|  |  |  |                     8: 5, | 
					
						
							|  |  |  |                 } | 
					
						
							|  |  |  |                 extra_step = diff_vpeak_step.get(diff_vpeak, 5) | 
					
						
							|  |  |  |                 if current_vpeak > target_vpeak_range[1]: | 
					
						
							|  |  |  |                     wt_mgc -= step + extra_step | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 if current_vpeak < target_vpeak_range[0]: | 
					
						
							|  |  |  |                     wt_mgc += step + extra_step | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 if current_vpeak >= target_vpeak_range[0] and current_vpeak <= target_vpeak_range[1]: | 
					
						
							|  |  |  |                     is_match = True | 
					
						
							|  |  |  |                     matched_mgc = wt_mgc | 
					
						
							|  |  |  |                     matched_vpeak = current_vpeak | 
					
						
							|  |  |  |                     break | 
					
						
							|  |  |  |              | 
					
						
							|  |  |  |             if is_match: | 
					
						
							|  |  |  |                 logging.info(f"------------------ Vpeak is matched, target vpeak:{target_vpeak}, current vpeak: {current_vpeak}, current mgc:{wt_mgc} ") | 
					
						
							|  |  |  |                 if str(exp_id) not in matched_results: | 
					
						
							|  |  |  |                     matched_results[str(exp_id)] = {} | 
					
						
							|  |  |  |                 if str(slot_id) not in matched_results[str(exp_id)]: | 
					
						
							|  |  |  |                     matched_results[str(exp_id)][str(slot_id)] = {} | 
					
						
							|  |  |  |                 if self.mode not in matched_results[str(exp_id)][str(slot_id)]: | 
					
						
							|  |  |  |                     matched_results[str(exp_id)][str(slot_id)][self.mode] = {} | 
					
						
							|  |  |  |                      | 
					
						
							|  |  |  |                 matched_results[str(exp_id)][str(slot_id)][self.mode][str(reg.lane)] = { | 
					
						
							|  |  |  |                     "mgc": matched_mgc, | 
					
						
							|  |  |  |                     "target_vpeak": target_vpeaks[lane_id], | 
					
						
							|  |  |  |                     "actual_vpeak": matched_vpeak | 
					
						
							|  |  |  |                 } | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 unmatched_lanes.append(lane_id) | 
					
						
							|  |  |  |          | 
					
						
							|  |  |  |         if matched_results: | 
					
						
							|  |  |  |             self.save_mgc_results_to_json(exp_id, slot_id, matched_results) | 
					
						
							|  |  |  |              | 
					
						
							|  |  |  |         if unmatched_lanes: | 
					
						
							|  |  |  |             logging.error(f"以下lane未匹配到合适的MGC值: {unmatched_lanes}") | 
					
						
							|  |  |  |             return False | 
					
						
							|  |  |  |              | 
					
						
							|  |  |  |         return True | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             remote_slot = self.topo_map.get_remote_slot_by_retimer(eye.rt_index, eye.lane_index, self.route_name) | 
					
						
							|  |  |  |         local_slot = self.topo_map.get_local_slot_by_retimer(eye.rt_index, eye.lane_index, self.route_name) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         import re | 
					
						
							|  |  |  | import yaml | 
					
						
							|  |  |  | from typing import List, Dict, Optional, Tuple, Iterator | 
					
						
							|  |  |  | from dataclasses import dataclass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @dataclass(frozen=True) | 
					
						
							|  |  |  | class Side: | 
					
						
							|  |  |  |     """ | 
					
						
							|  |  |  |     Represents one side of a link (e.g., host side or device side), | 
					
						
							|  |  |  |     explicitly containing GPU, Retimer, and Slot with their indices and lanes. | 
					
						
							|  |  |  |     """ | 
					
						
							|  |  |  |     gpu_index: int | 
					
						
							|  |  |  |     gpu_lane: int | 
					
						
							|  |  |  |     retimer_index: int | 
					
						
							|  |  |  |     retimer_lane: int | 
					
						
							|  |  |  |     slot_index: int | 
					
						
							|  |  |  |     slot_lane: int | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __str__(self) -> str: | 
					
						
							|  |  |  |         return (f"GPU{self.gpu_index}_L{self.gpu_lane} <-> " | 
					
						
							|  |  |  |                 f"RTMR{self.retimer_index}_L{self.retimer_lane} <-> " | 
					
						
							|  |  |  |                 f"SLOT{self.slot_index}_L{self.slot_lane}") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @dataclass(frozen=True) | 
					
						
							|  |  |  | class TopoLink: | 
					
						
							|  |  |  |     """ | 
					
						
							|  |  |  |     Represents a full physical link between two sides (A and B). | 
					
						
							|  |  |  |     Example: Side(GPU0, RTMR21, SLOT0) <-> Side(GPU6, RTMR32, SLOT0) | 
					
						
							|  |  |  |     """ | 
					
						
							|  |  |  |     side_a: Side | 
					
						
							|  |  |  |     side_b: Side | 
					
						
							|  |  |  |     route_name: str | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __str__(self) -> str: | 
					
						
							|  |  |  |         return f"{self.side_a}  <->  {self.side_b}" | 
					
						
							|  |  |  |      | 
					
						
							|  |  |  | @dataclass | 
					
						
							|  |  |  | class Route: | 
					
						
							|  |  |  |     name: str | 
					
						
							|  |  |  |     links: List[TopoLink] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __iter__(self) -> Iterator[TopoLink]: | 
					
						
							|  |  |  |         return iter(self.links) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __len__(self) -> int: | 
					
						
							|  |  |  |         return len(self.links) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TopoMappingParser: | 
					
						
							|  |  |  |     """ | 
					
						
							|  |  |  |     Parses topology YAML file into structured links with explicit Side components. | 
					
						
							|  |  |  |     Provides utility methods to query relationships between Retimer, Slot, GPU. | 
					
						
							|  |  |  |     All query methods now support optional filtering by route_name. | 
					
						
							|  |  |  |     """ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # Regex to match: DEVICE<index>_L<lane>, e.g., GPU0_L0, RTMR21_L7, SLOT5_L3 | 
					
						
							|  |  |  |     _TOKEN_PATTERN = re.compile(r"([A-Z]+)(\d+)_L(\d+)") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self, yaml_file: str): | 
					
						
							|  |  |  |         self.yaml_file = yaml_file | 
					
						
							|  |  |  |         self.routes: List[Route] = [] | 
					
						
							|  |  |  |         self._all_links: List[TopoLink] = []  # Flat index for fast lookup | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def parse(self) -> 'TopoMappingParser': | 
					
						
							|  |  |  |         """Parse YAML and build structured links.""" | 
					
						
							|  |  |  |         with open(self.yaml_file, 'r') as f: | 
					
						
							|  |  |  |             data = yaml.safe_load(f) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.routes.clear() | 
					
						
							|  |  |  |         self._all_links.clear() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         for item in data: | 
					
						
							|  |  |  |             if 'route_name' not in item or 'links' not in item: | 
					
						
							|  |  |  |                 continue | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             route_name = item['route_name'] | 
					
						
							|  |  |  |             links = [] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             for link_str in item['links']: | 
					
						
							|  |  |  |                 try: | 
					
						
							|  |  |  |                     link = self._parse_link(link_str.strip(), route_name) | 
					
						
							|  |  |  |                     links.append(link) | 
					
						
							|  |  |  |                     self._all_links.append(link) | 
					
						
							|  |  |  |                 except Exception as e: | 
					
						
							|  |  |  |                     raise ValueError(f"Failed to parse link '{link_str}' in route '{route_name}': {e}") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             self.routes.append(Route(route_name, links)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         return self | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _parse_link(self, link_str: str, route_name: str) -> TopoLink: | 
					
						
							|  |  |  |         """ | 
					
						
							|  |  |  |         Parse a link string into two structured Sides (A and B). | 
					
						
							|  |  |  |         Assumes format: GPUx_La <-> RTMRy_Lb <-> SLOTz_Lc <-> ... <-> GPUx_La | 
					
						
							|  |  |  |         And that both ends have: GPU, RTMR, SLOT in order. | 
					
						
							|  |  |  |         Middle may have repeated SLOT/RTMR. | 
					
						
							|  |  |  |         """ | 
					
						
							|  |  |  |         tokens = [t.strip() for t in link_str.split('<->')] | 
					
						
							|  |  |  |         if len(tokens) < 6: | 
					
						
							|  |  |  |             raise ValueError(f"Link too short to extract both sides: {link_str}") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Parse all nodes | 
					
						
							|  |  |  |         nodes = [self._parse_token(tok) for tok in tokens] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Find split point: assume symmetry, and RTMR is near both ends | 
					
						
							|  |  |  |         rt_indices = [i for i, (t, _, _) in enumerate(nodes) if t == "RTMR"] | 
					
						
							|  |  |  |         if len(rt_indices) < 2: | 
					
						
							|  |  |  |             raise ValueError(f"Link must have at least two retimers: {link_str}") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         mid = len(nodes) // 2 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Extract Side A (from start to middle RTMR) | 
					
						
							|  |  |  |         side_a = self._extract_side(nodes[:mid + 1]) | 
					
						
							|  |  |  |         # Extract Side B (from middle to end) | 
					
						
							|  |  |  |         side_b = self._extract_side(list(reversed(nodes[mid:]))) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         return TopoLink(side_a=side_a, side_b=side_b, route_name=route_name) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _parse_token(self, token: str) -> Tuple[str, int, int]: | 
					
						
							|  |  |  |         """ | 
					
						
							|  |  |  |         Parse token like 'GPU0_L0' into (type, index, lane) | 
					
						
							|  |  |  |         """ | 
					
						
							|  |  |  |         match = self._TOKEN_PATTERN.fullmatch(token) | 
					
						
							|  |  |  |         if not match: | 
					
						
							|  |  |  |             raise ValueError(f"Invalid token format: '{token}'") | 
					
						
							|  |  |  |         dev_type, idx, lane = match.groups() | 
					
						
							|  |  |  |         return dev_type, int(idx), int(lane) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _extract_side(self, nodes: List[Tuple[str, int, int]]) -> Side: | 
					
						
							|  |  |  |         """ | 
					
						
							|  |  |  |         Extract GPU, RTMR, SLOT from a list of nodes (assumed to be one side). | 
					
						
							|  |  |  |         Picks the first occurrence of each. | 
					
						
							|  |  |  |         """ | 
					
						
							|  |  |  |         gpu = retimer = slot = None | 
					
						
							|  |  |  |         for dev_type, idx, lane in nodes: | 
					
						
							|  |  |  |             if dev_type == "GPU" and gpu is None: | 
					
						
							|  |  |  |                 gpu = (idx, lane) | 
					
						
							|  |  |  |             elif dev_type == "RTMR" and retimer is None: | 
					
						
							|  |  |  |                 retimer = (idx, lane) | 
					
						
							|  |  |  |             elif dev_type == "SLOT" and slot is None: | 
					
						
							|  |  |  |                 slot = (idx, lane) | 
					
						
							|  |  |  |         if not gpu or not retimer or not slot: | 
					
						
							|  |  |  |             raise ValueError(f"Missing components in side: GPU={gpu}, RTMR={retimer}, SLOT={slot}") | 
					
						
							|  |  |  |         return Side( | 
					
						
							|  |  |  |             gpu_index=gpu[0], gpu_lane=gpu[1], | 
					
						
							|  |  |  |             retimer_index=retimer[0], retimer_lane=retimer[1], | 
					
						
							|  |  |  |             slot_index=slot[0], slot_lane=slot[1] | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |      | 
					
						
							|  |  |  |     # -------------------------------------------------- | 
					
						
							|  |  |  |     # 🔧 UTILITY METHODS (now support route_name filtering) | 
					
						
							|  |  |  |     # -------------------------------------------------- | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _filter_links(self, links: List[TopoLink], route_name: Optional[str]) -> List[TopoLink]: | 
					
						
							|  |  |  |         """Helper to filter links by route_name if provided.""" | 
					
						
							|  |  |  |         if route_name is None: | 
					
						
							|  |  |  |             return links | 
					
						
							|  |  |  |         return [link for link in links if link.route_name == route_name] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def get_links_by_retimer(self, rtmr_idx: int, rtmr_lane: int, route_name: Optional[str] = None) -> List[TopoLink]: | 
					
						
							|  |  |  |         """Get all links containing the given retimer (index and lane), optionally filtered by route_name.""" | 
					
						
							|  |  |  |         links = [link for link in self._all_links | 
					
						
							|  |  |  |                 if (link.side_a.retimer_index == rtmr_idx and link.side_a.retimer_lane == rtmr_lane) | 
					
						
							|  |  |  |                 or (link.side_b.retimer_index == rtmr_idx and link.side_b.retimer_lane == rtmr_lane)] | 
					
						
							|  |  |  |          | 
					
						
							|  |  |  |         return self._filter_links(links, route_name) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def get_local_side_by_retimer(self, rtmr_idx: int, rtmr_lane: int, route_name: Optional[str] = None) -> Optional[Side]: | 
					
						
							|  |  |  |         """ | 
					
						
							|  |  |  |         Given a retimer (index, lane), return the local Side (GPU + SLOT on same side), | 
					
						
							|  |  |  |         optionally filtered by route_name. | 
					
						
							|  |  |  |         """ | 
					
						
							|  |  |  |         links = self.get_links_by_retimer(rtmr_idx, rtmr_lane, route_name) | 
					
						
							|  |  |  |         for link in links: | 
					
						
							|  |  |  |             if link.side_a.retimer_index == rtmr_idx and link.side_a.retimer_lane == rtmr_lane: | 
					
						
							|  |  |  |                 return link.side_a | 
					
						
							|  |  |  |             if link.side_b.retimer_index == rtmr_idx and link.side_b.retimer_lane == rtmr_lane: | 
					
						
							|  |  |  |                 return link.side_b | 
					
						
							|  |  |  |         return None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def get_remote_side_by_retimer(self, rtmr_idx: int, rtmr_lane: int, route_name: Optional[str] = None) -> Optional[Side]: | 
					
						
							|  |  |  |         """ | 
					
						
							|  |  |  |         Given a retimer (index, lane), return the *remote* Side (the other end), | 
					
						
							|  |  |  |         optionally filtered by route_name. | 
					
						
							|  |  |  |         """ | 
					
						
							|  |  |  |         links = self.get_links_by_retimer(rtmr_idx, rtmr_lane, route_name) | 
					
						
							|  |  |  |         for link in links: | 
					
						
							|  |  |  |             if link.side_a.retimer_index == rtmr_idx and link.side_a.retimer_lane == rtmr_lane: | 
					
						
							|  |  |  |                 return link.side_b | 
					
						
							|  |  |  |             if link.side_b.retimer_index == rtmr_idx and link.side_b.retimer_lane == rtmr_lane: | 
					
						
							|  |  |  |                 return link.side_a | 
					
						
							|  |  |  |         return None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def get_local_slot_by_retimer(self, rtmr_idx: int, rtmr_lane: int, route_name: Optional[str] = None) -> Optional[Tuple[int, int]]: | 
					
						
							|  |  |  |         """ | 
					
						
							|  |  |  |         Get (slot_index, slot_lane) on the same side as the given retimer, | 
					
						
							|  |  |  |         optionally filtered by route_name. | 
					
						
							|  |  |  |         """ | 
					
						
							|  |  |  |         side = self.get_local_side_by_retimer(rtmr_idx, rtmr_lane, route_name) | 
					
						
							|  |  |  |         return (side.slot_index, side.slot_lane) if side else None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def get_remote_slot_by_retimer(self, rtmr_idx: int, rtmr_lane: int, route_name: str) -> Optional[Tuple[int, int]]: | 
					
						
							|  |  |  |         """ | 
					
						
							|  |  |  |         Get (slot_index, slot_lane) on the opposite side of the given retimer, | 
					
						
							|  |  |  |         optionally filtered by route_name. | 
					
						
							|  |  |  |         """ | 
					
						
							|  |  |  |         side = self.get_remote_side_by_retimer(rtmr_idx, rtmr_lane, route_name) | 
					
						
							|  |  |  |         return (side.slot_index, side.slot_lane) if side else None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def get_retimer_by_slot(self, slot_idx: int, slot_lane: int, route_name: str) -> Optional[Tuple[int, int, int, int]]: | 
					
						
							|  |  |  |         """ | 
					
						
							|  |  |  |         Given a slot (index, lane), return: | 
					
						
							|  |  |  |         (retimer_index, retimer_lane, peer_slot_idx, peer_slot_lane) | 
					
						
							|  |  |  |         Optionally filtered by route_name. | 
					
						
							|  |  |  |         """ | 
					
						
							|  |  |  |         links = self._filter_links(self._all_links, route_name) | 
					
						
							|  |  |  |         for link in links: | 
					
						
							|  |  |  |             if (link.side_a.slot_index == slot_idx and link.side_a.slot_lane == slot_lane): | 
					
						
							|  |  |  |                 return ( | 
					
						
							|  |  |  |                     link.side_a.retimer_index, | 
					
						
							|  |  |  |                     link.side_a.retimer_lane, | 
					
						
							|  |  |  |                     link.side_b.slot_index, | 
					
						
							|  |  |  |                     link.side_b.slot_lane | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  |             if (link.side_b.slot_index == slot_idx and link.side_b.slot_lane == slot_lane): | 
					
						
							|  |  |  |                 return ( | 
					
						
							|  |  |  |                     link.side_b.retimer_index, | 
					
						
							|  |  |  |                     link.side_b.retimer_lane, | 
					
						
							|  |  |  |                     link.side_a.slot_index, | 
					
						
							|  |  |  |                     link.side_a.slot_lane | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  |         return None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def get_gpu_by_slot(self, slot_idx: int, slot_lane: int, route_name: Optional[str] = None) -> Optional[Tuple[int, int]]: | 
					
						
							|  |  |  |         """Get (gpu_index, gpu_lane) connected to the given slot, optionally filtered by route_name.""" | 
					
						
							|  |  |  |         links = self._filter_links(self._all_links, route_name) | 
					
						
							|  |  |  |         for link in links: | 
					
						
							|  |  |  |             if link.side_a.slot_index == slot_idx and link.side_a.slot_lane == slot_lane: | 
					
						
							|  |  |  |                 return (link.side_a.gpu_index, link.side_a.gpu_lane) | 
					
						
							|  |  |  |             if link.side_b.slot_index == slot_idx and link.side_b.slot_lane == slot_lane: | 
					
						
							|  |  |  |                 return (link.side_b.gpu_index, link.side_b.gpu_lane) | 
					
						
							|  |  |  |         return None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def find_symmetric_links(self, route_name: Optional[str] = None) -> List[TopoLink]: | 
					
						
							|  |  |  |         """Find links where both sides are identical (e.g., oneta loops), optionally filtered by route_name.""" | 
					
						
							|  |  |  |         links = self._filter_links(self._all_links, route_name) | 
					
						
							|  |  |  |         return [ | 
					
						
							|  |  |  |             link for link in links | 
					
						
							|  |  |  |             if (link.side_a.gpu_index == link.side_b.gpu_index and | 
					
						
							|  |  |  |                 link.side_a.retimer_index == link.side_b.retimer_index and | 
					
						
							|  |  |  |                 link.side_a.slot_index == link.side_b.slot_index and | 
					
						
							|  |  |  |                 link.side_a.gpu_lane == link.side_b.gpu_lane and | 
					
						
							|  |  |  |                 link.side_a.retimer_lane == link.side_b.retimer_lane and | 
					
						
							|  |  |  |                 link.side_a.slot_lane == link.side_b.slot_lane) | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if __name__ == "__main__": | 
					
						
							|  |  |  |     # 示例用法 | 
					
						
							|  |  |  |     parser = TopoMappingParser("./main_data/topo_mapping.yaml") | 
					
						
							|  |  |  |     parser.parse() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # 原始调用(跨所有 route) | 
					
						
							|  |  |  |     print("All routes - local slot for RTMR21_L0:", parser.get_local_slot_by_retimer(21, 0)) | 
					
						
							|  |  |  |     print("All routes - remote slot for RTMR21_L0:", parser.get_remote_slot_by_retimer(21, 0)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # 按 route_name 查询 | 
					
						
							|  |  |  |     print("onoc6 - local slot for RTMR21_L0:", parser.get_local_slot_by_retimer(21, 0, route_name="onoc6")) | 
					
						
							|  |  |  |     print("onoc6 - remote slot for RTMR21_L0:", parser.get_remote_slot_by_retimer(21, 0, route_name="onoc6")) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     print("onoc5 - local slot for RTMR21_L0:", parser.get_local_slot_by_retimer(21, 0, route_name="onoc5")) | 
					
						
							|  |  |  |     print("onoc5 - remote slot for RTMR21_L0:", parser.get_remote_slot_by_retimer(21, 0, route_name="onoc5")) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # 其他查询也支持 route_name | 
					
						
							|  |  |  |     print("oneta - symmetric links count:", len(parser.find_symmetric_links(route_name="oneta"))) | 
					
						
							|  |  |  |     print("onoc6 - GPU connected to SLOT0_L0:", parser.get_gpu_by_slot(0, 0, route_name="onoc6")) | 
					
						
							|  |  |  |     print("onoc5 - GPU connected to SLOT0_L0:", parser.get_gpu_by_slot(0, 0, route_name="onoc5")) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 |