You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

1586 lines
77 KiB

# uncompyle6 version 3.9.3
# Python bytecode version base 3.6 (3379)
# Decompiled from: Python 3.6.8 (default, May 21 2025, 11:12:56)
# [GCC 8.5.0 20210514 (Anolis 8.5.0-10.0.1)]
# Embedded file name: /xz/gyou/nexusbench/toolbox/mgc_tuning_tool.py
# Compiled at: 2025-12-25 14:52:30
# Size of source mod 2**32: 78005 bytes
from dataclasses import dataclass, field
import logging, time, os, json, sys
from typing import Dict, List, Any, Optional, Tuple
import matplotlib.pyplot as plt
from gpu.biren.exp_util import DevWhiteRiverExp
from toolbox.opt_reg_access_tool import OptRegAccessTool
from toolbox.prbs_tool import PrbsTool, RetimerPrbsResult
from parser.topology_parser import TopoMappingParser
from parser.transceiver_config_parser import TransceiverConfigParser, RegisterInfo
from toolbox.bathtub_curve import BathtubCurve
@dataclass
class LaneErrInfo:
host: str = ""
exp: int = -1
route: str = ""
slot_id: int = -1
logic_lane: int = -1
rt_phys_id: int = -1
rt_phys_lane: int = -1
peer_slot_id: int = -1
peer_ibias_lane: int = -1
peer_logic_lane: int = -1
peer_rt_phys_id: int = -1
peer_rt_phys_lane: int = -1
preset: int = -1
mgc: int = -1
mgc_err_map: Dict[int, int] = field(default_factory=dict)
lowfreq_err_map: Dict[int, int] = field(default_factory=dict)
tmp_err_map: Dict[int, int] = field(default_factory=dict)
vpeak: int = -1
tia_peak: int = -1
ibias: int = -1
ipcurrent: int = -1
opcurrent: int = -1
lowfreq_eq: int = -1
highfreq_eq: int = -1
vgc_set: int = -1
drv_vpeak: int = -1
rssi1: int = -1
rssi2: int = -1
tmp_raw_value: int = -1
ctle: int = -1
class MgcTuningTool:
def __init__(self, host:str, reg_table: TransceiverConfigParser, reg_tool: OptRegAccessTool,
remote_reg_tool: OptRegAccessTool,
bmc: DevWhiteRiverExp, topo_map: TopoMappingParser, prbs_tool: PrbsTool, route_name: str):
self.host = host
self.reg_table = reg_table
self.bmc = bmc
self.target_vpeak = 0
self.local_reg_tool = reg_tool
self.remote_reg_tool = remote_reg_tool
self.target_vpeaks_file = "target_vpeaks.json"
self.mode = route_name
self.topo_map = topo_map
self.prbs_tool = prbs_tool
self.route_name = route_name
self.is_onet = self.remote_reg_tool != self.local_reg_tool
def debug_calc_target_vpeak_new(self, exp_id: int, slot_id: int) -> Optional[Dict[(int, int)]]:
target_vpeaks = {}
logging.info(f"-------slot {slot_id}")
self.local_reg_tool.enable_tia_agc(exp_id, slot_id)
time.sleep(0.05)
logging.info("----------step 2: toggle RF off")
if not self.toogle_rf(exp_id, slot_id, "off"):
logging.error(f"slot {slot_id}: toggle RF off fail")
return target_vpeaks
else:
self.local_reg_tool.write_agc_reg_all_lane(exp_id, slot_id, 0)
time.sleep(0.5)
logging.info("----------step 3: get base_line_vpeaks, and calculate target vpeaks")
base_line_vpeaks = self.local_reg_tool.read_vpeak_reg_all_lane(exp_id, slot_id)
logging.info(f"base_line_vpeaks: {base_line_vpeaks}")
self.toogle_rf(exp_id, slot_id, "on")
self.local_reg_tool.write_agc_reg_all_lane(exp_id, slot_id, 255)
time.sleep(0.5)
max_vpeaks = self.local_reg_tool.read_vpeak_reg_all_lane(exp_id, slot_id)
logging.info(f"max_vpeaks: {max_vpeaks}")
percent_target_vpeaks = {}
for lane, base_vpeak in base_line_vpeaks.items():
numerator = 19
vpkdelta = round((max_vpeaks[lane] - base_vpeak) * (numerator / 29))
percent_target_vpeaks[lane] = vpkdelta + base_vpeak
diff = max_vpeaks[lane] - base_line_vpeaks[lane]
if diff < 5:
logging.error(f"host: {self.host}, exp: {exp_id}, slot: {slot_id}, lane: {lane}, max_vpeak is error")
sys.exit(-1)
logging.info(f"percent_target_vpeaks: {percent_target_vpeaks}")
diff_target_vpeaks = {}
for lane, base_vpeak in base_line_vpeaks.items():
diff = max_vpeaks[lane] - base_line_vpeaks[lane]
if diff <= 22:
logging.error(f"host: {self.host}, exp: {exp_id}, slot: {slot_id}, lane: {lane}, max_vpeak is error")
diff_target_vpeaks[lane] = diff - 2 + base_vpeak
else:
diff_target_vpeaks[lane] = 21 + base_vpeak
target_vpeaks = diff_target_vpeaks
logging.info(f"diff_target_vpeaks: {target_vpeaks}")
logging.info("\n" + "=" * 90)
logging.info(f'{"SLOT":<6} {"LANE":<6} {"BASE":<8} {"MAX":<8} {"RANGE":<8} {"PERCENT_TGT":<12} {"DIFF_TGT":<12} {"PCT_OFF":<10} {"DIFF_OFF":<10}')
logging.info("=" * 90)
for lane in sorted(base_line_vpeaks.keys()):
base = base_line_vpeaks[lane]
mx = max_vpeaks[lane]
rng = mx - base
pct_tgt = percent_target_vpeaks.get(lane, -1)
diff_tgt = diff_target_vpeaks.get(lane, -1)
pct_off = pct_tgt - base
diff_off = diff_tgt - base
logging.info(f"{slot_id:<6} {lane:<6} {base:<8} {mx:<8} {rng:<8} {pct_tgt:<12} {diff_tgt:<12} {pct_off:<10} {diff_off:<10}")
logging.info("=" * 90 + "\n")
self.local_reg_tool.disable_tia_agc(exp_id, slot_id)
time.sleep(0.05)
try:
agc_start, agc_end, agc_step = (80, 180, 2)
agc_values = list(range(agc_start, agc_end + 1, agc_step))
all_vpeaks_scan = {lane: [] for lane in base_line_vpeaks.keys()}
logging.info(f"Scanning AGC from {agc_start} to {agc_end} step {agc_step}...")
for agc in agc_values:
self.local_reg_tool.write_tmp_mgc_reg_all_lane(exp_id, slot_id, agc)
# self.local_reg_tool.write_agc_reg_all_lane(exp_id, slot_id, agc)
time.sleep(0.1)
vpeaks = self.local_reg_tool.read_vpeak_reg_all_lane(exp_id, slot_id)
for lane in base_line_vpeaks.keys():
all_vpeaks_scan[lane].append(vpeaks.get(lane, 0))
plt.figure(figsize=(20, 8))
for lane in sorted(base_line_vpeaks.keys()):
plt.plot(agc_values, (all_vpeaks_scan[lane]), marker="o", label=f"lane {lane}")
for lane in base_line_vpeaks.keys():
plt.scatter([80], [base_line_vpeaks[lane]], color="blue")
plt.scatter([180], [max_vpeaks[lane]], color="red")
plt.title(f"Vpeak vs AGC (exp={exp_id}, slot={slot_id})")
plt.xlabel("AGC Value")
plt.ylabel("Vpeak")
plt.legend()
plt.grid(True)
plt.tight_layout()
plot_dir = "agc_vpeak_plots"
os.makedirs(plot_dir, exist_ok=True)
plot_path = os.path.join(plot_dir, f"{self.local_reg_tool.host}_agc_vpeak_exp{exp_id}_slot{slot_id}.png")
plt.savefig(plot_path)
logging.info(f"AGC-Vpeak curve saved to: {plot_path}")
plt.close()
except Exception as e:
logging.warning(f"Failed to generate AGC scan plot: {e}")
if target_vpeaks:
self.save_target_vpeaks_to_json(exp_id, slot_id, target_vpeaks)
return target_vpeaks
def auto_tune(self, exp_id: int, slot_id: int, lane_list: List[int]) -> bool:
lane_list = [
7, 6, 5, 4, 3, 2, 1, 0]
# self.debug_calc_target_vpeak_new(exp_id, slot_id)
target_vpeaks = self.calc_target_vpeak_new(exp_id, slot_id, lane_list)
if target_vpeaks is None:
return False
time.sleep(0.1)
logging.info("----------step 7: find optimal MGC")
self.match_optimal_mgc_new(exp_id, slot_id, lane_list, target_vpeaks)
return True
def auto_tia_peak_tune(self, exp_id: int, retimers):
tune_value_list = [0, 30, 60, 90, 120, 150, 180, 210, 240]
self.general_auto_tune_value(exp_id, retimers, "tia_peak", tune_value_list, self.local_reg_tool)
def auto_highfreq_tune(self, exp_id: int, retimers):
tune_value_list = [0, 20, 40, 60, 80, 100, 120, 140, 160, 180, 200, 220, 240]
self.general_auto_tune_value(exp_id, retimers, "highfreq_eq", tune_value_list, self.remote_reg_tool)
def rt_ibias(self, exp_id: int, retimers):
tune_value_list = [
1647, 3647]
self.ibias_auto_tune_value_new(exp_id, retimers, "ibias_runtime", tune_value_list, self.remote_reg_tool, False)
def auto_ibias_tune2(self, exp_id: int, retimers):
tune_value_list = [
1647, 1847, 2047, 2247, 2447, 2647, 2847, 3047,
3247, 3447, 3647]
self.ibias_auto_tune_value(exp_id, retimers, "ibias_runtime", tune_value_list, self.remote_reg_tool, False)
def auto_opcurrent_tune(self, exp_id: int, retimers):
tune_value_list = [130, 140, 150, 160, 120, 110, 90, 80, 100, 170, 180]
self.general_auto_tune_value(exp_id, retimers, "opcurrent", tune_value_list, self.remote_reg_tool)
def auto_opcurrent_tune_test(self, exp_id: int, retimers):
tune_value_list = [110, 130, 150, 170, 90, 70, 50, 190, 210, 230]
self.general_auto_tune_value(exp_id, retimers, "opcurrent", tune_value_list, self.remote_reg_tool)
def auto_lowfreq_tune(self, exp_id: int, retimers):
tune_value_list = [
120, 150, 180, 210, 240, 90, 60, 30, 0]
self.general_auto_tune_value(exp_id, retimers, "lowfreq_eq", tune_value_list, self.remote_reg_tool)
def auto_ctle_tune(self, exp_id: int, retimers):
tune_value_list = [
0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
self.general_auto_tune_value(exp_id, retimers, "lowfreq_eq", tune_value_list, self.remote_reg_tool)
def auto_tune2(self, exp_id: int, retimers) -> bool:
self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers)
logging.info("Starting auto_tune2...")
lane_data_list: List[LaneErrInfo] = []
# MGC_OFFSETS = [0, 2, -2, 4, -4, 6, -6, 8, -8, 10, -10, 12, -12, 14, -14]
# MGC_OFFSETS = [0, 2]
MGC_OFFSETS = [0, 2, -2, 4, -4, 6, -6, 8, -8, 10, -10]
for slot_id in [0, 1, 2, 3, 4, 5, 6, 7]:
for lane in [0, 1, 2, 3, 4, 5, 6, 7]:
current_mgc = self.local_reg_tool.read_mgc_reg(exp_id, slot_id, lane)
current_mgc = int(current_mgc)
mgc_points = [current_mgc + offset for offset in MGC_OFFSETS]
mgc_err_map = {mgc: -1 for mgc in mgc_points}
success, rt_phys_id, rt_phys_lane = self.topo_map.get_phys_rtmr_by_slot(slot_id, lane, self.route_name)
if success == False:
logging.error(f'get_phys_rtmr_by_slot failed, result: {success}')
return False
lane_info = LaneErrInfo(
slot_id=slot_id,
logic_lane=lane,
rt_phys_id=rt_phys_id,
rt_phys_lane=rt_phys_lane,
mgc_err_map=mgc_err_map,
mgc= current_mgc
)
lane_data_list.append(lane_info)
# logging.info(
# f"Initialized: Slot {slot_id}, Lane {lane}, "
# f"Base MGC={current_mgc}, Target MGCs={mgc_points}, ERRs={[-1.0]*5}"
# )
# logging.info(lane_data_list)
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True)
all_unlocked = all(not prbs.locked for prbs in prbs_results)
if all_unlocked:
logging.error(f"PRBS check failed: ALL lanes are UNLOCKED for exp_id={exp_id}. Aborting auto_tune2.")
return False
time.sleep(2)
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, False)
# return True
# logging.info(f'-------------- prbs_results: {prbs_results}')
for prbs in prbs_results:
local_slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name)
if not local_slot:
logging.error(f'local slot is none!')
return False
slot_id, lane_id = local_slot
# logging.info(f'------------------ slot: {slot_id}, lane_id: {lane_id}')
for info in lane_data_list:
if info.slot_id == slot_id and info.logic_lane == lane_id:
if prbs.locked:
info.mgc_err_map[info.mgc] = prbs.err_count
else:
info.mgc_err_map[info.mgc] = 65535
# logging.info(f'prbs.locked: {prbs.locked}, prbs.err_count: {prbs.err_count}')
break
# return True
for offset in [offset for offset in MGC_OFFSETS if offset != 0]:
for info in lane_data_list:
new_mgc = info.mgc + offset
self.local_reg_tool.write_tmp_mgc_reg(exp_id, info.slot_id, info.logic_lane, new_mgc)
time.sleep(0.05)
# for slot_id in [0,1,2,3,4,5,6,7]:
# self.local_reg_tool.write_confirm_reg(exp_id, slot_id)
# time.sleep(0.05)
time.sleep(0.5)
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True)
time.sleep(2)
# 测 PRBS
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers)
is_need_prbs_reset = False
# 更新结果:当前 MGC 是 info.mgc + offset
for prbs in prbs_results:
local_slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name)
if not local_slot:
logging.error(f'local slot is none!')
return False
slot_id, lane_id = local_slot
for info in lane_data_list:
if info.slot_id == slot_id and info.logic_lane == lane_id:
target_mgc = info.mgc + offset
if prbs.locked:
info.mgc_err_map[target_mgc] = prbs.err_count
else:
info.mgc_err_map[target_mgc] = 69999
is_need_prbs_reset = True
# logging.info(f'------update mgc err, {target_mgc}: {info.mgc_err_map[target_mgc]}')
break
if is_need_prbs_reset:
self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers)
self.print_err_matrix(lane_data_list, MGC_OFFSETS)
# 参数复原
# logging.info(f'------------len: {len(lane_data_list)}')
for info in lane_data_list:
new_mgc = info.mgc
self.local_reg_tool.write_tmp_mgc_reg(exp_id, info.slot_id, info.logic_lane, new_mgc)
time.sleep(0.05)
# for slot_id in [0,1,2,3,4,5,6,7]:
# self.local_reg_tool.write_confirm_reg(exp_id, slot_id)
# time.sleep(0.05)
# return True
# Step 4: 调用简单调优,返回与 lane_data_list 一一对应的推荐 offset 列表
recommended_mgcs, bad_lanes = self.simple_mgc_tuner(lane_data_list, bad_threshold=1)
tuning_success = True
for info, recommended_mgc in zip(lane_data_list, recommended_mgcs):
slot_id = info.slot_id
lane_id = info.logic_lane
current_mgc = info.mgc
target_mgc = recommended_mgc
diff_mgc = target_mgc - current_mgc
# 日志输出
if diff_mgc != 0:
logging.info(
f"Slot {slot_id}, Lane {lane_id}: MGC adjusted from {current_mgc} "
f"{target_mgc} (offset={diff_mgc:.1f})"
)
# logging.info(f"Applying MGC reg write: exp_id={exp_id}, slot={slot_id}, lane={lane_id}, mgc={target_mgc}")
# self.local_reg_tool.write_mgc_reg(exp_id, slot_id, lane_id, target_mgc)
# time.sleep(0.05)
# self.local_reg_tool.write_confirm_reg(exp_id, slot_id)
# time.sleep(0.05)
# else:
# logging.info(
# f"Slot {slot_id}, Lane {lane_id}: MGC {current_mgc} is optimal. No change."
# )
tuning_success = True
if bad_lanes:
tuning_success = False
for slot, lane in bad_lanes:
logging.warning(f"Slot {slot}, Lane {lane}: CRITICAL - Too few clean MGC points (<=5). Signal integrity issue?")
return True
def low_ibias_tune2(self, exp_id: int, retimers, reg_name: str, value_offset_list: Optional[List[int]]=None, value_list: Optional[List[int]]=None) -> bool:
logging.info("Starting auto_tune2...")
lane_data_list = []
MGC_OFFSETS = [
0, 2, -2, 4, -4, 6, -6, 8, -8]
for slot_id in (0, 1, 2, 3, 4, 5, 6, 7):
for lane in (0, 1, 2, 3, 4, 5, 6, 7):
if value_offset_list is not None:
current_value = self.local_reg_tool.read_opt_reg(exp_id, slot_id, lane, reg_name)
current_value = int(current_value)
value_points = [current_value + offset for offset in value_offset_list]
tmp_err_map = {value: -1 for value in value_points}
success, rt_phys_id, rt_phys_lane = self.topo_map.get_phys_rtmr_by_slot(slot_id, lane, self.route_name)
if success == False:
logging.error(f"get_phys_rtmr_by_slot failed, result: {success}")
return False
lane_info = LaneErrInfo(slot_id=slot_id,
logic_lane=lane,
rt_phys_id=rt_phys_id,
rt_phys_lane=rt_phys_lane,
mgc_err_map=tmp_err_map,
mgc=current_value)
lane_data_list.append(lane_info)
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True)
all_unlocked = all(not prbs.locked for prbs in prbs_results)
if all_unlocked:
logging.error(f"PRBS check failed: ALL lanes are UNLOCKED for exp_id={exp_id}. Aborting auto_tune2.")
return False
else:
time.sleep(2)
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, False)
for prbs in prbs_results:
local_slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name)
if not local_slot:
logging.error("local slot is none!")
return False
slot_id, lane_id = local_slot
for info in lane_data_list:
if info.slot_id == slot_id:
if info.logic_lane == lane_id:
if prbs.locked:
info.tmp_err_map[info.tmp_value] = prbs.err_count
else:
info.tmp_err_map[info.tmp_value] = 65535
break
for offset in [offset for offset in MGC_OFFSETS if offset != 0]:
for info in lane_data_list:
new_mgc = info.mgc + offset
self.local_reg_tool.write_mgc_reg(exp_id, info.slot_id, info.logic_lane, new_mgc)
for slot_id in (0, 1, 2, 3, 4, 5, 6, 7):
self.local_reg_tool.write_confirm_reg(exp_id, slot_id)
time.sleep(0.5)
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True)
time.sleep(2)
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers)
for prbs in prbs_results:
local_slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name)
if not local_slot:
logging.error("local slot is none!")
return False
slot_id, lane_id = local_slot
for info in lane_data_list:
if info.slot_id == slot_id:
if info.logic_lane == lane_id:
target_mgc = info.mgc + offset
if prbs.locked:
info.mgc_err_map[target_mgc] = prbs.err_count
else:
info.mgc_err_map[target_mgc] = 65535
break
self.print_err_matrix(lane_data_list, MGC_OFFSETS)
for info in lane_data_list:
new_mgc = info.mgc
self.local_reg_tool.write_mgc_reg(exp_id, info.slot_id, info.logic_lane, new_mgc)
recommended_mgcs, bad_lanes = self.simple_mgc_tuner(lane_data_list, bad_threshold=1)
tuning_success = True
for info, recommended_mgc in zip(lane_data_list, recommended_mgcs):
slot_id = info.slot_id
lane_id = info.logic_lane
current_mgc = info.mgc
target_mgc = recommended_mgc
diff_mgc = target_mgc - current_mgc
if diff_mgc != 0:
logging.info(f"Slot {slot_id}, Lane {lane_id}: MGC adjusted from {current_mgc}{target_mgc} (offset={diff_mgc:.1f})")
logging.info(f"Applying MGC reg write: exp_id={exp_id}, slot={slot_id}, lane={lane_id}, mgc={target_mgc}")
self.local_reg_tool.write_mgc_reg(exp_id, slot_id, lane_id, target_mgc)
self.local_reg_tool.write_confirm_reg(exp_id, slot_id)
tuning_success = True
if bad_lanes:
tuning_success = False
for slot, lane in bad_lanes:
logging.warning(f"Slot {slot}, Lane {lane}: CRITICAL - Too few clean MGC points (<=5). Signal integrity issue?")
return True
def general_auto_tune_value_new(self, exp_id: int, retimers, reg_name: str, tune_value_list: List[int], reg_tool: OptRegAccessTool, is_overwrite=False) -> bool:
logging.info("Starting auto_tune2...")
lane_data_list = []
lane_list = [
0, 1, 2, 3, 4, 5, 6, 7]
slot_list = [0, 1, 2, 3, 4, 5, 6, 7]
if reg_name in ('ibias', 'opcurrent'):
reg_tool = self.remote_reg_tool
else:
reg_tool = self.local_reg_tool
for rt_phys_id in retimers:
for rt_phys_lane in range(0, 16):
if reg_tool == self.local_reg_tool:
slot = self.topo_map.get_local_slot_by_retimer(rt_phys_id, rt_phys_lane, self.route_name)
else:
slot = self.topo_map.get_remote_slot_by_retimer(rt_phys_id, rt_phys_lane, self.route_name)
if not slot:
logging.error("local slot is none!")
return False
slot_id, lane_id = slot
raw_value = reg_tool.read_opt_reg(exp_id, slot_id, lane_id, reg_name)
tmp_err_map = {value: -1 for value in tune_value_list}
lane_info = LaneErrInfo(host=(reg_tool.host), exp=exp_id, slot_id=slot_id, logic_lane=lane_id, rt_phys_id=rt_phys_id,
rt_phys_lane=rt_phys_lane,
tmp_err_map=tmp_err_map,
tmp_raw_value=raw_value)
lane_data_list.append(lane_info)
time.sleep(0.05)
is_need_reset_prbs = True
for value in tune_value_list:
for info in lane_data_list:
reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, value, reg_name)
time.sleep(0.05)
time.sleep(1)
if is_need_reset_prbs:
logging.info(f"-------- is_need_reset_prbs: {is_need_reset_prbs}")
self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers)
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True)
time.sleep(2)
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers)
is_need_reset_prbs = False
for prbs in prbs_results:
if reg_tool == self.local_reg_tool:
slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name)
else:
slot = self.topo_map.get_remote_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name)
if not slot:
logging.error("local slot is none!")
return False
slot_id, lane_id = slot
for info in lane_data_list:
if info.slot_id == slot_id:
if info.logic_lane == lane_id:
if prbs.locked:
info.tmp_err_map[value] = prbs.err_count
elif value in info.tmp_err_map:
info.tmp_err_map[value] = 69999
else:
logging.error(f"--------Key {value} not found in tmp_err_map, skipping assignment.")
is_need_reset_prbs = True
break
logging.info(lane_data_list)
self.print_general_err_matrix(lane_data_list, tune_value_list)
time.sleep(0.05)
for info in lane_data_list:
reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, info.tmp_raw_value, reg_name)
time.sleep(0.05)
recommended_value, bad_lanes = self.simple_value_tuner(lane_data_list)
for info, recommended_value in zip(lane_data_list, recommended_value):
diff_value = recommended_value - info.tmp_raw_value
if diff_value != 0:
logging.info(f"Slot {info.slot_id}, Lane {info.logic_lane}: adjusted from {info.tmp_raw_value}{recommended_value} (offset={diff_value:.1f})")
logging.info(f"Applying {reg_name} reg write: exp_id={exp_id}, slot={info.slot_id}, lane={info.logic_lane}, {reg_name}={recommended_value}")
if is_overwrite:
reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, recommended_value, reg_name)
if bad_lanes:
for slot, lane in bad_lanes:
logging.error(f"Slot {slot}, Lane {lane}: CRITICAL - Too few clean points (<=5). Bad Modules")
return True
def ibias_auto_tune_value_new(self, exp_id: int, retimers, reg_name: str, tune_value_list: List[int], reg_tool: OptRegAccessTool, is_overwrite=False) -> bool:
logging.info("Starting ibias_auto_tune_value_new...")
local_data_list: List[LaneErrInfo] = []
remote_data_list: List[LaneErrInfo] = []
local_slot_list = list(range(8))
local_lane_list = list(range(8))
remote_slot_list = list(range(8))
remote_lane_list = list(range(8))
# 存储结构:{slot_id: ibias_dict}
remote_raw_ibias: Dict[int, Dict] = {}
local_raw_ibias: Dict[int, Dict] = {}
# init remote(ibias)
for slot_id in remote_slot_list:
self.remote_reg_tool.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 180, 136, 1, "01")
remote_raw_ibias = self.remote_reg_tool.read_ibias_reg_all_lane(exp_id, slot_id)
time.sleep(0.05)
if self.is_onet:
for slot_id in local_slot_list:
self.local_reg_tool.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 180, 136, 1, "01")
local_raw_ibias = self.local_reg_tool.read_ibias_reg_all_lane(exp_id, slot_id)
time.sleep(0.05)
# init remote (ibias)
for slot_id in local_slot_list:
for lane in local_lane_list:
peer_info = self.topo_map.get_remote_retimer_and_slot_by_slot(slot_id, lane, self.route_name)
if not peer_info:
logging.error(f"peer_info is None for slot {slot_id} lane {lane}")
return False
remote_rt_logic_id, remote_rt_logic_lane, remote_slot_id, remote_lane_id = peer_info
tmp_err_map = {value: -1 for value in tune_value_list}
result = self.topo_map.get_phys_rtmr_by_slot(slot_id, lane, self.route_name)
if result:
success, rt_phys_id, rt_phys_lane = result
if success == False:
logging.error(f"get_phys_rtmr_by_slot failed, result: {success}")
return False
lane_info = LaneErrInfo(host=(self.local_reg_tool.host), exp=exp_id,
route=(self.route_name.split("-")[-1]),
slot_id=slot_id, logic_lane=lane, rt_phys_id=rt_phys_id, rt_phys_lane=rt_phys_lane,
peer_slot_id=remote_slot_id, peer_logic_lane=remote_lane_id,
peer_rt_phys_id=remote_rt_logic_id, peer_rt_phys_lane=remote_rt_logic_lane,
tmp_err_map=tmp_err_map)
local_data_list.append(lane_info)
# init local (ibias)
if self.is_onet:
for slot_id in remote_slot_list:
for lane in remote_lane_list:
peer_info = self.topo_map.get_remote_retimer_and_slot_by_slot(slot_id, lane, self.route_name)
if not peer_info:
logging.error(f"peer_info is None for slot {slot_id} lane {lane}")
return False
local_rt_logic_id, local_rt_logic_lane, local_slot_id, local_lane_id = peer_info
tmp_err_map = {value: -1 for value in tune_value_list}
result = self.topo_map.get_phys_rtmr_by_slot(slot_id, lane, self.route_name)
if result:
success, rt_phys_id, rt_phys_lane = result
if success == False:
logging.error(f"get_phys_rtmr_by_slot failed, result: {success}")
return False
lane_info = LaneErrInfo(host=(self.remote_reg_tool.host), exp=exp_id,
route=(self.route_name.split("-")[-1]),
slot_id=slot_id, logic_lane=lane, rt_phys_id=rt_phys_id, rt_phys_lane=rt_phys_lane,
peer_slot_id=local_slot_id, peer_logic_lane=local_lane_id,
peer_rt_phys_id=local_rt_logic_id, peer_rt_phys_lane=local_rt_logic_lane,
tmp_err_map=tmp_err_map)
remote_data_list.append(lane_info)
time.sleep(0.05)
is_need_reset_prbs = True
for value in tune_value_list:
for info in local_data_list:
self.remote_reg_tool.write_vol_ibias_reg_all_lane(exp_id, info.peer_slot_id, value)
time.sleep(0.05)
if self.is_onet:
for info in remote_data_list:
self.local_reg_tool.write_vol_ibias_reg_all_lane(exp_id, info.peer_slot_id, value)
time.sleep(0.05)
time.sleep(1)
if is_need_reset_prbs:
logging.info(f"-------- is_need_reset_prbs: {is_need_reset_prbs}")
self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers)
prbs_results = self.prbs_tool.prbs_check_bidirection(exp_id, retimers, True)
time.sleep(2)
prbs_results = self.prbs_tool.prbs_check_bidirection(exp_id, retimers)
# logging.info(f'--------- prbs_results: {prbs_results}')
is_need_reset_prbs = False
for prbs in prbs_results:
for info in local_data_list:
if info.host == prbs.host and info.rt_phys_id == prbs.rtmr_id \
and info.rt_phys_lane == prbs.rtmr_lane:
if prbs.locked:
info.tmp_err_map[value] = prbs.err_count
elif value in info.tmp_err_map:
info.tmp_err_map[value] = 69999
else:
logging.error(f"--------Key {value} not found in tmp_err_map, skipping assignment.")
is_need_reset_prbs = True
break
for info in remote_data_list:
if info.host == prbs.host and info.rt_phys_id == prbs.rtmr_id \
and info.rt_phys_lane == prbs.rtmr_lane:
if prbs.locked:
info.tmp_err_map[value] = prbs.err_count
elif value in info.tmp_err_map:
info.tmp_err_map[value] = 69999
else:
logging.error(f"--------Key {value} not found in tmp_err_map, skipping assignment.")
is_need_reset_prbs = True
break
self.print_general_err_matrix(local_data_list, tune_value_list)
self.print_general_err_matrix(remote_data_list, tune_value_list)
time.sleep(0.05)
for slot_id in remote_slot_list:
self.remote_reg_tool.write_vol_ibias_reg_all_lane(exp_id, slot_id, wt_value_list=remote_raw_ibias[slot_id])
time.sleep(0.05)
self.remote_reg_tool.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 180, 136, 1, "00")
time.sleep(0.05)
if self.is_onet:
for slot_id in local_slot_list:
self.local_reg_tool.write_vol_ibias_reg_all_lane(exp_id, slot_id, wt_value_list=remote_raw_ibias[slot_id])
time.sleep(0.05)
self.local_reg_tool.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 180, 136, 1, "00")
time.sleep(0.05)
recommended_value, bad_lanes = self.simple_value_tuner(local_data_list)
reg_name = "ibias"
for info, recommended_value in zip(local_data_list, recommended_value):
diff_value = recommended_value - info.tmp_raw_value
if diff_value != 0:
remote_info = self.topo_map.get_remote_retimer_and_slot_by_slot(info.slot_id, info.logic_lane, self.route_name)
if not remote_info:
logging.error(f"remote_info is None for slot {info.slot_id} lane {info.logic_lane}")
return False
remote_rt_logic_id, remote_rt_logic_lane, remote_slot_id, remote_lane_id = remote_info
logging.info(f"Rx Slot: {info.slot_id},{info.logic_lane}, Tx Slot: {remote_slot_id}, {remote_lane_id}, adjusted from {info.tmp_raw_value}{recommended_value} (offset={diff_value:.1f})")
if is_overwrite:
reg_tool.write_ibias_reg(exp_id, remote_slot_id, remote_lane_id, recommended_value)
time.sleep(0.05)
if bad_lanes:
for slot, lane in bad_lanes:
logging.error(f"Rx Slot {slot}, Rx Lane {lane}: CRITICAL - Too few clean points (<=5). Bad Modules")
return True
def ibias_auto_tune_value(self, exp_id: int, retimers, reg_name: str, tune_value_list: List[int], reg_tool: OptRegAccessTool, is_overwrite=False) -> bool:
logging.info("Starting auto_tune2...")
lane_data_list = []
rx_slot_list = [0, 1, 2, 3, 4, 5, 6, 7]
tx_slot_list = [0, 1, 2, 3, 4, 5, 6, 7]
for slot_id in rx_slot_list:
reg_tool.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 180, 136, 1, "01")
time.sleep(0.05)
rx_lane_list = [0, 1, 2, 3, 4, 5, 6, 7]
for rx_slot_id in rx_slot_list:
for rx_lane in rx_lane_list:
remote_info = self.topo_map.get_remote_retimer_and_slot_by_slot(rx_slot_id, rx_lane, self.route_name)
if not remote_info:
logging.error(f"remote_info is None for slot {rx_slot_id} lane {rx_lane}")
return False
remote_rt_logic_id, remote_rt_logic_lane, remote_slot_id, remote_lane_id = remote_info
raw_value = reg_tool.read_opt_reg(exp_id, remote_slot_id, remote_lane_id, "ibias")
tmp_err_map = {value: -1 for value in tune_value_list}
result = self.topo_map.get_phys_rtmr_by_slot(rx_slot_id, rx_lane, self.route_name)
if result:
success, rt_phys_id, rt_phys_lane = result
if success == False:
logging.error(f"get_phys_rtmr_by_slot failed, result: {success}")
return False
lane_info = LaneErrInfo(host=(self.local_reg_tool.host),
exp=exp_id,
route=(self.route_name.split("-")[-1]),
slot_id=rx_slot_id,
logic_lane=rx_lane,
rt_phys_id=rt_phys_id,
rt_phys_lane=rt_phys_lane,
tmp_err_map=tmp_err_map,
tmp_raw_value=raw_value)
lane_data_list.append(lane_info)
time.sleep(0.05)
is_need_reset_prbs = True
for value in tune_value_list:
for info in lane_data_list:
remote_info = self.topo_map.get_remote_retimer_and_slot_by_slot(info.slot_id, info.logic_lane, self.route_name)
if not remote_info:
logging.error(f"remote_info is None for slot {info.slot_id} lane {info.logic_lane}")
return False
remote_rt_logic_id, remote_rt_logic_lane, remote_slot_id, remote_lane_id = remote_info
reg_tool.write_opt_reg(exp_id, remote_slot_id, remote_lane_id, value, "ibias_runtime")
time.sleep(0.05)
time.sleep(1)
if is_need_reset_prbs:
logging.info(f"-------- is_need_reset_prbs: {is_need_reset_prbs}")
self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers)
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True)
time.sleep(2)
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers)
# logging.info(f'--------- prbs_results: {prbs_results}')
is_need_reset_prbs = False
for prbs in prbs_results:
for info in lane_data_list:
if info.rt_phys_id == prbs.rtmr_id:
if info.rt_phys_lane == prbs.rtmr_lane:
if prbs.locked:
info.tmp_err_map[value] = prbs.err_count
elif value in info.tmp_err_map:
info.tmp_err_map[value] = 69999
else:
logging.error(f"--------Key {value} not found in tmp_err_map, skipping assignment.")
is_need_reset_prbs = True
break
# logging.info(lane_data_list)
self.print_general_err_matrix(lane_data_list, tune_value_list)
time.sleep(0.05)
for info in lane_data_list:
reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, info.tmp_raw_value, "ibias_runtime")
time.sleep(0.05)
recommended_value, bad_lanes = self.simple_value_tuner(lane_data_list)
reg_name = "ibias"
for info, recommended_value in zip(lane_data_list, recommended_value):
diff_value = recommended_value - info.tmp_raw_value
if diff_value != 0:
remote_info = self.topo_map.get_remote_retimer_and_slot_by_slot(info.slot_id, info.logic_lane, self.route_name)
if not remote_info:
logging.error(f"remote_info is None for slot {info.slot_id} lane {info.logic_lane}")
return False
remote_rt_logic_id, remote_rt_logic_lane, remote_slot_id, remote_lane_id = remote_info
logging.info(f"Rx Slot: {info.slot_id},{info.logic_lane}, Tx Slot: {remote_slot_id}, {remote_lane_id}, adjusted from {info.tmp_raw_value}{recommended_value} (offset={diff_value:.1f})")
if is_overwrite:
reg_tool.write_ibias_reg(exp_id, remote_slot_id, remote_lane_id, recommended_value)
time.sleep(0.05)
if bad_lanes:
for slot, lane in bad_lanes:
logging.error(f"Rx Slot {slot}, Rx Lane {lane}: CRITICAL - Too few clean points (<=5). Bad Modules")
return True
def general_auto_tune_value(self, exp_id: int, retimers, reg_name: str, tune_value_list: List[int], reg_tool: OptRegAccessTool, is_overwrite=False) -> bool:
logging.info("Starting auto_tune2...")
lane_data_list = []
lane_list = [
0, 1, 2, 3, 4, 5, 6, 7]
for slot_id in (0, 1, 2, 3, 4, 5, 6, 7):
for lane in lane_list:
raw_value = reg_tool.read_opt_reg(exp_id, slot_id, lane, reg_name)
tmp_err_map = {value: -1 for value in tune_value_list}
result = self.topo_map.get_phys_rtmr_by_slot(slot_id, lane, self.route_name)
if result:
success, rt_phys_id, rt_phys_lane = result
if success == False:
logging.error(f"get_phys_rtmr_by_slot failed, result: {success}")
return False
lane_info = LaneErrInfo(host=(self.local_reg_tool.host),
exp=exp_id,
slot_id=slot_id,
logic_lane=lane,
rt_phys_id=rt_phys_id,
rt_phys_lane=rt_phys_lane,
tmp_err_map=tmp_err_map,
tmp_raw_value=raw_value)
lane_data_list.append(lane_info)
time.sleep(0.05)
is_need_reset_prbs = True
for value in tune_value_list:
for info in lane_data_list:
reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, value, reg_name)
time.sleep(0.05)
time.sleep(1)
if is_need_reset_prbs:
logging.info(f"-------- is_need_reset_prbs: {is_need_reset_prbs}")
self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers)
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True)
time.sleep(2)
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers)
is_need_reset_prbs = False
for prbs in prbs_results:
if reg_tool == self.local_reg_tool:
slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name)
else:
slot = self.topo_map.get_remote_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name)
if not slot:
logging.error("local slot is none!")
return False
slot_id, lane_id = slot
for info in lane_data_list:
if info.slot_id == slot_id:
if info.logic_lane == lane_id:
if prbs.locked:
info.tmp_err_map[value] = prbs.err_count
elif value in info.tmp_err_map:
info.tmp_err_map[value] = 69999
else:
logging.error(f"--------Key {value} not found in tmp_err_map, skipping assignment.")
is_need_reset_prbs = True
break
logging.info(lane_data_list)
self.print_general_err_matrix(lane_data_list, tune_value_list)
time.sleep(0.05)
for info in lane_data_list:
reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, info.tmp_raw_value, reg_name)
time.sleep(0.05)
recommended_value, bad_lanes = self.simple_value_tuner(lane_data_list)
for info, recommended_value in zip(lane_data_list, recommended_value):
diff_value = recommended_value - info.tmp_raw_value
if diff_value != 0:
logging.info(f"Slot {info.slot_id}, Lane {info.logic_lane}: adjusted from {info.tmp_raw_value}{recommended_value} (offset={diff_value:.1f})")
logging.info(f"Applying {reg_name} reg write: exp_id={exp_id}, slot={info.slot_id}, lane={info.logic_lane}, {reg_name}={recommended_value}")
if is_overwrite:
reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, recommended_value, reg_name)
if bad_lanes:
for slot, lane in bad_lanes:
logging.error(f"Slot {slot}, Lane {lane}: CRITICAL - Too few clean points (<=5). Bad Modules")
return True
def simple_value_tuner(self, lane_data_list: List[LaneErrInfo], bad_threshold=1, min_good_count=5):
recommendations = []
bad_lanes = []
for info in lane_data_list:
err_map = info.tmp_err_map
good_offsets = [value for value, err in err_map.items() if err < bad_threshold]
if not good_offsets:
recommended_value = info.tmp_raw_value
else:
recommended_value = int((min(good_offsets) + max(good_offsets)) / 2)
if len(good_offsets) <= min_good_count:
bad_lanes.append((info.slot_id, info.logic_lane))
recommendations.append(recommended_value)
return (recommendations, bad_lanes)
def simple_mgc_tuner(self, lane_data_list, bad_threshold=1, min_good_count=6):
recommendations = []
bad_lanes = []
for info in lane_data_list:
err_map = info.mgc_err_map
good_offsets = [mgc for mgc, err in err_map.items() if err < bad_threshold]
if not good_offsets:
recommended_mgc = info.mgc
else:
recommended_mgc = int((min(good_offsets) + max(good_offsets)) / 2)
if len(good_offsets) <= 5:
bad_lanes.append((info.slot_id, info.logic_lane))
recommendations.append(recommended_mgc)
return (recommendations, bad_lanes)
def print_general_err_matrix(self, lane_data_list: List[LaneErrInfo], tune_value_list):
tune_value_list = sorted(tune_value_list)
header_parts = [
'Host', 'Exp', 'Route', 'Slot', 'Lane', 'RT_Id',
'RT_Lane'] + [str(value) for value in tune_value_list]
header_widths = [16, 6, 8, 6, 6, 6, 10] + [6] * len(tune_value_list)
header_format = "{:<16} {:<6} {:<8} {:<6} {:<6} {:<6} {:<10} " + " ".join("{:<6}" for _ in tune_value_list)
print((header_format.format)(*header_parts))
print("-" * (sum(header_widths) + len(header_widths) * 1))
for info in lane_data_list:
try:
err_counts = []
for value in tune_value_list:
err = info.tmp_err_map.get(value, -2)
if err < 0:
err_str = "70000"
else:
err_str = (f"{err}")
err_counts.append(err_str)
data_format = "{:<16} {:<6} {:<8} {:<6} {:<6} {:<6} {:<10} " + " ".join("{:<6}" for _ in tune_value_list)
print((data_format.format)(self.host, info.exp, info.route, info.slot_id, info.logic_lane, info.rt_phys_id, info.rt_phys_lane, *err_counts))
except Exception as e:
logging.error(f"Error printing row for Slot {info.slot_id}, Lane {info.logic_lane}: {e}")
continue
def print_err_matrix(self, lane_data_list, mgc_offsets):
mgc_offsets = sorted(mgc_offsets)
header_parts = ["Host","Slot", "Lane", "RT_Id", "RT_Lane"] + [str(offset) for offset in mgc_offsets]
header_widths = [16, 6, 6] + [6] * len(mgc_offsets)
header_format = "{:<16} {:<6} {:<6} {:<6} {:<10} " + " ".join(f"{{:<6}}" for _ in mgc_offsets)
print(header_format.format(*header_parts))
print("-" * (sum(header_widths) + len(header_widths) * 1))
for info in lane_data_list:
base_mgc = info.mgc
try:
err_counts = []
for offset in mgc_offsets:
target_mgc = base_mgc + offset
err = info.mgc_err_map.get(target_mgc, -1.0) # 安全获取,防止 key 不存在
if err < 0:
err_str = "N/A"
else:
err_str = f"{err}"
err_counts.append(err_str)
# 格式化输出
data_format = "{:<16} {:<6} {:<6} {:<6} {:<10} " + " ".join("{:<6}" for _ in mgc_offsets)
print(data_format.format(self.host, info.slot_id, info.logic_lane, info.rt_phys_id, info.rt_phys_lane, *err_counts))
except Exception as e:
logging.error(f"Error printing row for Slot {info.slot_id}, Lane {info.logic_lane}: {e}")
continue
def verify_rt_index_lane_index(self, rt_index: int, lane_index: int) -> Tuple[(bool, int, int)]:
if rt_index in (1, 2, 3, 4):
if 0 <= lane_index <= 7:
new_rt_index = rt_index * 10 + 1
return (
True, new_rt_index, lane_index)
else:
if 8 <= lane_index <= 15:
new_rt_index = rt_index * 10 + 2
new_lane_index = lane_index - 8
return (True, new_rt_index, new_lane_index)
logging.error("Invalid lane_index: must be in [0, 15]")
return (False, 0, 0)
else:
if rt_index in (11, 12, 21, 22, 31, 32, 41, 42):
logging.info(f"use retimer logic index and lane, rt_index: {rt_index}, rt_lane: {lane_index}")
if lane_index > 7:
logging.error("For logic rt_index, lane_index must be in [0, 7]")
return (False, 0, 0)
return (True, rt_index, lane_index)
else:
logging.error(f"Invalid rt_index: {rt_index}")
return (False, 0, 0)
def manual_tune(self, exp_id: int, slot_id: int) -> bool:
target_vpeaks = self.load_target_vpeaks_from_json(exp_id, slot_id)
if target_vpeaks is None:
logging.error("从文件加载target_vpeaks失败")
return False
else:
self.match_optimal_mgc_new2(exp_id, slot_id, target_vpeaks)
return True
def calc_target_vpeak_new(self, exp_id: int, slot_id: int, lane_list: List[int]) -> Optional[Dict[(int, int)]]:
target_vpeaks = {}
logging.info(f"-------slot {slot_id}")
time.sleep(0.05)
self.local_reg_tool.enable_tia_agc(exp_id, slot_id)
time.sleep(0.05)
logging.info("----------step 2: toogle RF off")
if not self.toogle_rf(exp_id, slot_id, "off"):
logging.error(f"slot {slot_id}: toogle RF off fail")
return target_vpeaks
else:
self.local_reg_tool.write_agc_reg_all_lane(exp_id, slot_id, 0)
time.sleep(0.5)
logging.info("----------step 3: get base_line_vpeaks, and calculate target vpeaks")
base_line_vpeaks = self.local_reg_tool.read_vpeak_reg_all_lane(exp_id, slot_id)
logging.info(f"base_line_vpeaks: {base_line_vpeaks}")
self.toogle_rf(exp_id, slot_id, "on")
self.local_reg_tool.write_agc_reg_all_lane(exp_id, slot_id, 255)
time.sleep(0.5)
max_vpeaks = self.local_reg_tool.read_vpeak_reg_all_lane(exp_id, slot_id)
logging.info(f"max_vpeaks: {max_vpeaks}")
for lane, base_vpeak in base_line_vpeaks.items():
numerator = 19
vpkdelta = round((max_vpeaks[lane] - base_vpeak) * (numerator / 29))
target_vpeaks[lane] = vpkdelta + base_vpeak
diff = max_vpeaks[lane] - base_line_vpeaks[lane]
if diff < 10:
logging.error(f"host: {self.host}, exp: {exp_id}, slot: {slot_id}, lane: {lane}, max_vpeak is error")
sys.exit(-1)
# for lane, base_vpeak in base_line_vpeaks.items():
# diff = max_vpeaks[lane] - base_line_vpeaks[lane]
# if diff <= 22:
# logging.error(f"host: {self.host}, exp: {exp_id}, slot: {slot_id}, lane: {lane}, max_vpeak is error")
# target_vpeaks[lane] = diff - 2 + base_vpeak
# else:
# target_vpeaks[lane] = 21 + base_vpeak
logging.info(f"target_vpeaks: {target_vpeaks}")
if target_vpeaks:
self.save_target_vpeaks_to_json(exp_id, slot_id, target_vpeaks)
self.local_reg_tool.disable_tia_agc(exp_id, slot_id)
return target_vpeaks
def calc_target_vpeak(self, exp_id: int, lane_list: List[int], slot_id: int) -> Dict[(int, int)]:
target_vpeaks = {}
logging.info(f"-------slot {slot_id}")
logging.info("----------step 1: disable agc")
self.disable_agc(exp_id, slot_id)
logging.info("----------step 2: toogle RF off")
if not self.toogle_rf(exp_id, slot_id, lane_list, "off"):
logging.error(f"slot {slot_id}: toogle RF off fail")
return target_vpeaks
else:
logging.info("----------step 3: get base_line_vpeaks, and calculate target vpeaks")
base_line_vpeaks = self._read_vpeak_all_lanes(exp_id, slot_id, lane_list)
logging.info(f"base_line_vpeaks: {base_line_vpeaks}")
target_vpeaks = self.calculate_target_vpeak(base_line_vpeaks)
logging.info(f"target_vpeaks: {target_vpeaks}")
if target_vpeaks:
self.save_target_vpeaks_to_json(exp_id, slot_id, target_vpeaks)
logging.info("----------step 4: enable RF")
self.toogle_rf(exp_id, slot_id, lane_list, "on")
return target_vpeaks
def disable_agc(self, exp_id: int, slot_id: int):
logging.debug("----------disable_agc, step 1")
agc_enable_regs = self.reg_table.get_registers_by_name("tia_agc_en")
if not agc_enable_regs:
logging.error("not find AGC_ENABLE register")
return False
else:
logging.debug("----------disable_agc, step 2")
success = True
for reg in agc_enable_regs:
logging.debug(f"----------disable_agc, step 2: reg: {reg}")
result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id,
slot_id=slot_id,
bank=(reg.bank),
page=(reg.page),
reg_offset=(reg.offset),
size=(reg.size),
hex_str="00")
time.sleep(0.05)
logging.debug(f"----------disable_agc, step 3: result: {result}")
if not result:
logging.error(f"设置寄存器失败: bank={reg.bank}, page={reg.page}, offset={reg.offset}")
success = False
return success
def enable_agc(self, exp_id: int, slot_id: int):
time.sleep(0.05)
logging.debug("----------enable_agc, step 1")
agc_enable_regs = self.reg_table.get_registers_by_name("tia_agc_en")
if not agc_enable_regs:
logging.error("not find AGC_ENABLE register")
return False
else:
logging.debug("----------enable_agc, step 2")
success = True
for reg in agc_enable_regs:
logging.debug(f"----------enable_agc, step 2: reg: {reg}")
result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id,
slot_id=slot_id,
bank=(reg.bank),
page=(reg.page),
reg_offset=(reg.offset),
size=(reg.size),
hex_str="FF")
time.sleep(0.05)
logging.info(f"----------enable_agc, step 3: result: {result}")
if not result:
logging.error(f"设置寄存器失败: bank={reg.bank}, page={reg.page}, offset={reg.offset}")
success = False
return success
def toogle_rf(self, exp_id, slot_id, switch='off'):
ret = False
logging.info(f"---------- toogle_rf to {switch}")
if switch == "off":
ret = self.local_reg_tool.write_opt_reg_all_lane(exp_id, slot_id, 0, "tia_stage2")
time.sleep(0.05)
if ret == False:
logging.error(f"---------- toogle_rf to {switch} failed")
else:
stage_recommended_value = 144
ret = self.local_reg_tool.write_opt_reg_all_lane(exp_id, slot_id, stage_recommended_value, "tia_stage2")
time.sleep(0.05)
if ret == False:
logging.error(f"---------- toogle_rf to {switch} failed")
return ret
def calculate_target_vpeak(self, base_line_vpeaks: Dict[(int, int)]) -> Dict[(int, int)]:
target_vpeaks = {}
for lane, vpeak in base_line_vpeaks.items():
logging.info(f"lane={lane}, vpeak={vpeak}")
target_vpeaks[lane] = vpeak + 18
logging.info(f"target_vpeaks={target_vpeaks}")
return target_vpeaks
def match_optimal_mgc_new2(self, exp_id: int, slot_id: int, target_vpeaks: Optional[Dict[(int, int)]]) -> bool:
mgc_regs = self.reg_table.get_registers_by_name("mgc")
vpeak_regs = self.reg_table.get_registers_by_name("vpeak")
if mgc_regs is None or vpeak_regs is None:
logging.error("not find MGC regs")
return False
else:
success = True
matched_results = {}
unmatched_lanes = []
target_mgcs = {}
for reg in mgc_regs:
mgc_value = self.bmc.GetOpticalModuleRegs(exp_id=exp_id,
slot_id=slot_id,
bank=(reg.bank),
page=(reg.page),
reg_offset=(reg.offset),
size=1)
mgc_value = int(mgc_value, 16)
for vpeak_reg in vpeak_regs:
if vpeak_reg.lane == reg.lane:
vpeak_value = self.bmc.GetOpticalModuleRegs(exp_id=exp_id,
slot_id=slot_id,
bank=(vpeak_reg.bank),
page=(vpeak_reg.page),
reg_offset=(vpeak_reg.offset),
size=2)
vpeak_value = int(vpeak_value, 16)
logging.info(f"mgc_value: {mgc_value}, target_vpeak: {target_vpeaks[vpeak_reg.lane]}, vpeak_value: {vpeak_value}")
target_mgc = (target_vpeaks[vpeak_reg.lane] - vpeak_value) * 1.5 + mgc_value
logging.info(f"target_mgc: {target_mgc}")
target_mgcs[reg.lane] = target_mgc
break
logging.info(f"----------------Target MGC: {target_mgcs}")
for reg in mgc_regs:
logging.info(f"mgc regs size: {len(mgc_regs)}")
min_val = int(target_mgcs[reg.lane] - 10)
if min_val > 255:
min_val = 255
max_val = int(target_mgcs[reg.lane] + 10)
if max_val > 255:
logging.info("max_val is greater than 255, setting to 255")
max_val = 255
logging.info(f"lane: {reg.lane}, min mgc : {min_val}, max_mgc: {max_val}, target vpeak: {target_vpeaks[reg.lane]}")
step = reg.step
search_values = list(range(max_val, min_val - 1, -step))
logging.info(f"Search order for lane {reg.lane}: {search_values}")
is_match = False
matched_mgc = None
matched_vpeak = None
for mgc in search_values:
result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id,
slot_id=slot_id,
bank=(reg.bank),
page=(reg.page),
reg_offset=(reg.offset),
size=1,
hex_str=f"{mgc:02X}")
time.sleep(0.05)
result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id,
slot_id=slot_id,
bank=0,
page=208,
reg_offset=136,
size=1,
hex_str="01")
time.sleep(0.05)
for vpeak_reg in vpeak_regs:
if vpeak_reg.lane == reg.lane:
value = self.bmc.GetOpticalModuleRegs(exp_id=exp_id,
slot_id=slot_id,
bank=(vpeak_reg.bank),
page=(vpeak_reg.page),
reg_offset=(vpeak_reg.offset),
size=2)
value = int(value, 16)
logging.info(f"exp_id:{exp_id}, slot_id:{slot_id}, lane:{reg.lane} -> set mgc {mgc}, Vpeak value: {value}")
if value < target_vpeaks[reg.lane] + 1 and value > target_vpeaks[reg.lane] - 2:
logging.info(f"----------Vpeak is matched, target vpeak:{target_vpeaks[reg.lane]}, current vpeak: {value}, current mgc:{mgc} ")
is_match = True
matched_mgc = mgc
matched_vpeak = value
break
if is_match:
break
time.sleep(0.03)
if is_match and matched_mgc is not None:
if matched_vpeak is not None:
if str(exp_id) not in matched_results:
matched_results[str(exp_id)] = {}
else:
if str(slot_id) not in matched_results[str(exp_id)]:
matched_results[str(exp_id)][str(slot_id)] = {}
if self.mode not in matched_results[str(exp_id)][str(slot_id)]:
matched_results[str(exp_id)][str(slot_id)][self.mode] = {}
matched_results[str(exp_id)][str(slot_id)][self.mode][str(reg.lane)] = {'mgc':matched_mgc,
'target_vpeak':target_vpeaks[reg.lane],
'actual_vpeak':matched_vpeak}
else:
unmatched_lanes.append(reg.lane)
if matched_results:
self.save_mgc_results_to_json(exp_id, slot_id, matched_results)
if unmatched_lanes:
logging.error(f"以下lane未匹配到合适的MGC值: {unmatched_lanes}")
return success
def match_optimal_mgc_new(self, exp_id: int, slot_id: int, lane_list: List[int], target_vpeaks: Dict[(int, int)]) -> bool:
matched_results = {}
unmatched_lanes = []
for index, lane_id in enumerate(lane_list):
is_match = False
matched_mgc = None
matched_vpeak = None
reg = self.reg_table.get_register_by_logic_lane("mgc", lane_id)
if reg is None or reg.valid_range is None:
logging.error(f"match_optimal_mgc_new error. exp:{exp_id}, slot:{slot_id}, lane: {lane_id}, register name: mgc")
return False
min_val = reg.valid_range[0]
max_val = reg.valid_range[1]
step = reg.step if reg.step is not None else 1
wt_mgc = int((min_val + max_val) / 2)
target_vpeak = target_vpeaks[lane_id]
target_vpeak_range = [target_vpeak, target_vpeak + 1]
count = 0
while 1:
ret = self.local_reg_tool.write_mgc_reg(exp_id, slot_id, lane_id, wt_mgc)
time.sleep(0.05)
if ret == False:
logging.error(f"match_optimal_mgc_new error. exp:{exp_id}, slot:{slot_id}, lane: {lane_id}, register name: mgc")
return False
ret = self.local_reg_tool.write_confirm_reg(exp_id, slot_id)
if ret == False:
logging.error(f"match_optimal_mgc_new error. exp:{exp_id}, slot:{slot_id}, lane: {lane_id}, register name: confirm")
return False
time.sleep(0.05)
current_vpeak = self.local_reg_tool.read_vpeak_reg(exp_id, slot_id, lane_id)
logging.info(f"exp_id:{exp_id}, slot_id:{slot_id}, lane:{lane_id} -> set mgc {wt_mgc}, Vpeak value: {current_vpeak}, target: {target_vpeak}")
diff_vpeak = abs(target_vpeak - current_vpeak)
diff_vpeak_step = {
0: 0,
1: 0,
2: 0,
3: 1,
4: 2,
5: 3,
6: 3,
7: 4,
8: 5}
extra_step = diff_vpeak_step.get(diff_vpeak, 5)
if current_vpeak > target_vpeak_range[1]:
wt_mgc -= step + extra_step
if current_vpeak < target_vpeak_range[0]:
wt_mgc += step + extra_step
if current_vpeak >= target_vpeak_range[0]:
if current_vpeak <= target_vpeak_range[1]:
is_match = True
matched_mgc = wt_mgc
matched_vpeak = current_vpeak
break
if count > 30:
logging.error(f"vpeak is not match, target vpeak:{target_vpeak}, current vpeak: {current_vpeak}, current mgc:{wt_mgc} ")
sys.exit(-1)
break
count += 1
if is_match:
logging.info(f"------------------ Vpeak is matched, target vpeak:{target_vpeak}, current vpeak: {current_vpeak}, current mgc:{wt_mgc} ")
if str(exp_id) not in matched_results:
matched_results[str(exp_id)] = {}
if str(slot_id) not in matched_results[str(exp_id)]:
matched_results[str(exp_id)][str(slot_id)] = {}
if self.mode not in matched_results[str(exp_id)][str(slot_id)]:
matched_results[str(exp_id)][str(slot_id)][self.mode] = {}
matched_results[str(exp_id)][str(slot_id)][self.mode][str(reg.lane)] = {'mgc':matched_mgc, 'target_vpeak':target_vpeaks[lane_id],
'actual_vpeak':matched_vpeak}
else:
unmatched_lanes.append(lane_id)
if matched_results:
self.save_mgc_results_to_json(exp_id, slot_id, matched_results)
if unmatched_lanes:
logging.error(f"以下lane未匹配到合适的MGC值: {unmatched_lanes}")
return False
else:
return True
def match_optimal_mgc_old(self, exp_id: int, slot_id: int, lane_list: List[int], target_vpeaks: Dict[(int, int)]) -> bool:
mgc_regs = self.reg_table.get_registers_by_name("mgc")
vpeak_regs = self.reg_table.get_registers_by_name("vpeak")
if mgc_regs is None or vpeak_regs is None:
logging.error("not find MGC regs")
return False
else:
success = True
matched_results = {}
unmatched_lanes = []
for reg in mgc_regs:
logging.info(f"mgc regs size: {len(mgc_regs)}")
logging.info(f"lane: {reg.lane}, target vpeak:{target_vpeaks[reg.lane]}, mgc valid_range: {reg.valid_range}, step: {reg.step}")
min_val = reg.valid_range[0]
max_val = reg.valid_range[1]
step = reg.step
mid_val = (min_val + max_val) // 2
mid_val = mid_val // step * step
search_values = [
mid_val]
offset = step
while len(search_values) < (max_val - min_val) // step + 1:
right_val = mid_val + offset
if right_val <= max_val:
search_values.append(right_val)
left_val = mid_val - offset
if left_val >= min_val:
search_values.append(left_val)
offset += step
if len(search_values) >= (max_val - min_val) // step + 1:
break
is_match = False
matched_mgc = None
matched_vpeak = None
for mgc in search_values:
result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id,
slot_id=slot_id,
bank=(reg.bank),
page=(reg.page),
reg_offset=(reg.offset),
size=1,
hex_str=f"{mgc:02X}")
time.sleep(0.02)
result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id,
slot_id=slot_id,
bank=0,
page=208,
reg_offset=136,
size=1,
hex_str="01")
time.sleep(0.02)
for vpeak_reg in vpeak_regs:
if vpeak_reg.lane == reg.lane:
value = self.bmc.GetOpticalModuleRegs(exp_id=exp_id,
slot_id=slot_id,
bank=(vpeak_reg.bank),
page=(vpeak_reg.page),
reg_offset=(vpeak_reg.offset),
size=2)
value = int(value, 16)
logging.info(f"exp_id:{exp_id}, slot_id:{slot_id}, lane:{reg.lane} -> set mgc {mgc}, Vpeak value: {value}")
if value < target_vpeaks[reg.lane] + 2 and value > target_vpeaks[reg.lane] - 2:
logging.info(f"----------Vpeak is matched, target vpeak:{target_vpeaks[reg.lane]}, current vpeak: {value}, current mgc:{mgc} ")
is_match = True
matched_mgc = mgc
matched_vpeak = value
break
if is_match:
break
time.sleep(0.03)
if is_match:
if matched_mgc is not None:
if matched_vpeak is not None:
if str(exp_id) not in matched_results:
matched_results[str(exp_id)] = {}
else:
if str(slot_id) not in matched_results[str(exp_id)]:
matched_results[str(exp_id)][str(slot_id)] = {}
if self.mode not in matched_results[str(exp_id)][str(slot_id)]:
matched_results[str(exp_id)][str(slot_id)][self.mode] = {}
matched_results[str(exp_id)][str(slot_id)][self.mode][str(reg.lane)] = {'mgc':matched_mgc,
'target_vpeak':target_vpeaks[reg.lane],
'actual_vpeak':matched_vpeak}
unmatched_lanes.append(reg.lane)
if matched_results:
self.save_mgc_results_to_json(exp_id, slot_id, matched_results)
if unmatched_lanes:
logging.error(f"以下lane未匹配到合适的MGC值: {unmatched_lanes}")
return success
def _read_vpeak_all_lanes(self, exp_id: int, slot_id: int, lane_list: List[int]) -> Dict[(int, int)]:
vpeak_values = {}
for lane in lane_list:
vpeak_int = self.local_reg_tool.read_vpeak_reg(exp_id, slot_id, lane)
vpeak_values[lane] = vpeak_int
return vpeak_values
def _write_mgc_all_lanes(self, exp_id: int, slot_id: int, lane_list: List[int], mgc: int) -> bool:
for lane_id in lane_list:
ret = self.local_reg_tool.write_mgc_reg(exp_id, slot_id, lane_id, mgc)
if ret == False:
logging.error(f"write mgc reg failed, exp_id:{exp_id}, slot_id:{slot_id}, lane_id:{lane_id}, mgc:{mgc}")
return False
ret = self.local_reg_tool.write_confirm_reg(exp_id, slot_id)
if ret == False:
logging.error(f"write confirm reg failed, exp_id:{exp_id}, slot_id:{slot_id}")
return False
else:
return True
def _write_agc_all_lanes(self, exp_id: int, slot_id: int, lane_list: List[int], agc: int) -> bool:
for lane_id in lane_list:
ret = self.local_reg_tool.write_agc_reg(exp_id, slot_id, lane_id, agc)
time.sleep(0.05)
if ret == False:
logging.error(f"write agc reg failed, exp_id:{exp_id}, slot_id:{slot_id}, lane_id:{lane_id}, agc:{agc}")
return False
ret = self.local_reg_tool.write_confirm_reg(exp_id, slot_id)
if ret == False:
logging.error(f"write confirm reg failed, exp_id:{exp_id}, slot_id:{slot_id}")
return False
else:
return True
def _generate_host_filename(self) -> str:
safe_host = self.host.replace(":", "_").replace("/", "_").replace("\\", "_")
return f"target_vpeaks_{safe_host}.json"
def _generate_mgc_host_filename(self) -> str:
safe_host = self.host.replace(":", "_").replace("/", "_").replace("\\", "_")
return f"mgc_{safe_host}.json"
def save_target_vpeaks_to_json(self, exp_id: int, slot_id: int, target_vpeaks: Dict[(int, int)]) -> bool:
try:
filename = self._generate_host_filename()
os.makedirs((os.path.dirname(filename) if os.path.dirname(filename) else "."), exist_ok=True)
if os.path.exists(filename):
with open(filename, "r") as f:
data = json.load(f)
else:
data = {}
if str(exp_id) not in data:
data[str(exp_id)] = {}
if str(slot_id) not in data[str(exp_id)]:
data[str(exp_id)][str(slot_id)] = {}
data[str(exp_id)][str(slot_id)][self.mode] = {str(k): v for k, v in target_vpeaks.items()}
with open(filename, "w") as f:
json.dump(data, f, indent=4)
logging.info(f"target_vpeaks saved to {filename}, path: {exp_id}/{slot_id}/{self.mode}")
return True
except Exception as e:
logging.error(f"Failed to save target_vpeaks to JSON file: {e}")
return False
def load_target_vpeaks_from_json(self, exp_id: int, slot_id: int) -> Optional[Dict[(int, int)]]:
try:
filename = self._generate_host_filename()
if not os.path.exists(filename):
logging.error(f"File {filename} does not exist")
return
with open(filename, "r") as f:
data = json.load(f)
if str(exp_id) not in data or str(slot_id) not in data[str(exp_id)] or self.mode not in data[str(exp_id)][str(slot_id)]:
logging.error(f"Path {exp_id}/{slot_id}/{self.mode} not found in {filename}")
return
target_vpeaks_data = data[str(exp_id)][str(slot_id)][self.mode]
target_vpeaks = {int(k): v for k, v in target_vpeaks_data.items()}
logging.info(f"target_vpeaks loaded successfully from {filename}, path: {exp_id}/{slot_id}/{self.mode}")
return target_vpeaks
except Exception as e:
logging.error(f"Failed to load target_vpeaks from JSON file: {e}")
return
def save_mgc_results_to_json(self, exp_id: int, slot_id: int, matched_results: Dict[(str, Any)]) -> bool:
try:
filename = self._generate_mgc_host_filename()
os.makedirs((os.path.dirname(filename) if os.path.dirname(filename) else "."), exist_ok=True)
if os.path.exists(filename):
with open(filename, "r") as f:
data = json.load(f)
else:
data = {}
for exp_key, exp_data in matched_results.items():
if exp_key not in data:
data[exp_key] = {}
for slot_key, slot_data in exp_data.items():
if slot_key not in data[exp_key]:
data[exp_key][slot_key] = {}
for mode_key, mode_data in slot_data.items():
if mode_key not in data[exp_key][slot_key]:
data[exp_key][slot_key][mode_key] = {}
data[exp_key][slot_key][mode_key].update(mode_data)
with open(filename, "w") as f:
json.dump(data, f, indent=4)
logging.info(f"MGC matching results saved to {filename}")
return True
except Exception as e:
logging.error(f"Failed to save MGC results to JSON file: {e}")
return False