You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1587 lines
77 KiB
1587 lines
77 KiB
|
3 weeks ago
|
# uncompyle6 version 3.9.3
|
||
|
|
# Python bytecode version base 3.6 (3379)
|
||
|
|
# Decompiled from: Python 3.6.8 (default, May 21 2025, 11:12:56)
|
||
|
|
# [GCC 8.5.0 20210514 (Anolis 8.5.0-10.0.1)]
|
||
|
|
# Embedded file name: /xz/gyou/nexusbench/toolbox/mgc_tuning_tool.py
|
||
|
|
# Compiled at: 2025-12-25 14:52:30
|
||
|
|
# Size of source mod 2**32: 78005 bytes
|
||
|
|
from dataclasses import dataclass, field
|
||
|
|
import logging, time, os, json, sys
|
||
|
|
from typing import Dict, List, Any, Optional, Tuple
|
||
|
|
import matplotlib.pyplot as plt
|
||
|
|
from gpu.biren.exp_util import DevWhiteRiverExp
|
||
|
|
from toolbox.opt_reg_access_tool import OptRegAccessTool
|
||
|
|
from toolbox.prbs_tool import PrbsTool, RetimerPrbsResult
|
||
|
|
from parser.topology_parser import TopoMappingParser
|
||
|
|
from parser.transceiver_config_parser import TransceiverConfigParser, RegisterInfo
|
||
|
|
from toolbox.bathtub_curve import BathtubCurve
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class LaneErrInfo:
|
||
|
|
host: str = ""
|
||
|
|
exp: int = -1
|
||
|
|
route: str = ""
|
||
|
|
slot_id: int = -1
|
||
|
|
logic_lane: int = -1
|
||
|
|
rt_phys_id: int = -1
|
||
|
|
rt_phys_lane: int = -1
|
||
|
|
peer_slot_id: int = -1
|
||
|
|
peer_ibias_lane: int = -1
|
||
|
|
peer_logic_lane: int = -1
|
||
|
|
peer_rt_phys_id: int = -1
|
||
|
|
peer_rt_phys_lane: int = -1
|
||
|
|
preset: int = -1
|
||
|
|
mgc: int = -1
|
||
|
|
mgc_err_map: Dict[int, int] = field(default_factory=dict)
|
||
|
|
lowfreq_err_map: Dict[int, int] = field(default_factory=dict)
|
||
|
|
tmp_err_map: Dict[int, int] = field(default_factory=dict)
|
||
|
|
vpeak: int = -1
|
||
|
|
tia_peak: int = -1
|
||
|
|
ibias: int = -1
|
||
|
|
ipcurrent: int = -1
|
||
|
|
opcurrent: int = -1
|
||
|
|
lowfreq_eq: int = -1
|
||
|
|
highfreq_eq: int = -1
|
||
|
|
vgc_set: int = -1
|
||
|
|
drv_vpeak: int = -1
|
||
|
|
rssi1: int = -1
|
||
|
|
rssi2: int = -1
|
||
|
|
tmp_raw_value: int = -1
|
||
|
|
ctle: int = -1
|
||
|
|
|
||
|
|
|
||
|
|
class MgcTuningTool:
|
||
|
|
|
||
|
|
def __init__(self, host:str, reg_table: TransceiverConfigParser, reg_tool: OptRegAccessTool,
|
||
|
|
remote_reg_tool: OptRegAccessTool,
|
||
|
|
bmc: DevWhiteRiverExp, topo_map: TopoMappingParser, prbs_tool: PrbsTool, route_name: str):
|
||
|
|
self.host = host
|
||
|
|
self.reg_table = reg_table
|
||
|
|
self.bmc = bmc
|
||
|
|
self.target_vpeak = 0
|
||
|
|
self.local_reg_tool = reg_tool
|
||
|
|
self.remote_reg_tool = remote_reg_tool
|
||
|
|
self.target_vpeaks_file = "target_vpeaks.json"
|
||
|
|
self.mode = route_name
|
||
|
|
self.topo_map = topo_map
|
||
|
|
self.prbs_tool = prbs_tool
|
||
|
|
self.route_name = route_name
|
||
|
|
self.is_onet = self.remote_reg_tool != self.local_reg_tool
|
||
|
|
|
||
|
|
def debug_calc_target_vpeak_new(self, exp_id: int, slot_id: int) -> Optional[Dict[(int, int)]]:
|
||
|
|
target_vpeaks = {}
|
||
|
|
logging.info(f"-------slot {slot_id}")
|
||
|
|
self.local_reg_tool.enable_tia_agc(exp_id, slot_id)
|
||
|
|
time.sleep(0.05)
|
||
|
|
logging.info("----------step 2: toggle RF off")
|
||
|
|
if not self.toogle_rf(exp_id, slot_id, "off"):
|
||
|
|
logging.error(f"slot {slot_id}: toggle RF off fail")
|
||
|
|
return target_vpeaks
|
||
|
|
else:
|
||
|
|
self.local_reg_tool.write_agc_reg_all_lane(exp_id, slot_id, 0)
|
||
|
|
time.sleep(0.5)
|
||
|
|
logging.info("----------step 3: get base_line_vpeaks, and calculate target vpeaks")
|
||
|
|
base_line_vpeaks = self.local_reg_tool.read_vpeak_reg_all_lane(exp_id, slot_id)
|
||
|
|
logging.info(f"base_line_vpeaks: {base_line_vpeaks}")
|
||
|
|
self.toogle_rf(exp_id, slot_id, "on")
|
||
|
|
self.local_reg_tool.write_agc_reg_all_lane(exp_id, slot_id, 255)
|
||
|
|
time.sleep(0.5)
|
||
|
|
max_vpeaks = self.local_reg_tool.read_vpeak_reg_all_lane(exp_id, slot_id)
|
||
|
|
|
||
|
|
logging.info(f"max_vpeaks: {max_vpeaks}")
|
||
|
|
percent_target_vpeaks = {}
|
||
|
|
for lane, base_vpeak in base_line_vpeaks.items():
|
||
|
|
numerator = 19
|
||
|
|
vpkdelta = round((max_vpeaks[lane] - base_vpeak) * (numerator / 29))
|
||
|
|
percent_target_vpeaks[lane] = vpkdelta + base_vpeak
|
||
|
|
diff = max_vpeaks[lane] - base_line_vpeaks[lane]
|
||
|
|
if diff < 5:
|
||
|
|
logging.error(f"host: {self.host}, exp: {exp_id}, slot: {slot_id}, lane: {lane}, max_vpeak is error")
|
||
|
|
sys.exit(-1)
|
||
|
|
|
||
|
|
logging.info(f"percent_target_vpeaks: {percent_target_vpeaks}")
|
||
|
|
diff_target_vpeaks = {}
|
||
|
|
for lane, base_vpeak in base_line_vpeaks.items():
|
||
|
|
diff = max_vpeaks[lane] - base_line_vpeaks[lane]
|
||
|
|
if diff <= 22:
|
||
|
|
logging.error(f"host: {self.host}, exp: {exp_id}, slot: {slot_id}, lane: {lane}, max_vpeak is error")
|
||
|
|
diff_target_vpeaks[lane] = diff - 2 + base_vpeak
|
||
|
|
else:
|
||
|
|
diff_target_vpeaks[lane] = 21 + base_vpeak
|
||
|
|
|
||
|
|
target_vpeaks = diff_target_vpeaks
|
||
|
|
logging.info(f"diff_target_vpeaks: {target_vpeaks}")
|
||
|
|
logging.info("\n" + "=" * 90)
|
||
|
|
logging.info(f'{"SLOT":<6} {"LANE":<6} {"BASE":<8} {"MAX":<8} {"RANGE":<8} {"PERCENT_TGT":<12} {"DIFF_TGT":<12} {"PCT_OFF":<10} {"DIFF_OFF":<10}')
|
||
|
|
logging.info("=" * 90)
|
||
|
|
for lane in sorted(base_line_vpeaks.keys()):
|
||
|
|
base = base_line_vpeaks[lane]
|
||
|
|
mx = max_vpeaks[lane]
|
||
|
|
rng = mx - base
|
||
|
|
pct_tgt = percent_target_vpeaks.get(lane, -1)
|
||
|
|
diff_tgt = diff_target_vpeaks.get(lane, -1)
|
||
|
|
pct_off = pct_tgt - base
|
||
|
|
diff_off = diff_tgt - base
|
||
|
|
logging.info(f"{slot_id:<6} {lane:<6} {base:<8} {mx:<8} {rng:<8} {pct_tgt:<12} {diff_tgt:<12} {pct_off:<10} {diff_off:<10}")
|
||
|
|
|
||
|
|
logging.info("=" * 90 + "\n")
|
||
|
|
self.local_reg_tool.disable_tia_agc(exp_id, slot_id)
|
||
|
|
time.sleep(0.05)
|
||
|
|
try:
|
||
|
|
agc_start, agc_end, agc_step = (80, 180, 2)
|
||
|
|
agc_values = list(range(agc_start, agc_end + 1, agc_step))
|
||
|
|
all_vpeaks_scan = {lane: [] for lane in base_line_vpeaks.keys()}
|
||
|
|
logging.info(f"Scanning AGC from {agc_start} to {agc_end} step {agc_step}...")
|
||
|
|
for agc in agc_values:
|
||
|
|
self.local_reg_tool.write_tmp_mgc_reg_all_lane(exp_id, slot_id, agc)
|
||
|
|
# self.local_reg_tool.write_agc_reg_all_lane(exp_id, slot_id, agc)
|
||
|
|
time.sleep(0.1)
|
||
|
|
vpeaks = self.local_reg_tool.read_vpeak_reg_all_lane(exp_id, slot_id)
|
||
|
|
for lane in base_line_vpeaks.keys():
|
||
|
|
all_vpeaks_scan[lane].append(vpeaks.get(lane, 0))
|
||
|
|
|
||
|
|
plt.figure(figsize=(20, 8))
|
||
|
|
for lane in sorted(base_line_vpeaks.keys()):
|
||
|
|
plt.plot(agc_values, (all_vpeaks_scan[lane]), marker="o", label=f"lane {lane}")
|
||
|
|
|
||
|
|
for lane in base_line_vpeaks.keys():
|
||
|
|
plt.scatter([80], [base_line_vpeaks[lane]], color="blue")
|
||
|
|
plt.scatter([180], [max_vpeaks[lane]], color="red")
|
||
|
|
|
||
|
|
plt.title(f"Vpeak vs AGC (exp={exp_id}, slot={slot_id})")
|
||
|
|
plt.xlabel("AGC Value")
|
||
|
|
plt.ylabel("Vpeak")
|
||
|
|
plt.legend()
|
||
|
|
plt.grid(True)
|
||
|
|
plt.tight_layout()
|
||
|
|
plot_dir = "agc_vpeak_plots"
|
||
|
|
os.makedirs(plot_dir, exist_ok=True)
|
||
|
|
plot_path = os.path.join(plot_dir, f"{self.local_reg_tool.host}_agc_vpeak_exp{exp_id}_slot{slot_id}.png")
|
||
|
|
plt.savefig(plot_path)
|
||
|
|
logging.info(f"AGC-Vpeak curve saved to: {plot_path}")
|
||
|
|
plt.close()
|
||
|
|
except Exception as e:
|
||
|
|
logging.warning(f"Failed to generate AGC scan plot: {e}")
|
||
|
|
|
||
|
|
if target_vpeaks:
|
||
|
|
self.save_target_vpeaks_to_json(exp_id, slot_id, target_vpeaks)
|
||
|
|
return target_vpeaks
|
||
|
|
|
||
|
|
def auto_tune(self, exp_id: int, slot_id: int, lane_list: List[int]) -> bool:
|
||
|
|
lane_list = [
|
||
|
|
7, 6, 5, 4, 3, 2, 1, 0]
|
||
|
|
# self.debug_calc_target_vpeak_new(exp_id, slot_id)
|
||
|
|
|
||
|
|
target_vpeaks = self.calc_target_vpeak_new(exp_id, slot_id, lane_list)
|
||
|
|
if target_vpeaks is None:
|
||
|
|
return False
|
||
|
|
|
||
|
|
time.sleep(0.1)
|
||
|
|
logging.info("----------step 7: find optimal MGC")
|
||
|
|
self.match_optimal_mgc_new(exp_id, slot_id, lane_list, target_vpeaks)
|
||
|
|
return True
|
||
|
|
|
||
|
|
def auto_tia_peak_tune(self, exp_id: int, retimers):
|
||
|
|
tune_value_list = [0, 30, 60, 90, 120, 150, 180, 210, 240]
|
||
|
|
self.general_auto_tune_value(exp_id, retimers, "tia_peak", tune_value_list, self.local_reg_tool)
|
||
|
|
|
||
|
|
def auto_highfreq_tune(self, exp_id: int, retimers):
|
||
|
|
tune_value_list = [0, 20, 40, 60, 80, 100, 120, 140, 160, 180, 200, 220, 240]
|
||
|
|
self.general_auto_tune_value(exp_id, retimers, "highfreq_eq", tune_value_list, self.remote_reg_tool)
|
||
|
|
|
||
|
|
def rt_ibias(self, exp_id: int, retimers):
|
||
|
|
tune_value_list = [
|
||
|
|
1647, 3647]
|
||
|
|
self.ibias_auto_tune_value_new(exp_id, retimers, "ibias_runtime", tune_value_list, self.remote_reg_tool, False)
|
||
|
|
|
||
|
|
def auto_ibias_tune2(self, exp_id: int, retimers):
|
||
|
|
tune_value_list = [
|
||
|
|
1647, 1847, 2047, 2247, 2447, 2647, 2847, 3047,
|
||
|
|
3247, 3447, 3647]
|
||
|
|
self.ibias_auto_tune_value(exp_id, retimers, "ibias_runtime", tune_value_list, self.remote_reg_tool, False)
|
||
|
|
|
||
|
|
def auto_opcurrent_tune(self, exp_id: int, retimers):
|
||
|
|
tune_value_list = [130, 140, 150, 160, 120, 110, 90, 80, 100, 170, 180]
|
||
|
|
self.general_auto_tune_value(exp_id, retimers, "opcurrent", tune_value_list, self.remote_reg_tool)
|
||
|
|
|
||
|
|
def auto_opcurrent_tune_test(self, exp_id: int, retimers):
|
||
|
|
tune_value_list = [110, 130, 150, 170, 90, 70, 50, 190, 210, 230]
|
||
|
|
self.general_auto_tune_value(exp_id, retimers, "opcurrent", tune_value_list, self.remote_reg_tool)
|
||
|
|
|
||
|
|
def auto_lowfreq_tune(self, exp_id: int, retimers):
|
||
|
|
tune_value_list = [
|
||
|
|
120, 150, 180, 210, 240, 90, 60, 30, 0]
|
||
|
|
self.general_auto_tune_value(exp_id, retimers, "lowfreq_eq", tune_value_list, self.remote_reg_tool)
|
||
|
|
|
||
|
|
def auto_ctle_tune(self, exp_id: int, retimers):
|
||
|
|
tune_value_list = [
|
||
|
|
0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
|
||
|
|
self.general_auto_tune_value(exp_id, retimers, "lowfreq_eq", tune_value_list, self.remote_reg_tool)
|
||
|
|
|
||
|
|
def auto_tune2(self, exp_id: int, retimers) -> bool:
|
||
|
|
|
||
|
|
self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers)
|
||
|
|
|
||
|
|
logging.info("Starting auto_tune2...")
|
||
|
|
|
||
|
|
lane_data_list: List[LaneErrInfo] = []
|
||
|
|
|
||
|
|
# MGC_OFFSETS = [0, 2, -2, 4, -4, 6, -6, 8, -8, 10, -10, 12, -12, 14, -14]
|
||
|
|
# MGC_OFFSETS = [0, 2]
|
||
|
|
MGC_OFFSETS = [0, 2, -2, 4, -4, 6, -6, 8, -8, 10, -10]
|
||
|
|
|
||
|
|
for slot_id in [0, 1, 2, 3, 4, 5, 6, 7]:
|
||
|
|
for lane in [0, 1, 2, 3, 4, 5, 6, 7]:
|
||
|
|
current_mgc = self.local_reg_tool.read_mgc_reg(exp_id, slot_id, lane)
|
||
|
|
current_mgc = int(current_mgc)
|
||
|
|
|
||
|
|
mgc_points = [current_mgc + offset for offset in MGC_OFFSETS]
|
||
|
|
|
||
|
|
mgc_err_map = {mgc: -1 for mgc in mgc_points}
|
||
|
|
|
||
|
|
success, rt_phys_id, rt_phys_lane = self.topo_map.get_phys_rtmr_by_slot(slot_id, lane, self.route_name)
|
||
|
|
if success == False:
|
||
|
|
logging.error(f'get_phys_rtmr_by_slot failed, result: {success}')
|
||
|
|
return False
|
||
|
|
|
||
|
|
lane_info = LaneErrInfo(
|
||
|
|
slot_id=slot_id,
|
||
|
|
logic_lane=lane,
|
||
|
|
rt_phys_id=rt_phys_id,
|
||
|
|
rt_phys_lane=rt_phys_lane,
|
||
|
|
mgc_err_map=mgc_err_map,
|
||
|
|
mgc= current_mgc
|
||
|
|
)
|
||
|
|
|
||
|
|
lane_data_list.append(lane_info)
|
||
|
|
|
||
|
|
# logging.info(
|
||
|
|
# f"Initialized: Slot {slot_id}, Lane {lane}, "
|
||
|
|
# f"Base MGC={current_mgc}, Target MGCs={mgc_points}, ERRs={[-1.0]*5}"
|
||
|
|
# )
|
||
|
|
|
||
|
|
# logging.info(lane_data_list)
|
||
|
|
|
||
|
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True)
|
||
|
|
|
||
|
|
all_unlocked = all(not prbs.locked for prbs in prbs_results)
|
||
|
|
if all_unlocked:
|
||
|
|
logging.error(f"PRBS check failed: ALL lanes are UNLOCKED for exp_id={exp_id}. Aborting auto_tune2.")
|
||
|
|
return False
|
||
|
|
|
||
|
|
time.sleep(2)
|
||
|
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, False)
|
||
|
|
|
||
|
|
# return True
|
||
|
|
|
||
|
|
# logging.info(f'-------------- prbs_results: {prbs_results}')
|
||
|
|
|
||
|
|
for prbs in prbs_results:
|
||
|
|
local_slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name)
|
||
|
|
if not local_slot:
|
||
|
|
logging.error(f'local slot is none!')
|
||
|
|
return False
|
||
|
|
slot_id, lane_id = local_slot
|
||
|
|
# logging.info(f'------------------ slot: {slot_id}, lane_id: {lane_id}')
|
||
|
|
for info in lane_data_list:
|
||
|
|
if info.slot_id == slot_id and info.logic_lane == lane_id:
|
||
|
|
if prbs.locked:
|
||
|
|
info.mgc_err_map[info.mgc] = prbs.err_count
|
||
|
|
else:
|
||
|
|
info.mgc_err_map[info.mgc] = 65535
|
||
|
|
# logging.info(f'prbs.locked: {prbs.locked}, prbs.err_count: {prbs.err_count}')
|
||
|
|
break
|
||
|
|
|
||
|
|
# return True
|
||
|
|
|
||
|
|
for offset in [offset for offset in MGC_OFFSETS if offset != 0]:
|
||
|
|
for info in lane_data_list:
|
||
|
|
new_mgc = info.mgc + offset
|
||
|
|
self.local_reg_tool.write_tmp_mgc_reg(exp_id, info.slot_id, info.logic_lane, new_mgc)
|
||
|
|
time.sleep(0.05)
|
||
|
|
|
||
|
|
# for slot_id in [0,1,2,3,4,5,6,7]:
|
||
|
|
# self.local_reg_tool.write_confirm_reg(exp_id, slot_id)
|
||
|
|
# time.sleep(0.05)
|
||
|
|
|
||
|
|
time.sleep(0.5)
|
||
|
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True)
|
||
|
|
time.sleep(2)
|
||
|
|
|
||
|
|
# 测 PRBS
|
||
|
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers)
|
||
|
|
is_need_prbs_reset = False
|
||
|
|
# 更新结果:当前 MGC 是 info.mgc + offset
|
||
|
|
for prbs in prbs_results:
|
||
|
|
local_slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name)
|
||
|
|
if not local_slot:
|
||
|
|
logging.error(f'local slot is none!')
|
||
|
|
return False
|
||
|
|
slot_id, lane_id = local_slot
|
||
|
|
|
||
|
|
for info in lane_data_list:
|
||
|
|
if info.slot_id == slot_id and info.logic_lane == lane_id:
|
||
|
|
target_mgc = info.mgc + offset
|
||
|
|
if prbs.locked:
|
||
|
|
info.mgc_err_map[target_mgc] = prbs.err_count
|
||
|
|
else:
|
||
|
|
info.mgc_err_map[target_mgc] = 69999
|
||
|
|
is_need_prbs_reset = True
|
||
|
|
# logging.info(f'------update mgc err, {target_mgc}: {info.mgc_err_map[target_mgc]}')
|
||
|
|
break
|
||
|
|
|
||
|
|
if is_need_prbs_reset:
|
||
|
|
self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers)
|
||
|
|
self.print_err_matrix(lane_data_list, MGC_OFFSETS)
|
||
|
|
|
||
|
|
# 参数复原
|
||
|
|
# logging.info(f'------------len: {len(lane_data_list)}')
|
||
|
|
for info in lane_data_list:
|
||
|
|
new_mgc = info.mgc
|
||
|
|
self.local_reg_tool.write_tmp_mgc_reg(exp_id, info.slot_id, info.logic_lane, new_mgc)
|
||
|
|
time.sleep(0.05)
|
||
|
|
|
||
|
|
# for slot_id in [0,1,2,3,4,5,6,7]:
|
||
|
|
# self.local_reg_tool.write_confirm_reg(exp_id, slot_id)
|
||
|
|
# time.sleep(0.05)
|
||
|
|
# return True
|
||
|
|
|
||
|
|
# Step 4: 调用简单调优,返回与 lane_data_list 一一对应的推荐 offset 列表
|
||
|
|
recommended_mgcs, bad_lanes = self.simple_mgc_tuner(lane_data_list, bad_threshold=1)
|
||
|
|
|
||
|
|
tuning_success = True
|
||
|
|
|
||
|
|
for info, recommended_mgc in zip(lane_data_list, recommended_mgcs):
|
||
|
|
slot_id = info.slot_id
|
||
|
|
lane_id = info.logic_lane
|
||
|
|
current_mgc = info.mgc
|
||
|
|
target_mgc = recommended_mgc
|
||
|
|
diff_mgc = target_mgc - current_mgc
|
||
|
|
|
||
|
|
# 日志输出
|
||
|
|
if diff_mgc != 0:
|
||
|
|
logging.info(
|
||
|
|
f"Slot {slot_id}, Lane {lane_id}: MGC adjusted from {current_mgc} "
|
||
|
|
f"→ {target_mgc} (offset={diff_mgc:.1f})"
|
||
|
|
)
|
||
|
|
# logging.info(f"Applying MGC reg write: exp_id={exp_id}, slot={slot_id}, lane={lane_id}, mgc={target_mgc}")
|
||
|
|
# self.local_reg_tool.write_mgc_reg(exp_id, slot_id, lane_id, target_mgc)
|
||
|
|
# time.sleep(0.05)
|
||
|
|
# self.local_reg_tool.write_confirm_reg(exp_id, slot_id)
|
||
|
|
# time.sleep(0.05)
|
||
|
|
# else:
|
||
|
|
# logging.info(
|
||
|
|
# f"Slot {slot_id}, Lane {lane_id}: MGC {current_mgc} is optimal. No change."
|
||
|
|
# )
|
||
|
|
|
||
|
|
tuning_success = True
|
||
|
|
if bad_lanes:
|
||
|
|
tuning_success = False
|
||
|
|
for slot, lane in bad_lanes:
|
||
|
|
logging.warning(f"Slot {slot}, Lane {lane}: CRITICAL - Too few clean MGC points (<=5). Signal integrity issue?")
|
||
|
|
|
||
|
|
return True
|
||
|
|
|
||
|
|
def low_ibias_tune2(self, exp_id: int, retimers, reg_name: str, value_offset_list: Optional[List[int]]=None, value_list: Optional[List[int]]=None) -> bool:
|
||
|
|
logging.info("Starting auto_tune2...")
|
||
|
|
lane_data_list = []
|
||
|
|
MGC_OFFSETS = [
|
||
|
|
0, 2, -2, 4, -4, 6, -6, 8, -8]
|
||
|
|
for slot_id in (0, 1, 2, 3, 4, 5, 6, 7):
|
||
|
|
for lane in (0, 1, 2, 3, 4, 5, 6, 7):
|
||
|
|
if value_offset_list is not None:
|
||
|
|
current_value = self.local_reg_tool.read_opt_reg(exp_id, slot_id, lane, reg_name)
|
||
|
|
current_value = int(current_value)
|
||
|
|
value_points = [current_value + offset for offset in value_offset_list]
|
||
|
|
tmp_err_map = {value: -1 for value in value_points}
|
||
|
|
success, rt_phys_id, rt_phys_lane = self.topo_map.get_phys_rtmr_by_slot(slot_id, lane, self.route_name)
|
||
|
|
if success == False:
|
||
|
|
logging.error(f"get_phys_rtmr_by_slot failed, result: {success}")
|
||
|
|
return False
|
||
|
|
lane_info = LaneErrInfo(slot_id=slot_id,
|
||
|
|
logic_lane=lane,
|
||
|
|
rt_phys_id=rt_phys_id,
|
||
|
|
rt_phys_lane=rt_phys_lane,
|
||
|
|
mgc_err_map=tmp_err_map,
|
||
|
|
mgc=current_value)
|
||
|
|
lane_data_list.append(lane_info)
|
||
|
|
|
||
|
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True)
|
||
|
|
all_unlocked = all(not prbs.locked for prbs in prbs_results)
|
||
|
|
if all_unlocked:
|
||
|
|
logging.error(f"PRBS check failed: ALL lanes are UNLOCKED for exp_id={exp_id}. Aborting auto_tune2.")
|
||
|
|
return False
|
||
|
|
else:
|
||
|
|
time.sleep(2)
|
||
|
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, False)
|
||
|
|
for prbs in prbs_results:
|
||
|
|
local_slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name)
|
||
|
|
if not local_slot:
|
||
|
|
logging.error("local slot is none!")
|
||
|
|
return False
|
||
|
|
slot_id, lane_id = local_slot
|
||
|
|
for info in lane_data_list:
|
||
|
|
if info.slot_id == slot_id:
|
||
|
|
if info.logic_lane == lane_id:
|
||
|
|
if prbs.locked:
|
||
|
|
info.tmp_err_map[info.tmp_value] = prbs.err_count
|
||
|
|
else:
|
||
|
|
info.tmp_err_map[info.tmp_value] = 65535
|
||
|
|
break
|
||
|
|
|
||
|
|
for offset in [offset for offset in MGC_OFFSETS if offset != 0]:
|
||
|
|
for info in lane_data_list:
|
||
|
|
new_mgc = info.mgc + offset
|
||
|
|
self.local_reg_tool.write_mgc_reg(exp_id, info.slot_id, info.logic_lane, new_mgc)
|
||
|
|
|
||
|
|
for slot_id in (0, 1, 2, 3, 4, 5, 6, 7):
|
||
|
|
self.local_reg_tool.write_confirm_reg(exp_id, slot_id)
|
||
|
|
|
||
|
|
time.sleep(0.5)
|
||
|
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True)
|
||
|
|
time.sleep(2)
|
||
|
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers)
|
||
|
|
for prbs in prbs_results:
|
||
|
|
local_slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name)
|
||
|
|
if not local_slot:
|
||
|
|
logging.error("local slot is none!")
|
||
|
|
return False
|
||
|
|
slot_id, lane_id = local_slot
|
||
|
|
for info in lane_data_list:
|
||
|
|
if info.slot_id == slot_id:
|
||
|
|
if info.logic_lane == lane_id:
|
||
|
|
target_mgc = info.mgc + offset
|
||
|
|
if prbs.locked:
|
||
|
|
info.mgc_err_map[target_mgc] = prbs.err_count
|
||
|
|
else:
|
||
|
|
info.mgc_err_map[target_mgc] = 65535
|
||
|
|
break
|
||
|
|
|
||
|
|
self.print_err_matrix(lane_data_list, MGC_OFFSETS)
|
||
|
|
for info in lane_data_list:
|
||
|
|
new_mgc = info.mgc
|
||
|
|
self.local_reg_tool.write_mgc_reg(exp_id, info.slot_id, info.logic_lane, new_mgc)
|
||
|
|
|
||
|
|
recommended_mgcs, bad_lanes = self.simple_mgc_tuner(lane_data_list, bad_threshold=1)
|
||
|
|
tuning_success = True
|
||
|
|
for info, recommended_mgc in zip(lane_data_list, recommended_mgcs):
|
||
|
|
slot_id = info.slot_id
|
||
|
|
lane_id = info.logic_lane
|
||
|
|
current_mgc = info.mgc
|
||
|
|
target_mgc = recommended_mgc
|
||
|
|
diff_mgc = target_mgc - current_mgc
|
||
|
|
if diff_mgc != 0:
|
||
|
|
logging.info(f"Slot {slot_id}, Lane {lane_id}: MGC adjusted from {current_mgc} → {target_mgc} (offset={diff_mgc:.1f})")
|
||
|
|
logging.info(f"Applying MGC reg write: exp_id={exp_id}, slot={slot_id}, lane={lane_id}, mgc={target_mgc}")
|
||
|
|
self.local_reg_tool.write_mgc_reg(exp_id, slot_id, lane_id, target_mgc)
|
||
|
|
self.local_reg_tool.write_confirm_reg(exp_id, slot_id)
|
||
|
|
|
||
|
|
tuning_success = True
|
||
|
|
if bad_lanes:
|
||
|
|
tuning_success = False
|
||
|
|
for slot, lane in bad_lanes:
|
||
|
|
logging.warning(f"Slot {slot}, Lane {lane}: CRITICAL - Too few clean MGC points (<=5). Signal integrity issue?")
|
||
|
|
|
||
|
|
return True
|
||
|
|
|
||
|
|
def general_auto_tune_value_new(self, exp_id: int, retimers, reg_name: str, tune_value_list: List[int], reg_tool: OptRegAccessTool, is_overwrite=False) -> bool:
|
||
|
|
logging.info("Starting auto_tune2...")
|
||
|
|
lane_data_list = []
|
||
|
|
lane_list = [
|
||
|
|
0, 1, 2, 3, 4, 5, 6, 7]
|
||
|
|
slot_list = [0, 1, 2, 3, 4, 5, 6, 7]
|
||
|
|
if reg_name in ('ibias', 'opcurrent'):
|
||
|
|
reg_tool = self.remote_reg_tool
|
||
|
|
else:
|
||
|
|
reg_tool = self.local_reg_tool
|
||
|
|
for rt_phys_id in retimers:
|
||
|
|
for rt_phys_lane in range(0, 16):
|
||
|
|
if reg_tool == self.local_reg_tool:
|
||
|
|
slot = self.topo_map.get_local_slot_by_retimer(rt_phys_id, rt_phys_lane, self.route_name)
|
||
|
|
else:
|
||
|
|
slot = self.topo_map.get_remote_slot_by_retimer(rt_phys_id, rt_phys_lane, self.route_name)
|
||
|
|
if not slot:
|
||
|
|
logging.error("local slot is none!")
|
||
|
|
return False
|
||
|
|
slot_id, lane_id = slot
|
||
|
|
raw_value = reg_tool.read_opt_reg(exp_id, slot_id, lane_id, reg_name)
|
||
|
|
tmp_err_map = {value: -1 for value in tune_value_list}
|
||
|
|
lane_info = LaneErrInfo(host=(reg_tool.host), exp=exp_id, slot_id=slot_id, logic_lane=lane_id, rt_phys_id=rt_phys_id,
|
||
|
|
rt_phys_lane=rt_phys_lane,
|
||
|
|
tmp_err_map=tmp_err_map,
|
||
|
|
tmp_raw_value=raw_value)
|
||
|
|
lane_data_list.append(lane_info)
|
||
|
|
|
||
|
|
time.sleep(0.05)
|
||
|
|
is_need_reset_prbs = True
|
||
|
|
for value in tune_value_list:
|
||
|
|
for info in lane_data_list:
|
||
|
|
reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, value, reg_name)
|
||
|
|
time.sleep(0.05)
|
||
|
|
|
||
|
|
time.sleep(1)
|
||
|
|
if is_need_reset_prbs:
|
||
|
|
logging.info(f"-------- is_need_reset_prbs: {is_need_reset_prbs}")
|
||
|
|
self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers)
|
||
|
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True)
|
||
|
|
time.sleep(2)
|
||
|
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers)
|
||
|
|
is_need_reset_prbs = False
|
||
|
|
for prbs in prbs_results:
|
||
|
|
if reg_tool == self.local_reg_tool:
|
||
|
|
slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name)
|
||
|
|
else:
|
||
|
|
slot = self.topo_map.get_remote_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name)
|
||
|
|
if not slot:
|
||
|
|
logging.error("local slot is none!")
|
||
|
|
return False
|
||
|
|
slot_id, lane_id = slot
|
||
|
|
for info in lane_data_list:
|
||
|
|
if info.slot_id == slot_id:
|
||
|
|
if info.logic_lane == lane_id:
|
||
|
|
if prbs.locked:
|
||
|
|
info.tmp_err_map[value] = prbs.err_count
|
||
|
|
elif value in info.tmp_err_map:
|
||
|
|
info.tmp_err_map[value] = 69999
|
||
|
|
else:
|
||
|
|
logging.error(f"--------Key {value} not found in tmp_err_map, skipping assignment.")
|
||
|
|
is_need_reset_prbs = True
|
||
|
|
break
|
||
|
|
|
||
|
|
logging.info(lane_data_list)
|
||
|
|
self.print_general_err_matrix(lane_data_list, tune_value_list)
|
||
|
|
time.sleep(0.05)
|
||
|
|
for info in lane_data_list:
|
||
|
|
reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, info.tmp_raw_value, reg_name)
|
||
|
|
time.sleep(0.05)
|
||
|
|
|
||
|
|
recommended_value, bad_lanes = self.simple_value_tuner(lane_data_list)
|
||
|
|
for info, recommended_value in zip(lane_data_list, recommended_value):
|
||
|
|
diff_value = recommended_value - info.tmp_raw_value
|
||
|
|
if diff_value != 0:
|
||
|
|
logging.info(f"Slot {info.slot_id}, Lane {info.logic_lane}: adjusted from {info.tmp_raw_value} → {recommended_value} (offset={diff_value:.1f})")
|
||
|
|
logging.info(f"Applying {reg_name} reg write: exp_id={exp_id}, slot={info.slot_id}, lane={info.logic_lane}, {reg_name}={recommended_value}")
|
||
|
|
if is_overwrite:
|
||
|
|
reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, recommended_value, reg_name)
|
||
|
|
|
||
|
|
if bad_lanes:
|
||
|
|
for slot, lane in bad_lanes:
|
||
|
|
logging.error(f"Slot {slot}, Lane {lane}: CRITICAL - Too few clean points (<=5). Bad Modules")
|
||
|
|
|
||
|
|
return True
|
||
|
|
|
||
|
|
def ibias_auto_tune_value_new(self, exp_id: int, retimers, reg_name: str, tune_value_list: List[int], reg_tool: OptRegAccessTool, is_overwrite=False) -> bool:
|
||
|
|
logging.info("Starting ibias_auto_tune_value_new...")
|
||
|
|
local_data_list: List[LaneErrInfo] = []
|
||
|
|
remote_data_list: List[LaneErrInfo] = []
|
||
|
|
local_slot_list = list(range(8))
|
||
|
|
local_lane_list = list(range(8))
|
||
|
|
remote_slot_list = list(range(8))
|
||
|
|
remote_lane_list = list(range(8))
|
||
|
|
|
||
|
|
# 存储结构:{slot_id: ibias_dict}
|
||
|
|
remote_raw_ibias: Dict[int, Dict] = {}
|
||
|
|
local_raw_ibias: Dict[int, Dict] = {}
|
||
|
|
|
||
|
|
# init remote(ibias)
|
||
|
|
for slot_id in remote_slot_list:
|
||
|
|
self.remote_reg_tool.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 180, 136, 1, "01")
|
||
|
|
remote_raw_ibias = self.remote_reg_tool.read_ibias_reg_all_lane(exp_id, slot_id)
|
||
|
|
time.sleep(0.05)
|
||
|
|
|
||
|
|
if self.is_onet:
|
||
|
|
for slot_id in local_slot_list:
|
||
|
|
self.local_reg_tool.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 180, 136, 1, "01")
|
||
|
|
local_raw_ibias = self.local_reg_tool.read_ibias_reg_all_lane(exp_id, slot_id)
|
||
|
|
time.sleep(0.05)
|
||
|
|
|
||
|
|
# init remote (ibias)
|
||
|
|
for slot_id in local_slot_list:
|
||
|
|
for lane in local_lane_list:
|
||
|
|
peer_info = self.topo_map.get_remote_retimer_and_slot_by_slot(slot_id, lane, self.route_name)
|
||
|
|
if not peer_info:
|
||
|
|
logging.error(f"peer_info is None for slot {slot_id} lane {lane}")
|
||
|
|
return False
|
||
|
|
remote_rt_logic_id, remote_rt_logic_lane, remote_slot_id, remote_lane_id = peer_info
|
||
|
|
|
||
|
|
tmp_err_map = {value: -1 for value in tune_value_list}
|
||
|
|
|
||
|
|
result = self.topo_map.get_phys_rtmr_by_slot(slot_id, lane, self.route_name)
|
||
|
|
if result:
|
||
|
|
success, rt_phys_id, rt_phys_lane = result
|
||
|
|
if success == False:
|
||
|
|
logging.error(f"get_phys_rtmr_by_slot failed, result: {success}")
|
||
|
|
return False
|
||
|
|
lane_info = LaneErrInfo(host=(self.local_reg_tool.host), exp=exp_id,
|
||
|
|
route=(self.route_name.split("-")[-1]),
|
||
|
|
slot_id=slot_id, logic_lane=lane, rt_phys_id=rt_phys_id, rt_phys_lane=rt_phys_lane,
|
||
|
|
peer_slot_id=remote_slot_id, peer_logic_lane=remote_lane_id,
|
||
|
|
peer_rt_phys_id=remote_rt_logic_id, peer_rt_phys_lane=remote_rt_logic_lane,
|
||
|
|
tmp_err_map=tmp_err_map)
|
||
|
|
local_data_list.append(lane_info)
|
||
|
|
|
||
|
|
# init local (ibias)
|
||
|
|
if self.is_onet:
|
||
|
|
for slot_id in remote_slot_list:
|
||
|
|
for lane in remote_lane_list:
|
||
|
|
peer_info = self.topo_map.get_remote_retimer_and_slot_by_slot(slot_id, lane, self.route_name)
|
||
|
|
if not peer_info:
|
||
|
|
logging.error(f"peer_info is None for slot {slot_id} lane {lane}")
|
||
|
|
return False
|
||
|
|
local_rt_logic_id, local_rt_logic_lane, local_slot_id, local_lane_id = peer_info
|
||
|
|
|
||
|
|
tmp_err_map = {value: -1 for value in tune_value_list}
|
||
|
|
|
||
|
|
result = self.topo_map.get_phys_rtmr_by_slot(slot_id, lane, self.route_name)
|
||
|
|
if result:
|
||
|
|
success, rt_phys_id, rt_phys_lane = result
|
||
|
|
if success == False:
|
||
|
|
logging.error(f"get_phys_rtmr_by_slot failed, result: {success}")
|
||
|
|
return False
|
||
|
|
lane_info = LaneErrInfo(host=(self.remote_reg_tool.host), exp=exp_id,
|
||
|
|
route=(self.route_name.split("-")[-1]),
|
||
|
|
slot_id=slot_id, logic_lane=lane, rt_phys_id=rt_phys_id, rt_phys_lane=rt_phys_lane,
|
||
|
|
peer_slot_id=local_slot_id, peer_logic_lane=local_lane_id,
|
||
|
|
peer_rt_phys_id=local_rt_logic_id, peer_rt_phys_lane=local_rt_logic_lane,
|
||
|
|
tmp_err_map=tmp_err_map)
|
||
|
|
remote_data_list.append(lane_info)
|
||
|
|
|
||
|
|
time.sleep(0.05)
|
||
|
|
is_need_reset_prbs = True
|
||
|
|
for value in tune_value_list:
|
||
|
|
for info in local_data_list:
|
||
|
|
self.remote_reg_tool.write_vol_ibias_reg_all_lane(exp_id, info.peer_slot_id, value)
|
||
|
|
time.sleep(0.05)
|
||
|
|
|
||
|
|
if self.is_onet:
|
||
|
|
for info in remote_data_list:
|
||
|
|
self.local_reg_tool.write_vol_ibias_reg_all_lane(exp_id, info.peer_slot_id, value)
|
||
|
|
time.sleep(0.05)
|
||
|
|
|
||
|
|
time.sleep(1)
|
||
|
|
if is_need_reset_prbs:
|
||
|
|
logging.info(f"-------- is_need_reset_prbs: {is_need_reset_prbs}")
|
||
|
|
self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers)
|
||
|
|
|
||
|
|
prbs_results = self.prbs_tool.prbs_check_bidirection(exp_id, retimers, True)
|
||
|
|
time.sleep(2)
|
||
|
|
prbs_results = self.prbs_tool.prbs_check_bidirection(exp_id, retimers)
|
||
|
|
|
||
|
|
# logging.info(f'--------- prbs_results: {prbs_results}')
|
||
|
|
|
||
|
|
is_need_reset_prbs = False
|
||
|
|
for prbs in prbs_results:
|
||
|
|
for info in local_data_list:
|
||
|
|
if info.host == prbs.host and info.rt_phys_id == prbs.rtmr_id \
|
||
|
|
and info.rt_phys_lane == prbs.rtmr_lane:
|
||
|
|
if prbs.locked:
|
||
|
|
info.tmp_err_map[value] = prbs.err_count
|
||
|
|
elif value in info.tmp_err_map:
|
||
|
|
info.tmp_err_map[value] = 69999
|
||
|
|
else:
|
||
|
|
logging.error(f"--------Key {value} not found in tmp_err_map, skipping assignment.")
|
||
|
|
is_need_reset_prbs = True
|
||
|
|
break
|
||
|
|
|
||
|
|
for info in remote_data_list:
|
||
|
|
if info.host == prbs.host and info.rt_phys_id == prbs.rtmr_id \
|
||
|
|
and info.rt_phys_lane == prbs.rtmr_lane:
|
||
|
|
if prbs.locked:
|
||
|
|
info.tmp_err_map[value] = prbs.err_count
|
||
|
|
elif value in info.tmp_err_map:
|
||
|
|
info.tmp_err_map[value] = 69999
|
||
|
|
else:
|
||
|
|
logging.error(f"--------Key {value} not found in tmp_err_map, skipping assignment.")
|
||
|
|
is_need_reset_prbs = True
|
||
|
|
break
|
||
|
|
|
||
|
|
self.print_general_err_matrix(local_data_list, tune_value_list)
|
||
|
|
self.print_general_err_matrix(remote_data_list, tune_value_list)
|
||
|
|
time.sleep(0.05)
|
||
|
|
|
||
|
|
for slot_id in remote_slot_list:
|
||
|
|
self.remote_reg_tool.write_vol_ibias_reg_all_lane(exp_id, slot_id, wt_value_list=remote_raw_ibias[slot_id])
|
||
|
|
time.sleep(0.05)
|
||
|
|
self.remote_reg_tool.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 180, 136, 1, "00")
|
||
|
|
time.sleep(0.05)
|
||
|
|
|
||
|
|
if self.is_onet:
|
||
|
|
for slot_id in local_slot_list:
|
||
|
|
self.local_reg_tool.write_vol_ibias_reg_all_lane(exp_id, slot_id, wt_value_list=remote_raw_ibias[slot_id])
|
||
|
|
time.sleep(0.05)
|
||
|
|
self.local_reg_tool.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 180, 136, 1, "00")
|
||
|
|
time.sleep(0.05)
|
||
|
|
|
||
|
|
|
||
|
|
recommended_value, bad_lanes = self.simple_value_tuner(local_data_list)
|
||
|
|
reg_name = "ibias"
|
||
|
|
for info, recommended_value in zip(local_data_list, recommended_value):
|
||
|
|
diff_value = recommended_value - info.tmp_raw_value
|
||
|
|
if diff_value != 0:
|
||
|
|
remote_info = self.topo_map.get_remote_retimer_and_slot_by_slot(info.slot_id, info.logic_lane, self.route_name)
|
||
|
|
if not remote_info:
|
||
|
|
logging.error(f"remote_info is None for slot {info.slot_id} lane {info.logic_lane}")
|
||
|
|
return False
|
||
|
|
remote_rt_logic_id, remote_rt_logic_lane, remote_slot_id, remote_lane_id = remote_info
|
||
|
|
logging.info(f"Rx Slot: {info.slot_id},{info.logic_lane}, Tx Slot: {remote_slot_id}, {remote_lane_id}, adjusted from {info.tmp_raw_value} → {recommended_value} (offset={diff_value:.1f})")
|
||
|
|
if is_overwrite:
|
||
|
|
reg_tool.write_ibias_reg(exp_id, remote_slot_id, remote_lane_id, recommended_value)
|
||
|
|
time.sleep(0.05)
|
||
|
|
|
||
|
|
if bad_lanes:
|
||
|
|
for slot, lane in bad_lanes:
|
||
|
|
logging.error(f"Rx Slot {slot}, Rx Lane {lane}: CRITICAL - Too few clean points (<=5). Bad Modules")
|
||
|
|
|
||
|
|
return True
|
||
|
|
|
||
|
|
def ibias_auto_tune_value(self, exp_id: int, retimers, reg_name: str, tune_value_list: List[int], reg_tool: OptRegAccessTool, is_overwrite=False) -> bool:
|
||
|
|
logging.info("Starting auto_tune2...")
|
||
|
|
lane_data_list = []
|
||
|
|
rx_slot_list = [0, 1, 2, 3, 4, 5, 6, 7]
|
||
|
|
tx_slot_list = [0, 1, 2, 3, 4, 5, 6, 7]
|
||
|
|
|
||
|
|
for slot_id in rx_slot_list:
|
||
|
|
reg_tool.bmc.SetOpticalModuleRegs(exp_id, slot_id, 0, 180, 136, 1, "01")
|
||
|
|
time.sleep(0.05)
|
||
|
|
|
||
|
|
rx_lane_list = [0, 1, 2, 3, 4, 5, 6, 7]
|
||
|
|
for rx_slot_id in rx_slot_list:
|
||
|
|
for rx_lane in rx_lane_list:
|
||
|
|
remote_info = self.topo_map.get_remote_retimer_and_slot_by_slot(rx_slot_id, rx_lane, self.route_name)
|
||
|
|
if not remote_info:
|
||
|
|
logging.error(f"remote_info is None for slot {rx_slot_id} lane {rx_lane}")
|
||
|
|
return False
|
||
|
|
remote_rt_logic_id, remote_rt_logic_lane, remote_slot_id, remote_lane_id = remote_info
|
||
|
|
raw_value = reg_tool.read_opt_reg(exp_id, remote_slot_id, remote_lane_id, "ibias")
|
||
|
|
tmp_err_map = {value: -1 for value in tune_value_list}
|
||
|
|
result = self.topo_map.get_phys_rtmr_by_slot(rx_slot_id, rx_lane, self.route_name)
|
||
|
|
if result:
|
||
|
|
success, rt_phys_id, rt_phys_lane = result
|
||
|
|
if success == False:
|
||
|
|
logging.error(f"get_phys_rtmr_by_slot failed, result: {success}")
|
||
|
|
return False
|
||
|
|
lane_info = LaneErrInfo(host=(self.local_reg_tool.host),
|
||
|
|
exp=exp_id,
|
||
|
|
route=(self.route_name.split("-")[-1]),
|
||
|
|
slot_id=rx_slot_id,
|
||
|
|
logic_lane=rx_lane,
|
||
|
|
rt_phys_id=rt_phys_id,
|
||
|
|
rt_phys_lane=rt_phys_lane,
|
||
|
|
tmp_err_map=tmp_err_map,
|
||
|
|
tmp_raw_value=raw_value)
|
||
|
|
lane_data_list.append(lane_info)
|
||
|
|
|
||
|
|
time.sleep(0.05)
|
||
|
|
is_need_reset_prbs = True
|
||
|
|
for value in tune_value_list:
|
||
|
|
for info in lane_data_list:
|
||
|
|
remote_info = self.topo_map.get_remote_retimer_and_slot_by_slot(info.slot_id, info.logic_lane, self.route_name)
|
||
|
|
if not remote_info:
|
||
|
|
logging.error(f"remote_info is None for slot {info.slot_id} lane {info.logic_lane}")
|
||
|
|
return False
|
||
|
|
remote_rt_logic_id, remote_rt_logic_lane, remote_slot_id, remote_lane_id = remote_info
|
||
|
|
reg_tool.write_opt_reg(exp_id, remote_slot_id, remote_lane_id, value, "ibias_runtime")
|
||
|
|
time.sleep(0.05)
|
||
|
|
|
||
|
|
time.sleep(1)
|
||
|
|
if is_need_reset_prbs:
|
||
|
|
logging.info(f"-------- is_need_reset_prbs: {is_need_reset_prbs}")
|
||
|
|
self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers)
|
||
|
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True)
|
||
|
|
time.sleep(2)
|
||
|
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers)
|
||
|
|
|
||
|
|
# logging.info(f'--------- prbs_results: {prbs_results}')
|
||
|
|
|
||
|
|
is_need_reset_prbs = False
|
||
|
|
for prbs in prbs_results:
|
||
|
|
for info in lane_data_list:
|
||
|
|
if info.rt_phys_id == prbs.rtmr_id:
|
||
|
|
if info.rt_phys_lane == prbs.rtmr_lane:
|
||
|
|
if prbs.locked:
|
||
|
|
info.tmp_err_map[value] = prbs.err_count
|
||
|
|
elif value in info.tmp_err_map:
|
||
|
|
info.tmp_err_map[value] = 69999
|
||
|
|
else:
|
||
|
|
logging.error(f"--------Key {value} not found in tmp_err_map, skipping assignment.")
|
||
|
|
is_need_reset_prbs = True
|
||
|
|
break
|
||
|
|
|
||
|
|
# logging.info(lane_data_list)
|
||
|
|
self.print_general_err_matrix(lane_data_list, tune_value_list)
|
||
|
|
time.sleep(0.05)
|
||
|
|
|
||
|
|
for info in lane_data_list:
|
||
|
|
reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, info.tmp_raw_value, "ibias_runtime")
|
||
|
|
time.sleep(0.05)
|
||
|
|
|
||
|
|
recommended_value, bad_lanes = self.simple_value_tuner(lane_data_list)
|
||
|
|
reg_name = "ibias"
|
||
|
|
for info, recommended_value in zip(lane_data_list, recommended_value):
|
||
|
|
diff_value = recommended_value - info.tmp_raw_value
|
||
|
|
if diff_value != 0:
|
||
|
|
remote_info = self.topo_map.get_remote_retimer_and_slot_by_slot(info.slot_id, info.logic_lane, self.route_name)
|
||
|
|
if not remote_info:
|
||
|
|
logging.error(f"remote_info is None for slot {info.slot_id} lane {info.logic_lane}")
|
||
|
|
return False
|
||
|
|
remote_rt_logic_id, remote_rt_logic_lane, remote_slot_id, remote_lane_id = remote_info
|
||
|
|
logging.info(f"Rx Slot: {info.slot_id},{info.logic_lane}, Tx Slot: {remote_slot_id}, {remote_lane_id}, adjusted from {info.tmp_raw_value} → {recommended_value} (offset={diff_value:.1f})")
|
||
|
|
if is_overwrite:
|
||
|
|
reg_tool.write_ibias_reg(exp_id, remote_slot_id, remote_lane_id, recommended_value)
|
||
|
|
time.sleep(0.05)
|
||
|
|
|
||
|
|
if bad_lanes:
|
||
|
|
for slot, lane in bad_lanes:
|
||
|
|
logging.error(f"Rx Slot {slot}, Rx Lane {lane}: CRITICAL - Too few clean points (<=5). Bad Modules")
|
||
|
|
|
||
|
|
return True
|
||
|
|
|
||
|
|
def general_auto_tune_value(self, exp_id: int, retimers, reg_name: str, tune_value_list: List[int], reg_tool: OptRegAccessTool, is_overwrite=False) -> bool:
|
||
|
|
logging.info("Starting auto_tune2...")
|
||
|
|
lane_data_list = []
|
||
|
|
lane_list = [
|
||
|
|
0, 1, 2, 3, 4, 5, 6, 7]
|
||
|
|
for slot_id in (0, 1, 2, 3, 4, 5, 6, 7):
|
||
|
|
for lane in lane_list:
|
||
|
|
raw_value = reg_tool.read_opt_reg(exp_id, slot_id, lane, reg_name)
|
||
|
|
tmp_err_map = {value: -1 for value in tune_value_list}
|
||
|
|
result = self.topo_map.get_phys_rtmr_by_slot(slot_id, lane, self.route_name)
|
||
|
|
if result:
|
||
|
|
success, rt_phys_id, rt_phys_lane = result
|
||
|
|
if success == False:
|
||
|
|
logging.error(f"get_phys_rtmr_by_slot failed, result: {success}")
|
||
|
|
return False
|
||
|
|
lane_info = LaneErrInfo(host=(self.local_reg_tool.host),
|
||
|
|
exp=exp_id,
|
||
|
|
slot_id=slot_id,
|
||
|
|
logic_lane=lane,
|
||
|
|
rt_phys_id=rt_phys_id,
|
||
|
|
rt_phys_lane=rt_phys_lane,
|
||
|
|
tmp_err_map=tmp_err_map,
|
||
|
|
tmp_raw_value=raw_value)
|
||
|
|
lane_data_list.append(lane_info)
|
||
|
|
|
||
|
|
time.sleep(0.05)
|
||
|
|
is_need_reset_prbs = True
|
||
|
|
for value in tune_value_list:
|
||
|
|
for info in lane_data_list:
|
||
|
|
reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, value, reg_name)
|
||
|
|
time.sleep(0.05)
|
||
|
|
|
||
|
|
time.sleep(1)
|
||
|
|
if is_need_reset_prbs:
|
||
|
|
logging.info(f"-------- is_need_reset_prbs: {is_need_reset_prbs}")
|
||
|
|
self.prbs_tool.ResetRetimerAndEnablePrbs(exp_id, retimers)
|
||
|
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers, True)
|
||
|
|
time.sleep(2)
|
||
|
|
prbs_results = self.prbs_tool.prbs_check(exp_id, retimers)
|
||
|
|
is_need_reset_prbs = False
|
||
|
|
for prbs in prbs_results:
|
||
|
|
if reg_tool == self.local_reg_tool:
|
||
|
|
slot = self.topo_map.get_local_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name)
|
||
|
|
else:
|
||
|
|
slot = self.topo_map.get_remote_slot_by_retimer(prbs.rtmr_id, prbs.rtmr_lane, self.route_name)
|
||
|
|
if not slot:
|
||
|
|
logging.error("local slot is none!")
|
||
|
|
return False
|
||
|
|
slot_id, lane_id = slot
|
||
|
|
for info in lane_data_list:
|
||
|
|
if info.slot_id == slot_id:
|
||
|
|
if info.logic_lane == lane_id:
|
||
|
|
if prbs.locked:
|
||
|
|
info.tmp_err_map[value] = prbs.err_count
|
||
|
|
elif value in info.tmp_err_map:
|
||
|
|
info.tmp_err_map[value] = 69999
|
||
|
|
else:
|
||
|
|
logging.error(f"--------Key {value} not found in tmp_err_map, skipping assignment.")
|
||
|
|
is_need_reset_prbs = True
|
||
|
|
break
|
||
|
|
|
||
|
|
logging.info(lane_data_list)
|
||
|
|
self.print_general_err_matrix(lane_data_list, tune_value_list)
|
||
|
|
time.sleep(0.05)
|
||
|
|
for info in lane_data_list:
|
||
|
|
reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, info.tmp_raw_value, reg_name)
|
||
|
|
time.sleep(0.05)
|
||
|
|
|
||
|
|
recommended_value, bad_lanes = self.simple_value_tuner(lane_data_list)
|
||
|
|
for info, recommended_value in zip(lane_data_list, recommended_value):
|
||
|
|
diff_value = recommended_value - info.tmp_raw_value
|
||
|
|
if diff_value != 0:
|
||
|
|
logging.info(f"Slot {info.slot_id}, Lane {info.logic_lane}: adjusted from {info.tmp_raw_value} → {recommended_value} (offset={diff_value:.1f})")
|
||
|
|
logging.info(f"Applying {reg_name} reg write: exp_id={exp_id}, slot={info.slot_id}, lane={info.logic_lane}, {reg_name}={recommended_value}")
|
||
|
|
if is_overwrite:
|
||
|
|
reg_tool.write_opt_reg(exp_id, info.slot_id, info.logic_lane, recommended_value, reg_name)
|
||
|
|
|
||
|
|
if bad_lanes:
|
||
|
|
for slot, lane in bad_lanes:
|
||
|
|
logging.error(f"Slot {slot}, Lane {lane}: CRITICAL - Too few clean points (<=5). Bad Modules")
|
||
|
|
|
||
|
|
return True
|
||
|
|
|
||
|
|
def simple_value_tuner(self, lane_data_list: List[LaneErrInfo], bad_threshold=1, min_good_count=5):
|
||
|
|
recommendations = []
|
||
|
|
bad_lanes = []
|
||
|
|
for info in lane_data_list:
|
||
|
|
err_map = info.tmp_err_map
|
||
|
|
good_offsets = [value for value, err in err_map.items() if err < bad_threshold]
|
||
|
|
if not good_offsets:
|
||
|
|
recommended_value = info.tmp_raw_value
|
||
|
|
else:
|
||
|
|
recommended_value = int((min(good_offsets) + max(good_offsets)) / 2)
|
||
|
|
if len(good_offsets) <= min_good_count:
|
||
|
|
bad_lanes.append((info.slot_id, info.logic_lane))
|
||
|
|
recommendations.append(recommended_value)
|
||
|
|
|
||
|
|
return (recommendations, bad_lanes)
|
||
|
|
|
||
|
|
def simple_mgc_tuner(self, lane_data_list, bad_threshold=1, min_good_count=6):
|
||
|
|
recommendations = []
|
||
|
|
bad_lanes = []
|
||
|
|
for info in lane_data_list:
|
||
|
|
err_map = info.mgc_err_map
|
||
|
|
good_offsets = [mgc for mgc, err in err_map.items() if err < bad_threshold]
|
||
|
|
if not good_offsets:
|
||
|
|
recommended_mgc = info.mgc
|
||
|
|
else:
|
||
|
|
recommended_mgc = int((min(good_offsets) + max(good_offsets)) / 2)
|
||
|
|
if len(good_offsets) <= 5:
|
||
|
|
bad_lanes.append((info.slot_id, info.logic_lane))
|
||
|
|
recommendations.append(recommended_mgc)
|
||
|
|
|
||
|
|
return (recommendations, bad_lanes)
|
||
|
|
|
||
|
|
def print_general_err_matrix(self, lane_data_list: List[LaneErrInfo], tune_value_list):
|
||
|
|
tune_value_list = sorted(tune_value_list)
|
||
|
|
header_parts = [
|
||
|
|
'Host', 'Exp', 'Route', 'Slot', 'Lane', 'RT_Id',
|
||
|
|
'RT_Lane'] + [str(value) for value in tune_value_list]
|
||
|
|
header_widths = [16, 6, 8, 6, 6, 6, 10] + [6] * len(tune_value_list)
|
||
|
|
header_format = "{:<16} {:<6} {:<8} {:<6} {:<6} {:<6} {:<10} " + " ".join("{:<6}" for _ in tune_value_list)
|
||
|
|
print((header_format.format)(*header_parts))
|
||
|
|
print("-" * (sum(header_widths) + len(header_widths) * 1))
|
||
|
|
for info in lane_data_list:
|
||
|
|
try:
|
||
|
|
err_counts = []
|
||
|
|
for value in tune_value_list:
|
||
|
|
err = info.tmp_err_map.get(value, -2)
|
||
|
|
if err < 0:
|
||
|
|
err_str = "70000"
|
||
|
|
else:
|
||
|
|
err_str = (f"{err}")
|
||
|
|
err_counts.append(err_str)
|
||
|
|
|
||
|
|
data_format = "{:<16} {:<6} {:<8} {:<6} {:<6} {:<6} {:<10} " + " ".join("{:<6}" for _ in tune_value_list)
|
||
|
|
print((data_format.format)(self.host, info.exp, info.route, info.slot_id, info.logic_lane, info.rt_phys_id, info.rt_phys_lane, *err_counts))
|
||
|
|
except Exception as e:
|
||
|
|
logging.error(f"Error printing row for Slot {info.slot_id}, Lane {info.logic_lane}: {e}")
|
||
|
|
continue
|
||
|
|
|
||
|
|
def print_err_matrix(self, lane_data_list, mgc_offsets):
|
||
|
|
mgc_offsets = sorted(mgc_offsets)
|
||
|
|
|
||
|
|
header_parts = ["Host","Slot", "Lane", "RT_Id", "RT_Lane"] + [str(offset) for offset in mgc_offsets]
|
||
|
|
header_widths = [16, 6, 6] + [6] * len(mgc_offsets)
|
||
|
|
|
||
|
|
header_format = "{:<16} {:<6} {:<6} {:<6} {:<10} " + " ".join(f"{{:<6}}" for _ in mgc_offsets)
|
||
|
|
print(header_format.format(*header_parts))
|
||
|
|
|
||
|
|
print("-" * (sum(header_widths) + len(header_widths) * 1))
|
||
|
|
|
||
|
|
for info in lane_data_list:
|
||
|
|
base_mgc = info.mgc
|
||
|
|
try:
|
||
|
|
|
||
|
|
err_counts = []
|
||
|
|
for offset in mgc_offsets:
|
||
|
|
target_mgc = base_mgc + offset
|
||
|
|
err = info.mgc_err_map.get(target_mgc, -1.0) # 安全获取,防止 key 不存在
|
||
|
|
if err < 0:
|
||
|
|
err_str = "N/A"
|
||
|
|
else:
|
||
|
|
err_str = f"{err}"
|
||
|
|
err_counts.append(err_str)
|
||
|
|
|
||
|
|
# 格式化输出
|
||
|
|
data_format = "{:<16} {:<6} {:<6} {:<6} {:<10} " + " ".join("{:<6}" for _ in mgc_offsets)
|
||
|
|
print(data_format.format(self.host, info.slot_id, info.logic_lane, info.rt_phys_id, info.rt_phys_lane, *err_counts))
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logging.error(f"Error printing row for Slot {info.slot_id}, Lane {info.logic_lane}: {e}")
|
||
|
|
continue
|
||
|
|
|
||
|
|
def verify_rt_index_lane_index(self, rt_index: int, lane_index: int) -> Tuple[(bool, int, int)]:
|
||
|
|
if rt_index in (1, 2, 3, 4):
|
||
|
|
if 0 <= lane_index <= 7:
|
||
|
|
new_rt_index = rt_index * 10 + 1
|
||
|
|
return (
|
||
|
|
True, new_rt_index, lane_index)
|
||
|
|
else:
|
||
|
|
if 8 <= lane_index <= 15:
|
||
|
|
new_rt_index = rt_index * 10 + 2
|
||
|
|
new_lane_index = lane_index - 8
|
||
|
|
return (True, new_rt_index, new_lane_index)
|
||
|
|
logging.error("Invalid lane_index: must be in [0, 15]")
|
||
|
|
return (False, 0, 0)
|
||
|
|
else:
|
||
|
|
if rt_index in (11, 12, 21, 22, 31, 32, 41, 42):
|
||
|
|
logging.info(f"use retimer logic index and lane, rt_index: {rt_index}, rt_lane: {lane_index}")
|
||
|
|
if lane_index > 7:
|
||
|
|
logging.error("For logic rt_index, lane_index must be in [0, 7]")
|
||
|
|
return (False, 0, 0)
|
||
|
|
return (True, rt_index, lane_index)
|
||
|
|
else:
|
||
|
|
logging.error(f"Invalid rt_index: {rt_index}")
|
||
|
|
return (False, 0, 0)
|
||
|
|
|
||
|
|
def manual_tune(self, exp_id: int, slot_id: int) -> bool:
|
||
|
|
target_vpeaks = self.load_target_vpeaks_from_json(exp_id, slot_id)
|
||
|
|
if target_vpeaks is None:
|
||
|
|
logging.error("从文件加载target_vpeaks失败")
|
||
|
|
return False
|
||
|
|
else:
|
||
|
|
self.match_optimal_mgc_new2(exp_id, slot_id, target_vpeaks)
|
||
|
|
return True
|
||
|
|
|
||
|
|
def calc_target_vpeak_new(self, exp_id: int, slot_id: int, lane_list: List[int]) -> Optional[Dict[(int, int)]]:
|
||
|
|
target_vpeaks = {}
|
||
|
|
logging.info(f"-------slot {slot_id}")
|
||
|
|
time.sleep(0.05)
|
||
|
|
self.local_reg_tool.enable_tia_agc(exp_id, slot_id)
|
||
|
|
time.sleep(0.05)
|
||
|
|
logging.info("----------step 2: toogle RF off")
|
||
|
|
if not self.toogle_rf(exp_id, slot_id, "off"):
|
||
|
|
logging.error(f"slot {slot_id}: toogle RF off fail")
|
||
|
|
return target_vpeaks
|
||
|
|
else:
|
||
|
|
self.local_reg_tool.write_agc_reg_all_lane(exp_id, slot_id, 0)
|
||
|
|
time.sleep(0.5)
|
||
|
|
logging.info("----------step 3: get base_line_vpeaks, and calculate target vpeaks")
|
||
|
|
base_line_vpeaks = self.local_reg_tool.read_vpeak_reg_all_lane(exp_id, slot_id)
|
||
|
|
logging.info(f"base_line_vpeaks: {base_line_vpeaks}")
|
||
|
|
self.toogle_rf(exp_id, slot_id, "on")
|
||
|
|
self.local_reg_tool.write_agc_reg_all_lane(exp_id, slot_id, 255)
|
||
|
|
time.sleep(0.5)
|
||
|
|
max_vpeaks = self.local_reg_tool.read_vpeak_reg_all_lane(exp_id, slot_id)
|
||
|
|
logging.info(f"max_vpeaks: {max_vpeaks}")
|
||
|
|
for lane, base_vpeak in base_line_vpeaks.items():
|
||
|
|
numerator = 19
|
||
|
|
vpkdelta = round((max_vpeaks[lane] - base_vpeak) * (numerator / 29))
|
||
|
|
target_vpeaks[lane] = vpkdelta + base_vpeak
|
||
|
|
diff = max_vpeaks[lane] - base_line_vpeaks[lane]
|
||
|
|
if diff < 10:
|
||
|
|
logging.error(f"host: {self.host}, exp: {exp_id}, slot: {slot_id}, lane: {lane}, max_vpeak is error")
|
||
|
|
sys.exit(-1)
|
||
|
|
|
||
|
|
# for lane, base_vpeak in base_line_vpeaks.items():
|
||
|
|
# diff = max_vpeaks[lane] - base_line_vpeaks[lane]
|
||
|
|
# if diff <= 22:
|
||
|
|
# logging.error(f"host: {self.host}, exp: {exp_id}, slot: {slot_id}, lane: {lane}, max_vpeak is error")
|
||
|
|
# target_vpeaks[lane] = diff - 2 + base_vpeak
|
||
|
|
# else:
|
||
|
|
# target_vpeaks[lane] = 21 + base_vpeak
|
||
|
|
|
||
|
|
logging.info(f"target_vpeaks: {target_vpeaks}")
|
||
|
|
|
||
|
|
if target_vpeaks:
|
||
|
|
self.save_target_vpeaks_to_json(exp_id, slot_id, target_vpeaks)
|
||
|
|
self.local_reg_tool.disable_tia_agc(exp_id, slot_id)
|
||
|
|
return target_vpeaks
|
||
|
|
|
||
|
|
def calc_target_vpeak(self, exp_id: int, lane_list: List[int], slot_id: int) -> Dict[(int, int)]:
|
||
|
|
target_vpeaks = {}
|
||
|
|
logging.info(f"-------slot {slot_id}")
|
||
|
|
logging.info("----------step 1: disable agc")
|
||
|
|
self.disable_agc(exp_id, slot_id)
|
||
|
|
logging.info("----------step 2: toogle RF off")
|
||
|
|
if not self.toogle_rf(exp_id, slot_id, lane_list, "off"):
|
||
|
|
logging.error(f"slot {slot_id}: toogle RF off fail")
|
||
|
|
return target_vpeaks
|
||
|
|
else:
|
||
|
|
logging.info("----------step 3: get base_line_vpeaks, and calculate target vpeaks")
|
||
|
|
base_line_vpeaks = self._read_vpeak_all_lanes(exp_id, slot_id, lane_list)
|
||
|
|
logging.info(f"base_line_vpeaks: {base_line_vpeaks}")
|
||
|
|
target_vpeaks = self.calculate_target_vpeak(base_line_vpeaks)
|
||
|
|
logging.info(f"target_vpeaks: {target_vpeaks}")
|
||
|
|
if target_vpeaks:
|
||
|
|
self.save_target_vpeaks_to_json(exp_id, slot_id, target_vpeaks)
|
||
|
|
logging.info("----------step 4: enable RF")
|
||
|
|
self.toogle_rf(exp_id, slot_id, lane_list, "on")
|
||
|
|
return target_vpeaks
|
||
|
|
|
||
|
|
def disable_agc(self, exp_id: int, slot_id: int):
|
||
|
|
logging.debug("----------disable_agc, step 1")
|
||
|
|
agc_enable_regs = self.reg_table.get_registers_by_name("tia_agc_en")
|
||
|
|
if not agc_enable_regs:
|
||
|
|
logging.error("not find AGC_ENABLE register")
|
||
|
|
return False
|
||
|
|
else:
|
||
|
|
logging.debug("----------disable_agc, step 2")
|
||
|
|
success = True
|
||
|
|
for reg in agc_enable_regs:
|
||
|
|
logging.debug(f"----------disable_agc, step 2: reg: {reg}")
|
||
|
|
result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id,
|
||
|
|
slot_id=slot_id,
|
||
|
|
bank=(reg.bank),
|
||
|
|
page=(reg.page),
|
||
|
|
reg_offset=(reg.offset),
|
||
|
|
size=(reg.size),
|
||
|
|
hex_str="00")
|
||
|
|
time.sleep(0.05)
|
||
|
|
logging.debug(f"----------disable_agc, step 3: result: {result}")
|
||
|
|
if not result:
|
||
|
|
logging.error(f"设置寄存器失败: bank={reg.bank}, page={reg.page}, offset={reg.offset}")
|
||
|
|
success = False
|
||
|
|
|
||
|
|
return success
|
||
|
|
|
||
|
|
def enable_agc(self, exp_id: int, slot_id: int):
|
||
|
|
time.sleep(0.05)
|
||
|
|
logging.debug("----------enable_agc, step 1")
|
||
|
|
agc_enable_regs = self.reg_table.get_registers_by_name("tia_agc_en")
|
||
|
|
if not agc_enable_regs:
|
||
|
|
logging.error("not find AGC_ENABLE register")
|
||
|
|
return False
|
||
|
|
else:
|
||
|
|
logging.debug("----------enable_agc, step 2")
|
||
|
|
success = True
|
||
|
|
for reg in agc_enable_regs:
|
||
|
|
logging.debug(f"----------enable_agc, step 2: reg: {reg}")
|
||
|
|
result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id,
|
||
|
|
slot_id=slot_id,
|
||
|
|
bank=(reg.bank),
|
||
|
|
page=(reg.page),
|
||
|
|
reg_offset=(reg.offset),
|
||
|
|
size=(reg.size),
|
||
|
|
hex_str="FF")
|
||
|
|
time.sleep(0.05)
|
||
|
|
logging.info(f"----------enable_agc, step 3: result: {result}")
|
||
|
|
if not result:
|
||
|
|
logging.error(f"设置寄存器失败: bank={reg.bank}, page={reg.page}, offset={reg.offset}")
|
||
|
|
success = False
|
||
|
|
|
||
|
|
return success
|
||
|
|
|
||
|
|
def toogle_rf(self, exp_id, slot_id, switch='off'):
|
||
|
|
ret = False
|
||
|
|
logging.info(f"---------- toogle_rf to {switch}")
|
||
|
|
if switch == "off":
|
||
|
|
ret = self.local_reg_tool.write_opt_reg_all_lane(exp_id, slot_id, 0, "tia_stage2")
|
||
|
|
time.sleep(0.05)
|
||
|
|
if ret == False:
|
||
|
|
logging.error(f"---------- toogle_rf to {switch} failed")
|
||
|
|
else:
|
||
|
|
stage_recommended_value = 144
|
||
|
|
ret = self.local_reg_tool.write_opt_reg_all_lane(exp_id, slot_id, stage_recommended_value, "tia_stage2")
|
||
|
|
time.sleep(0.05)
|
||
|
|
if ret == False:
|
||
|
|
logging.error(f"---------- toogle_rf to {switch} failed")
|
||
|
|
return ret
|
||
|
|
|
||
|
|
def calculate_target_vpeak(self, base_line_vpeaks: Dict[(int, int)]) -> Dict[(int, int)]:
|
||
|
|
target_vpeaks = {}
|
||
|
|
for lane, vpeak in base_line_vpeaks.items():
|
||
|
|
logging.info(f"lane={lane}, vpeak={vpeak}")
|
||
|
|
target_vpeaks[lane] = vpeak + 18
|
||
|
|
|
||
|
|
logging.info(f"target_vpeaks={target_vpeaks}")
|
||
|
|
return target_vpeaks
|
||
|
|
|
||
|
|
def match_optimal_mgc_new2(self, exp_id: int, slot_id: int, target_vpeaks: Optional[Dict[(int, int)]]) -> bool:
|
||
|
|
mgc_regs = self.reg_table.get_registers_by_name("mgc")
|
||
|
|
vpeak_regs = self.reg_table.get_registers_by_name("vpeak")
|
||
|
|
if mgc_regs is None or vpeak_regs is None:
|
||
|
|
logging.error("not find MGC regs")
|
||
|
|
return False
|
||
|
|
else:
|
||
|
|
success = True
|
||
|
|
matched_results = {}
|
||
|
|
unmatched_lanes = []
|
||
|
|
target_mgcs = {}
|
||
|
|
for reg in mgc_regs:
|
||
|
|
mgc_value = self.bmc.GetOpticalModuleRegs(exp_id=exp_id,
|
||
|
|
slot_id=slot_id,
|
||
|
|
bank=(reg.bank),
|
||
|
|
page=(reg.page),
|
||
|
|
reg_offset=(reg.offset),
|
||
|
|
size=1)
|
||
|
|
mgc_value = int(mgc_value, 16)
|
||
|
|
for vpeak_reg in vpeak_regs:
|
||
|
|
if vpeak_reg.lane == reg.lane:
|
||
|
|
vpeak_value = self.bmc.GetOpticalModuleRegs(exp_id=exp_id,
|
||
|
|
slot_id=slot_id,
|
||
|
|
bank=(vpeak_reg.bank),
|
||
|
|
page=(vpeak_reg.page),
|
||
|
|
reg_offset=(vpeak_reg.offset),
|
||
|
|
size=2)
|
||
|
|
vpeak_value = int(vpeak_value, 16)
|
||
|
|
logging.info(f"mgc_value: {mgc_value}, target_vpeak: {target_vpeaks[vpeak_reg.lane]}, vpeak_value: {vpeak_value}")
|
||
|
|
target_mgc = (target_vpeaks[vpeak_reg.lane] - vpeak_value) * 1.5 + mgc_value
|
||
|
|
logging.info(f"target_mgc: {target_mgc}")
|
||
|
|
target_mgcs[reg.lane] = target_mgc
|
||
|
|
break
|
||
|
|
|
||
|
|
logging.info(f"----------------Target MGC: {target_mgcs}")
|
||
|
|
for reg in mgc_regs:
|
||
|
|
logging.info(f"mgc regs size: {len(mgc_regs)}")
|
||
|
|
min_val = int(target_mgcs[reg.lane] - 10)
|
||
|
|
if min_val > 255:
|
||
|
|
min_val = 255
|
||
|
|
max_val = int(target_mgcs[reg.lane] + 10)
|
||
|
|
if max_val > 255:
|
||
|
|
logging.info("max_val is greater than 255, setting to 255")
|
||
|
|
max_val = 255
|
||
|
|
logging.info(f"lane: {reg.lane}, min mgc : {min_val}, max_mgc: {max_val}, target vpeak: {target_vpeaks[reg.lane]}")
|
||
|
|
step = reg.step
|
||
|
|
search_values = list(range(max_val, min_val - 1, -step))
|
||
|
|
logging.info(f"Search order for lane {reg.lane}: {search_values}")
|
||
|
|
is_match = False
|
||
|
|
matched_mgc = None
|
||
|
|
matched_vpeak = None
|
||
|
|
for mgc in search_values:
|
||
|
|
result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id,
|
||
|
|
slot_id=slot_id,
|
||
|
|
bank=(reg.bank),
|
||
|
|
page=(reg.page),
|
||
|
|
reg_offset=(reg.offset),
|
||
|
|
size=1,
|
||
|
|
hex_str=f"{mgc:02X}")
|
||
|
|
time.sleep(0.05)
|
||
|
|
result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id,
|
||
|
|
slot_id=slot_id,
|
||
|
|
bank=0,
|
||
|
|
page=208,
|
||
|
|
reg_offset=136,
|
||
|
|
size=1,
|
||
|
|
hex_str="01")
|
||
|
|
time.sleep(0.05)
|
||
|
|
for vpeak_reg in vpeak_regs:
|
||
|
|
if vpeak_reg.lane == reg.lane:
|
||
|
|
value = self.bmc.GetOpticalModuleRegs(exp_id=exp_id,
|
||
|
|
slot_id=slot_id,
|
||
|
|
bank=(vpeak_reg.bank),
|
||
|
|
page=(vpeak_reg.page),
|
||
|
|
reg_offset=(vpeak_reg.offset),
|
||
|
|
size=2)
|
||
|
|
value = int(value, 16)
|
||
|
|
logging.info(f"exp_id:{exp_id}, slot_id:{slot_id}, lane:{reg.lane} -> set mgc {mgc}, Vpeak value: {value}")
|
||
|
|
if value < target_vpeaks[reg.lane] + 1 and value > target_vpeaks[reg.lane] - 2:
|
||
|
|
logging.info(f"----------Vpeak is matched, target vpeak:{target_vpeaks[reg.lane]}, current vpeak: {value}, current mgc:{mgc} ")
|
||
|
|
is_match = True
|
||
|
|
matched_mgc = mgc
|
||
|
|
matched_vpeak = value
|
||
|
|
break
|
||
|
|
|
||
|
|
if is_match:
|
||
|
|
break
|
||
|
|
time.sleep(0.03)
|
||
|
|
|
||
|
|
if is_match and matched_mgc is not None:
|
||
|
|
if matched_vpeak is not None:
|
||
|
|
if str(exp_id) not in matched_results:
|
||
|
|
matched_results[str(exp_id)] = {}
|
||
|
|
else:
|
||
|
|
if str(slot_id) not in matched_results[str(exp_id)]:
|
||
|
|
matched_results[str(exp_id)][str(slot_id)] = {}
|
||
|
|
if self.mode not in matched_results[str(exp_id)][str(slot_id)]:
|
||
|
|
matched_results[str(exp_id)][str(slot_id)][self.mode] = {}
|
||
|
|
matched_results[str(exp_id)][str(slot_id)][self.mode][str(reg.lane)] = {'mgc':matched_mgc,
|
||
|
|
'target_vpeak':target_vpeaks[reg.lane],
|
||
|
|
'actual_vpeak':matched_vpeak}
|
||
|
|
else:
|
||
|
|
unmatched_lanes.append(reg.lane)
|
||
|
|
|
||
|
|
if matched_results:
|
||
|
|
self.save_mgc_results_to_json(exp_id, slot_id, matched_results)
|
||
|
|
if unmatched_lanes:
|
||
|
|
logging.error(f"以下lane未匹配到合适的MGC值: {unmatched_lanes}")
|
||
|
|
return success
|
||
|
|
|
||
|
|
def match_optimal_mgc_new(self, exp_id: int, slot_id: int, lane_list: List[int], target_vpeaks: Dict[(int, int)]) -> bool:
|
||
|
|
matched_results = {}
|
||
|
|
unmatched_lanes = []
|
||
|
|
for index, lane_id in enumerate(lane_list):
|
||
|
|
is_match = False
|
||
|
|
matched_mgc = None
|
||
|
|
matched_vpeak = None
|
||
|
|
reg = self.reg_table.get_register_by_logic_lane("mgc", lane_id)
|
||
|
|
if reg is None or reg.valid_range is None:
|
||
|
|
logging.error(f"match_optimal_mgc_new error. exp:{exp_id}, slot:{slot_id}, lane: {lane_id}, register name: mgc")
|
||
|
|
return False
|
||
|
|
min_val = reg.valid_range[0]
|
||
|
|
max_val = reg.valid_range[1]
|
||
|
|
step = reg.step if reg.step is not None else 1
|
||
|
|
wt_mgc = int((min_val + max_val) / 2)
|
||
|
|
target_vpeak = target_vpeaks[lane_id]
|
||
|
|
target_vpeak_range = [target_vpeak, target_vpeak + 1]
|
||
|
|
count = 0
|
||
|
|
while 1:
|
||
|
|
ret = self.local_reg_tool.write_mgc_reg(exp_id, slot_id, lane_id, wt_mgc)
|
||
|
|
time.sleep(0.05)
|
||
|
|
if ret == False:
|
||
|
|
logging.error(f"match_optimal_mgc_new error. exp:{exp_id}, slot:{slot_id}, lane: {lane_id}, register name: mgc")
|
||
|
|
return False
|
||
|
|
ret = self.local_reg_tool.write_confirm_reg(exp_id, slot_id)
|
||
|
|
if ret == False:
|
||
|
|
logging.error(f"match_optimal_mgc_new error. exp:{exp_id}, slot:{slot_id}, lane: {lane_id}, register name: confirm")
|
||
|
|
return False
|
||
|
|
time.sleep(0.05)
|
||
|
|
current_vpeak = self.local_reg_tool.read_vpeak_reg(exp_id, slot_id, lane_id)
|
||
|
|
logging.info(f"exp_id:{exp_id}, slot_id:{slot_id}, lane:{lane_id} -> set mgc {wt_mgc}, Vpeak value: {current_vpeak}, target: {target_vpeak}")
|
||
|
|
diff_vpeak = abs(target_vpeak - current_vpeak)
|
||
|
|
diff_vpeak_step = {
|
||
|
|
0: 0,
|
||
|
|
1: 0,
|
||
|
|
2: 0,
|
||
|
|
3: 1,
|
||
|
|
4: 2,
|
||
|
|
5: 3,
|
||
|
|
6: 3,
|
||
|
|
7: 4,
|
||
|
|
8: 5}
|
||
|
|
extra_step = diff_vpeak_step.get(diff_vpeak, 5)
|
||
|
|
if current_vpeak > target_vpeak_range[1]:
|
||
|
|
wt_mgc -= step + extra_step
|
||
|
|
if current_vpeak < target_vpeak_range[0]:
|
||
|
|
wt_mgc += step + extra_step
|
||
|
|
if current_vpeak >= target_vpeak_range[0]:
|
||
|
|
if current_vpeak <= target_vpeak_range[1]:
|
||
|
|
is_match = True
|
||
|
|
matched_mgc = wt_mgc
|
||
|
|
matched_vpeak = current_vpeak
|
||
|
|
break
|
||
|
|
if count > 30:
|
||
|
|
logging.error(f"vpeak is not match, target vpeak:{target_vpeak}, current vpeak: {current_vpeak}, current mgc:{wt_mgc} ")
|
||
|
|
sys.exit(-1)
|
||
|
|
break
|
||
|
|
count += 1
|
||
|
|
|
||
|
|
if is_match:
|
||
|
|
logging.info(f"------------------ Vpeak is matched, target vpeak:{target_vpeak}, current vpeak: {current_vpeak}, current mgc:{wt_mgc} ")
|
||
|
|
if str(exp_id) not in matched_results:
|
||
|
|
matched_results[str(exp_id)] = {}
|
||
|
|
if str(slot_id) not in matched_results[str(exp_id)]:
|
||
|
|
matched_results[str(exp_id)][str(slot_id)] = {}
|
||
|
|
if self.mode not in matched_results[str(exp_id)][str(slot_id)]:
|
||
|
|
matched_results[str(exp_id)][str(slot_id)][self.mode] = {}
|
||
|
|
matched_results[str(exp_id)][str(slot_id)][self.mode][str(reg.lane)] = {'mgc':matched_mgc, 'target_vpeak':target_vpeaks[lane_id],
|
||
|
|
'actual_vpeak':matched_vpeak}
|
||
|
|
else:
|
||
|
|
unmatched_lanes.append(lane_id)
|
||
|
|
|
||
|
|
if matched_results:
|
||
|
|
self.save_mgc_results_to_json(exp_id, slot_id, matched_results)
|
||
|
|
if unmatched_lanes:
|
||
|
|
logging.error(f"以下lane未匹配到合适的MGC值: {unmatched_lanes}")
|
||
|
|
return False
|
||
|
|
else:
|
||
|
|
return True
|
||
|
|
|
||
|
|
def match_optimal_mgc_old(self, exp_id: int, slot_id: int, lane_list: List[int], target_vpeaks: Dict[(int, int)]) -> bool:
|
||
|
|
mgc_regs = self.reg_table.get_registers_by_name("mgc")
|
||
|
|
vpeak_regs = self.reg_table.get_registers_by_name("vpeak")
|
||
|
|
if mgc_regs is None or vpeak_regs is None:
|
||
|
|
logging.error("not find MGC regs")
|
||
|
|
return False
|
||
|
|
else:
|
||
|
|
success = True
|
||
|
|
matched_results = {}
|
||
|
|
unmatched_lanes = []
|
||
|
|
for reg in mgc_regs:
|
||
|
|
logging.info(f"mgc regs size: {len(mgc_regs)}")
|
||
|
|
logging.info(f"lane: {reg.lane}, target vpeak:{target_vpeaks[reg.lane]}, mgc valid_range: {reg.valid_range}, step: {reg.step}")
|
||
|
|
min_val = reg.valid_range[0]
|
||
|
|
max_val = reg.valid_range[1]
|
||
|
|
step = reg.step
|
||
|
|
mid_val = (min_val + max_val) // 2
|
||
|
|
mid_val = mid_val // step * step
|
||
|
|
search_values = [
|
||
|
|
mid_val]
|
||
|
|
offset = step
|
||
|
|
while len(search_values) < (max_val - min_val) // step + 1:
|
||
|
|
right_val = mid_val + offset
|
||
|
|
if right_val <= max_val:
|
||
|
|
search_values.append(right_val)
|
||
|
|
left_val = mid_val - offset
|
||
|
|
if left_val >= min_val:
|
||
|
|
search_values.append(left_val)
|
||
|
|
offset += step
|
||
|
|
if len(search_values) >= (max_val - min_val) // step + 1:
|
||
|
|
break
|
||
|
|
|
||
|
|
is_match = False
|
||
|
|
matched_mgc = None
|
||
|
|
matched_vpeak = None
|
||
|
|
for mgc in search_values:
|
||
|
|
result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id,
|
||
|
|
slot_id=slot_id,
|
||
|
|
bank=(reg.bank),
|
||
|
|
page=(reg.page),
|
||
|
|
reg_offset=(reg.offset),
|
||
|
|
size=1,
|
||
|
|
hex_str=f"{mgc:02X}")
|
||
|
|
time.sleep(0.02)
|
||
|
|
result = self.bmc.SetOpticalModuleRegs(exp_id=exp_id,
|
||
|
|
slot_id=slot_id,
|
||
|
|
bank=0,
|
||
|
|
page=208,
|
||
|
|
reg_offset=136,
|
||
|
|
size=1,
|
||
|
|
hex_str="01")
|
||
|
|
time.sleep(0.02)
|
||
|
|
for vpeak_reg in vpeak_regs:
|
||
|
|
if vpeak_reg.lane == reg.lane:
|
||
|
|
value = self.bmc.GetOpticalModuleRegs(exp_id=exp_id,
|
||
|
|
slot_id=slot_id,
|
||
|
|
bank=(vpeak_reg.bank),
|
||
|
|
page=(vpeak_reg.page),
|
||
|
|
reg_offset=(vpeak_reg.offset),
|
||
|
|
size=2)
|
||
|
|
value = int(value, 16)
|
||
|
|
logging.info(f"exp_id:{exp_id}, slot_id:{slot_id}, lane:{reg.lane} -> set mgc {mgc}, Vpeak value: {value}")
|
||
|
|
if value < target_vpeaks[reg.lane] + 2 and value > target_vpeaks[reg.lane] - 2:
|
||
|
|
logging.info(f"----------Vpeak is matched, target vpeak:{target_vpeaks[reg.lane]}, current vpeak: {value}, current mgc:{mgc} ")
|
||
|
|
is_match = True
|
||
|
|
matched_mgc = mgc
|
||
|
|
matched_vpeak = value
|
||
|
|
break
|
||
|
|
|
||
|
|
if is_match:
|
||
|
|
break
|
||
|
|
time.sleep(0.03)
|
||
|
|
|
||
|
|
if is_match:
|
||
|
|
if matched_mgc is not None:
|
||
|
|
if matched_vpeak is not None:
|
||
|
|
if str(exp_id) not in matched_results:
|
||
|
|
matched_results[str(exp_id)] = {}
|
||
|
|
else:
|
||
|
|
if str(slot_id) not in matched_results[str(exp_id)]:
|
||
|
|
matched_results[str(exp_id)][str(slot_id)] = {}
|
||
|
|
if self.mode not in matched_results[str(exp_id)][str(slot_id)]:
|
||
|
|
matched_results[str(exp_id)][str(slot_id)][self.mode] = {}
|
||
|
|
matched_results[str(exp_id)][str(slot_id)][self.mode][str(reg.lane)] = {'mgc':matched_mgc,
|
||
|
|
'target_vpeak':target_vpeaks[reg.lane],
|
||
|
|
'actual_vpeak':matched_vpeak}
|
||
|
|
unmatched_lanes.append(reg.lane)
|
||
|
|
|
||
|
|
if matched_results:
|
||
|
|
self.save_mgc_results_to_json(exp_id, slot_id, matched_results)
|
||
|
|
if unmatched_lanes:
|
||
|
|
logging.error(f"以下lane未匹配到合适的MGC值: {unmatched_lanes}")
|
||
|
|
return success
|
||
|
|
|
||
|
|
def _read_vpeak_all_lanes(self, exp_id: int, slot_id: int, lane_list: List[int]) -> Dict[(int, int)]:
|
||
|
|
vpeak_values = {}
|
||
|
|
for lane in lane_list:
|
||
|
|
vpeak_int = self.local_reg_tool.read_vpeak_reg(exp_id, slot_id, lane)
|
||
|
|
vpeak_values[lane] = vpeak_int
|
||
|
|
|
||
|
|
return vpeak_values
|
||
|
|
|
||
|
|
def _write_mgc_all_lanes(self, exp_id: int, slot_id: int, lane_list: List[int], mgc: int) -> bool:
|
||
|
|
for lane_id in lane_list:
|
||
|
|
ret = self.local_reg_tool.write_mgc_reg(exp_id, slot_id, lane_id, mgc)
|
||
|
|
if ret == False:
|
||
|
|
logging.error(f"write mgc reg failed, exp_id:{exp_id}, slot_id:{slot_id}, lane_id:{lane_id}, mgc:{mgc}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
ret = self.local_reg_tool.write_confirm_reg(exp_id, slot_id)
|
||
|
|
if ret == False:
|
||
|
|
logging.error(f"write confirm reg failed, exp_id:{exp_id}, slot_id:{slot_id}")
|
||
|
|
return False
|
||
|
|
else:
|
||
|
|
return True
|
||
|
|
|
||
|
|
def _write_agc_all_lanes(self, exp_id: int, slot_id: int, lane_list: List[int], agc: int) -> bool:
|
||
|
|
for lane_id in lane_list:
|
||
|
|
ret = self.local_reg_tool.write_agc_reg(exp_id, slot_id, lane_id, agc)
|
||
|
|
time.sleep(0.05)
|
||
|
|
if ret == False:
|
||
|
|
logging.error(f"write agc reg failed, exp_id:{exp_id}, slot_id:{slot_id}, lane_id:{lane_id}, agc:{agc}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
ret = self.local_reg_tool.write_confirm_reg(exp_id, slot_id)
|
||
|
|
if ret == False:
|
||
|
|
logging.error(f"write confirm reg failed, exp_id:{exp_id}, slot_id:{slot_id}")
|
||
|
|
return False
|
||
|
|
else:
|
||
|
|
return True
|
||
|
|
|
||
|
|
def _generate_host_filename(self) -> str:
|
||
|
|
safe_host = self.host.replace(":", "_").replace("/", "_").replace("\\", "_")
|
||
|
|
return f"target_vpeaks_{safe_host}.json"
|
||
|
|
|
||
|
|
def _generate_mgc_host_filename(self) -> str:
|
||
|
|
safe_host = self.host.replace(":", "_").replace("/", "_").replace("\\", "_")
|
||
|
|
return f"mgc_{safe_host}.json"
|
||
|
|
|
||
|
|
def save_target_vpeaks_to_json(self, exp_id: int, slot_id: int, target_vpeaks: Dict[(int, int)]) -> bool:
|
||
|
|
try:
|
||
|
|
filename = self._generate_host_filename()
|
||
|
|
os.makedirs((os.path.dirname(filename) if os.path.dirname(filename) else "."), exist_ok=True)
|
||
|
|
if os.path.exists(filename):
|
||
|
|
with open(filename, "r") as f:
|
||
|
|
data = json.load(f)
|
||
|
|
else:
|
||
|
|
data = {}
|
||
|
|
if str(exp_id) not in data:
|
||
|
|
data[str(exp_id)] = {}
|
||
|
|
if str(slot_id) not in data[str(exp_id)]:
|
||
|
|
data[str(exp_id)][str(slot_id)] = {}
|
||
|
|
data[str(exp_id)][str(slot_id)][self.mode] = {str(k): v for k, v in target_vpeaks.items()}
|
||
|
|
with open(filename, "w") as f:
|
||
|
|
json.dump(data, f, indent=4)
|
||
|
|
logging.info(f"target_vpeaks saved to {filename}, path: {exp_id}/{slot_id}/{self.mode}")
|
||
|
|
return True
|
||
|
|
except Exception as e:
|
||
|
|
logging.error(f"Failed to save target_vpeaks to JSON file: {e}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
def load_target_vpeaks_from_json(self, exp_id: int, slot_id: int) -> Optional[Dict[(int, int)]]:
|
||
|
|
try:
|
||
|
|
filename = self._generate_host_filename()
|
||
|
|
if not os.path.exists(filename):
|
||
|
|
logging.error(f"File {filename} does not exist")
|
||
|
|
return
|
||
|
|
with open(filename, "r") as f:
|
||
|
|
data = json.load(f)
|
||
|
|
if str(exp_id) not in data or str(slot_id) not in data[str(exp_id)] or self.mode not in data[str(exp_id)][str(slot_id)]:
|
||
|
|
logging.error(f"Path {exp_id}/{slot_id}/{self.mode} not found in {filename}")
|
||
|
|
return
|
||
|
|
target_vpeaks_data = data[str(exp_id)][str(slot_id)][self.mode]
|
||
|
|
target_vpeaks = {int(k): v for k, v in target_vpeaks_data.items()}
|
||
|
|
logging.info(f"target_vpeaks loaded successfully from {filename}, path: {exp_id}/{slot_id}/{self.mode}")
|
||
|
|
return target_vpeaks
|
||
|
|
except Exception as e:
|
||
|
|
logging.error(f"Failed to load target_vpeaks from JSON file: {e}")
|
||
|
|
return
|
||
|
|
|
||
|
|
def save_mgc_results_to_json(self, exp_id: int, slot_id: int, matched_results: Dict[(str, Any)]) -> bool:
|
||
|
|
try:
|
||
|
|
filename = self._generate_mgc_host_filename()
|
||
|
|
os.makedirs((os.path.dirname(filename) if os.path.dirname(filename) else "."), exist_ok=True)
|
||
|
|
if os.path.exists(filename):
|
||
|
|
with open(filename, "r") as f:
|
||
|
|
data = json.load(f)
|
||
|
|
else:
|
||
|
|
data = {}
|
||
|
|
for exp_key, exp_data in matched_results.items():
|
||
|
|
if exp_key not in data:
|
||
|
|
data[exp_key] = {}
|
||
|
|
for slot_key, slot_data in exp_data.items():
|
||
|
|
if slot_key not in data[exp_key]:
|
||
|
|
data[exp_key][slot_key] = {}
|
||
|
|
for mode_key, mode_data in slot_data.items():
|
||
|
|
if mode_key not in data[exp_key][slot_key]:
|
||
|
|
data[exp_key][slot_key][mode_key] = {}
|
||
|
|
data[exp_key][slot_key][mode_key].update(mode_data)
|
||
|
|
|
||
|
|
with open(filename, "w") as f:
|
||
|
|
json.dump(data, f, indent=4)
|
||
|
|
logging.info(f"MGC matching results saved to {filename}")
|
||
|
|
return True
|
||
|
|
except Exception as e:
|
||
|
|
logging.error(f"Failed to save MGC results to JSON file: {e}")
|
||
|
|
return False
|