# xz_cable_setup_check_tool.py build ```shell pyinstaller --onefile \ --paths=/home/gyou/NexusBench-baihe-br/nexusbench \ --paths=/home/gyou/NexusBench-baihe-br/nexusbench/log \ --paths=/home/gyou/NexusBench-baihe-br/nexusbench/connection \ --paths=/home/gyou/NexusBench-baihe-br/nexusbench/gpu/biren \ --hidden-import=connection.http_helper \ --runtime-tmpdir=/home/gyou/tmp \ /home/gyou/NexusBench-baihe-br/nexusbench/biren_test.py # 加了这一行就可以运行了 --runtime-tmpdir=/home/gyou/tmp \ --hidden-import=log.logger \ --hidden-import=connection.ssh_connection_manager \ ``` ./build/whiteriver_exp --host 10.57.216.109 --exp 2 --cmd vcmd --param rev ./vuart -i 10.57.216.109 -e 2 -c vcmd -p ver ./vuart -i 10.57.216.108 -e 4 -c fw-down -p ./build/whiteriver_exp -i 10.57.216.108 -e 4 -c fw-down -p ./build/whiteriver_exp -i 10.57.216.91 -e 4 -c vcmd -p ver net use X: \\10.57.216.173\shared /user:root ossdbg1 PicT1!2@3#4$ RCms@Zte3 ./build/whiteriver_exp -i 10.57.216.94 -e 4 -c fw-down -p "./whiteriver_exp@1.0.17+2508161844.img" ./build/whiteriver_exp -i 10.57.216.95,10.57.216.96,10.57.216.97,10.57.216.98 scp /usr/bin/ocs_link_reset root@10.57.216.166 scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.166:/usr/bin/ scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.165:/usr/bin/ scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.187:/usr/bin/ scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.148:/usr/bin/ scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.163:/usr/bin/ scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.139:/usr/bin/ scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.173:/usr/bin/ scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.167:/usr/bin/ scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.134:/usr/bin/ scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.145:/usr/bin/ scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.176:/usr/bin/ scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.180:/usr/bin/ scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.185:/usr/bin/ scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.150:/usr/bin/ scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.168:/usr/bin/ scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.174:/usr/bin/ scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.132:/usr/bin/ scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.189:/usr/bin/ scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.151:/usr/bin/ scp -o "BatchMode=no" -o "StrictHostKeyChecking=no" -i pass.txt /usr/bin/ocs_link_reset root@10.57.216.156:/usr/bin/ import logging, time, os, json from typing import Dict, List, Any, Optional from gpu.biren.exp_util import DevWhiteRiverExp from toolbox.opt_reg_access_tool import OptRegAccessTool from parser.transceiver_config_parser import TransceiverConfigParser, RegisterInfo class MgcTuningTool: """ MGC自动校准工具类 """ def __init__(self, host:str, reg_table: TransceiverConfigParser, reg_tool: OptRegAccessTool, bmc: DevWhiteRiverExp): self.host = host self.reg_table = reg_table self.bmc = bmc self.target_vpeak = 0 self.reg_tool = reg_tool self.target_vpeaks_file = "target_vpeaks.json" self.mode = "onoc1" # 默认mode # 暂时只考虑了onoc模式 # onet模式暂时手动传 def auto_tune(self, exp_id: int, slot_id: int, lane_list: List[int]) -> bool: # target_vpeaks = self.calc_target_vpeak(exp_id, slot_id) target_vpeaks = self.calc_target_vpeak_new(exp_id, slot_id, lane_list) # 开始匹配目标target_vpeaks logging.info("----------step 7: find optimal MGC") self.match_optimal_mgc_new(exp_id, slot_id, lane_list, target_vpeaks) return True def manual_tune(self, exp_id: int, slot_id: int) -> bool: target_vpeaks = self.load_target_vpeaks_from_json(exp_id, slot_id) if target_vpeaks is None: logging.error("从文件加载target_vpeaks失败") return False self.match_optimal_mgc_new2(exp_id, slot_id, target_vpeaks) return True def calc_target_vpeak_new(self, exp_id: int, slot_id: int, lane_list: List[int]) -> Dict[int, int]: target_vpeaks : Dict[int, int] = {} logging.info(f"-------slot {slot_id}") # 切换成mgc logging.info("----------step 1: disable agc") self.disable_agc(exp_id, slot_id) # 关闭RF logging.info("----------step 2: toogle RF off") if not self.toogle_rf(exp_id, slot_id, lane_list, "off"): logging.error(f"slot {slot_id}: toogle RF off fail") return target_vpeaks # 读取base_line_vpeaks logging.info("----------step 3: get base_line_vpeaks, and calculate target vpeaks") base_line_vpeaks = self._read_vpeak_all_lanes(exp_id, slot_id, lane_list) logging.info(f"base_line_vpeaks: {base_line_vpeaks}") self.toogle_rf(exp_id, slot_id, lane_list, "on") self._write_mgc_all_lanes(exp_id, slot_id, lane_list, 255) max_vpeaks = self._read_vpeak_all_lanes(exp_id, slot_id, lane_list) for lane, base_vpeak in base_line_vpeaks.items(): vpkdelta = round((max_vpeaks[lane] - base_vpeak) * (16 / 29)) target_vpeaks[lane] = vpkdelta + base_vpeak logging.info(f"target_vpeaks: {target_vpeaks}") if target_vpeaks: self.save_target_vpeaks_to_json(exp_id, slot_id, target_vpeaks) # 开启RF logging.info("----------step 4: enable RF") # time.sleep(5) return target_vpeaks def calc_target_vpeak(self, exp_id: int, lane_list: List[int], slot_id: int) -> Dict[int, int]: target_vpeaks : Dict[int, int] = {} logging.info(f"-------slot {slot_id}") # 切换成mgc logging.info("----------step 1: disable agc") self.disable_agc(exp_id, slot_id) # 关闭RF logging.info("----------step 2: toogle RF off") if not self.toogle_rf(exp_id, slot_id, lane_list, "off"): logging.error(f"slot {slot_id}: toogle RF off fail") return target_vpeaks # 读取base_line_vpeaks logging.info("----------step 3: get base_line_vpeaks, and calculate target vpeaks") base_line_vpeaks = self._read_vpeak_all_lanes(exp_id, slot_id, lane_list) logging.info(f"base_line_vpeaks: {base_line_vpeaks}") target_vpeaks = self.calculate_target_vpeak(base_line_vpeaks) logging.info(f"target_vpeaks: {target_vpeaks}") if target_vpeaks: self.save_target_vpeaks_to_json(exp_id, slot_id, target_vpeaks) # 开启RF logging.info("----------step 4: enable RF") # time.sleep(5) self.toogle_rf(exp_id, slot_id, lane_list, "on") return target_vpeaks def disable_agc(self, exp_id: int, slot_id: int): logging.debug("----------disable_agc, step 1") agc_enable_regs = self.reg_table.get_registers_by_name("tia_agc_en") if not agc_enable_regs: logging.error("not find AGC_ENABLE register") return False logging.debug("----------disable_agc, step 2") success = True for reg in agc_enable_regs: logging.debug(f"----------disable_agc, step 2: reg: {reg}") result = self.bmc.SetOpticalModuleRegs( exp_id=exp_id, slot_id=slot_id, bank=reg.bank, page=reg.page, reg_offset=reg.offset, size=reg.size, hex_str="0000" ) logging.debug(f"----------disable_agc, step 3: result: {result}") if not result: logging.error(f"设置寄存器失败: bank={reg.bank}, page={reg.page}, offset={reg.offset}") success = False return success def toogle_rf(self, exp_id: int, slot_id: int, lane_list: List[int], switch: str = "off") -> bool: ret = False logging.info(f"---------- toogle_rf to {switch}") for lane in lane_list: if switch == "off": ret = self.reg_tool.write_tia_stage2_reg(exp_id, slot_id, lane, 0) if ret == False: logging.error(f"---------- toogle_rf to {switch} failed") else: stage_recommended_value = 144 ret = self.reg_tool.write_tia_stage2_reg(exp_id, slot_id, lane, stage_recommended_value) if ret == False: logging.error(f"---------- toogle_rf to {switch} failed") return ret def calculate_target_vpeak(self, base_line_vpeaks: Dict[int, int]) -> Dict[int, int]: # base_line值加22 target_vpeaks:Dict[int, int] = {} for lane, vpeak in base_line_vpeaks.items(): logging.info(f"lane={lane}, vpeak={vpeak}") target_vpeaks[lane] = vpeak + 18 logging.info(f"target_vpeaks={target_vpeaks}") return target_vpeaks def match_optimal_mgc_new2(self, exp_id: int, slot_id: int, target_vpeaks: Optional[Dict[int, int]]) -> bool: mgc_regs = self.reg_table.get_registers_by_name("mgc") vpeak_regs = self.reg_table.get_registers_by_name("vpeak") if mgc_regs is None or vpeak_regs is None: logging.error("not find MGC regs") return False success = True matched_results = {} unmatched_lanes = [] target_mgcs = {} for reg in mgc_regs: # logging.info('------------------step 1') mgc_value = self.bmc.GetOpticalModuleRegs( exp_id=exp_id, slot_id=slot_id, bank=reg.bank, page=reg.page, reg_offset=reg.offset, size=1 ) # logging.info(f'------------------step 2: {mgc_value}') mgc_value = int(mgc_value, 16) for vpeak_reg in vpeak_regs: if vpeak_reg.lane == reg.lane: vpeak_value = self.bmc.GetOpticalModuleRegs( exp_id=exp_id, slot_id=slot_id, bank=vpeak_reg.bank, page=vpeak_reg.page, reg_offset=vpeak_reg.offset, size=2 ) vpeak_value = int(vpeak_value, 16) # logging.info(f'------------------step 3: {vpeak_value}') logging.info(f'mgc_value: {mgc_value}, target_vpeak: {target_vpeaks[vpeak_reg.lane]}, vpeak_value: {vpeak_value}') target_mgc = (target_vpeaks[vpeak_reg.lane] - vpeak_value) * 1.5 + mgc_value logging.info(f'target_mgc: {target_mgc}') target_mgcs[reg.lane] = target_mgc break logging.info(f'----------------Target MGC: {target_mgcs}') for reg in mgc_regs: logging.info(f"mgc regs size: {len(mgc_regs)}") min_val = int(target_mgcs[reg.lane] - 10) if min_val > 255: min_val = 255 max_val = int(target_mgcs[reg.lane] + 10) if max_val > 255: logging.info(f'max_val is greater than 255, setting to 255') max_val = 255 logging.info(f"lane: {reg.lane}, min mgc : {min_val}, max_mgc: {max_val}, target vpeak: {target_vpeaks[reg.lane]}") step = reg.step search_values = list(range(max_val, min_val - 1, -step)) logging.info(f"Search order for lane {reg.lane}: {search_values}") is_match = False matched_mgc = None matched_vpeak = None for mgc in search_values: result = self.bmc.SetOpticalModuleRegs( exp_id=exp_id, slot_id=slot_id, bank=reg.bank, page=reg.page, reg_offset=reg.offset, size=1, hex_str=f"{mgc:02X}" ) time.sleep(0.05) # 新固件特殊逻辑 result = self.bmc.SetOpticalModuleRegs( exp_id=exp_id, slot_id=slot_id, bank= 0, page=0xd0, reg_offset=0x88, size=1, hex_str="01" ) time.sleep(0.05) for vpeak_reg in vpeak_regs: if vpeak_reg.lane == reg.lane: value = self.bmc.GetOpticalModuleRegs( exp_id=exp_id, slot_id=slot_id, bank=vpeak_reg.bank, page=vpeak_reg.page, reg_offset=vpeak_reg.offset, size=2 ) value = int(value, 16) logging.info(f"exp_id:{exp_id}, slot_id:{slot_id}, lane:{reg.lane} -> set mgc {mgc}, Vpeak value: {value}") if value < (target_vpeaks[reg.lane] + 1) and value > (target_vpeaks[reg.lane] - 2): logging.info(f"----------Vpeak is matched, target vpeak:{target_vpeaks[reg.lane]}, current vpeak: {value}, current mgc:{mgc} ") is_match = True matched_mgc = mgc matched_vpeak = value break if is_match: break time.sleep(0.03) if is_match and matched_mgc is not None and matched_vpeak is not None: if str(exp_id) not in matched_results: matched_results[str(exp_id)] = {} if str(slot_id) not in matched_results[str(exp_id)]: matched_results[str(exp_id)][str(slot_id)] = {} if self.mode not in matched_results[str(exp_id)][str(slot_id)]: matched_results[str(exp_id)][str(slot_id)][self.mode] = {} matched_results[str(exp_id)][str(slot_id)][self.mode][str(reg.lane)] = { "mgc": matched_mgc, "target_vpeak": target_vpeaks[reg.lane], "actual_vpeak": matched_vpeak } else: unmatched_lanes.append(reg.lane) if matched_results: self.save_mgc_results_to_json(exp_id, slot_id, matched_results) if unmatched_lanes: logging.error(f"以下lane未匹配到合适的MGC值: {unmatched_lanes}") return success def match_optimal_mgc_new(self, exp_id: int, slot_id: int, lane_list: List[int], target_vpeaks: Dict[int, int]) -> bool: matched_results = {} unmatched_lanes = [] for lane_id in lane_list: is_match = False matched_mgc = None matched_vpeak = None reg = self.reg_table.get_register_by_logic_lane('mgc', lane_id) if reg is None or reg.valid_range is None: logging.error(f'match_optimal_mgc_new error. exp:{exp_id}, slot:{slot_id}, lane: {lane_id}, register name: mgc') return False min_val = reg.valid_range[0] max_val = reg.valid_range[1] step = reg.step if reg.step is not None else 1 wt_mgc = int((min_val + max_val) / 2) target_vpeak = target_vpeaks[lane_id] # example: 100 target_vpeak_range = [target_vpeak -1, target_vpeak + 1] # 99,101 while True: ret = self.reg_tool.write_mgc_reg(exp_id, slot_id, lane_id, wt_mgc) if ret != 0: logging.error(f'match_optimal_mgc_new error. exp:{exp_id}, slot:{slot_id}, lane: {lane_id}, register name: mgc') return False ret = self.reg_tool.write_confirm_reg(exp_id, slot_id) if ret != 0: logging.error(f'match_optimal_mgc_new error. exp:{exp_id}, slot:{slot_id}, lane: {lane_id}, register name: confirm') return False time.sleep(0.03) current_vpeak = self.reg_tool.read_vpeak_reg(exp_id, slot_id, lane_id) logging.info(f"exp_id:{exp_id}, slot_id:{slot_id}, lane:{lane_id} -> set mgc {wt_mgc}, Vpeak value: {current_vpeak}") if current_vpeak > target_vpeak_range[1]: wt_mgc -= step if current_vpeak < target_vpeak_range[0]: wt_mgc += step if current_vpeak >= target_vpeak_range[0] and current_vpeak <= target_vpeak_range[1]: is_match = True matched_mgc = wt_mgc matched_vpeak = current_vpeak break if is_match: logging.info(f"----------Vpeak is matched, target vpeak:{target_vpeak}, current vpeak: {current_vpeak}, current mgc:{wt_mgc} ") if str(exp_id) not in matched_results: matched_results[str(exp_id)] = {} if str(slot_id) not in matched_results[str(exp_id)]: matched_results[str(exp_id)][str(slot_id)] = {} if self.mode not in matched_results[str(exp_id)][str(slot_id)]: matched_results[str(exp_id)][str(slot_id)][self.mode] = {} matched_results[str(exp_id)][str(slot_id)][self.mode][str(reg.lane)] = { "mgc": matched_mgc, "target_vpeak": target_vpeaks[lane_id], "actual_vpeak": matched_vpeak } else: unmatched_lanes.append(lane_id) if matched_results: self.save_mgc_results_to_json(exp_id, slot_id, matched_results) if unmatched_lanes: logging.error(f"以下lane未匹配到合适的MGC值: {unmatched_lanes}") return False return True def _read_vpeak_all_lanes(self, exp_id: int, slot_id: int, lane_list: List[int]) -> Dict[int, int]: vpeak_values = {} for lane in lane_list: vpeak_int = self.reg_tool.read_tia_peak_reg(exp_id, slot_id, lane) vpeak_values[lane] = vpeak_int return vpeak_values def _write_mgc_all_lanes(self, exp_id: int, slot_id: int, lane_list: List[int], mgc: int) -> bool: for lane_id in lane_list: ret = self.reg_tool.write_mgc_reg(exp_id, slot_id, lane_id, mgc) if ret == False: logging.error(f'write mgc reg failed, exp_id:{exp_id}, slot_id:{slot_id}, lane_id:{lane_id}, mgc:{mgc}') return False ret = self.reg_tool.write_confirm_reg(exp_id, slot_id) if ret == False: logging.error(f'write confirm reg failed, exp_id:{exp_id}, slot_id:{slot_id}') return False return True def _generate_host_filename(self) -> str: safe_host = self.host.replace(':', '_').replace('/', '_').replace('\\', '_') return f"target_vpeaks_{safe_host}.json" def _generate_mgc_host_filename(self) -> str: safe_host = self.host.replace(':', '_').replace('/', '_').replace('\\', '_') return f"mgc_{safe_host}.json" def save_target_vpeaks_to_json(self, exp_id: int, slot_id: int, target_vpeaks: Dict[int, int]) -> bool: try: filename = self._generate_host_filename() os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True) if os.path.exists(filename): with open(filename, 'r') as f: data = json.load(f) else: data = {} if str(exp_id) not in data: data[str(exp_id)] = {} if str(slot_id) not in data[str(exp_id)]: data[str(exp_id)][str(slot_id)] = {} data[str(exp_id)][str(slot_id)][self.mode] = {str(k): v for k, v in target_vpeaks.items()} with open(filename, 'w') as f: json.dump(data, f, indent=4) logging.info(f"target_vpeaks saved to {filename}, path: {exp_id}/{slot_id}/{self.mode}") return True except Exception as e: logging.error(f"Failed to save target_vpeaks to JSON file: {e}") return False def load_target_vpeaks_from_json(self, exp_id: int, slot_id: int) -> Optional[Dict[int, int]]: try: filename = self._generate_host_filename() if not os.path.exists(filename): logging.error(f"File {filename} does not exist") return None with open(filename, 'r') as f: data = json.load(f) if (str(exp_id) not in data or str(slot_id) not in data[str(exp_id)] or self.mode not in data[str(exp_id)][str(slot_id)]): logging.error(f"Path {exp_id}/{slot_id}/{self.mode} not found in {filename}") return None target_vpeaks_data = data[str(exp_id)][str(slot_id)][self.mode] target_vpeaks = {int(k): v for k, v in target_vpeaks_data.items()} logging.info(f"target_vpeaks loaded successfully from {filename}, path: {exp_id}/{slot_id}/{self.mode}") return target_vpeaks except Exception as e: logging.error(f"Failed to load target_vpeaks from JSON file: {e}") return None def save_mgc_results_to_json(self, exp_id: int, slot_id: int, matched_results: Dict[str, Any]) -> bool: try: filename = self._generate_mgc_host_filename() os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True) if os.path.exists(filename): with open(filename, 'r') as f: data = json.load(f) else: data = {} for exp_key, exp_data in matched_results.items(): if exp_key not in data: data[exp_key] = {} for slot_key, slot_data in exp_data.items(): if slot_key not in data[exp_key]: data[exp_key][slot_key] = {} for mode_key, mode_data in slot_data.items(): if mode_key not in data[exp_key][slot_key]: data[exp_key][slot_key][mode_key] = {} data[exp_key][slot_key][mode_key].update(mode_data) with open(filename, 'w') as f: json.dump(data, f, indent=4) logging.info(f"MGC matching results saved to {filename}") return True except Exception as e: logging.error(f"Failed to save MGC results to JSON file: {e}") return False import logging, math, sys from typing import List from gpu.biren.exp_util import DevWhiteRiverExp from toolbox.regs_save_load_tool import LaneRegInfo from parser.transceiver_config_parser import TransceiverConfigParser class OptRegAccessTool: def __init__(self, host:str, reg_table_file: str, bmc: DevWhiteRiverExp): self.host = host self.bmc = bmc self.parser = TransceiverConfigParser(reg_table_file) self.reg_table = self.parser.get_all_register_config_groups() def read_ibias_reg(self, exp_id: int, slot_id: int, lane: int) -> int: ibias_lane = self.get_ibias_by_logic_lane(lane) logging.info(f'exp: {exp_id}, slot: {slot_id}, lane: {lane}, ibias lane: {ibias_lane}') return self.read_opt_reg(exp_id, slot_id, ibias_lane, 'ibias') def write_ibias_reg(self, exp_id: int, slot_id: int, lane: int, wt_value: int) -> bool: lane = self.get_ibias_by_logic_lane(lane) return self.write_opt_reg(exp_id, slot_id, lane, wt_value, 'ibias') def read_rssi_reg(self, exp_id: int, slot_id: int, lane: int) -> int: return self.read_opt_reg(exp_id, slot_id, lane, 'rssi') def read_opcurrent_reg(self, exp_id: int, slot_id: int, lane: int) -> int: return self.read_opt_reg(exp_id, slot_id, lane, 'opcurrent') def write_opcurrent_reg(self, exp_id: int, slot_id: int, lane: int, wt_value: int) -> bool: return self.write_opt_reg(exp_id, slot_id, lane, wt_value, 'opcurrent') def read_mgc_reg(self, exp_id: int, slot_id: int, lane: int) -> int: return self.read_opt_reg(exp_id, slot_id, lane, 'mgc') def write_mgc_reg(self, exp_id: int, slot_id: int, lane: int, wt_value: int) -> bool: return self.write_opt_reg(exp_id, slot_id, lane, wt_value, 'mgc') def read_tia_peak_reg(self, exp_id: int, slot_id: int, lane: int) -> int: return self.read_opt_reg(exp_id, slot_id, lane, 'tia_peak') def write_tia_peak_reg(self, exp_id: int, slot_id: int, lane: int, wt_value: int) -> bool: return self.write_opt_reg(exp_id, slot_id, lane, wt_value, 'tia_peak') def read_high_freq_reg(self, exp_id: int, slot_id: int, lane: int) -> int: return self.read_opt_reg(exp_id, slot_id, lane, 'highfreq_eq') def write_high_freq_reg(self, exp_id: int, slot_id: int, lane: int, wt_value: int) -> bool: return self.write_opt_reg(exp_id, slot_id, lane, wt_value, 'highfreq_eq') def read_tia_stage2_reg(self, exp_id: int, slot_id: int, lane: int) -> int: return self.read_opt_reg(exp_id, slot_id, lane, 'tia_stage2') def write_tia_stage2_reg(self, exp_id: int, slot_id: int, lane: int, wt_value: int) -> bool: return self.write_opt_reg(exp_id, slot_id, lane, wt_value, 'tia_stage2') def write_confirm_reg(self, exp_id: int, slot_id: int) -> bool: return self.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 0xd0, 0x88, 1, "01") def read_vpeak_reg(self, exp_id: int, slot_id: int, lane: int) -> int: return self.read_opt_reg(exp_id, slot_id, lane, 'vpeak') def read_opt_reg(self, exp_id: int, slot_id: int, lane: int, reg_name: str): reg_value = -1 reg = self.parser.get_register_by_logic_lane(reg_name, lane) if not reg: logging.error(f'read opt register error. exp:{exp_id}, slot:{slot_id}, lane: {lane}, register name: {reg_name}') return reg_value value = self.bmc.GetOpticalModuleRegs(exp_id=exp_id, slot_id=slot_id, bank=reg.bank, page=reg.page, reg_offset=reg.offset, size=reg.size) reg_value = int(value, 16) logging.info(f'{self.host} read_opt_reg exp: {exp_id}, slot: {slot_id}, lane: {lane}, reg: {reg_name}, value: {reg_value}') if reg_name == 'rssi': dbm = round(10 * math.log10(reg_value / 10000), 3) logging.info(f'{self.host} exp_id:{exp_id}, slot_id:{slot_id}, lglane:{reg.logic_lane}, lane:{reg.lane}, bank:0x{reg.bank:02X}, page:0x{reg.page:02X}, offset:0x{reg.offset:02X}, value:{reg_value}, dbm: {dbm}') else: logging.info(f'{self.host} exp_id:{exp_id}, slot_id:{slot_id}, lglane:{reg.logic_lane}, lane:{reg.lane}, bank:0x{reg.bank:02X}, page:0x{reg.page:02X}, offset:0x{reg.offset:02X}, value:{reg_value}') return reg_value def write_opt_reg(self, exp_id: int, slot_id: int, lane: int, wt_value: int, reg_name: str) -> bool: ret = False wt_value_str = '' reg = self.parser.get_register_by_logic_lane(reg_name, lane) if not reg: logging.error(f'write opt register error. exp:{exp_id}, slot:{slot_id}, lane: {lane}, register name: {reg_name}') return ret if reg.size == 1: wt_value_str = f"{wt_value:02X}" elif reg.size == 2: wt_value_str = f"{wt_value:04X}" ret = self.bmc.SetOpticalModuleRegs(exp_id, slot_id=slot_id, bank=reg.bank, page=reg.page, reg_offset=reg.offset, size=reg.size, hex_str=wt_value_str) if ret: logging.info(f'{self.host} write_opt_reg exp: {exp_id}, slot: {slot_id}, lane: {lane}, reg: {reg_name}, value: {wt_value}') else: logging.error(f'{self.host} write_opt_reg exp: {exp_id}, slot: {slot_id}, lane: {lane}, reg: {reg_name}, value: {wt_value}') logging.error(f'{self.host} write_opt_reg ret: {ret}') return ret def get_ibias_by_logic_lane(self, logic_lane) -> int: ibias_lane_map = { 0:0, 1:0, 2:1, 3:1, 4:2, 5:2, 6:3, 7:3 } return ibias_lane_map.get(logic_lane, -1) # def ReadAndWriteOptRegs(self, exp_id: int, slot_list: List[int], # lane_list: List[int], regs: List[str], # reg_wt_value: str, is_write: bool = False): # for slot_id in slot_list: # if is_write: # # logging.info(f'------------ reg_wt_value:{reg_wt_value}') # self.SetPrivateLinkRegister(exp_id, slot_id, lane_list, regs, reg_wt_value) # logging.debug('------------------2') # self.GetPrivateLinkRegister(exp_id, slot_id, lane_list, regs) # logging.debug('------------------3') def write_opt_regs(self, exp_id: int, slot_list: List[int], lane_list: List[int], regs: List[str], reg_wt_value: str): for slot_id in slot_list: self.write_opt_registers_by_slot(exp_id, slot_id, lane_list, regs, reg_wt_value) def read_opt_regs(self, exp_id: int, slot_list: List[int], lane_list: List[int], regs: List[str]): for slot_id in slot_list: self.read_opt_registers_by_slot(exp_id, slot_id, lane_list, regs) def read_opt_registers_by_slot(self, exp_id: int, slot_id: int, lane_list: List[int], regs: List[str]) -> List[LaneRegInfo]: laneRegValueList: List[LaneRegInfo] = [LaneRegInfo(logic_lane=lane_num) for lane_num in lane_list] logging.debug(f'------regs:{regs}') for rd_reg in regs: tmp_lane_list = lane_list if rd_reg == 'ibias' and len(tmp_lane_list) == 8: tmp_lane_list = [0,1,2,3] for lane_id in tmp_lane_list: value = self.read_opt_reg(exp_id, slot_id, lane_id, rd_reg) if value == -1: break for langRegValue in laneRegValueList: if langRegValue.logic_lane == lane_id: if rd_reg == 'mgc': langRegValue.mgc = value elif rd_reg == 'vpeak': langRegValue.vpeak = value elif rd_reg == 'ibias': langRegValue.ibias = value elif rd_reg == 'ipcurrent': langRegValue.ipcurrent = value elif rd_reg == 'opcurrent': langRegValue.opcurrent = value elif rd_reg == 'lowfreq_eq': langRegValue.lowfreq_eq = value elif rd_reg == 'highfreq_eq': langRegValue.highfreq_eq = value elif rd_reg == 'tia_peak': langRegValue.tia_peak = value elif rd_reg == 'vgc_set': langRegValue.vgc_set = value elif rd_reg == 'drv_vpeak': langRegValue.drv_vpeak = value break return laneRegValueList def write_opt_registers_by_slot(self, exp_id: int, slot_id: int, lane_list: List[int], regs: List[str], reg_wt_value: str): logging.debug(f'------regs:{regs}') for wt_reg in regs: for lane_id in lane_list: ret = self.write_opt_reg(exp_id, slot_id, lane_id, int(reg_wt_value), wt_reg) if ret == False: break # def GetPrivateLinkRegister(self, exp_id: int, slot_id: int, # lane_list: List[int], regs: List[str]) -> List[LaneRegInfo]: # laneRegValueList: List[LaneRegInfo] = [LaneRegInfo(logic_lane=lane_num) for lane_num in lane_list] # logging.debug(f'------regs:{regs}') # for rd_reg in regs: # for reg_group in self.reg_table: # logging.debug(f'reg_group.name:{reg_group.name}, rd_reg:{rd_reg}') # if reg_group.name == rd_reg: # logging.info(f'Read register: {reg_group.name}, description: {reg_group.description}') # for reg in reg_group.registers: # # logging.info(f'-------------lane_list:{lane_list}, reg.lane: {reg.lane}') # if reg.logic_lane in lane_list: # logging.info(f'-----read logic lane: {reg.logic_lane}, phys lane: {reg.lane}') # else: # continue # size = 1 # ret = -1 # if reg.size is not None: # size = reg.size # #if reg.end_offset is not None: # # size = reg.end_offset - reg.offset + 1 # value = self.bmc.GetOpticalModuleRegs(exp_id, slot_id=slot_id, # bank=reg.bank, page=reg.page, # reg_offset=reg.offset, size=size) # logging.info(f'----- hex value: {value}') # value = int(value, 16) # for langRegValue in laneRegValueList: # if langRegValue.logic_lane == reg.logic_lane: # if rd_reg == 'mgc': # langRegValue.mgc = value # elif rd_reg == 'vpeak': # langRegValue.vpeak = value # elif rd_reg == 'ibias': # langRegValue.ibias = value # elif rd_reg == 'ipcurrent': # langRegValue.ipcurrent = value # elif rd_reg == 'opcurrent': # langRegValue.opcurrent = value # elif rd_reg == 'lowfreq_eq': # langRegValue.lowfreq_eq = value # elif rd_reg == 'highfreq_eq': # langRegValue.highfreq_eq = value # elif rd_reg == 'tia_peak': # langRegValue.tia_peak = value # elif rd_reg == 'vgc_set': # langRegValue.vgc_set = value # elif rd_reg == 'drv_vpeak': # langRegValue.drv_vpeak = value # break # if rd_reg == 'rssi': # dbm = round(10 * math.log10(value / 10000), 3) # logging.info(f'{self.host}: exp_id:{exp_id}, slot_id:{slot_id}, lglane:{reg.logic_lane}, lane:{reg.lane}, bank:0x{reg.bank:02X}, page:0x{reg.page:02X}, offset:0x{reg.offset:02X}, value:{value}, dbm: {dbm}') # else: # logging.info(f'{self.host}: exp_id:{exp_id}, slot_id:{slot_id}, lglane:{reg.logic_lane}, lane:{reg.lane}, bank:0x{reg.bank:02X}, page:0x{reg.page:02X}, offset:0x{reg.offset:02X}, value:{value}') # break # return laneRegValueList # def SetPrivateLinkRegister(self, exp_id: int, slot_id: int, # lane_list: List[int], regs: List[str], reg_wt_value: str): # for wt_reg_name in regs: # for reg_group in self.reg_table: # if reg_group.name == wt_reg_name: # logging.info(f'Write register: {reg_group.name}, description: {reg_group.description}') # for reg in reg_group.registers: # if reg.logic_lane in lane_list: # logging.info(f'-----read logic lane: {reg.logic_lane}, phys lane: {reg.lane}') # else: # continue # size = 1 # ret = -1 # if reg.size is not None: # size = reg.size # if reg.size == 2: # value_hex_str=f"{reg.value:04x}" # else: # value_hex_str=f"{reg.value:02x}" # if reg.end_offset is not None: # loop = int((reg.end_offset - reg.offset + 1)/size) # size = loop * size # #logging.info(f"------loop:{loop}") # value = '' # for i in range(loop): # value += value_hex_str # ret = self.bmc.SetOpticalModuleRegs(exp_id, slot_id=slot_id, # bank=reg.bank, page=reg.page, # reg_offset=reg.offset, size=size, # hex_str=value) # logging.info(f'exp_id:{exp_id}, slot_id: {slot_id}, bank:0x{reg.bank:02X}, page:0x{reg.page:02X}, offset:0x{reg.offset:02X}, value:{value}, size: {size}, status: {ret}') # else: # write_value = value_hex_str # if reg_wt_value is not '': # dec = int(reg_wt_value) # write_value = '' # if reg.size == 1: # write_value = format(dec, '02X') # size = 1 # elif reg.size == 2: # write_value = format(dec, '04X') # size = 2 # else: # logging.error(f'error value:{reg_wt_value}') # if write_value is not '': # logging.info(f'reg_wt_value: {reg_wt_value}, value_hex_str: {write_value}') # # logging.info(f'-----------------------write hex_str: {write_value}') # ret = self.bmc.SetOpticalModuleRegs(exp_id, slot_id=slot_id, # bank=reg.bank, page=reg.page, # reg_offset=reg.offset, size=size, # hex_str=write_value) # # logging.info(f'exp_id:{exp_id}, slot_id: {slot_id}, bank:{reg.bank}, page:{reg.page}, offset:{reg.offset}, value:{value_hex_str}, status: {ret}')