You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1586 lines
77 KiB
1586 lines
77 KiB
# uncompyle6 version 3.9.3 |
|
# Python bytecode version base 3.6 (3379) |
|
# Decompiled from: Python 3.6.8 (default, May 21 2025, 11:12:56) |
|
# [GCC 8.5.0 20210514 (Anolis 8.5.0-10.0.1)] |
|
# Embedded file name: /xz/gyou/nexusbench/toolbox/mgc_tuning_tool.py |
|
# Compiled at: 2025-12-25 14:52:30 |
|
# Size of source mod 2**32: 78005 bytes |
|
from dataclasses import dataclass, field |
|
import logging, time, os, json, sys |
|
from typing import Dict, List, Any, Optional, Tuple |
|
import matplotlib.pyplot as plt |
|
from gpu.biren.exp_util import DevWhiteRiverExp |
|
from toolbox.opt_reg_access_tool import OptRegAccessTool |
|
from toolbox.prbs_tool import PrbsTool, RetimerPrbsResult |
|
from parser.topology_parser import TopoMappingParser |
|
from parser.transceiver_config_parser import TransceiverConfigParser, RegisterInfo |
|
from toolbox.bathtub_curve import BathtubCurve |
|
|
|
@dataclass |
|
class LaneErrInfo: |
|
host: str = "" |
|
exp: int = -1 |
|
route: str = "" |
|
slot_id: int = -1 |
|
logic_lane: int = -1 |
|
rt_phys_id: int = -1 |
|
rt_phys_lane: int = -1 |
|
peer_slot_id: int = -1 |
|
peer_ibias_lane: int = -1 |
|
peer_logic_lane: int = -1 |
|
peer_rt_phys_id: int = -1 |
|
peer_rt_phys_lane: int = -1 |
|
preset: int = -1 |
|
mgc: int = -1 |
|
mgc_err_map: Dict[int, int] = field(default_factory=dict) |
|
lowfreq_err_map: Dict[int, int] = field(default_factory=dict) |
|
tmp_err_map: Dict[int, int] = field(default_factory=dict) |
|
vpeak: int = -1 |
|
tia_peak: int = -1 |
|
ibias: int = -1 |
|
ipcurrent: int = -1 |
|
opcurrent: int = -1 |
|
lowfreq_eq: int = -1 |
|
highfreq_eq: int = -1 |
|
vgc_set: int = -1 |
|
drv_vpeak: int = -1 |
|
rssi1: int = -1 |
|
rssi2: int = -1 |
|
tmp_raw_value: int = -1 |
|
ctle: int = -1 |
|
|
|
|
|
class MgcTuningTool: |
|
|
|
def __init__(self, host:str, reg_table: TransceiverConfigParser, reg_tool: OptRegAccessTool, |
|
remote_reg_tool: OptRegAccessTool, |
|
bmc: DevWhiteRiverExp, topo_map: TopoMappingParser, prbs_tool: PrbsTool, route_name: str): |
|
self.host = host |
|
self.reg_table = reg_table |
|
self.bmc = bmc |
|
self.target_vpeak = 0 |
|
self.local_reg_tool = reg_tool |
|
self.remote_reg_tool = remote_reg_tool |
|
self.target_vpeaks_file = "target_vpeaks.json" |
|
self.mode = route_name |
|
self.topo_map = topo_map |
|
self.prbs_tool = prbs_tool |
|
self.route_name = route_name |
|
self.is_onet = self.remote_reg_tool != self.local_reg_tool |
|
|
|
def debug_calc_target_vpeak_new(self, exp_id: int, slot_id: int) -> Optional[Dict[(int, int)]]: |
|
target_vpeaks = {} |
|
logging.info(f"-------slot {slot_id}") |
|
self.local_reg_tool.enable_tia_agc(exp_id, slot_id) |
|
time.sleep(0.05) |
|
logging.info("----------step 2: toggle RF off") |
|
if not self.toogle_rf(exp_id, slot_id, "off"): |
|
logging.error(f"slot {slot_id}: toggle RF off fail") |
|
return target_vpeaks |
|
else: |
|
self.local_reg_tool.write_agc_reg_all_lane(exp_id, slot_id, 0) |
|
time.sleep(0.5) |
|
logging.info("----------step 3: get base_line_vpeaks, and calculate target vpeaks") |
|
base_line_vpeaks = self.local_reg_tool.read_vpeak_reg_all_lane(exp_id, slot_id) |
|
logging.info(f"base_line_vpeaks: {base_line_vpeaks}") |
|
self.toogle_rf(exp_id, slot_id, "on") |
|
self.local_reg_tool.write_agc_reg_all_lane(exp_id, slot_id, 255) |
|
time.sleep(0.5) |
|
max_vpeaks = self.local_reg_tool.read_vpeak_reg_all_lane(exp_id, slot_id) |
|
|
|
logging.info(f"max_vpeaks: {max_vpeaks}") |
|
percent_target_vpeaks = {} |
|
for lane, base_vpeak in base_line_vpeaks.items(): |
|
numerator = 19 |
|
vpkdelta = round((max_vpeaks[lane] - base_vpeak) * (numerator / 29)) |
|
percent_target_vpeaks[lane] = vpkdelta + base_vpeak |
|
diff = max_vpeaks[lane] - base_line_vpeaks[lane] |
|
if diff < 5: |
|
logging.error(f"host: {self.host}, exp: {exp_id}, slot: {slot_id}, lane: {lane}, max_vpeak is error") |
|
sys.exit(-1) |
|
|
|
logging.info(f"percent_target_vpeaks: {percent_target_vpeaks}") |
|
diff_target_vpeaks = {} |
|
for lane, base_vpeak in base_line_vpeaks.items(): |
|
diff = max_vpeaks[lane] - base_line_vpeaks[lane] |
|
if diff <= 22: |
|
logging.error(f"host: {self.host}, exp: {exp_id}, slot: {slot_id}, lane: {lane}, max_vpeak is error") |
|
diff_target_vpeaks[lane] = diff - 2 + base_vpeak |
|
else: |
|
diff_target_vpeaks[lane] = 21 + base_vpeak |
|
|
|
target_vpeaks = diff_target_vpeaks |
|
logging.info(f"diff_target_vpeaks: {target_vpeaks}") |
|
logging.info("\n" + "=" * 90) |
|
logging.info(f'{"SLOT":<6} {"LANE":<6} {"BASE":<8} {"MAX":<8} {"RANGE":<8} {"PERCENT_TGT":<12} {"DIFF_TGT":<12} {"PCT_OFF":<10} {"DIFF_OFF":<10}') |
|
logging.info("=" * 90) |
|
for lane in sorted(base_line_vpeaks.keys()): |
|
base = base_line_vpeaks[lane] |
|
mx = max_vpeaks[lane] |
|
rng = mx - base |
|
pct_tgt = percent_target_vpeaks.get(lane, -1) |
|
diff_tgt = diff_target_vpeaks.get(lane, -1) |
|
pct_off = pct_tgt - base |
|
diff_off = diff_tgt - base |
|
logging.info(f"{slot_id:<6} {lane:<6} {base:<8} {mx:<8} {rng:<8} {pct_tgt:<12} {diff_tgt:<12} {pct_off:<10} {diff_off:<10}") |
|
|
|
logging.info("=" * 90 + "\n") |
|
self.local_reg_tool.disable_tia_agc(exp_id, slot_id) |
|
time.sleep(0.05) |
|
try: |
|
agc_start, agc_end, agc_step = (80, 180, 2) |
|
agc_values = list(range(agc_start, agc_end + 1, agc_step)) |
|
all_vpeaks_scan = {lane: [] for lane in base_line_vpeaks.keys()} |
|
logging.info(f"Scanning AGC from {agc_start} to {agc_end} step {agc_step}...") |
|
for agc in agc_values: |
|
self.local_reg_tool.write_tmp_mgc_reg_all_lane(exp_id, slot_id, agc) |
|
# self.local_reg_tool.write_agc_reg_all_lane(exp_id, slot_id, agc) |
|
time.sleep(0.1) |
|
vpeaks = self.local_reg_tool.read_vpeak_reg_all_lane(exp_id, slot_id) |
|
for lane in base_line_vpeaks.keys(): |
|
all_vpeaks_scan[lane].append(vpeaks.get(lane, 0)) |
|
|
|
plt.figure(figsize=(20, 8)) |
|
for lane in sorted(base_line_vpeaks.keys()): |
|
plt.plot(agc_values, (all_vpeaks_scan[lane]), marker="o", label=f"lane {lane}") |
|
|
|
for lane in base_line_vpeaks.keys(): |
|
plt.scatter([80], [base_line_vpeaks[lane]], color="blue") |
|
plt.scatter([180], [max_vpeaks[lane]], color="red") |
|
|
|
plt.title(f"Vpeak vs AGC (exp={exp_id}, slot={slot_id})") |
|
plt.xlabel("AGC Value") |
|
plt.ylabel("Vpeak") |
|
plt.legend() |
|
plt.grid(True) |
|
plt.tight_layout() |
|
plot_dir = "agc_vpeak_plots" |
|
os.makedirs(plot_dir, exist_ok=True) |
|
plot_path = os.path.join(plot_dir, f"{self.local_reg_tool.host}_agc_vpeak_exp{exp_id}_slot{slot_id}.png") |
|
plt.savefig(plot_path) |
|
logging.info(f"AGC-Vpeak curve saved to: {plot_path}") |
|
plt.close() |
|
except Exception as e: |
|
logging.warning(f"Failed to generate AGC scan plot: {e}") |
|
|
|
if target_vpeaks: |
|
self.save_target_vpeaks_to_json(exp_id, slot_id, target_vpeaks) |
|
return target_vpeaks |
|
|
|
def auto_tune(self, exp_id: int, slot_id: int, lane_list: List[int]) -> bool: |
|
lane_list = [ |
|
7, 6, 5, 4, 3, 2, 1, 0] |
|
# self.debug_calc_target_vpeak_new(exp_id, slot_id) |
|
|
|
target_vpeaks = self.calc_target_vpeak_new(exp_id, slot_id, lane_list) |
|
if target_vpeaks is None: |
|
return False |
|
|
|
time.sleep(0.1) |
|
logging.info("----------step 7: find optimal MGC") |
|
self.match_optimal_mgc_new(exp_id, slot_id, lane_list, target_vpeaks) |
|
return True |
|
|
|
def auto_tia_peak_tune(self, exp_id: int, retimers): |
|
tune_value_list = [0, 30, 60, 90, 120, 150, 180, 210, 240] |
|
self.general_auto_tune_value(exp_id, retimers, "tia_peak", tune_value_list, self.local_reg_tool) |
|
|
|
def auto_highfreq_tune(self, exp_id: int, retimers): |
|
tune_value_list = [0, 20, 40, 60, 80, 100, 120, 140, 160, 180, 200, 220, 240] |
|
self.general_auto_tune_value(exp_id, retimers, "highfreq_eq", tune_value_list, self.remote_reg_tool) |
|
|
|
def rt_ibias(self, exp_id: int, retimers): |
|
tune_value_list = [ |
|
1647, 3647] |
|
self.ibias_auto_tune_value_new(exp_id, retimers, "ibias_runtime", tune_value_list, self.remote_reg_tool, False) |
|
|
|
def auto_ibias_tune2(self, exp_id: int, retimers): |
|
tune_value_list = [ |
|
1647, 1847, 2047, 2247, 2447, 2647, 2847, 3047, |
|
3247, 3447, 3647] |
|
self.ibias_auto_tune_value(exp_id, retimers, "ibias_runtime", tune_value_list, self.remote_reg_tool, False) |
|
|
|
def auto_opcurrent_tune(self, exp_id: int, retimers): |
|
tune_value_list = [130, 140, 150, 160, 120, 110, 90, 80, 100, 170, 180] |
|
self.general_auto_tune_value(exp_id, retimers, "opcurrent", tune_value_list, self.remote_reg_tool) |
|
|
|
def auto_opcurrent_tune_test(self, exp_id: int, retimers): |
|
tune_value_list = [110, 130, 150, 170, 90, 70, 50, 190, 210, 230] |
|
self.general_auto_tune_value(exp_id, retimers, "opcurrent", tune_value_list, self.remote_reg_tool) |
|
|
|
def auto_lowfreq_tune(self, exp_id: int, retimers): |
|
tune_value_list = [ |
|
120, 150, 180, 210, 240, 90, 60, 30, 0] |
|
self.general_auto_tune_value(exp_id, retimers, "lowfreq_eq", tune_value_list, self.remote_reg_tool) |
|
|
|
def auto_ctle_tune(self, exp_id: int, retimers): |
|
tune_value_list = [ |
|
0, 2, 4, 6, 8, 10, 12, 14, 16, 18] |
|
self.general_auto_tune_value(exp_id, retimers, "lowfreq_eq", tune_value_list, self.remote_reg_tool) |
|
|
|
def auto_tune2(self, exp_id: int, retimers) -> bool: |
|
|
|
self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers) |
|
|
|
logging.info("Starting auto_tune2...") |
|
|
|
lane_data_list: List[LaneErrInfo] = [] |
|
|
|
# MGC_OFFSETS = [0, 2, -2, 4, -4, 6, -6, 8, -8, 10, -10, 12, -12, 14, -14] |
|
# MGC_OFFSETS = [0, 2] |
|
MGC_OFFSETS = [0, 2, -2, 4, -4, 6, -6, 8, -8, 10, -10] |
|
|
|
for slot_id in [0, 1, 2, 3, 4, 5, 6, 7]: |
|
for lane in [0, 1, 2, 3, 4, 5, 6, 7]: |
|
current_mgc = self.local_reg_tool.read_mgc_reg(exp_id, slot_id, lane) |
|
current_mgc = int(current_mgc) |
|
|
|
mgc_points = [current_mgc + offset for offset in MGC_OFFSETS] |
|
|
|
mgc_err_map = {mgc: -1 for mgc in mgc_points} |
|
|
|
success, rt_phys_id, rt_phys_lane = self.topo_map.get_phys_rtmr_by_slot(slot_id, lane, self.route_name) |
|
if success == False: |
|
logging.error(f'get_phys_rtmr_by_slot failed, result: {success}') |
|
return False |
|
|
|
lane_info = LaneErrInfo( |
|
slot_id=slot_id, |
|
logic_lane=lane, |
|
rt_phys_id=rt_phys_id, |
|
rt_phys_lane=rt_phys_lane, |
|
mgc_err_map=mgc_err_map, |
|
mgc= current_mgc |
|
) |
|
|
|
lane_data_list.append(lane_info) |
|
|
|
# logging.info( |
|
# f"Initialized: Slot {slot_id}, Lane {lane}, " |
|
# f"Base MGC={current_mgc}, Target MGCs={mgc_points}, ERRs={[-1.0]*5}" |
|
# ) |
|
|
|
# logging.info(lane_data_list) |
|
|
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True) |
|
|
|
all_unlocked = all(not prbs.locked for prbs in prbs_results) |
|
if all_unlocked: |
|
logging.error(f"PRBS check failed: ALL lanes are UNLOCKED for exp_id={exp_id}. Aborting auto_tune2.") |
|
return False |
|
|
|
time.sleep(2) |
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, False) |
|
|
|
# return True |
|
|
|
# logging.info(f'-------------- prbs_results: {prbs_results}') |
|
|
|
for prbs in prbs_results: |
|
local_slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name) |
|
if not local_slot: |
|
logging.error(f'local slot is none!') |
|
return False |
|
slot_id, lane_id = local_slot |
|
# logging.info(f'------------------ slot: {slot_id}, lane_id: {lane_id}') |
|
for info in lane_data_list: |
|
if info.slot_id == slot_id and info.logic_lane == lane_id: |
|
if prbs.locked: |
|
info.mgc_err_map[info.mgc] = prbs.err_count |
|
else: |
|
info.mgc_err_map[info.mgc] = 65535 |
|
# logging.info(f'prbs.locked: {prbs.locked}, prbs.err_count: {prbs.err_count}') |
|
break |
|
|
|
# return True |
|
|
|
for offset in [offset for offset in MGC_OFFSETS if offset != 0]: |
|
for info in lane_data_list: |
|
new_mgc = info.mgc + offset |
|
self.local_reg_tool.write_tmp_mgc_reg(exp_id, info.slot_id, info.logic_lane, new_mgc) |
|
time.sleep(0.05) |
|
|
|
# for slot_id in [0,1,2,3,4,5,6,7]: |
|
# self.local_reg_tool.write_confirm_reg(exp_id, slot_id) |
|
# time.sleep(0.05) |
|
|
|
time.sleep(0.5) |
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True) |
|
time.sleep(2) |
|
|
|
# 测 PRBS |
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers) |
|
is_need_prbs_reset = False |
|
# 更新结果:当前 MGC 是 info.mgc + offset |
|
for prbs in prbs_results: |
|
local_slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name) |
|
if not local_slot: |
|
logging.error(f'local slot is none!') |
|
return False |
|
slot_id, lane_id = local_slot |
|
|
|
for info in lane_data_list: |
|
if info.slot_id == slot_id and info.logic_lane == lane_id: |
|
target_mgc = info.mgc + offset |
|
if prbs.locked: |
|
info.mgc_err_map[target_mgc] = prbs.err_count |
|
else: |
|
info.mgc_err_map[target_mgc] = 69999 |
|
is_need_prbs_reset = True |
|
# logging.info(f'------update mgc err, {target_mgc}: {info.mgc_err_map[target_mgc]}') |
|
break |
|
|
|
if is_need_prbs_reset: |
|
self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers) |
|
self.print_err_matrix(lane_data_list, MGC_OFFSETS) |
|
|
|
# 参数复原 |
|
# logging.info(f'------------len: {len(lane_data_list)}') |
|
for info in lane_data_list: |
|
new_mgc = info.mgc |
|
self.local_reg_tool.write_tmp_mgc_reg(exp_id, info.slot_id, info.logic_lane, new_mgc) |
|
time.sleep(0.05) |
|
|
|
# for slot_id in [0,1,2,3,4,5,6,7]: |
|
# self.local_reg_tool.write_confirm_reg(exp_id, slot_id) |
|
# time.sleep(0.05) |
|
# return True |
|
|
|
# Step 4: 调用简单调优,返回与 lane_data_list 一一对应的推荐 offset 列表 |
|
recommended_mgcs, bad_lanes = self.simple_mgc_tuner(lane_data_list, bad_threshold=1) |
|
|
|
tuning_success = True |
|
|
|
for info, recommended_mgc in zip(lane_data_list, recommended_mgcs): |
|
slot_id = info.slot_id |
|
lane_id = info.logic_lane |
|
current_mgc = info.mgc |
|
target_mgc = recommended_mgc |
|
diff_mgc = target_mgc - current_mgc |
|
|
|
# 日志输出 |
|
if diff_mgc != 0: |
|
logging.info( |
|
f"Slot {slot_id}, Lane {lane_id}: MGC adjusted from {current_mgc} " |
|
f"→ {target_mgc} (offset={diff_mgc:.1f})" |
|
) |
|
# logging.info(f"Applying MGC reg write: exp_id={exp_id}, slot={slot_id}, lane={lane_id}, mgc={target_mgc}") |
|
# self.local_reg_tool.write_mgc_reg(exp_id, slot_id, lane_id, target_mgc) |
|
# time.sleep(0.05) |
|
# self.local_reg_tool.write_confirm_reg(exp_id, slot_id) |
|
# time.sleep(0.05) |
|
# else: |
|
# logging.info( |
|
# f"Slot {slot_id}, Lane {lane_id}: MGC {current_mgc} is optimal. No change." |
|
# ) |
|
|
|
tuning_success = True |
|
if bad_lanes: |
|
tuning_success = False |
|
for slot, lane in bad_lanes: |
|
logging.warning(f"Slot {slot}, Lane {lane}: CRITICAL - Too few clean MGC points (<=5). Signal integrity issue?") |
|
|
|
return True |
|
|
|
def low_ibias_tune2(self, exp_id: int, retimers, reg_name: str, value_offset_list: Optional[List[int]]=None, value_list: Optional[List[int]]=None) -> bool: |
|
logging.info("Starting auto_tune2...") |
|
lane_data_list = [] |
|
MGC_OFFSETS = [ |
|
0, 2, -2, 4, -4, 6, -6, 8, -8] |
|
for slot_id in (0, 1, 2, 3, 4, 5, 6, 7): |
|
for lane in (0, 1, 2, 3, 4, 5, 6, 7): |
|
if value_offset_list is not None: |
|
current_value = self.local_reg_tool.read_opt_reg(exp_id, slot_id, lane, reg_name) |
|
current_value = int(current_value) |
|
value_points = [current_value + offset for offset in value_offset_list] |
|
tmp_err_map = {value: -1 for value in value_points} |
|
success, rt_phys_id, rt_phys_lane = self.topo_map.get_phys_rtmr_by_slot(slot_id, lane, self.route_name) |
|
if success == False: |
|
logging.error(f"get_phys_rtmr_by_slot failed, result: {success}") |
|
return False |
|
lane_info = LaneErrInfo(slot_id=slot_id, |
|
logic_lane=lane, |
|
rt_phys_id=rt_phys_id, |
|
rt_phys_lane=rt_phys_lane, |
|
mgc_err_map=tmp_err_map, |
|
mgc=current_value) |
|
lane_data_list.append(lane_info) |
|
|
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True) |
|
all_unlocked = all(not prbs.locked for prbs in prbs_results) |
|
if all_unlocked: |
|
logging.error(f"PRBS check failed: ALL lanes are UNLOCKED for exp_id={exp_id}. Aborting auto_tune2.") |
|
return False |
|
else: |
|
time.sleep(2) |
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, False) |
|
for prbs in prbs_results: |
|
local_slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name) |
|
if not local_slot: |
|
logging.error("local slot is none!") |
|
return False |
|
slot_id, lane_id = local_slot |
|
for info in lane_data_list: |
|
if info.slot_id == slot_id: |
|
if info.logic_lane == lane_id: |
|
if prbs.locked: |
|
info.tmp_err_map[info.tmp_value] = prbs.err_count |
|
else: |
|
info.tmp_err_map[info.tmp_value] = 65535 |
|
break |
|
|
|
for offset in [offset for offset in MGC_OFFSETS if offset != 0]: |
|
for info in lane_data_list: |
|
new_mgc = info.mgc + offset |
|
self.local_reg_tool.write_mgc_reg(exp_id, info.slot_id, info.logic_lane, new_mgc) |
|
|
|
for slot_id in (0, 1, 2, 3, 4, 5, 6, 7): |
|
self.local_reg_tool.write_confirm_reg(exp_id, slot_id) |
|
|
|
time.sleep(0.5) |
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True) |
|
time.sleep(2) |
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers) |
|
for prbs in prbs_results: |
|
local_slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name) |
|
if not local_slot: |
|
logging.error("local slot is none!") |
|
return False |
|
slot_id, lane_id = local_slot |
|
for info in lane_data_list: |
|
if info.slot_id == slot_id: |
|
if info.logic_lane == lane_id: |
|
target_mgc = info.mgc + offset |
|
if prbs.locked: |
|
info.mgc_err_map[target_mgc] = prbs.err_count |
|
else: |
|
info.mgc_err_map[target_mgc] = 65535 |
|
break |
|
|
|
self.print_err_matrix(lane_data_list, MGC_OFFSETS) |
|
for info in lane_data_list: |
|
new_mgc = info.mgc |
|
self.local_reg_tool.write_mgc_reg(exp_id, info.slot_id, info.logic_lane, new_mgc) |
|
|
|
recommended_mgcs, bad_lanes = self.simple_mgc_tuner(lane_data_list, bad_threshold=1) |
|
tuning_success = True |
|
for info, recommended_mgc in zip(lane_data_list, recommended_mgcs): |
|
slot_id = info.slot_id |
|
lane_id = info.logic_lane |
|
current_mgc = info.mgc |
|
target_mgc = recommended_mgc |
|
diff_mgc = target_mgc - current_mgc |
|
if diff_mgc != 0: |
|
logging.info(f"Slot {slot_id}, Lane {lane_id}: MGC adjusted from {current_mgc} → {target_mgc} (offset={diff_mgc:.1f})") |
|
logging.info(f"Applying MGC reg write: exp_id={exp_id}, slot={slot_id}, lane={lane_id}, mgc={target_mgc}") |
|
self.local_reg_tool.write_mgc_reg(exp_id, slot_id, lane_id, target_mgc) |
|
self.local_reg_tool.write_confirm_reg(exp_id, slot_id) |
|
|
|
tuning_success = True |
|
if bad_lanes: |
|
tuning_success = False |
|
for slot, lane in bad_lanes: |
|
logging.warning(f"Slot {slot}, Lane {lane}: CRITICAL - Too few clean MGC points (<=5). Signal integrity issue?") |
|
|
|
return True |
|
|
|
def general_auto_tune_value_new(self, exp_id: int, retimers, reg_name: str, tune_value_list: List[int], reg_tool: OptRegAccessTool, is_overwrite=False) -> bool: |
|
logging.info("Starting auto_tune2...") |
|
lane_data_list = [] |
|
lane_list = [ |
|
0, 1, 2, 3, 4, 5, 6, 7] |
|
slot_list = [0, 1, 2, 3, 4, 5, 6, 7] |
|
if reg_name in ('ibias', 'opcurrent'): |
|
reg_tool = self.remote_reg_tool |
|
else: |
|
reg_tool = self.local_reg_tool |
|
for rt_phys_id in retimers: |
|
for rt_phys_lane in range(0, 16): |
|
if reg_tool == self.local_reg_tool: |
|
slot = self.topo_map.get_local_slot_by_retimer(rt_phys_id, rt_phys_lane, self.route_name) |
|
else: |
|
slot = self.topo_map.get_remote_slot_by_retimer(rt_phys_id, rt_phys_lane, self.route_name) |
|
if not slot: |
|
logging.error("local slot is none!") |
|
return False |
|
slot_id, lane_id = slot |
|
raw_value = reg_tool.read_opt_reg(exp_id, slot_id, lane_id, reg_name) |
|
tmp_err_map = {value: -1 for value in tune_value_list} |
|
lane_info = LaneErrInfo(host=(reg_tool.host), exp=exp_id, slot_id=slot_id, logic_lane=lane_id, rt_phys_id=rt_phys_id, |
|
rt_phys_lane=rt_phys_lane, |
|
tmp_err_map=tmp_err_map, |
|
tmp_raw_value=raw_value) |
|
lane_data_list.append(lane_info) |
|
|
|
time.sleep(0.05) |
|
is_need_reset_prbs = True |
|
for value in tune_value_list: |
|
for info in lane_data_list: |
|
reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, value, reg_name) |
|
time.sleep(0.05) |
|
|
|
time.sleep(1) |
|
if is_need_reset_prbs: |
|
logging.info(f"-------- is_need_reset_prbs: {is_need_reset_prbs}") |
|
self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers) |
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True) |
|
time.sleep(2) |
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers) |
|
is_need_reset_prbs = False |
|
for prbs in prbs_results: |
|
if reg_tool == self.local_reg_tool: |
|
slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name) |
|
else: |
|
slot = self.topo_map.get_remote_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name) |
|
if not slot: |
|
logging.error("local slot is none!") |
|
return False |
|
slot_id, lane_id = slot |
|
for info in lane_data_list: |
|
if info.slot_id == slot_id: |
|
if info.logic_lane == lane_id: |
|
if prbs.locked: |
|
info.tmp_err_map[value] = prbs.err_count |
|
elif value in info.tmp_err_map: |
|
info.tmp_err_map[value] = 69999 |
|
else: |
|
logging.error(f"--------Key {value} not found in tmp_err_map, skipping assignment.") |
|
is_need_reset_prbs = True |
|
break |
|
|
|
logging.info(lane_data_list) |
|
self.print_general_err_matrix(lane_data_list, tune_value_list) |
|
time.sleep(0.05) |
|
for info in lane_data_list: |
|
reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, info.tmp_raw_value, reg_name) |
|
time.sleep(0.05) |
|
|
|
recommended_value, bad_lanes = self.simple_value_tuner(lane_data_list) |
|
for info, recommended_value in zip(lane_data_list, recommended_value): |
|
diff_value = recommended_value - info.tmp_raw_value |
|
if diff_value != 0: |
|
logging.info(f"Slot {info.slot_id}, Lane {info.logic_lane}: adjusted from {info.tmp_raw_value} → {recommended_value} (offset={diff_value:.1f})") |
|
logging.info(f"Applying {reg_name} reg write: exp_id={exp_id}, slot={info.slot_id}, lane={info.logic_lane}, {reg_name}={recommended_value}") |
|
if is_overwrite: |
|
reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, recommended_value, reg_name) |
|
|
|
if bad_lanes: |
|
for slot, lane in bad_lanes: |
|
logging.error(f"Slot {slot}, Lane {lane}: CRITICAL - Too few clean points (<=5). Bad Modules") |
|
|
|
return True |
|
|
|
def ibias_auto_tune_value_new(self, exp_id: int, retimers, reg_name: str, tune_value_list: List[int], reg_tool: OptRegAccessTool, is_overwrite=False) -> bool: |
|
logging.info("Starting ibias_auto_tune_value_new...") |
|
local_data_list: List[LaneErrInfo] = [] |
|
remote_data_list: List[LaneErrInfo] = [] |
|
local_slot_list = list(range(8)) |
|
local_lane_list = list(range(8)) |
|
remote_slot_list = list(range(8)) |
|
remote_lane_list = list(range(8)) |
|
|
|
# 存储结构:{slot_id: ibias_dict} |
|
remote_raw_ibias: Dict[int, Dict] = {} |
|
local_raw_ibias: Dict[int, Dict] = {} |
|
|
|
# init remote(ibias) |
|
for slot_id in remote_slot_list: |
|
self.remote_reg_tool.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 180, 136, 1, "01") |
|
remote_raw_ibias = self.remote_reg_tool.read_ibias_reg_all_lane(exp_id, slot_id) |
|
time.sleep(0.05) |
|
|
|
if self.is_onet: |
|
for slot_id in local_slot_list: |
|
self.local_reg_tool.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 180, 136, 1, "01") |
|
local_raw_ibias = self.local_reg_tool.read_ibias_reg_all_lane(exp_id, slot_id) |
|
time.sleep(0.05) |
|
|
|
# init remote (ibias) |
|
for slot_id in local_slot_list: |
|
for lane in local_lane_list: |
|
peer_info = self.topo_map.get_remote_retimer_and_slot_by_slot(slot_id, lane, self.route_name) |
|
if not peer_info: |
|
logging.error(f"peer_info is None for slot {slot_id} lane {lane}") |
|
return False |
|
remote_rt_logic_id, remote_rt_logic_lane, remote_slot_id, remote_lane_id = peer_info |
|
|
|
tmp_err_map = {value: -1 for value in tune_value_list} |
|
|
|
result = self.topo_map.get_phys_rtmr_by_slot(slot_id, lane, self.route_name) |
|
if result: |
|
success, rt_phys_id, rt_phys_lane = result |
|
if success == False: |
|
logging.error(f"get_phys_rtmr_by_slot failed, result: {success}") |
|
return False |
|
lane_info = LaneErrInfo(host=(self.local_reg_tool.host), exp=exp_id, |
|
route=(self.route_name.split("-")[-1]), |
|
slot_id=slot_id, logic_lane=lane, rt_phys_id=rt_phys_id, rt_phys_lane=rt_phys_lane, |
|
peer_slot_id=remote_slot_id, peer_logic_lane=remote_lane_id, |
|
peer_rt_phys_id=remote_rt_logic_id, peer_rt_phys_lane=remote_rt_logic_lane, |
|
tmp_err_map=tmp_err_map) |
|
local_data_list.append(lane_info) |
|
|
|
# init local (ibias) |
|
if self.is_onet: |
|
for slot_id in remote_slot_list: |
|
for lane in remote_lane_list: |
|
peer_info = self.topo_map.get_remote_retimer_and_slot_by_slot(slot_id, lane, self.route_name) |
|
if not peer_info: |
|
logging.error(f"peer_info is None for slot {slot_id} lane {lane}") |
|
return False |
|
local_rt_logic_id, local_rt_logic_lane, local_slot_id, local_lane_id = peer_info |
|
|
|
tmp_err_map = {value: -1 for value in tune_value_list} |
|
|
|
result = self.topo_map.get_phys_rtmr_by_slot(slot_id, lane, self.route_name) |
|
if result: |
|
success, rt_phys_id, rt_phys_lane = result |
|
if success == False: |
|
logging.error(f"get_phys_rtmr_by_slot failed, result: {success}") |
|
return False |
|
lane_info = LaneErrInfo(host=(self.remote_reg_tool.host), exp=exp_id, |
|
route=(self.route_name.split("-")[-1]), |
|
slot_id=slot_id, logic_lane=lane, rt_phys_id=rt_phys_id, rt_phys_lane=rt_phys_lane, |
|
peer_slot_id=local_slot_id, peer_logic_lane=local_lane_id, |
|
peer_rt_phys_id=local_rt_logic_id, peer_rt_phys_lane=local_rt_logic_lane, |
|
tmp_err_map=tmp_err_map) |
|
remote_data_list.append(lane_info) |
|
|
|
time.sleep(0.05) |
|
is_need_reset_prbs = True |
|
for value in tune_value_list: |
|
for info in local_data_list: |
|
self.remote_reg_tool.write_vol_ibias_reg_all_lane(exp_id, info.peer_slot_id, value) |
|
time.sleep(0.05) |
|
|
|
if self.is_onet: |
|
for info in remote_data_list: |
|
self.local_reg_tool.write_vol_ibias_reg_all_lane(exp_id, info.peer_slot_id, value) |
|
time.sleep(0.05) |
|
|
|
time.sleep(1) |
|
if is_need_reset_prbs: |
|
logging.info(f"-------- is_need_reset_prbs: {is_need_reset_prbs}") |
|
self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers) |
|
|
|
prbs_results = self.prbs_tool.prbs_check_bidirection(exp_id, retimers, True) |
|
time.sleep(2) |
|
prbs_results = self.prbs_tool.prbs_check_bidirection(exp_id, retimers) |
|
|
|
# logging.info(f'--------- prbs_results: {prbs_results}') |
|
|
|
is_need_reset_prbs = False |
|
for prbs in prbs_results: |
|
for info in local_data_list: |
|
if info.host == prbs.host and info.rt_phys_id == prbs.rtmr_id \ |
|
and info.rt_phys_lane == prbs.rtmr_lane: |
|
if prbs.locked: |
|
info.tmp_err_map[value] = prbs.err_count |
|
elif value in info.tmp_err_map: |
|
info.tmp_err_map[value] = 69999 |
|
else: |
|
logging.error(f"--------Key {value} not found in tmp_err_map, skipping assignment.") |
|
is_need_reset_prbs = True |
|
break |
|
|
|
for info in remote_data_list: |
|
if info.host == prbs.host and info.rt_phys_id == prbs.rtmr_id \ |
|
and info.rt_phys_lane == prbs.rtmr_lane: |
|
if prbs.locked: |
|
info.tmp_err_map[value] = prbs.err_count |
|
elif value in info.tmp_err_map: |
|
info.tmp_err_map[value] = 69999 |
|
else: |
|
logging.error(f"--------Key {value} not found in tmp_err_map, skipping assignment.") |
|
is_need_reset_prbs = True |
|
break |
|
|
|
self.print_general_err_matrix(local_data_list, tune_value_list) |
|
self.print_general_err_matrix(remote_data_list, tune_value_list) |
|
time.sleep(0.05) |
|
|
|
for slot_id in remote_slot_list: |
|
self.remote_reg_tool.write_vol_ibias_reg_all_lane(exp_id, slot_id, wt_value_list=remote_raw_ibias[slot_id]) |
|
time.sleep(0.05) |
|
self.remote_reg_tool.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 180, 136, 1, "00") |
|
time.sleep(0.05) |
|
|
|
if self.is_onet: |
|
for slot_id in local_slot_list: |
|
self.local_reg_tool.write_vol_ibias_reg_all_lane(exp_id, slot_id, wt_value_list=remote_raw_ibias[slot_id]) |
|
time.sleep(0.05) |
|
self.local_reg_tool.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 180, 136, 1, "00") |
|
time.sleep(0.05) |
|
|
|
|
|
recommended_value, bad_lanes = self.simple_value_tuner(local_data_list) |
|
reg_name = "ibias" |
|
for info, recommended_value in zip(local_data_list, recommended_value): |
|
diff_value = recommended_value - info.tmp_raw_value |
|
if diff_value != 0: |
|
remote_info = self.topo_map.get_remote_retimer_and_slot_by_slot(info.slot_id, info.logic_lane, self.route_name) |
|
if not remote_info: |
|
logging.error(f"remote_info is None for slot {info.slot_id} lane {info.logic_lane}") |
|
return False |
|
remote_rt_logic_id, remote_rt_logic_lane, remote_slot_id, remote_lane_id = remote_info |
|
logging.info(f"Rx Slot: {info.slot_id},{info.logic_lane}, Tx Slot: {remote_slot_id}, {remote_lane_id}, adjusted from {info.tmp_raw_value} → {recommended_value} (offset={diff_value:.1f})") |
|
if is_overwrite: |
|
reg_tool.write_ibias_reg(exp_id, remote_slot_id, remote_lane_id, recommended_value) |
|
time.sleep(0.05) |
|
|
|
if bad_lanes: |
|
for slot, lane in bad_lanes: |
|
logging.error(f"Rx Slot {slot}, Rx Lane {lane}: CRITICAL - Too few clean points (<=5). Bad Modules") |
|
|
|
return True |
|
|
|
def ibias_auto_tune_value(self, exp_id: int, retimers, reg_name: str, tune_value_list: List[int], reg_tool: OptRegAccessTool, is_overwrite=False) -> bool: |
|
logging.info("Starting auto_tune2...") |
|
lane_data_list = [] |
|
rx_slot_list = [0, 1, 2, 3, 4, 5, 6, 7] |
|
tx_slot_list = [0, 1, 2, 3, 4, 5, 6, 7] |
|
|
|
for slot_id in rx_slot_list: |
|
reg_tool.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 180, 136, 1, "01") |
|
time.sleep(0.05) |
|
|
|
rx_lane_list = [0, 1, 2, 3, 4, 5, 6, 7] |
|
for rx_slot_id in rx_slot_list: |
|
for rx_lane in rx_lane_list: |
|
remote_info = self.topo_map.get_remote_retimer_and_slot_by_slot(rx_slot_id, rx_lane, self.route_name) |
|
if not remote_info: |
|
logging.error(f"remote_info is None for slot {rx_slot_id} lane {rx_lane}") |
|
return False |
|
remote_rt_logic_id, remote_rt_logic_lane, remote_slot_id, remote_lane_id = remote_info |
|
raw_value = reg_tool.read_opt_reg(exp_id, remote_slot_id, remote_lane_id, "ibias") |
|
tmp_err_map = {value: -1 for value in tune_value_list} |
|
result = self.topo_map.get_phys_rtmr_by_slot(rx_slot_id, rx_lane, self.route_name) |
|
if result: |
|
success, rt_phys_id, rt_phys_lane = result |
|
if success == False: |
|
logging.error(f"get_phys_rtmr_by_slot failed, result: {success}") |
|
return False |
|
lane_info = LaneErrInfo(host=(self.local_reg_tool.host), |
|
exp=exp_id, |
|
route=(self.route_name.split("-")[-1]), |
|
slot_id=rx_slot_id, |
|
logic_lane=rx_lane, |
|
rt_phys_id=rt_phys_id, |
|
rt_phys_lane=rt_phys_lane, |
|
tmp_err_map=tmp_err_map, |
|
tmp_raw_value=raw_value) |
|
lane_data_list.append(lane_info) |
|
|
|
time.sleep(0.05) |
|
is_need_reset_prbs = True |
|
for value in tune_value_list: |
|
for info in lane_data_list: |
|
remote_info = self.topo_map.get_remote_retimer_and_slot_by_slot(info.slot_id, info.logic_lane, self.route_name) |
|
if not remote_info: |
|
logging.error(f"remote_info is None for slot {info.slot_id} lane {info.logic_lane}") |
|
return False |
|
remote_rt_logic_id, remote_rt_logic_lane, remote_slot_id, remote_lane_id = remote_info |
|
reg_tool.write_opt_reg(exp_id, remote_slot_id, remote_lane_id, value, "ibias_runtime") |
|
time.sleep(0.05) |
|
|
|
time.sleep(1) |
|
if is_need_reset_prbs: |
|
logging.info(f"-------- is_need_reset_prbs: {is_need_reset_prbs}") |
|
self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers) |
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True) |
|
time.sleep(2) |
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers) |
|
|
|
# logging.info(f'--------- prbs_results: {prbs_results}') |
|
|
|
is_need_reset_prbs = False |
|
for prbs in prbs_results: |
|
for info in lane_data_list: |
|
if info.rt_phys_id == prbs.rtmr_id: |
|
if info.rt_phys_lane == prbs.rtmr_lane: |
|
if prbs.locked: |
|
info.tmp_err_map[value] = prbs.err_count |
|
elif value in info.tmp_err_map: |
|
info.tmp_err_map[value] = 69999 |
|
else: |
|
logging.error(f"--------Key {value} not found in tmp_err_map, skipping assignment.") |
|
is_need_reset_prbs = True |
|
break |
|
|
|
# logging.info(lane_data_list) |
|
self.print_general_err_matrix(lane_data_list, tune_value_list) |
|
time.sleep(0.05) |
|
|
|
for info in lane_data_list: |
|
reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, info.tmp_raw_value, "ibias_runtime") |
|
time.sleep(0.05) |
|
|
|
recommended_value, bad_lanes = self.simple_value_tuner(lane_data_list) |
|
reg_name = "ibias" |
|
for info, recommended_value in zip(lane_data_list, recommended_value): |
|
diff_value = recommended_value - info.tmp_raw_value |
|
if diff_value != 0: |
|
remote_info = self.topo_map.get_remote_retimer_and_slot_by_slot(info.slot_id, info.logic_lane, self.route_name) |
|
if not remote_info: |
|
logging.error(f"remote_info is None for slot {info.slot_id} lane {info.logic_lane}") |
|
return False |
|
remote_rt_logic_id, remote_rt_logic_lane, remote_slot_id, remote_lane_id = remote_info |
|
logging.info(f"Rx Slot: {info.slot_id},{info.logic_lane}, Tx Slot: {remote_slot_id}, {remote_lane_id}, adjusted from {info.tmp_raw_value} → {recommended_value} (offset={diff_value:.1f})") |
|
if is_overwrite: |
|
reg_tool.write_ibias_reg(exp_id, remote_slot_id, remote_lane_id, recommended_value) |
|
time.sleep(0.05) |
|
|
|
if bad_lanes: |
|
for slot, lane in bad_lanes: |
|
logging.error(f"Rx Slot {slot}, Rx Lane {lane}: CRITICAL - Too few clean points (<=5). Bad Modules") |
|
|
|
return True |
|
|
|
def general_auto_tune_value(self, exp_id: int, retimers, reg_name: str, tune_value_list: List[int], reg_tool: OptRegAccessTool, is_overwrite=False) -> bool: |
|
logging.info("Starting auto_tune2...") |
|
lane_data_list = [] |
|
lane_list = [ |
|
0, 1, 2, 3, 4, 5, 6, 7] |
|
for slot_id in (0, 1, 2, 3, 4, 5, 6, 7): |
|
for lane in lane_list: |
|
raw_value = reg_tool.read_opt_reg(exp_id, slot_id, lane, reg_name) |
|
tmp_err_map = {value: -1 for value in tune_value_list} |
|
result = self.topo_map.get_phys_rtmr_by_slot(slot_id, lane, self.route_name) |
|
if result: |
|
success, rt_phys_id, rt_phys_lane = result |
|
if success == False: |
|
logging.error(f"get_phys_rtmr_by_slot failed, result: {success}") |
|
return False |
|
lane_info = LaneErrInfo(host=(self.local_reg_tool.host), |
|
exp=exp_id, |
|
slot_id=slot_id, |
|
logic_lane=lane, |
|
rt_phys_id=rt_phys_id, |
|
rt_phys_lane=rt_phys_lane, |
|
tmp_err_map=tmp_err_map, |
|
tmp_raw_value=raw_value) |
|
lane_data_list.append(lane_info) |
|
|
|
time.sleep(0.05) |
|
is_need_reset_prbs = True |
|
for value in tune_value_list: |
|
for info in lane_data_list: |
|
reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, value, reg_name) |
|
time.sleep(0.05) |
|
|
|
time.sleep(1) |
|
if is_need_reset_prbs: |
|
logging.info(f"-------- is_need_reset_prbs: {is_need_reset_prbs}") |
|
self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers) |
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True) |
|
time.sleep(2) |
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers) |
|
is_need_reset_prbs = False |
|
for prbs in prbs_results: |
|
if reg_tool == self.local_reg_tool: |
|
slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name) |
|
else: |
|
slot = self.topo_map.get_remote_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name) |
|
if not slot: |
|
logging.error("local slot is none!") |
|
return False |
|
slot_id, lane_id = slot |
|
for info in lane_data_list: |
|
if info.slot_id == slot_id: |
|
if info.logic_lane == lane_id: |
|
if prbs.locked: |
|
info.tmp_err_map[value] = prbs.err_count |
|
elif value in info.tmp_err_map: |
|
info.tmp_err_map[value] = 69999 |
|
else: |
|
logging.error(f"--------Key {value} not found in tmp_err_map, skipping assignment.") |
|
is_need_reset_prbs = True |
|
break |
|
|
|
logging.info(lane_data_list) |
|
self.print_general_err_matrix(lane_data_list, tune_value_list) |
|
time.sleep(0.05) |
|
for info in lane_data_list: |
|
reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, info.tmp_raw_value, reg_name) |
|
time.sleep(0.05) |
|
|
|
recommended_value, bad_lanes = self.simple_value_tuner(lane_data_list) |
|
for info, recommended_value in zip(lane_data_list, recommended_value): |
|
diff_value = recommended_value - info.tmp_raw_value |
|
if diff_value != 0: |
|
logging.info(f"Slot {info.slot_id}, Lane {info.logic_lane}: adjusted from {info.tmp_raw_value} → {recommended_value} (offset={diff_value:.1f})") |
|
logging.info(f"Applying {reg_name} reg write: exp_id={exp_id}, slot={info.slot_id}, lane={info.logic_lane}, {reg_name}={recommended_value}") |
|
if is_overwrite: |
|
reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, recommended_value, reg_name) |
|
|
|
if bad_lanes: |
|
for slot, lane in bad_lanes: |
|
logging.error(f"Slot {slot}, Lane {lane}: CRITICAL - Too few clean points (<=5). Bad Modules") |
|
|
|
return True |
|
|
|
def simple_value_tuner(self, lane_data_list: List[LaneErrInfo], bad_threshold=1, min_good_count=5): |
|
recommendations = [] |
|
bad_lanes = [] |
|
for info in lane_data_list: |
|
err_map = info.tmp_err_map |
|
good_offsets = [value for value, err in err_map.items() if err < bad_threshold] |
|
if not good_offsets: |
|
recommended_value = info.tmp_raw_value |
|
else: |
|
recommended_value = int((min(good_offsets) + max(good_offsets)) / 2) |
|
if len(good_offsets) <= min_good_count: |
|
bad_lanes.append((info.slot_id, info.logic_lane)) |
|
recommendations.append(recommended_value) |
|
|
|
return (recommendations, bad_lanes) |
|
|
|
def simple_mgc_tuner(self, lane_data_list, bad_threshold=1, min_good_count=6): |
|
recommendations = [] |
|
bad_lanes = [] |
|
for info in lane_data_list: |
|
err_map = info.mgc_err_map |
|
good_offsets = [mgc for mgc, err in err_map.items() if err < bad_threshold] |
|
if not good_offsets: |
|
recommended_mgc = info.mgc |
|
else: |
|
recommended_mgc = int((min(good_offsets) + max(good_offsets)) / 2) |
|
if len(good_offsets) <= 5: |
|
bad_lanes.append((info.slot_id, info.logic_lane)) |
|
recommendations.append(recommended_mgc) |
|
|
|
return (recommendations, bad_lanes) |
|
|
|
def print_general_err_matrix(self, lane_data_list: List[LaneErrInfo], tune_value_list): |
|
tune_value_list = sorted(tune_value_list) |
|
header_parts = [ |
|
'Host', 'Exp', 'Route', 'Slot', 'Lane', 'RT_Id', |
|
'RT_Lane'] + [str(value) for value in tune_value_list] |
|
header_widths = [16, 6, 8, 6, 6, 6, 10] + [6] * len(tune_value_list) |
|
header_format = "{:<16} {:<6} {:<8} {:<6} {:<6} {:<6} {:<10} " + " ".join("{:<6}" for _ in tune_value_list) |
|
print((header_format.format)(*header_parts)) |
|
print("-" * (sum(header_widths) + len(header_widths) * 1)) |
|
for info in lane_data_list: |
|
try: |
|
err_counts = [] |
|
for value in tune_value_list: |
|
err = info.tmp_err_map.get(value, -2) |
|
if err < 0: |
|
err_str = "70000" |
|
else: |
|
err_str = (f"{err}") |
|
err_counts.append(err_str) |
|
|
|
data_format = "{:<16} {:<6} {:<8} {:<6} {:<6} {:<6} {:<10} " + " ".join("{:<6}" for _ in tune_value_list) |
|
print((data_format.format)(self.host, info.exp, info.route, info.slot_id, info.logic_lane, info.rt_phys_id, info.rt_phys_lane, *err_counts)) |
|
except Exception as e: |
|
logging.error(f"Error printing row for Slot {info.slot_id}, Lane {info.logic_lane}: {e}") |
|
continue |
|
|
|
def print_err_matrix(self, lane_data_list, mgc_offsets): |
|
mgc_offsets = sorted(mgc_offsets) |
|
|
|
header_parts = ["Host","Slot", "Lane", "RT_Id", "RT_Lane"] + [str(offset) for offset in mgc_offsets] |
|
header_widths = [16, 6, 6] + [6] * len(mgc_offsets) |
|
|
|
header_format = "{:<16} {:<6} {:<6} {:<6} {:<10} " + " ".join(f"{{:<6}}" for _ in mgc_offsets) |
|
print(header_format.format(*header_parts)) |
|
|
|
print("-" * (sum(header_widths) + len(header_widths) * 1)) |
|
|
|
for info in lane_data_list: |
|
base_mgc = info.mgc |
|
try: |
|
|
|
err_counts = [] |
|
for offset in mgc_offsets: |
|
target_mgc = base_mgc + offset |
|
err = info.mgc_err_map.get(target_mgc, -1.0) # 安全获取,防止 key 不存在 |
|
if err < 0: |
|
err_str = "N/A" |
|
else: |
|
err_str = f"{err}" |
|
err_counts.append(err_str) |
|
|
|
# 格式化输出 |
|
data_format = "{:<16} {:<6} {:<6} {:<6} {:<10} " + " ".join("{:<6}" for _ in mgc_offsets) |
|
print(data_format.format(self.host, info.slot_id, info.logic_lane, info.rt_phys_id, info.rt_phys_lane, *err_counts)) |
|
|
|
except Exception as e: |
|
logging.error(f"Error printing row for Slot {info.slot_id}, Lane {info.logic_lane}: {e}") |
|
continue |
|
|
|
def verify_rt_index_lane_index(self, rt_index: int, lane_index: int) -> Tuple[(bool, int, int)]: |
|
if rt_index in (1, 2, 3, 4): |
|
if 0 <= lane_index <= 7: |
|
new_rt_index = rt_index * 10 + 1 |
|
return ( |
|
True, new_rt_index, lane_index) |
|
else: |
|
if 8 <= lane_index <= 15: |
|
new_rt_index = rt_index * 10 + 2 |
|
new_lane_index = lane_index - 8 |
|
return (True, new_rt_index, new_lane_index) |
|
logging.error("Invalid lane_index: must be in [0, 15]") |
|
return (False, 0, 0) |
|
else: |
|
if rt_index in (11, 12, 21, 22, 31, 32, 41, 42): |
|
logging.info(f"use retimer logic index and lane, rt_index: {rt_index}, rt_lane: {lane_index}") |
|
if lane_index > 7: |
|
logging.error("For logic rt_index, lane_index must be in [0, 7]") |
|
return (False, 0, 0) |
|
return (True, rt_index, lane_index) |
|
else: |
|
logging.error(f"Invalid rt_index: {rt_index}") |
|
return (False, 0, 0) |
|
|
|
def manual_tune(self, exp_id: int, slot_id: int) -> bool: |
|
target_vpeaks = self.load_target_vpeaks_from_json(exp_id, slot_id) |
|
if target_vpeaks is None: |
|
logging.error("从文件加载target_vpeaks失败") |
|
return False |
|
else: |
|
self.match_optimal_mgc_new2(exp_id, slot_id, target_vpeaks) |
|
return True |
|
|
|
def calc_target_vpeak_new(self, exp_id: int, slot_id: int, lane_list: List[int]) -> Optional[Dict[(int, int)]]: |
|
target_vpeaks = {} |
|
logging.info(f"-------slot {slot_id}") |
|
time.sleep(0.05) |
|
self.local_reg_tool.enable_tia_agc(exp_id, slot_id) |
|
time.sleep(0.05) |
|
logging.info("----------step 2: toogle RF off") |
|
if not self.toogle_rf(exp_id, slot_id, "off"): |
|
logging.error(f"slot {slot_id}: toogle RF off fail") |
|
return target_vpeaks |
|
else: |
|
self.local_reg_tool.write_agc_reg_all_lane(exp_id, slot_id, 0) |
|
time.sleep(0.5) |
|
logging.info("----------step 3: get base_line_vpeaks, and calculate target vpeaks") |
|
base_line_vpeaks = self.local_reg_tool.read_vpeak_reg_all_lane(exp_id, slot_id) |
|
logging.info(f"base_line_vpeaks: {base_line_vpeaks}") |
|
self.toogle_rf(exp_id, slot_id, "on") |
|
self.local_reg_tool.write_agc_reg_all_lane(exp_id, slot_id, 255) |
|
time.sleep(0.5) |
|
max_vpeaks = self.local_reg_tool.read_vpeak_reg_all_lane(exp_id, slot_id) |
|
logging.info(f"max_vpeaks: {max_vpeaks}") |
|
for lane, base_vpeak in base_line_vpeaks.items(): |
|
numerator = 19 |
|
vpkdelta = round((max_vpeaks[lane] - base_vpeak) * (numerator / 29)) |
|
target_vpeaks[lane] = vpkdelta + base_vpeak |
|
diff = max_vpeaks[lane] - base_line_vpeaks[lane] |
|
if diff < 10: |
|
logging.error(f"host: {self.host}, exp: {exp_id}, slot: {slot_id}, lane: {lane}, max_vpeak is error") |
|
sys.exit(-1) |
|
|
|
# for lane, base_vpeak in base_line_vpeaks.items(): |
|
# diff = max_vpeaks[lane] - base_line_vpeaks[lane] |
|
# if diff <= 22: |
|
# logging.error(f"host: {self.host}, exp: {exp_id}, slot: {slot_id}, lane: {lane}, max_vpeak is error") |
|
# target_vpeaks[lane] = diff - 2 + base_vpeak |
|
# else: |
|
# target_vpeaks[lane] = 21 + base_vpeak |
|
|
|
logging.info(f"target_vpeaks: {target_vpeaks}") |
|
|
|
if target_vpeaks: |
|
self.save_target_vpeaks_to_json(exp_id, slot_id, target_vpeaks) |
|
self.local_reg_tool.disable_tia_agc(exp_id, slot_id) |
|
return target_vpeaks |
|
|
|
def calc_target_vpeak(self, exp_id: int, lane_list: List[int], slot_id: int) -> Dict[(int, int)]: |
|
target_vpeaks = {} |
|
logging.info(f"-------slot {slot_id}") |
|
logging.info("----------step 1: disable agc") |
|
self.disable_agc(exp_id, slot_id) |
|
logging.info("----------step 2: toogle RF off") |
|
if not self.toogle_rf(exp_id, slot_id, lane_list, "off"): |
|
logging.error(f"slot {slot_id}: toogle RF off fail") |
|
return target_vpeaks |
|
else: |
|
logging.info("----------step 3: get base_line_vpeaks, and calculate target vpeaks") |
|
base_line_vpeaks = self._read_vpeak_all_lanes(exp_id, slot_id, lane_list) |
|
logging.info(f"base_line_vpeaks: {base_line_vpeaks}") |
|
target_vpeaks = self.calculate_target_vpeak(base_line_vpeaks) |
|
logging.info(f"target_vpeaks: {target_vpeaks}") |
|
if target_vpeaks: |
|
self.save_target_vpeaks_to_json(exp_id, slot_id, target_vpeaks) |
|
logging.info("----------step 4: enable RF") |
|
self.toogle_rf(exp_id, slot_id, lane_list, "on") |
|
return target_vpeaks |
|
|
|
def disable_agc(self, exp_id: int, slot_id: int): |
|
logging.debug("----------disable_agc, step 1") |
|
agc_enable_regs = self.reg_table.get_registers_by_name("tia_agc_en") |
|
if not agc_enable_regs: |
|
logging.error("not find AGC_ENABLE register") |
|
return False |
|
else: |
|
logging.debug("----------disable_agc, step 2") |
|
success = True |
|
for reg in agc_enable_regs: |
|
logging.debug(f"----------disable_agc, step 2: reg: {reg}") |
|
result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id, |
|
slot_id=slot_id, |
|
bank=(reg.bank), |
|
page=(reg.page), |
|
reg_offset=(reg.offset), |
|
size=(reg.size), |
|
hex_str="00") |
|
time.sleep(0.05) |
|
logging.debug(f"----------disable_agc, step 3: result: {result}") |
|
if not result: |
|
logging.error(f"设置寄存器失败: bank={reg.bank}, page={reg.page}, offset={reg.offset}") |
|
success = False |
|
|
|
return success |
|
|
|
def enable_agc(self, exp_id: int, slot_id: int): |
|
time.sleep(0.05) |
|
logging.debug("----------enable_agc, step 1") |
|
agc_enable_regs = self.reg_table.get_registers_by_name("tia_agc_en") |
|
if not agc_enable_regs: |
|
logging.error("not find AGC_ENABLE register") |
|
return False |
|
else: |
|
logging.debug("----------enable_agc, step 2") |
|
success = True |
|
for reg in agc_enable_regs: |
|
logging.debug(f"----------enable_agc, step 2: reg: {reg}") |
|
result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id, |
|
slot_id=slot_id, |
|
bank=(reg.bank), |
|
page=(reg.page), |
|
reg_offset=(reg.offset), |
|
size=(reg.size), |
|
hex_str="FF") |
|
time.sleep(0.05) |
|
logging.info(f"----------enable_agc, step 3: result: {result}") |
|
if not result: |
|
logging.error(f"设置寄存器失败: bank={reg.bank}, page={reg.page}, offset={reg.offset}") |
|
success = False |
|
|
|
return success |
|
|
|
def toogle_rf(self, exp_id, slot_id, switch='off'): |
|
ret = False |
|
logging.info(f"---------- toogle_rf to {switch}") |
|
if switch == "off": |
|
ret = self.local_reg_tool.write_opt_reg_all_lane(exp_id, slot_id, 0, "tia_stage2") |
|
time.sleep(0.05) |
|
if ret == False: |
|
logging.error(f"---------- toogle_rf to {switch} failed") |
|
else: |
|
stage_recommended_value = 144 |
|
ret = self.local_reg_tool.write_opt_reg_all_lane(exp_id, slot_id, stage_recommended_value, "tia_stage2") |
|
time.sleep(0.05) |
|
if ret == False: |
|
logging.error(f"---------- toogle_rf to {switch} failed") |
|
return ret |
|
|
|
def calculate_target_vpeak(self, base_line_vpeaks: Dict[(int, int)]) -> Dict[(int, int)]: |
|
target_vpeaks = {} |
|
for lane, vpeak in base_line_vpeaks.items(): |
|
logging.info(f"lane={lane}, vpeak={vpeak}") |
|
target_vpeaks[lane] = vpeak + 18 |
|
|
|
logging.info(f"target_vpeaks={target_vpeaks}") |
|
return target_vpeaks |
|
|
|
def match_optimal_mgc_new2(self, exp_id: int, slot_id: int, target_vpeaks: Optional[Dict[(int, int)]]) -> bool: |
|
mgc_regs = self.reg_table.get_registers_by_name("mgc") |
|
vpeak_regs = self.reg_table.get_registers_by_name("vpeak") |
|
if mgc_regs is None or vpeak_regs is None: |
|
logging.error("not find MGC regs") |
|
return False |
|
else: |
|
success = True |
|
matched_results = {} |
|
unmatched_lanes = [] |
|
target_mgcs = {} |
|
for reg in mgc_regs: |
|
mgc_value = self.bmc.GetOpticalModuleRegs(exp_id=exp_id, |
|
slot_id=slot_id, |
|
bank=(reg.bank), |
|
page=(reg.page), |
|
reg_offset=(reg.offset), |
|
size=1) |
|
mgc_value = int(mgc_value, 16) |
|
for vpeak_reg in vpeak_regs: |
|
if vpeak_reg.lane == reg.lane: |
|
vpeak_value = self.bmc.GetOpticalModuleRegs(exp_id=exp_id, |
|
slot_id=slot_id, |
|
bank=(vpeak_reg.bank), |
|
page=(vpeak_reg.page), |
|
reg_offset=(vpeak_reg.offset), |
|
size=2) |
|
vpeak_value = int(vpeak_value, 16) |
|
logging.info(f"mgc_value: {mgc_value}, target_vpeak: {target_vpeaks[vpeak_reg.lane]}, vpeak_value: {vpeak_value}") |
|
target_mgc = (target_vpeaks[vpeak_reg.lane] - vpeak_value) * 1.5 + mgc_value |
|
logging.info(f"target_mgc: {target_mgc}") |
|
target_mgcs[reg.lane] = target_mgc |
|
break |
|
|
|
logging.info(f"----------------Target MGC: {target_mgcs}") |
|
for reg in mgc_regs: |
|
logging.info(f"mgc regs size: {len(mgc_regs)}") |
|
min_val = int(target_mgcs[reg.lane] - 10) |
|
if min_val > 255: |
|
min_val = 255 |
|
max_val = int(target_mgcs[reg.lane] + 10) |
|
if max_val > 255: |
|
logging.info("max_val is greater than 255, setting to 255") |
|
max_val = 255 |
|
logging.info(f"lane: {reg.lane}, min mgc : {min_val}, max_mgc: {max_val}, target vpeak: {target_vpeaks[reg.lane]}") |
|
step = reg.step |
|
search_values = list(range(max_val, min_val - 1, -step)) |
|
logging.info(f"Search order for lane {reg.lane}: {search_values}") |
|
is_match = False |
|
matched_mgc = None |
|
matched_vpeak = None |
|
for mgc in search_values: |
|
result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id, |
|
slot_id=slot_id, |
|
bank=(reg.bank), |
|
page=(reg.page), |
|
reg_offset=(reg.offset), |
|
size=1, |
|
hex_str=f"{mgc:02X}") |
|
time.sleep(0.05) |
|
result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id, |
|
slot_id=slot_id, |
|
bank=0, |
|
page=208, |
|
reg_offset=136, |
|
size=1, |
|
hex_str="01") |
|
time.sleep(0.05) |
|
for vpeak_reg in vpeak_regs: |
|
if vpeak_reg.lane == reg.lane: |
|
value = self.bmc.GetOpticalModuleRegs(exp_id=exp_id, |
|
slot_id=slot_id, |
|
bank=(vpeak_reg.bank), |
|
page=(vpeak_reg.page), |
|
reg_offset=(vpeak_reg.offset), |
|
size=2) |
|
value = int(value, 16) |
|
logging.info(f"exp_id:{exp_id}, slot_id:{slot_id}, lane:{reg.lane} -> set mgc {mgc}, Vpeak value: {value}") |
|
if value < target_vpeaks[reg.lane] + 1 and value > target_vpeaks[reg.lane] - 2: |
|
logging.info(f"----------Vpeak is matched, target vpeak:{target_vpeaks[reg.lane]}, current vpeak: {value}, current mgc:{mgc} ") |
|
is_match = True |
|
matched_mgc = mgc |
|
matched_vpeak = value |
|
break |
|
|
|
if is_match: |
|
break |
|
time.sleep(0.03) |
|
|
|
if is_match and matched_mgc is not None: |
|
if matched_vpeak is not None: |
|
if str(exp_id) not in matched_results: |
|
matched_results[str(exp_id)] = {} |
|
else: |
|
if str(slot_id) not in matched_results[str(exp_id)]: |
|
matched_results[str(exp_id)][str(slot_id)] = {} |
|
if self.mode not in matched_results[str(exp_id)][str(slot_id)]: |
|
matched_results[str(exp_id)][str(slot_id)][self.mode] = {} |
|
matched_results[str(exp_id)][str(slot_id)][self.mode][str(reg.lane)] = {'mgc':matched_mgc, |
|
'target_vpeak':target_vpeaks[reg.lane], |
|
'actual_vpeak':matched_vpeak} |
|
else: |
|
unmatched_lanes.append(reg.lane) |
|
|
|
if matched_results: |
|
self.save_mgc_results_to_json(exp_id, slot_id, matched_results) |
|
if unmatched_lanes: |
|
logging.error(f"以下lane未匹配到合适的MGC值: {unmatched_lanes}") |
|
return success |
|
|
|
def match_optimal_mgc_new(self, exp_id: int, slot_id: int, lane_list: List[int], target_vpeaks: Dict[(int, int)]) -> bool: |
|
matched_results = {} |
|
unmatched_lanes = [] |
|
for index, lane_id in enumerate(lane_list): |
|
is_match = False |
|
matched_mgc = None |
|
matched_vpeak = None |
|
reg = self.reg_table.get_register_by_logic_lane("mgc", lane_id) |
|
if reg is None or reg.valid_range is None: |
|
logging.error(f"match_optimal_mgc_new error. exp:{exp_id}, slot:{slot_id}, lane: {lane_id}, register name: mgc") |
|
return False |
|
min_val = reg.valid_range[0] |
|
max_val = reg.valid_range[1] |
|
step = reg.step if reg.step is not None else 1 |
|
wt_mgc = int((min_val + max_val) / 2) |
|
target_vpeak = target_vpeaks[lane_id] |
|
target_vpeak_range = [target_vpeak, target_vpeak + 1] |
|
count = 0 |
|
while 1: |
|
ret = self.local_reg_tool.write_mgc_reg(exp_id, slot_id, lane_id, wt_mgc) |
|
time.sleep(0.05) |
|
if ret == False: |
|
logging.error(f"match_optimal_mgc_new error. exp:{exp_id}, slot:{slot_id}, lane: {lane_id}, register name: mgc") |
|
return False |
|
ret = self.local_reg_tool.write_confirm_reg(exp_id, slot_id) |
|
if ret == False: |
|
logging.error(f"match_optimal_mgc_new error. exp:{exp_id}, slot:{slot_id}, lane: {lane_id}, register name: confirm") |
|
return False |
|
time.sleep(0.05) |
|
current_vpeak = self.local_reg_tool.read_vpeak_reg(exp_id, slot_id, lane_id) |
|
logging.info(f"exp_id:{exp_id}, slot_id:{slot_id}, lane:{lane_id} -> set mgc {wt_mgc}, Vpeak value: {current_vpeak}, target: {target_vpeak}") |
|
diff_vpeak = abs(target_vpeak - current_vpeak) |
|
diff_vpeak_step = { |
|
0: 0, |
|
1: 0, |
|
2: 0, |
|
3: 1, |
|
4: 2, |
|
5: 3, |
|
6: 3, |
|
7: 4, |
|
8: 5} |
|
extra_step = diff_vpeak_step.get(diff_vpeak, 5) |
|
if current_vpeak > target_vpeak_range[1]: |
|
wt_mgc -= step + extra_step |
|
if current_vpeak < target_vpeak_range[0]: |
|
wt_mgc += step + extra_step |
|
if current_vpeak >= target_vpeak_range[0]: |
|
if current_vpeak <= target_vpeak_range[1]: |
|
is_match = True |
|
matched_mgc = wt_mgc |
|
matched_vpeak = current_vpeak |
|
break |
|
if count > 30: |
|
logging.error(f"vpeak is not match, target vpeak:{target_vpeak}, current vpeak: {current_vpeak}, current mgc:{wt_mgc} ") |
|
sys.exit(-1) |
|
break |
|
count += 1 |
|
|
|
if is_match: |
|
logging.info(f"------------------ Vpeak is matched, target vpeak:{target_vpeak}, current vpeak: {current_vpeak}, current mgc:{wt_mgc} ") |
|
if str(exp_id) not in matched_results: |
|
matched_results[str(exp_id)] = {} |
|
if str(slot_id) not in matched_results[str(exp_id)]: |
|
matched_results[str(exp_id)][str(slot_id)] = {} |
|
if self.mode not in matched_results[str(exp_id)][str(slot_id)]: |
|
matched_results[str(exp_id)][str(slot_id)][self.mode] = {} |
|
matched_results[str(exp_id)][str(slot_id)][self.mode][str(reg.lane)] = {'mgc':matched_mgc, 'target_vpeak':target_vpeaks[lane_id], |
|
'actual_vpeak':matched_vpeak} |
|
else: |
|
unmatched_lanes.append(lane_id) |
|
|
|
if matched_results: |
|
self.save_mgc_results_to_json(exp_id, slot_id, matched_results) |
|
if unmatched_lanes: |
|
logging.error(f"以下lane未匹配到合适的MGC值: {unmatched_lanes}") |
|
return False |
|
else: |
|
return True |
|
|
|
def match_optimal_mgc_old(self, exp_id: int, slot_id: int, lane_list: List[int], target_vpeaks: Dict[(int, int)]) -> bool: |
|
mgc_regs = self.reg_table.get_registers_by_name("mgc") |
|
vpeak_regs = self.reg_table.get_registers_by_name("vpeak") |
|
if mgc_regs is None or vpeak_regs is None: |
|
logging.error("not find MGC regs") |
|
return False |
|
else: |
|
success = True |
|
matched_results = {} |
|
unmatched_lanes = [] |
|
for reg in mgc_regs: |
|
logging.info(f"mgc regs size: {len(mgc_regs)}") |
|
logging.info(f"lane: {reg.lane}, target vpeak:{target_vpeaks[reg.lane]}, mgc valid_range: {reg.valid_range}, step: {reg.step}") |
|
min_val = reg.valid_range[0] |
|
max_val = reg.valid_range[1] |
|
step = reg.step |
|
mid_val = (min_val + max_val) // 2 |
|
mid_val = mid_val // step * step |
|
search_values = [ |
|
mid_val] |
|
offset = step |
|
while len(search_values) < (max_val - min_val) // step + 1: |
|
right_val = mid_val + offset |
|
if right_val <= max_val: |
|
search_values.append(right_val) |
|
left_val = mid_val - offset |
|
if left_val >= min_val: |
|
search_values.append(left_val) |
|
offset += step |
|
if len(search_values) >= (max_val - min_val) // step + 1: |
|
break |
|
|
|
is_match = False |
|
matched_mgc = None |
|
matched_vpeak = None |
|
for mgc in search_values: |
|
result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id, |
|
slot_id=slot_id, |
|
bank=(reg.bank), |
|
page=(reg.page), |
|
reg_offset=(reg.offset), |
|
size=1, |
|
hex_str=f"{mgc:02X}") |
|
time.sleep(0.02) |
|
result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id, |
|
slot_id=slot_id, |
|
bank=0, |
|
page=208, |
|
reg_offset=136, |
|
size=1, |
|
hex_str="01") |
|
time.sleep(0.02) |
|
for vpeak_reg in vpeak_regs: |
|
if vpeak_reg.lane == reg.lane: |
|
value = self.bmc.GetOpticalModuleRegs(exp_id=exp_id, |
|
slot_id=slot_id, |
|
bank=(vpeak_reg.bank), |
|
page=(vpeak_reg.page), |
|
reg_offset=(vpeak_reg.offset), |
|
size=2) |
|
value = int(value, 16) |
|
logging.info(f"exp_id:{exp_id}, slot_id:{slot_id}, lane:{reg.lane} -> set mgc {mgc}, Vpeak value: {value}") |
|
if value < target_vpeaks[reg.lane] + 2 and value > target_vpeaks[reg.lane] - 2: |
|
logging.info(f"----------Vpeak is matched, target vpeak:{target_vpeaks[reg.lane]}, current vpeak: {value}, current mgc:{mgc} ") |
|
is_match = True |
|
matched_mgc = mgc |
|
matched_vpeak = value |
|
break |
|
|
|
if is_match: |
|
break |
|
time.sleep(0.03) |
|
|
|
if is_match: |
|
if matched_mgc is not None: |
|
if matched_vpeak is not None: |
|
if str(exp_id) not in matched_results: |
|
matched_results[str(exp_id)] = {} |
|
else: |
|
if str(slot_id) not in matched_results[str(exp_id)]: |
|
matched_results[str(exp_id)][str(slot_id)] = {} |
|
if self.mode not in matched_results[str(exp_id)][str(slot_id)]: |
|
matched_results[str(exp_id)][str(slot_id)][self.mode] = {} |
|
matched_results[str(exp_id)][str(slot_id)][self.mode][str(reg.lane)] = {'mgc':matched_mgc, |
|
'target_vpeak':target_vpeaks[reg.lane], |
|
'actual_vpeak':matched_vpeak} |
|
unmatched_lanes.append(reg.lane) |
|
|
|
if matched_results: |
|
self.save_mgc_results_to_json(exp_id, slot_id, matched_results) |
|
if unmatched_lanes: |
|
logging.error(f"以下lane未匹配到合适的MGC值: {unmatched_lanes}") |
|
return success |
|
|
|
def _read_vpeak_all_lanes(self, exp_id: int, slot_id: int, lane_list: List[int]) -> Dict[(int, int)]: |
|
vpeak_values = {} |
|
for lane in lane_list: |
|
vpeak_int = self.local_reg_tool.read_vpeak_reg(exp_id, slot_id, lane) |
|
vpeak_values[lane] = vpeak_int |
|
|
|
return vpeak_values |
|
|
|
def _write_mgc_all_lanes(self, exp_id: int, slot_id: int, lane_list: List[int], mgc: int) -> bool: |
|
for lane_id in lane_list: |
|
ret = self.local_reg_tool.write_mgc_reg(exp_id, slot_id, lane_id, mgc) |
|
if ret == False: |
|
logging.error(f"write mgc reg failed, exp_id:{exp_id}, slot_id:{slot_id}, lane_id:{lane_id}, mgc:{mgc}") |
|
return False |
|
|
|
ret = self.local_reg_tool.write_confirm_reg(exp_id, slot_id) |
|
if ret == False: |
|
logging.error(f"write confirm reg failed, exp_id:{exp_id}, slot_id:{slot_id}") |
|
return False |
|
else: |
|
return True |
|
|
|
def _write_agc_all_lanes(self, exp_id: int, slot_id: int, lane_list: List[int], agc: int) -> bool: |
|
for lane_id in lane_list: |
|
ret = self.local_reg_tool.write_agc_reg(exp_id, slot_id, lane_id, agc) |
|
time.sleep(0.05) |
|
if ret == False: |
|
logging.error(f"write agc reg failed, exp_id:{exp_id}, slot_id:{slot_id}, lane_id:{lane_id}, agc:{agc}") |
|
return False |
|
|
|
ret = self.local_reg_tool.write_confirm_reg(exp_id, slot_id) |
|
if ret == False: |
|
logging.error(f"write confirm reg failed, exp_id:{exp_id}, slot_id:{slot_id}") |
|
return False |
|
else: |
|
return True |
|
|
|
def _generate_host_filename(self) -> str: |
|
safe_host = self.host.replace(":", "_").replace("/", "_").replace("\\", "_") |
|
return f"target_vpeaks_{safe_host}.json" |
|
|
|
def _generate_mgc_host_filename(self) -> str: |
|
safe_host = self.host.replace(":", "_").replace("/", "_").replace("\\", "_") |
|
return f"mgc_{safe_host}.json" |
|
|
|
def save_target_vpeaks_to_json(self, exp_id: int, slot_id: int, target_vpeaks: Dict[(int, int)]) -> bool: |
|
try: |
|
filename = self._generate_host_filename() |
|
os.makedirs((os.path.dirname(filename) if os.path.dirname(filename) else "."), exist_ok=True) |
|
if os.path.exists(filename): |
|
with open(filename, "r") as f: |
|
data = json.load(f) |
|
else: |
|
data = {} |
|
if str(exp_id) not in data: |
|
data[str(exp_id)] = {} |
|
if str(slot_id) not in data[str(exp_id)]: |
|
data[str(exp_id)][str(slot_id)] = {} |
|
data[str(exp_id)][str(slot_id)][self.mode] = {str(k): v for k, v in target_vpeaks.items()} |
|
with open(filename, "w") as f: |
|
json.dump(data, f, indent=4) |
|
logging.info(f"target_vpeaks saved to {filename}, path: {exp_id}/{slot_id}/{self.mode}") |
|
return True |
|
except Exception as e: |
|
logging.error(f"Failed to save target_vpeaks to JSON file: {e}") |
|
return False |
|
|
|
def load_target_vpeaks_from_json(self, exp_id: int, slot_id: int) -> Optional[Dict[(int, int)]]: |
|
try: |
|
filename = self._generate_host_filename() |
|
if not os.path.exists(filename): |
|
logging.error(f"File {filename} does not exist") |
|
return |
|
with open(filename, "r") as f: |
|
data = json.load(f) |
|
if str(exp_id) not in data or str(slot_id) not in data[str(exp_id)] or self.mode not in data[str(exp_id)][str(slot_id)]: |
|
logging.error(f"Path {exp_id}/{slot_id}/{self.mode} not found in {filename}") |
|
return |
|
target_vpeaks_data = data[str(exp_id)][str(slot_id)][self.mode] |
|
target_vpeaks = {int(k): v for k, v in target_vpeaks_data.items()} |
|
logging.info(f"target_vpeaks loaded successfully from {filename}, path: {exp_id}/{slot_id}/{self.mode}") |
|
return target_vpeaks |
|
except Exception as e: |
|
logging.error(f"Failed to load target_vpeaks from JSON file: {e}") |
|
return |
|
|
|
def save_mgc_results_to_json(self, exp_id: int, slot_id: int, matched_results: Dict[(str, Any)]) -> bool: |
|
try: |
|
filename = self._generate_mgc_host_filename() |
|
os.makedirs((os.path.dirname(filename) if os.path.dirname(filename) else "."), exist_ok=True) |
|
if os.path.exists(filename): |
|
with open(filename, "r") as f: |
|
data = json.load(f) |
|
else: |
|
data = {} |
|
for exp_key, exp_data in matched_results.items(): |
|
if exp_key not in data: |
|
data[exp_key] = {} |
|
for slot_key, slot_data in exp_data.items(): |
|
if slot_key not in data[exp_key]: |
|
data[exp_key][slot_key] = {} |
|
for mode_key, mode_data in slot_data.items(): |
|
if mode_key not in data[exp_key][slot_key]: |
|
data[exp_key][slot_key][mode_key] = {} |
|
data[exp_key][slot_key][mode_key].update(mode_data) |
|
|
|
with open(filename, "w") as f: |
|
json.dump(data, f, indent=4) |
|
logging.info(f"MGC matching results saved to {filename}") |
|
return True |
|
except Exception as e: |
|
logging.error(f"Failed to save MGC results to JSON file: {e}") |
|
return False
|
|
|