From 34dd1d8f0b7359a8504c72f919420983bdae5e87 Mon Sep 17 00:00:00 2001 From: xz_ocs Date: Mon, 6 Oct 2025 12:41:41 +0000 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=20'ocs/note.md'?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ocs/note.md | 1022 ++++++++++++++++----------------------------------- 1 file changed, 320 insertions(+), 702 deletions(-) diff --git a/ocs/note.md b/ocs/note.md index 628826f..fae3c34 100644 --- a/ocs/note.md +++ b/ocs/note.md @@ -76,87 +76,35 @@ scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_lin scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.156:/usr/bin/ -import logging, time, os, json -from typing import Dict, List, Any, Optional -from gpu.biren.exp_util import DevWhiteRiverExp -from toolbox.opt_reg_access_tool import OptRegAccessTool -from parser.transceiver_config_parser import TransceiverConfigParser, RegisterInfo -class MgcTuningTool: - """ - MGC自动校准工具类 - """ - def __init__(self, host:str, reg_table: TransceiverConfigParser, reg_tool: OptRegAccessTool, - bmc: DevWhiteRiverExp): - self.host = host - self.reg_table = reg_table - self.bmc = bmc - self.target_vpeak = 0 - self.reg_tool = reg_tool - self.target_vpeaks_file = "target_vpeaks.json" - self.mode = "onoc1" # 默认mode - - # 暂时只考虑了onoc模式 - # onet模式暂时手动传 - - def auto_tune(self, exp_id: int, slot_id: int, lane_list: List[int]) -> bool: - # target_vpeaks = self.calc_target_vpeak(exp_id, slot_id) - target_vpeaks = self.calc_target_vpeak_new(exp_id, slot_id, lane_list) - # 开始匹配目标target_vpeaks - logging.info("----------step 7: find optimal MGC") - self.match_optimal_mgc_new(exp_id, slot_id, lane_list, target_vpeaks) - return True - def manual_tune(self, exp_id: int, slot_id: int) -> bool: - target_vpeaks = self.load_target_vpeaks_from_json(exp_id, slot_id) - if target_vpeaks is None: - logging.error("从文件加载target_vpeaks失败") - return False - self.match_optimal_mgc_new2(exp_id, slot_id, target_vpeaks) - return True - - def calc_target_vpeak_new(self, exp_id: int, slot_id: int, lane_list: List[int]) -> Dict[int, int]: - target_vpeaks : Dict[int, int] = {} - logging.info(f"-------slot {slot_id}") - - # 切换成mgc - logging.info("----------step 1: disable agc") - self.disable_agc(exp_id, slot_id) - # 关闭RF - logging.info("----------step 2: toogle RF off") - if not self.toogle_rf(exp_id, slot_id, lane_list, "off"): - logging.error(f"slot {slot_id}: toogle RF off fail") - return target_vpeaks - - # 读取base_line_vpeaks - logging.info("----------step 3: get base_line_vpeaks, and calculate target vpeaks") - base_line_vpeaks = self._read_vpeak_all_lanes(exp_id, slot_id, lane_list) - logging.info(f"base_line_vpeaks: {base_line_vpeaks}") - self.toogle_rf(exp_id, slot_id, lane_list, "on") - self._write_mgc_all_lanes(exp_id, slot_id, lane_list, 255) - max_vpeaks = self._read_vpeak_all_lanes(exp_id, slot_id, lane_list) - for lane, base_vpeak in base_line_vpeaks.items(): - vpkdelta = round((max_vpeaks[lane] - base_vpeak) * (16 / 29)) - target_vpeaks[lane] = vpkdelta + base_vpeak + + def read_tia_stage2_reg(self, exp_id: int, slot_id: int, + lane: int) -> int: + return self.read_opt_reg(exp_id, slot_id, lane, 'tia_stage2') - logging.info(f"target_vpeaks: {target_vpeaks}") + def write_tia_stage2_reg(self, exp_id: int, slot_id: int, + lane: int, wt_value: int) -> bool: + return self.write_opt_reg(exp_id, slot_id, lane, wt_value, 'tia_stage2') + + def write_confirm_reg(self, exp_id: int, slot_id: int) -> bool: + return self.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 0xd0, 0x88, 1, "01") + + def read_vpeak_reg(self, exp_id: int, slot_id: int, lane: int) -> int: + return self.read_opt_reg(exp_id, slot_id, lane, 'vpeak') - if target_vpeaks: - self.save_target_vpeaks_to_json(exp_id, slot_id, target_vpeaks) + def read_opt_reg(self, exp_id: int, slot_id: int, + lane: int, reg_name: str): - # 开启RF - logging.info("----------step 4: enable RF") - # time.sleep(5) - return target_vpeaks - def calc_target_vpeak(self, exp_id: int, lane_list: List[int], slot_id: int) -> Dict[int, int]: + def calc_target_vpeak_new(self, exp_id: int, slot_id: int, lane_list: List[int]) -> Dict[int, int]: target_vpeaks : Dict[int, int] = {} logging.info(f"-------slot {slot_id}") @@ -174,7 +122,18 @@ class MgcTuningTool: logging.info("----------step 3: get base_line_vpeaks, and calculate target vpeaks") base_line_vpeaks = self._read_vpeak_all_lanes(exp_id, slot_id, lane_list) logging.info(f"base_line_vpeaks: {base_line_vpeaks}") - target_vpeaks = self.calculate_target_vpeak(base_line_vpeaks) + self.toogle_rf(exp_id, slot_id, lane_list, "on") + + self._write_mgc_all_lanes(exp_id, slot_id, lane_list, 255) + time.sleep(0.05) # important + max_vpeaks = self._read_vpeak_all_lanes(exp_id, slot_id, lane_list) + + for lane, base_vpeak in base_line_vpeaks.items(): + numerator = 16 + vpkdelta = round((max_vpeaks[lane] - base_vpeak) * (numerator / 29)) + logging.info(f'-------numerator:{numerator}') + target_vpeaks[lane] = vpkdelta + base_vpeak + logging.info(f"target_vpeaks: {target_vpeaks}") if target_vpeaks: @@ -183,200 +142,11 @@ class MgcTuningTool: # 开启RF logging.info("----------step 4: enable RF") # time.sleep(5) - self.toogle_rf(exp_id, slot_id, lane_list, "on") return target_vpeaks - def disable_agc(self, exp_id: int, slot_id: int): - logging.debug("----------disable_agc, step 1") - agc_enable_regs = self.reg_table.get_registers_by_name("tia_agc_en") - if not agc_enable_regs: - logging.error("not find AGC_ENABLE register") - return False - - logging.debug("----------disable_agc, step 2") - success = True - for reg in agc_enable_regs: - logging.debug(f"----------disable_agc, step 2: reg: {reg}") - result = self.bmc.SetOpticalModuleRegs( - exp_id=exp_id, - slot_id=slot_id, - bank=reg.bank, - page=reg.page, - reg_offset=reg.offset, - size=reg.size, - hex_str="0000" - ) - logging.debug(f"----------disable_agc, step 3: result: {result}") - if not result: - logging.error(f"设置寄存器失败: bank={reg.bank}, page={reg.page}, offset={reg.offset}") - success = False - - return success - - def toogle_rf(self, exp_id: int, slot_id: int, lane_list: List[int], switch: str = "off") -> bool: - ret = False - logging.info(f"---------- toogle_rf to {switch}") - for lane in lane_list: - if switch == "off": - ret = self.reg_tool.write_tia_stage2_reg(exp_id, slot_id, lane, 0) - if ret == False: - logging.error(f"---------- toogle_rf to {switch} failed") - else: - stage_recommended_value = 144 - ret = self.reg_tool.write_tia_stage2_reg(exp_id, slot_id, lane, stage_recommended_value) - if ret == False: - logging.error(f"---------- toogle_rf to {switch} failed") - - return ret - def calculate_target_vpeak(self, base_line_vpeaks: Dict[int, int]) -> Dict[int, int]: - # base_line值加22 - target_vpeaks:Dict[int, int] = {} - for lane, vpeak in base_line_vpeaks.items(): - logging.info(f"lane={lane}, vpeak={vpeak}") - target_vpeaks[lane] = vpeak + 18 - logging.info(f"target_vpeaks={target_vpeaks}") - return target_vpeaks - - def match_optimal_mgc_new2(self, exp_id: int, slot_id: int, target_vpeaks: Optional[Dict[int, int]]) -> bool: - mgc_regs = self.reg_table.get_registers_by_name("mgc") - vpeak_regs = self.reg_table.get_registers_by_name("vpeak") - if mgc_regs is None or vpeak_regs is None: - logging.error("not find MGC regs") - return False - - success = True - matched_results = {} - unmatched_lanes = [] - target_mgcs = {} - - for reg in mgc_regs: - # logging.info('------------------step 1') - mgc_value = self.bmc.GetOpticalModuleRegs( - exp_id=exp_id, - slot_id=slot_id, - bank=reg.bank, - page=reg.page, - reg_offset=reg.offset, - size=1 - ) - - # logging.info(f'------------------step 2: {mgc_value}') - mgc_value = int(mgc_value, 16) - for vpeak_reg in vpeak_regs: - if vpeak_reg.lane == reg.lane: - vpeak_value = self.bmc.GetOpticalModuleRegs( - exp_id=exp_id, - slot_id=slot_id, - bank=vpeak_reg.bank, - page=vpeak_reg.page, - reg_offset=vpeak_reg.offset, - size=2 - ) - vpeak_value = int(vpeak_value, 16) - # logging.info(f'------------------step 3: {vpeak_value}') - logging.info(f'mgc_value: {mgc_value}, target_vpeak: {target_vpeaks[vpeak_reg.lane]}, vpeak_value: {vpeak_value}') - target_mgc = (target_vpeaks[vpeak_reg.lane] - vpeak_value) * 1.5 + mgc_value - logging.info(f'target_mgc: {target_mgc}') - target_mgcs[reg.lane] = target_mgc - break - - logging.info(f'----------------Target MGC: {target_mgcs}') - - for reg in mgc_regs: - logging.info(f"mgc regs size: {len(mgc_regs)}") - - min_val = int(target_mgcs[reg.lane] - 10) - if min_val > 255: - min_val = 255 - max_val = int(target_mgcs[reg.lane] + 10) - if max_val > 255: - logging.info(f'max_val is greater than 255, setting to 255') - max_val = 255 - logging.info(f"lane: {reg.lane}, min mgc : {min_val}, max_mgc: {max_val}, target vpeak: {target_vpeaks[reg.lane]}") - step = reg.step - - - search_values = list(range(max_val, min_val - 1, -step)) - logging.info(f"Search order for lane {reg.lane}: {search_values}") - - is_match = False - matched_mgc = None - matched_vpeak = None - - for mgc in search_values: - result = self.bmc.SetOpticalModuleRegs( - exp_id=exp_id, - slot_id=slot_id, - bank=reg.bank, - page=reg.page, - reg_offset=reg.offset, - size=1, - hex_str=f"{mgc:02X}" - ) - time.sleep(0.05) - - # 新固件特殊逻辑 - result = self.bmc.SetOpticalModuleRegs( - exp_id=exp_id, - slot_id=slot_id, - bank= 0, - page=0xd0, - reg_offset=0x88, - size=1, - hex_str="01" - ) - time.sleep(0.05) - - - for vpeak_reg in vpeak_regs: - if vpeak_reg.lane == reg.lane: - value = self.bmc.GetOpticalModuleRegs( - exp_id=exp_id, - slot_id=slot_id, - bank=vpeak_reg.bank, - page=vpeak_reg.page, - reg_offset=vpeak_reg.offset, - size=2 - ) - value = int(value, 16) - logging.info(f"exp_id:{exp_id}, slot_id:{slot_id}, lane:{reg.lane} -> set mgc {mgc}, Vpeak value: {value}") - if value < (target_vpeaks[reg.lane] + 1) and value > (target_vpeaks[reg.lane] - 2): - logging.info(f"----------Vpeak is matched, target vpeak:{target_vpeaks[reg.lane]}, current vpeak: {value}, current mgc:{mgc} ") - is_match = True - matched_mgc = mgc - matched_vpeak = value - break - if is_match: - break - time.sleep(0.03) - - if is_match and matched_mgc is not None and matched_vpeak is not None: - if str(exp_id) not in matched_results: - matched_results[str(exp_id)] = {} - if str(slot_id) not in matched_results[str(exp_id)]: - matched_results[str(exp_id)][str(slot_id)] = {} - if self.mode not in matched_results[str(exp_id)][str(slot_id)]: - matched_results[str(exp_id)][str(slot_id)][self.mode] = {} - - matched_results[str(exp_id)][str(slot_id)][self.mode][str(reg.lane)] = { - "mgc": matched_mgc, - "target_vpeak": target_vpeaks[reg.lane], - "actual_vpeak": matched_vpeak - } - else: - unmatched_lanes.append(reg.lane) - - if matched_results: - self.save_mgc_results_to_json(exp_id, slot_id, matched_results) - - if unmatched_lanes: - logging.error(f"以下lane未匹配到合适的MGC值: {unmatched_lanes}") - - return success - - def match_optimal_mgc_new(self, exp_id: int, slot_id: int, lane_list: List[int], target_vpeaks: Dict[int, int]) -> bool: +def match_optimal_mgc_new(self, exp_id: int, slot_id: int, lane_list: List[int], target_vpeaks: Dict[int, int]) -> bool: matched_results = {} unmatched_lanes = [] @@ -397,28 +167,42 @@ class MgcTuningTool: wt_mgc = int((min_val + max_val) / 2) target_vpeak = target_vpeaks[lane_id] # example: 100 - target_vpeak_range = [target_vpeak -1, target_vpeak + 1] # 99,101 + target_vpeak_range = [target_vpeak, target_vpeak + 1] # 99,101 while True: ret = self.reg_tool.write_mgc_reg(exp_id, slot_id, lane_id, wt_mgc) - if ret != 0: + if ret == False: logging.error(f'match_optimal_mgc_new error. exp:{exp_id}, slot:{slot_id}, lane: {lane_id}, register name: mgc') return False - + + time.sleep(0.05) ret = self.reg_tool.write_confirm_reg(exp_id, slot_id) - if ret != 0: + if ret == False: logging.error(f'match_optimal_mgc_new error. exp:{exp_id}, slot:{slot_id}, lane: {lane_id}, register name: confirm') return False - time.sleep(0.03) + time.sleep(0.05) current_vpeak = self.reg_tool.read_vpeak_reg(exp_id, slot_id, lane_id) - logging.info(f"exp_id:{exp_id}, slot_id:{slot_id}, lane:{lane_id} -> set mgc {wt_mgc}, Vpeak value: {current_vpeak}") - + logging.info(f"exp_id:{exp_id}, slot_id:{slot_id}, lane:{lane_id} -> set mgc {wt_mgc}, Vpeak value: {current_vpeak}, target: {target_vpeak}") + + diff_vpeak = abs(target_vpeak - current_vpeak) + diff_vpeak_step = { + 0: 0, + 1: 0, + 2: 1, + 3: 1, + 4: 2, + 5: 3, + 6: 3, + 7: 4, + 8: 5, + } + extra_step = diff_vpeak_step.get(diff_vpeak, 5) if current_vpeak > target_vpeak_range[1]: - wt_mgc -= step + wt_mgc -= step + extra_step if current_vpeak < target_vpeak_range[0]: - wt_mgc += step + wt_mgc += step + extra_step if current_vpeak >= target_vpeak_range[0] and current_vpeak <= target_vpeak_range[1]: is_match = True @@ -427,7 +211,7 @@ class MgcTuningTool: break if is_match: - logging.info(f"----------Vpeak is matched, target vpeak:{target_vpeak}, current vpeak: {current_vpeak}, current mgc:{wt_mgc} ") + logging.info(f"------------------ Vpeak is matched, target vpeak:{target_vpeak}, current vpeak: {current_vpeak}, current mgc:{wt_mgc} ") if str(exp_id) not in matched_results: matched_results[str(exp_id)] = {} if str(slot_id) not in matched_results[str(exp_id)]: @@ -451,469 +235,303 @@ class MgcTuningTool: return False return True - - def _read_vpeak_all_lanes(self, exp_id: int, slot_id: int, lane_list: List[int]) -> Dict[int, int]: - vpeak_values = {} - for lane in lane_list: - vpeak_int = self.reg_tool.read_tia_peak_reg(exp_id, slot_id, lane) - vpeak_values[lane] = vpeak_int - return vpeak_values - def _write_mgc_all_lanes(self, exp_id: int, slot_id: int, lane_list: List[int], mgc: int) -> bool: - for lane_id in lane_list: - ret = self.reg_tool.write_mgc_reg(exp_id, slot_id, lane_id, mgc) - if ret == False: - logging.error(f'write mgc reg failed, exp_id:{exp_id}, slot_id:{slot_id}, lane_id:{lane_id}, mgc:{mgc}') - return False - - ret = self.reg_tool.write_confirm_reg(exp_id, slot_id) - if ret == False: - logging.error(f'write confirm reg failed, exp_id:{exp_id}, slot_id:{slot_id}') - return False - - return True + remote_slot = self.topo_map.get_remote_slot_by_retimer(eye.rt_index, eye.lane_index, self.route_name) + local_slot = self.topo_map.get_local_slot_by_retimer(eye.rt_index, eye.lane_index, self.route_name) - def _generate_host_filename(self) -> str: - safe_host = self.host.replace(':', '_').replace('/', '_').replace('\\', '_') - return f"target_vpeaks_{safe_host}.json" - - def _generate_mgc_host_filename(self) -> str: - safe_host = self.host.replace(':', '_').replace('/', '_').replace('\\', '_') - return f"mgc_{safe_host}.json" - - def save_target_vpeaks_to_json(self, exp_id: int, slot_id: int, target_vpeaks: Dict[int, int]) -> bool: - try: - filename = self._generate_host_filename() - os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True) - - if os.path.exists(filename): - with open(filename, 'r') as f: - data = json.load(f) - else: - data = {} - - if str(exp_id) not in data: - data[str(exp_id)] = {} - if str(slot_id) not in data[str(exp_id)]: - data[str(exp_id)][str(slot_id)] = {} - - data[str(exp_id)][str(slot_id)][self.mode] = {str(k): v for k, v in target_vpeaks.items()} - - with open(filename, 'w') as f: - json.dump(data, f, indent=4) - - logging.info(f"target_vpeaks saved to {filename}, path: {exp_id}/{slot_id}/{self.mode}") - return True - except Exception as e: - logging.error(f"Failed to save target_vpeaks to JSON file: {e}") - return False - def load_target_vpeaks_from_json(self, exp_id: int, slot_id: int) -> Optional[Dict[int, int]]: - try: - filename = self._generate_host_filename() - if not os.path.exists(filename): - logging.error(f"File {filename} does not exist") - return None - - with open(filename, 'r') as f: - data = json.load(f) - - if (str(exp_id) not in data or - str(slot_id) not in data[str(exp_id)] or - self.mode not in data[str(exp_id)][str(slot_id)]): - logging.error(f"Path {exp_id}/{slot_id}/{self.mode} not found in {filename}") - return None - - target_vpeaks_data = data[str(exp_id)][str(slot_id)][self.mode] - target_vpeaks = {int(k): v for k, v in target_vpeaks_data.items()} - - logging.info(f"target_vpeaks loaded successfully from {filename}, path: {exp_id}/{slot_id}/{self.mode}") - return target_vpeaks - except Exception as e: - logging.error(f"Failed to load target_vpeaks from JSON file: {e}") - return None - - def save_mgc_results_to_json(self, exp_id: int, slot_id: int, matched_results: Dict[str, Any]) -> bool: - try: - filename = self._generate_mgc_host_filename() - os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True) - - if os.path.exists(filename): - with open(filename, 'r') as f: - data = json.load(f) - else: - data = {} - - for exp_key, exp_data in matched_results.items(): - if exp_key not in data: - data[exp_key] = {} - for slot_key, slot_data in exp_data.items(): - if slot_key not in data[exp_key]: - data[exp_key][slot_key] = {} - for mode_key, mode_data in slot_data.items(): - if mode_key not in data[exp_key][slot_key]: - data[exp_key][slot_key][mode_key] = {} - data[exp_key][slot_key][mode_key].update(mode_data) - - with open(filename, 'w') as f: - json.dump(data, f, indent=4) - - logging.info(f"MGC matching results saved to {filename}") - return True - except Exception as e: - logging.error(f"Failed to save MGC results to JSON file: {e}") - return False + import re +import yaml +from typing import List, Dict, Optional, Tuple, Iterator +from dataclasses import dataclass + + +@dataclass(frozen=True) +class Side: + """ + Represents one side of a link (e.g., host side or device side), + explicitly containing GPU, Retimer, and Slot with their indices and lanes. + """ + gpu_index: int + gpu_lane: int + retimer_index: int + retimer_lane: int + slot_index: int + slot_lane: int + + def __str__(self) -> str: + return (f"GPU{self.gpu_index}_L{self.gpu_lane} <-> " + f"RTMR{self.retimer_index}_L{self.retimer_lane} <-> " + f"SLOT{self.slot_index}_L{self.slot_lane}") + + +@dataclass(frozen=True) +class TopoLink: + """ + Represents a full physical link between two sides (A and B). + Example: Side(GPU0, RTMR21, SLOT0) <-> Side(GPU6, RTMR32, SLOT0) + """ + side_a: Side + side_b: Side + route_name: str + def __str__(self) -> str: + return f"{self.side_a} <-> {self.side_b}" + +@dataclass +class Route: + name: str + links: List[TopoLink] + def __iter__(self) -> Iterator[TopoLink]: + return iter(self.links) + def __len__(self) -> int: + return len(self.links) +class TopoMappingParser: + """ + Parses topology YAML file into structured links with explicit Side components. + Provides utility methods to query relationships between Retimer, Slot, GPU. + All query methods now support optional filtering by route_name. + """ + # Regex to match: DEVICE_L, e.g., GPU0_L0, RTMR21_L7, SLOT5_L3 + _TOKEN_PATTERN = re.compile(r"([A-Z]+)(\d+)_L(\d+)") + + def __init__(self, yaml_file: str): + self.yaml_file = yaml_file + self.routes: List[Route] = [] + self._all_links: List[TopoLink] = [] # Flat index for fast lookup + + def parse(self) -> 'TopoMappingParser': + """Parse YAML and build structured links.""" + with open(self.yaml_file, 'r') as f: + data = yaml.safe_load(f) + + self.routes.clear() + self._all_links.clear() + + for item in data: + if 'route_name' not in item or 'links' not in item: + continue + + route_name = item['route_name'] + links = [] + + for link_str in item['links']: + try: + link = self._parse_link(link_str.strip(), route_name) + links.append(link) + self._all_links.append(link) + except Exception as e: + raise ValueError(f"Failed to parse link '{link_str}' in route '{route_name}': {e}") + + self.routes.append(Route(route_name, links)) + + return self + + def _parse_link(self, link_str: str, route_name: str) -> TopoLink: + """ + Parse a link string into two structured Sides (A and B). + Assumes format: GPUx_La <-> RTMRy_Lb <-> SLOTz_Lc <-> ... <-> GPUx_La + And that both ends have: GPU, RTMR, SLOT in order. + Middle may have repeated SLOT/RTMR. + """ + tokens = [t.strip() for t in link_str.split('<->')] + if len(tokens) < 6: + raise ValueError(f"Link too short to extract both sides: {link_str}") + + # Parse all nodes + nodes = [self._parse_token(tok) for tok in tokens] + + # Find split point: assume symmetry, and RTMR is near both ends + rt_indices = [i for i, (t, _, _) in enumerate(nodes) if t == "RTMR"] + if len(rt_indices) < 2: + raise ValueError(f"Link must have at least two retimers: {link_str}") + + mid = len(nodes) // 2 + + # Extract Side A (from start to middle RTMR) + side_a = self._extract_side(nodes[:mid + 1]) + # Extract Side B (from middle to end) + side_b = self._extract_side(list(reversed(nodes[mid:]))) + + return TopoLink(side_a=side_a, side_b=side_b, route_name=route_name) + + def _parse_token(self, token: str) -> Tuple[str, int, int]: + """ + Parse token like 'GPU0_L0' into (type, index, lane) + """ + match = self._TOKEN_PATTERN.fullmatch(token) + if not match: + raise ValueError(f"Invalid token format: '{token}'") + dev_type, idx, lane = match.groups() + return dev_type, int(idx), int(lane) + + def _extract_side(self, nodes: List[Tuple[str, int, int]]) -> Side: + """ + Extract GPU, RTMR, SLOT from a list of nodes (assumed to be one side). + Picks the first occurrence of each. + """ + gpu = retimer = slot = None + for dev_type, idx, lane in nodes: + if dev_type == "GPU" and gpu is None: + gpu = (idx, lane) + elif dev_type == "RTMR" and retimer is None: + retimer = (idx, lane) + elif dev_type == "SLOT" and slot is None: + slot = (idx, lane) + if not gpu or not retimer or not slot: + raise ValueError(f"Missing components in side: GPU={gpu}, RTMR={retimer}, SLOT={slot}") + return Side( + gpu_index=gpu[0], gpu_lane=gpu[1], + retimer_index=retimer[0], retimer_lane=retimer[1], + slot_index=slot[0], slot_lane=slot[1] + ) + + # -------------------------------------------------- + # 🔧 UTILITY METHODS (now support route_name filtering) + # -------------------------------------------------- + + def _filter_links(self, links: List[TopoLink], route_name: Optional[str]) -> List[TopoLink]: + """Helper to filter links by route_name if provided.""" + if route_name is None: + return links + return [link for link in links if link.route_name == route_name] + + def get_links_by_retimer(self, rtmr_idx: int, rtmr_lane: int, route_name: Optional[str] = None) -> List[TopoLink]: + """Get all links containing the given retimer (index and lane), optionally filtered by route_name.""" + links = [link for link in self._all_links + if (link.side_a.retimer_index == rtmr_idx and link.side_a.retimer_lane == rtmr_lane) + or (link.side_b.retimer_index == rtmr_idx and link.side_b.retimer_lane == rtmr_lane)] + + return self._filter_links(links, route_name) + + def get_local_side_by_retimer(self, rtmr_idx: int, rtmr_lane: int, route_name: Optional[str] = None) -> Optional[Side]: + """ + Given a retimer (index, lane), return the local Side (GPU + SLOT on same side), + optionally filtered by route_name. + """ + links = self.get_links_by_retimer(rtmr_idx, rtmr_lane, route_name) + for link in links: + if link.side_a.retimer_index == rtmr_idx and link.side_a.retimer_lane == rtmr_lane: + return link.side_a + if link.side_b.retimer_index == rtmr_idx and link.side_b.retimer_lane == rtmr_lane: + return link.side_b + return None + + def get_remote_side_by_retimer(self, rtmr_idx: int, rtmr_lane: int, route_name: Optional[str] = None) -> Optional[Side]: + """ + Given a retimer (index, lane), return the *remote* Side (the other end), + optionally filtered by route_name. + """ + links = self.get_links_by_retimer(rtmr_idx, rtmr_lane, route_name) + for link in links: + if link.side_a.retimer_index == rtmr_idx and link.side_a.retimer_lane == rtmr_lane: + return link.side_b + if link.side_b.retimer_index == rtmr_idx and link.side_b.retimer_lane == rtmr_lane: + return link.side_a + return None + + def get_local_slot_by_retimer(self, rtmr_idx: int, rtmr_lane: int, route_name: Optional[str] = None) -> Optional[Tuple[int, int]]: + """ + Get (slot_index, slot_lane) on the same side as the given retimer, + optionally filtered by route_name. + """ + side = self.get_local_side_by_retimer(rtmr_idx, rtmr_lane, route_name) + return (side.slot_index, side.slot_lane) if side else None + + def get_remote_slot_by_retimer(self, rtmr_idx: int, rtmr_lane: int, route_name: str) -> Optional[Tuple[int, int]]: + """ + Get (slot_index, slot_lane) on the opposite side of the given retimer, + optionally filtered by route_name. + """ + side = self.get_remote_side_by_retimer(rtmr_idx, rtmr_lane, route_name) + return (side.slot_index, side.slot_lane) if side else None + + def get_retimer_by_slot(self, slot_idx: int, slot_lane: int, route_name: str) -> Optional[Tuple[int, int, int, int]]: + """ + Given a slot (index, lane), return: + (retimer_index, retimer_lane, peer_slot_idx, peer_slot_lane) + Optionally filtered by route_name. + """ + links = self._filter_links(self._all_links, route_name) + for link in links: + if (link.side_a.slot_index == slot_idx and link.side_a.slot_lane == slot_lane): + return ( + link.side_a.retimer_index, + link.side_a.retimer_lane, + link.side_b.slot_index, + link.side_b.slot_lane + ) + if (link.side_b.slot_index == slot_idx and link.side_b.slot_lane == slot_lane): + return ( + link.side_b.retimer_index, + link.side_b.retimer_lane, + link.side_a.slot_index, + link.side_a.slot_lane + ) + return None + def get_gpu_by_slot(self, slot_idx: int, slot_lane: int, route_name: Optional[str] = None) -> Optional[Tuple[int, int]]: + """Get (gpu_index, gpu_lane) connected to the given slot, optionally filtered by route_name.""" + links = self._filter_links(self._all_links, route_name) + for link in links: + if link.side_a.slot_index == slot_idx and link.side_a.slot_lane == slot_lane: + return (link.side_a.gpu_index, link.side_a.gpu_lane) + if link.side_b.slot_index == slot_idx and link.side_b.slot_lane == slot_lane: + return (link.side_b.gpu_index, link.side_b.gpu_lane) + return None + def find_symmetric_links(self, route_name: Optional[str] = None) -> List[TopoLink]: + """Find links where both sides are identical (e.g., oneta loops), optionally filtered by route_name.""" + links = self._filter_links(self._all_links, route_name) + return [ + link for link in links + if (link.side_a.gpu_index == link.side_b.gpu_index and + link.side_a.retimer_index == link.side_b.retimer_index and + link.side_a.slot_index == link.side_b.slot_index and + link.side_a.gpu_lane == link.side_b.gpu_lane and + link.side_a.retimer_lane == link.side_b.retimer_lane and + link.side_a.slot_lane == link.side_b.slot_lane) + ] -import logging, math, sys -from typing import List -from gpu.biren.exp_util import DevWhiteRiverExp -from toolbox.regs_save_load_tool import LaneRegInfo -from parser.transceiver_config_parser import TransceiverConfigParser +if __name__ == "__main__": + # 示例用法 + parser = TopoMappingParser("./main_data/topo_mapping.yaml") + parser.parse() + # 原始调用(跨所有 route) + print("All routes - local slot for RTMR21_L0:", parser.get_local_slot_by_retimer(21, 0)) + print("All routes - remote slot for RTMR21_L0:", parser.get_remote_slot_by_retimer(21, 0)) -class OptRegAccessTool: - def __init__(self, host:str, reg_table_file: str, bmc: DevWhiteRiverExp): - self.host = host - self.bmc = bmc - self.parser = TransceiverConfigParser(reg_table_file) - self.reg_table = self.parser.get_all_register_config_groups() + # 按 route_name 查询 + print("onoc6 - local slot for RTMR21_L0:", parser.get_local_slot_by_retimer(21, 0, route_name="onoc6")) + print("onoc6 - remote slot for RTMR21_L0:", parser.get_remote_slot_by_retimer(21, 0, route_name="onoc6")) - def read_ibias_reg(self, exp_id: int, slot_id: int, - lane: int) -> int: - ibias_lane = self.get_ibias_by_logic_lane(lane) - logging.info(f'exp: {exp_id}, slot: {slot_id}, lane: {lane}, ibias lane: {ibias_lane}') - return self.read_opt_reg(exp_id, slot_id, ibias_lane, 'ibias') + print("onoc5 - local slot for RTMR21_L0:", parser.get_local_slot_by_retimer(21, 0, route_name="onoc5")) + print("onoc5 - remote slot for RTMR21_L0:", parser.get_remote_slot_by_retimer(21, 0, route_name="onoc5")) - def write_ibias_reg(self, exp_id: int, slot_id: int, - lane: int, wt_value: int) -> bool: - lane = self.get_ibias_by_logic_lane(lane) - return self.write_opt_reg(exp_id, slot_id, lane, wt_value, 'ibias') - - def read_rssi_reg(self, exp_id: int, slot_id: int, - lane: int) -> int: - return self.read_opt_reg(exp_id, slot_id, lane, 'rssi') - - def read_opcurrent_reg(self, exp_id: int, slot_id: int, - lane: int) -> int: - return self.read_opt_reg(exp_id, slot_id, lane, 'opcurrent') + # 其他查询也支持 route_name + print("oneta - symmetric links count:", len(parser.find_symmetric_links(route_name="oneta"))) + print("onoc6 - GPU connected to SLOT0_L0:", parser.get_gpu_by_slot(0, 0, route_name="onoc6")) + print("onoc5 - GPU connected to SLOT0_L0:", parser.get_gpu_by_slot(0, 0, route_name="onoc5")) - def write_opcurrent_reg(self, exp_id: int, slot_id: int, - lane: int, wt_value: int) -> bool: - return self.write_opt_reg(exp_id, slot_id, lane, wt_value, 'opcurrent') - - def read_mgc_reg(self, exp_id: int, slot_id: int, - lane: int) -> int: - return self.read_opt_reg(exp_id, slot_id, lane, 'mgc') - def write_mgc_reg(self, exp_id: int, slot_id: int, - lane: int, wt_value: int) -> bool: - return self.write_opt_reg(exp_id, slot_id, lane, wt_value, 'mgc') - - def read_tia_peak_reg(self, exp_id: int, slot_id: int, - lane: int) -> int: - return self.read_opt_reg(exp_id, slot_id, lane, 'tia_peak') - def write_tia_peak_reg(self, exp_id: int, slot_id: int, - lane: int, wt_value: int) -> bool: - return self.write_opt_reg(exp_id, slot_id, lane, wt_value, 'tia_peak') - - def read_high_freq_reg(self, exp_id: int, slot_id: int, - lane: int) -> int: - return self.read_opt_reg(exp_id, slot_id, lane, 'highfreq_eq') - def write_high_freq_reg(self, exp_id: int, slot_id: int, - lane: int, wt_value: int) -> bool: - return self.write_opt_reg(exp_id, slot_id, lane, wt_value, 'highfreq_eq') - - def read_tia_stage2_reg(self, exp_id: int, slot_id: int, - lane: int) -> int: - return self.read_opt_reg(exp_id, slot_id, lane, 'tia_stage2') - def write_tia_stage2_reg(self, exp_id: int, slot_id: int, - lane: int, wt_value: int) -> bool: - return self.write_opt_reg(exp_id, slot_id, lane, wt_value, 'tia_stage2') - - def write_confirm_reg(self, exp_id: int, slot_id: int) -> bool: - return self.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 0xd0, 0x88, 1, "01") - - def read_vpeak_reg(self, exp_id: int, slot_id: int, lane: int) -> int: - return self.read_opt_reg(exp_id, slot_id, lane, 'vpeak') - def read_opt_reg(self, exp_id: int, slot_id: int, - lane: int, reg_name: str): - reg_value = -1 - reg = self.parser.get_register_by_logic_lane(reg_name, lane) - if not reg: - logging.error(f'read opt register error. exp:{exp_id}, slot:{slot_id}, lane: {lane}, register name: {reg_name}') - return reg_value - - value = self.bmc.GetOpticalModuleRegs(exp_id=exp_id, slot_id=slot_id, - bank=reg.bank, page=reg.page, - reg_offset=reg.offset, size=reg.size) - reg_value = int(value, 16) - logging.info(f'{self.host} read_opt_reg exp: {exp_id}, slot: {slot_id}, lane: {lane}, reg: {reg_name}, value: {reg_value}') - - if reg_name == 'rssi': - dbm = round(10 * math.log10(reg_value / 10000), 3) - logging.info(f'{self.host} exp_id:{exp_id}, slot_id:{slot_id}, lglane:{reg.logic_lane}, lane:{reg.lane}, bank:0x{reg.bank:02X}, page:0x{reg.page:02X}, offset:0x{reg.offset:02X}, value:{reg_value}, dbm: {dbm}') - else: - logging.info(f'{self.host} exp_id:{exp_id}, slot_id:{slot_id}, lglane:{reg.logic_lane}, lane:{reg.lane}, bank:0x{reg.bank:02X}, page:0x{reg.page:02X}, offset:0x{reg.offset:02X}, value:{reg_value}') - return reg_value - - def write_opt_reg(self, exp_id: int, slot_id: int, - lane: int, wt_value: int, reg_name: str) -> bool: - ret = False - wt_value_str = '' - - reg = self.parser.get_register_by_logic_lane(reg_name, lane) - if not reg: - logging.error(f'write opt register error. exp:{exp_id}, slot:{slot_id}, lane: {lane}, register name: {reg_name}') - return ret - - if reg.size == 1: - wt_value_str = f"{wt_value:02X}" - elif reg.size == 2: - wt_value_str = f"{wt_value:04X}" - ret = self.bmc.SetOpticalModuleRegs(exp_id, slot_id=slot_id, - bank=reg.bank, page=reg.page, - reg_offset=reg.offset, size=reg.size, - hex_str=wt_value_str) - - if ret: - logging.info(f'{self.host} write_opt_reg exp: {exp_id}, slot: {slot_id}, lane: {lane}, reg: {reg_name}, value: {wt_value}') - else: - logging.error(f'{self.host} write_opt_reg exp: {exp_id}, slot: {slot_id}, lane: {lane}, reg: {reg_name}, value: {wt_value}') - logging.error(f'{self.host} write_opt_reg ret: {ret}') - return ret - - def get_ibias_by_logic_lane(self, logic_lane) -> int: - ibias_lane_map = { - 0:0, - 1:0, - 2:1, - 3:1, - 4:2, - 5:2, - 6:3, - 7:3 - } - return ibias_lane_map.get(logic_lane, -1) - - # def ReadAndWriteOptRegs(self, exp_id: int, slot_list: List[int], - # lane_list: List[int], regs: List[str], - # reg_wt_value: str, is_write: bool = False): - - # for slot_id in slot_list: - # if is_write: - # # logging.info(f'------------ reg_wt_value:{reg_wt_value}') - # self.SetPrivateLinkRegister(exp_id, slot_id, lane_list, regs, reg_wt_value) - # logging.debug('------------------2') - # self.GetPrivateLinkRegister(exp_id, slot_id, lane_list, regs) - # logging.debug('------------------3') - - def write_opt_regs(self, exp_id: int, slot_list: List[int], - lane_list: List[int], regs: List[str], - reg_wt_value: str): - - for slot_id in slot_list: - self.write_opt_registers_by_slot(exp_id, slot_id, lane_list, regs, reg_wt_value) - def read_opt_regs(self, exp_id: int, slot_list: List[int], - lane_list: List[int], regs: List[str]): - - for slot_id in slot_list: - self.read_opt_registers_by_slot(exp_id, slot_id, lane_list, regs) - - def read_opt_registers_by_slot(self, exp_id: int, slot_id: int, - lane_list: List[int], regs: List[str]) -> List[LaneRegInfo]: - laneRegValueList: List[LaneRegInfo] = [LaneRegInfo(logic_lane=lane_num) for lane_num in lane_list] - - logging.debug(f'------regs:{regs}') - for rd_reg in regs: - tmp_lane_list = lane_list - if rd_reg == 'ibias' and len(tmp_lane_list) == 8: - tmp_lane_list = [0,1,2,3] - - for lane_id in tmp_lane_list: - value = self.read_opt_reg(exp_id, slot_id, lane_id, rd_reg) - if value == -1: - break - for langRegValue in laneRegValueList: - - if langRegValue.logic_lane == lane_id: - if rd_reg == 'mgc': - langRegValue.mgc = value - elif rd_reg == 'vpeak': - langRegValue.vpeak = value - elif rd_reg == 'ibias': - langRegValue.ibias = value - elif rd_reg == 'ipcurrent': - langRegValue.ipcurrent = value - elif rd_reg == 'opcurrent': - langRegValue.opcurrent = value - elif rd_reg == 'lowfreq_eq': - langRegValue.lowfreq_eq = value - elif rd_reg == 'highfreq_eq': - langRegValue.highfreq_eq = value - elif rd_reg == 'tia_peak': - langRegValue.tia_peak = value - elif rd_reg == 'vgc_set': - langRegValue.vgc_set = value - elif rd_reg == 'drv_vpeak': - langRegValue.drv_vpeak = value - - break - - return laneRegValueList - - def write_opt_registers_by_slot(self, exp_id: int, slot_id: int, - lane_list: List[int], regs: List[str], - reg_wt_value: str): - logging.debug(f'------regs:{regs}') - for wt_reg in regs: - for lane_id in lane_list: - ret = self.write_opt_reg(exp_id, slot_id, lane_id, int(reg_wt_value), wt_reg) - if ret == False: - break - # def GetPrivateLinkRegister(self, exp_id: int, slot_id: int, - # lane_list: List[int], regs: List[str]) -> List[LaneRegInfo]: - - # laneRegValueList: List[LaneRegInfo] = [LaneRegInfo(logic_lane=lane_num) for lane_num in lane_list] - - # logging.debug(f'------regs:{regs}') - # for rd_reg in regs: - # for reg_group in self.reg_table: - # logging.debug(f'reg_group.name:{reg_group.name}, rd_reg:{rd_reg}') - # if reg_group.name == rd_reg: - # logging.info(f'Read register: {reg_group.name}, description: {reg_group.description}') - # for reg in reg_group.registers: - # # logging.info(f'-------------lane_list:{lane_list}, reg.lane: {reg.lane}') - - # if reg.logic_lane in lane_list: - # logging.info(f'-----read logic lane: {reg.logic_lane}, phys lane: {reg.lane}') - # else: - # continue - - # size = 1 - # ret = -1 - # if reg.size is not None: - # size = reg.size - # #if reg.end_offset is not None: - # # size = reg.end_offset - reg.offset + 1 - - # value = self.bmc.GetOpticalModuleRegs(exp_id, slot_id=slot_id, - # bank=reg.bank, page=reg.page, - # reg_offset=reg.offset, size=size) - - # logging.info(f'----- hex value: {value}') - # value = int(value, 16) - - # for langRegValue in laneRegValueList: - - # if langRegValue.logic_lane == reg.logic_lane: - # if rd_reg == 'mgc': - # langRegValue.mgc = value - # elif rd_reg == 'vpeak': - # langRegValue.vpeak = value - # elif rd_reg == 'ibias': - # langRegValue.ibias = value - # elif rd_reg == 'ipcurrent': - # langRegValue.ipcurrent = value - # elif rd_reg == 'opcurrent': - # langRegValue.opcurrent = value - # elif rd_reg == 'lowfreq_eq': - # langRegValue.lowfreq_eq = value - # elif rd_reg == 'highfreq_eq': - # langRegValue.highfreq_eq = value - # elif rd_reg == 'tia_peak': - # langRegValue.tia_peak = value - # elif rd_reg == 'vgc_set': - # langRegValue.vgc_set = value - # elif rd_reg == 'drv_vpeak': - # langRegValue.drv_vpeak = value - - # break - - # if rd_reg == 'rssi': - # dbm = round(10 * math.log10(value / 10000), 3) - # logging.info(f'{self.host}: exp_id:{exp_id}, slot_id:{slot_id}, lglane:{reg.logic_lane}, lane:{reg.lane}, bank:0x{reg.bank:02X}, page:0x{reg.page:02X}, offset:0x{reg.offset:02X}, value:{value}, dbm: {dbm}') - # else: - # logging.info(f'{self.host}: exp_id:{exp_id}, slot_id:{slot_id}, lglane:{reg.logic_lane}, lane:{reg.lane}, bank:0x{reg.bank:02X}, page:0x{reg.page:02X}, offset:0x{reg.offset:02X}, value:{value}') - # break - - # return laneRegValueList - - # def SetPrivateLinkRegister(self, exp_id: int, slot_id: int, - # lane_list: List[int], regs: List[str], reg_wt_value: str): - - # for wt_reg_name in regs: - # for reg_group in self.reg_table: - # if reg_group.name == wt_reg_name: - # logging.info(f'Write register: {reg_group.name}, description: {reg_group.description}') - # for reg in reg_group.registers: - # if reg.logic_lane in lane_list: - # logging.info(f'-----read logic lane: {reg.logic_lane}, phys lane: {reg.lane}') - # else: - # continue - - # size = 1 - # ret = -1 - # if reg.size is not None: - # size = reg.size - # if reg.size == 2: - # value_hex_str=f"{reg.value:04x}" - # else: - # value_hex_str=f"{reg.value:02x}" - - # if reg.end_offset is not None: - # loop = int((reg.end_offset - reg.offset + 1)/size) - # size = loop * size - # #logging.info(f"------loop:{loop}") - # value = '' - # for i in range(loop): - # value += value_hex_str - # ret = self.bmc.SetOpticalModuleRegs(exp_id, slot_id=slot_id, - # bank=reg.bank, page=reg.page, - # reg_offset=reg.offset, size=size, - # hex_str=value) - # logging.info(f'exp_id:{exp_id}, slot_id: {slot_id}, bank:0x{reg.bank:02X}, page:0x{reg.page:02X}, offset:0x{reg.offset:02X}, value:{value}, size: {size}, status: {ret}') - # else: - # write_value = value_hex_str - # if reg_wt_value is not '': - # dec = int(reg_wt_value) - # write_value = '' - # if reg.size == 1: - # write_value = format(dec, '02X') - # size = 1 - # elif reg.size == 2: - # write_value = format(dec, '04X') - # size = 2 - # else: - # logging.error(f'error value:{reg_wt_value}') - - # if write_value is not '': - # logging.info(f'reg_wt_value: {reg_wt_value}, value_hex_str: {write_value}') - # # logging.info(f'-----------------------write hex_str: {write_value}') - # ret = self.bmc.SetOpticalModuleRegs(exp_id, slot_id=slot_id, - # bank=reg.bank, page=reg.page, - # reg_offset=reg.offset, size=size, - # hex_str=write_value) - # # logging.info(f'exp_id:{exp_id}, slot_id: {slot_id}, bank:{reg.bank}, page:{reg.page}, offset:{reg.offset}, value:{value_hex_str}, status: {ret}')