Stress Testing

  • Script used for creating interface multiple flap events and OIR
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

BANNER = r"""
Debug CLI & Interface Control (Python)

-------------------------------------
*** Must be run using root login ****
-------------------------------------
- Logs outputs into debug_cli.log (override with --logfile)
- Shows live [INFO]/[WARN]/[RESULT] messages on console
- Disable/enable/status/flap interface ops via arguments
- --brcm_cli [disable|enable IFACES]
    * With args: run one-shot Broadcom action on IFACES (iface names; auto-map to ports)
    * Without args: act as a flag to make --flap use Broadcom (jbcmcmd.py) instead of Junos
- Honors argument order across all ops (executes in the sequence you pass)
- --flap <ifaces> [--flap-wait <sec>] [--flap-count <n>] [--post-up-wait <sec>]
Tunables (verify): 
 -   --down-timeout / --up-timeout: max seconds to wait for the link to reach down / up. Defaults: 12s down, 24s up.
 -   --down-interval / --up-interval: how often to poll show interfaces terse during that wait. Defaults: 1.5s down, 2.0s up.
 -   --attempts: optional override for number of polls. If set, it ignores the timeout/interval-derived count and uses exactly this many attempts.
- Bare port numbers auto-map to et-0/0/<port> for flaps/status/enable/disable (override with --iface-prefix, e.g., xe-0/0/)
Junos commands used:
- Disable:  configure; set interfaces <iface> disable; commit and-quit
- Enable:   configure; delete interfaces <iface> disable; commit and-quit
- Status 1: show interfaces terse | match ^<iface>
- Status 2: show interfaces terse | no-more

Broadcom commands used:
- Toggle:   <brcm_cli> "port <port> enable=false|true" (via jbcmcmd.py/brcm_cli/brcmcli/brckcli)
- Status:   qfx.brcm.pfe.port.link_status <iface>

Optics OIR test:
- Actions:  request pfe execute command "test picd optics fpc_slot <fpc> pic_slot <pic> port <port> cmd [oir_enable|remove|insert|oir_disable]" target fpc<fpc>
- Inventory: show chassis hardware detail | display xml | no-more (fallback: | no-more | match "Xcvr ")
- Defaults: --fpc-slot 0 --pic-slot 0 --oir-poll-delay 1.0 --oir-retries 20
- Cycles:   --flap-count controls number of OIR cycles per port (default: 1)
- No ports?  --oir-test can reuse the --flap list (if given) when provided without args.
- Optional link check: add --check-status to verify link DOWN after remove and UP after insert (uses --down-timeout/--up-timeout)
- Inter-cycle wait: --flap-wait applies between OIR cycles
- Modes: --remove-only (skip insert), --insert-only (skip remove)
- Round-robin: --round-robin alternates ports per cycle (e.g., cycle 1: port 36, then 60; cycle 2: port 36, then 60; etc.)

Examples:
- Disable: ./evo_link_flap.py --disable xe-0/0/1
- Enable:  ./evo_link_flap.py --enable xe-0/0/1
- Status:  ./evo_link_flap.py --status xe-0/0/1
- Flap (Junos): ./evo_link_flap.py --flap 21,22 --flap-count 2 --flap-wait 5   # maps to et-0/0/21, et-0/0/22
- Flap (BRCM):  ./evo_link_flap.py --flap 21,22 --brcm_cli --flap-count 2 --flap-wait 5
- Flap+OIR:     ./evo_link_flap.py --flap 21,22 --oir-test --flap-count 2 --flap-wait 5 --check-status
- OIR only:     ./evo_link_flap.py --oir-test 21,22 --flap-count 3 --flap-wait 5 --check-status
- OIR round-robin: ./evo_link_flap.py --oir-test 36,60 --flap-count 3 --flap-wait 5 --check-status --round-robin
- OIR remove:   ./evo_link_flap.py --oir-test 21,22 --remove-only --check-status
- OIR insert:   ./evo_link_flap.py --oir-test 21,22 --insert-only --check-status
"""

print(BANNER)

import argparse
import os
import subprocess
import sys
import time
from typing import List, Optional, Tuple
from xml.etree import ElementTree as ET

LOGFILE = "/var/tmp/debug_cli.log"
OIR_FLAG_SENTINEL = "__OIR_FLAG__"
IFACE_PREFIX = "et-0/0/"
IFACE_FAMILY = "et"
PREFERRED_FAMILIES = ["et", "xe", "ge"]
IFACE_PREFIX = "et-0/0/"
IFACE_FAMILY = "et"

# -------------------
# Logging helpers
# -------------------
def _ensure_log_dir(path: str):
    d = os.path.dirname(path) or "."
    try:
        os.makedirs(d, exist_ok=True)
    except Exception:
        pass

def set_logfile(path: str):
    global LOGFILE
    LOGFILE = os.path.abspath(path)
    _ensure_log_dir(LOGFILE)

def _write(line: str):
    with open(LOGFILE, "a", encoding="utf-8", errors="replace") as f:
        f.write(line + "\n")

def user_msg(msg: str):
    line = f"[INFO] {msg}"
    print(line)
    _write(line)

def warn_msg(msg: str):
    line = f"[WARN] {msg}"
    print(line)
    _write(line)

def result_msg(msg: str):
    line = f"[RESULT] {msg}"
    print(line)
    _write(line)

def run_command(cmd, shell=True) -> bool:
    with open(LOGFILE, "a", encoding="utf-8", errors="replace") as f:
        try:
            subprocess.run(cmd, shell=shell, stdout=f, stderr=f, check=True)
            return True
        except subprocess.CalledProcessError:
            return False

def junos_cli(cmd: str) -> bool:
    return run_command(f'cli -c "{cmd}"', shell=True)

def junos_cli_capture(cmd: str) -> str:
    try:
        out = subprocess.check_output(["cli", "-c", cmd], text=True)
        _write(f"\n$ cli -c \"{cmd}\"\n{out}\n")
        return out
    except subprocess.CalledProcessError as e:
        warn_msg(f"CLI failed: {cmd} rc={e.returncode}")
        return ""

def junos_cli_capture_quiet(cmd: str) -> str:
    try:
        out = subprocess.check_output(["cli", "-c", cmd], text=True)
        return out
    except subprocess.CalledProcessError as e:
        warn_msg(f"CLI failed: {cmd} rc={e.returncode}")
        return ""

# -------------------
# General helpers
# -------------------
def normalize_iflist(arg: Optional[str]) -> List[str]:
    if not arg:
        return []
    seen = set()
    out: List[str] = []
    for x in (p.strip() for p in arg.split(",") if p.strip()):
        if x not in seen:
            out.append(x)
            seen.add(x)
    return out

def set_iface_prefix(prefix: str):
    """
    Configure how bare port numbers map to interface names.
    Examples: 'et-0/0/' (default), 'xe-0/0/'.
    """
    global IFACE_PREFIX, IFACE_FAMILY
    p = (prefix or "").strip()
    if not p:
        p = "et-0/0/"
    if not p.endswith("/"):
        p = p + "/"
    IFACE_PREFIX = p
    IFACE_FAMILY = p.split("-", 1)[0] if "-" in p else "et"

def normalize_iface_token(token: str) -> str:
    """
    Map bare port numbers (e.g. '23' or '0/0/23') to IFACE_PREFIX + <port> (family derived from prefix).
    Leave anything with letters or a hyphen untouched (assume full name).
    """
    t = token.strip()
    if not t:
        return ""
    if any(c.isalpha() for c in t) or "-" in t:
        return t
    parts = t.split("/")
    if len(parts) == 3 and all(p.isdigit() for p in parts):
        return f"{IFACE_FAMILY}-{parts[0]}/{parts[1]}/{parts[2]}"
    if t.isdigit():
        return f"{IFACE_PREFIX}{t}"
    return t

def normalize_iface_list(arg: Optional[str]) -> List[str]:
    raw = normalize_iflist(arg)
    out: List[str] = []
    seen = set()
    for tok in raw:
        mapped = normalize_iface_token(tok)
        if mapped and mapped not in seen:
            out.append(mapped)
            seen.add(mapped)
    return out

def oir_ports_from_tokens(tokens: List[str]) -> List[str]:
    """
    Extract numeric port identifiers from a list of tokens.
    Accepts bare numbers or interface strings ending with a number.
    """
    out: List[str] = []
    seen = set()
    for tok in tokens:
        t = tok.strip()
        if not t:
            continue
        candidate = None
        if t.isdigit():
            candidate = t
        elif "/" in t:
            tail = t.rsplit("/", 1)[-1]
            if tail.isdigit():
                candidate = tail
        if candidate and candidate not in seen:
            out.append(candidate)
            seen.add(candidate)
    return out

def parse_terse_line(line: str) -> Tuple[str, str, str]:
    # Expect: Interface | Admin | Link | ...
    parts = line.split()
    if len(parts) >= 3:
        return parts[0], parts[1].lower(), parts[2].lower()
    if len(parts) >= 1:
        return parts[0], "unknown", "unknown"
    return "", "unknown", "unknown"

def first_physical_lines_from_terse(out: str, ifaces: List[str]) -> List[str]:
    lines = []
    want = set(ifaces)
    # collect physical matches first; if none, allow first logical match as fallback
    logical_fallback = []
    for l in out.splitlines():
        l = l.strip()
        if not l:
            continue
        name = l.split()[0] if l.split() else ""
        if not name:
            continue
        base = name.split(".", 1)[0].split(":", 1)[0]
        if base not in want:
            continue
        if "." in name or ":" in name:
            logical_fallback.append(l)
        else:
            lines.append(l)
    if not lines and logical_fallback:
        return logical_fallback
    return lines

def show_iface_terse(iface: str) -> str:
    return junos_cli_capture(f"show interfaces terse | match ^{iface}")

def verify_oper_state(
    iface: str,
    expect: str,
    timeout_s: int,
    interval_s: float,
    attempts: Optional[int] = None
) -> bool:
    """
    Poll 'show interfaces terse' until Link matches expect ('up'/'down').
    If attempts is provided, it overrides timeout/interval derived attempts.
    """
    expect = expect.lower()
    if attempts is None:
        attempts = max(1, int(round(timeout_s / max(0.1, interval_s))))
    for attempt in range(1, attempts + 1):
        out = show_iface_terse(iface)
        # Try to find the specific line; if not present, state=unknown
        state = "unknown"
        for l in out.splitlines():
            if not l.startswith(iface):
                continue
            name, admin, oper = parse_terse_line(l)
            base = name.split(".", 1)[0].split(":", 1)[0]
            if base != iface:
                continue
            state = oper
            if "." not in name and ":" not in name:  # physical preferred
                break
        user_msg(f"[{iface}] observed link='{state}' (attempt {attempt}/{attempts})")
        if state == expect:
            return True
        time.sleep(interval_s)
    return False

def status_interfaces(ifaces: List[str]) -> None:
    for i in ifaces:
        user_msg(f"--- Status: {i} ---")
        out = junos_cli_capture("show interfaces terse | no-more")
        lines = first_physical_lines_from_terse(out, [i])
        # print a concise summary first
        if not lines:
            warn_msg(f"{i}: no terse output")
        else:
            for l in lines:
                name, admin, oper = parse_terse_line(l)
                if admin == "up" and oper == "up":
                    user_msg(f"[UP ] {name}: admin={admin}, oper={oper}")
                else:
                    user_msg(f"[DOWN] {name}: admin={admin}, oper={oper}")
        # also echo the raw matched lines for visibility
        if lines:
            for l in lines:
                print(l)
                _write(l)
        result_msg(f"Status checked for: {', '.join(ifaces)}")

# -------------------
# Junos interface ops
# -------------------
def disable_interfaces_junos(ifaces: List[str]) -> bool:
    if not ifaces:
        warn_msg("No interfaces provided to disable.")
        return False
    user_msg(f"Disabling interfaces (Junos, single commit): {', '.join(ifaces)}")
    cfg = ["configure"] + [f"set interfaces {i} disable" for i in ifaces] + ["commit and-quit"]
    return junos_cli("; ".join(cfg))

def enable_interfaces_junos(ifaces: List[str]) -> bool:
    if not ifaces:
        warn_msg("No interfaces provided to enable.")
        return False
    user_msg(f"Enabling interfaces (Junos, single commit): {', '.join(ifaces)}")
    cfg = ["configure"] + [f"delete interfaces {i} disable" for i in ifaces] + ["commit and-quit"]
    return junos_cli("; ".join(cfg))

# -------------------
# Broadcom helpers
# -------------------
_BRCM_BIN_CANDIDATES = (
    "/usr/sbin/jbcmcmd.py",
    "jbcmcmd.py",
    "brckcli",
    "brcm_cli",
    "brcmcli",
)

def _find_brcm_cli() -> Optional[str]:
    for b in _BRCM_BIN_CANDIDATES:
        try:
            rc = subprocess.run(["bash", "-lc", f"command -v {b}"], capture_output=True, text=True)
            if rc.returncode == 0:
                path = rc.stdout.strip()
                if path:
                    return path
        except Exception:
            pass
    return None

def brcm_run(cmd: str) -> bool:
    """
    Run a Broadcom command via the located CLI. The command string is passed as one argument.
    Example: brcm_run('port 40 enable=false')
    """
    bin_path = _find_brcm_cli()
    if not bin_path:
        warn_msg("Broadcom CLI tool not found (tried: brckcli, brcm_cli, brcmcli, jbcmcmd.py).")
        return False
    with open(LOGFILE, "a", encoding="utf-8", errors="replace") as f:
        try:
            subprocess.run([bin_path, cmd], stdout=f, stderr=f, check=True)
            return True
        except subprocess.CalledProcessError:
            return False

def brcm_link_status_for_iface(iface: str):
    """
    Emit Broadcom link status details for a Junos iface using qfx.brcm.pfe.port.link_status.
    This prints directly to log (and console header) for visibility.
    """
    user_msg(f"[BRCM-STATUS] {iface} (via qfx.brcm.pfe.port.link_status)")
    with open(LOGFILE, "a", encoding="utf-8", errors="replace") as f:
        try:
            out = subprocess.check_output(["qfx.brcm.pfe.port.link_status", iface], text=True)
            # echo to console minimally; full details go to logfile
            print(out)
            f.write(out)
            if not out.endswith("\n"):
                f.write("\n")
        except subprocess.CalledProcessError:
            warn_msg(f"[BRCM] link_status failed for {iface}")

def map_iface_to_brcm_port(iface: str) -> Optional[int]:
    """
    Map a Junos interface to Broadcom port using qfx.brcm.pfe.port.link_status output.
    Looks for 'stream' value in the channel summary (commonly the BCM port).
    """
    try:
        out = subprocess.check_output(["qfx.brcm.pfe.port.link_status", iface], text=True)
    except subprocess.CalledProcessError:
        warn_msg(f"[BRCM-MAP] link_status failed for {iface}")
        return None

    # Heuristic: find line starting with 'channel-...' and read last column 'stream'
    stream = None
    for line in out.splitlines():
        line = line.strip()
        if not line:
            continue
        # table line: ... inst   stream
        if line.startswith("channel-") and "stream" not in line:
            cols = [c for c in line.split() if c]
            if len(cols) >= 1:
                try:
                    # usually the last column is stream (port number)
                    stream = int(cols[-1])
                    break
                except Exception:
                    pass
    if stream is not None:
        user_msg(f"[BRCM-MAP] {iface} -> port {stream}")
    else:
        warn_msg(f"[BRCM-MAP] Could not map {iface} to Broadcom port (no 'stream' found)")
    return stream

def disable_interfaces_brcm(ifaces: List[str]) -> bool:
    ports = []
    for i in ifaces:
        p = map_iface_to_brcm_port(i)
        if p is not None:
            ports.append(p)
    if not ports:
        return False
    user_msg(f"[BRCM] Setting enable=false for ports: {', '.join(str(p) for p in ports)}")
    ok_all = True
    for p in ports:
        if not brcm_run(f"port {p} enable=false"):
            ok_all = False
    return ok_all

def enable_interfaces_brcm(ifaces: List[str]) -> bool:
    ports = []
    for i in ifaces:
        p = map_iface_to_brcm_port(i)
        if p is not None:
            ports.append(p)
    if not ports:
        return False
    user_msg(f"[BRCM] Setting enable=true for ports: {', '.join(str(p) for p in ports)}")
    ok_all = True
    for p in ports:
        if not brcm_run(f"port {p} enable=true"):
            ok_all = False
    return ok_all

# -------------------
# Optics OIR test
# -------------------
def _xml_name_exists(xml_text: str, name_text: str) -> bool:
    if not xml_text or not name_text:
        return False
    try:
        root = ET.fromstring(xml_text)
    except Exception:
        return False
    target = name_text.strip()
    if not target:
        return False
    for n in root.iter():
        if n.tag.endswith("name") and (n.text or "").strip() == target:
            return True
    return False

def _xcvr_present_in_inventory(xcvr_name: str) -> bool:
    xml_out = junos_cli_capture_quiet("show chassis hardware detail | display xml | no-more")
    if _xml_name_exists(xml_out, xcvr_name):
        return True
    text_out = junos_cli_capture_quiet('show chassis hardware detail | no-more | match "Xcvr "')
    if xcvr_name and text_out and xcvr_name.casefold() in text_out.casefold():
        return True
    return False

def _pfe_optics_action(fpc_slot: int, pic_slot: int, port: str, action: str):
    payload = f'test picd optics fpc_slot {int(fpc_slot)} pic_slot {int(pic_slot)} port {str(port).strip()} cmd {action}'
    cli_cmd = f'request pfe execute command "{payload}" target fpc{int(fpc_slot)}'
    user_msg(f"[OIR] action={action} target={int(fpc_slot)}/{int(pic_slot)}/{str(port).strip()}")
    return junos_cli_capture(cli_cmd)

def _poll_for_xcvr_after_insert(xcvr_name: str, retries: int, poll_delay_sec: float) -> bool:
    for i in range(1, retries + 1):
        if _xcvr_present_in_inventory(xcvr_name):
            user_msg(f"[OIR] '{xcvr_name}' visible after insert (try {i}/{retries})")
            return True
        if i < retries:
            time.sleep(poll_delay_sec)
    warn_msg(f"[OIR] '{xcvr_name}' NOT visible after insert in {retries} tries")
    return False

def optics_oir_test(
    fpc_slot: Optional[int],
    pic_slot: Optional[int],
    ports: List[str],
    poll_delay_sec: float = 1.0,
    retries: int = 20,
    cycles: int = 1,
    check_status: bool = False,
    status_timeout: int = 24,
    status_interval: float = 2.0,
    down_timeout: int = 12,
    down_interval: float = 1.5,
    inter_cycle_wait: int = 0,
    remove_only: bool = False,
    insert_only: bool = False,
    verify_attempts: Optional[int] = None,
    round_robin: bool = False,
) -> bool:
    if remove_only and insert_only:
        warn_msg("Cannot use both --remove-only and --insert-only for OIR.")
        return False

    remove_step = True
    insert_step = True
    if remove_only:
        insert_step = False
    if insert_only:
        remove_step = False
    fpc_slot = 0 if fpc_slot is None else fpc_slot
    pic_slot = 0 if pic_slot is None else pic_slot
    if not ports:
        warn_msg("No ports provided for optics OIR test.")
        return False
    cycles = max(1, int(cycles or 1))

    norm_ports = [str(p).strip() for p in ports if str(p).strip()]
    mode_desc = "round-robin" if round_robin else "sequential"
    user_msg(f"[OIR] fpc={fpc_slot}, pic={pic_slot}, ports={', '.join(norm_ports)} (poll_delay={poll_delay_sec}s, retries={retries}, cycles={cycles}, mode={mode_desc})")
    all_ok = True

    # Pre-check: ensure all Xcvrs are visible before starting (skip this if insert-only)
    port_info = []
    for port in norm_ports:
        try:
            port_num = int(str(port).strip())
        except ValueError:
            warn_msg(f"[OIR] Invalid port value '{port}' (must be numeric).")
            all_ok = False
            continue

        port_disp = f"{int(fpc_slot)}/{int(pic_slot)}/{port_num}"
        xcvr_name = f"Xcvr {port_num}"
        link_iface = normalize_iface_token(str(port_num))
        
        if remove_step and not _xcvr_present_in_inventory(xcvr_name):
            warn_msg(f"[OIR] {port_disp}: transceiver '{xcvr_name}' not visible before test, skipping.")
            result_msg(f"[OIR] {port_disp}: SKIP (not present pre-test)")
            all_ok = False
            continue

        port_info.append({
            'port': port,
            'port_num': port_num,
            'port_disp': port_disp,
            'xcvr_name': xcvr_name,
            'link_iface': link_iface,
            'ok': True
        })

    if not port_info:
        warn_msg("[OIR] No valid ports to test after pre-checks.")
        return False

    if round_robin:
        # Round-robin mode: iterate cycles first, then ports within each cycle
        mode_desc = "remove->insert" if (remove_step and insert_step) else ("remove-only" if remove_step else "insert-only")
        user_msg(f"[OIR] Round-robin mode: {mode_desc} (cycles={cycles}, ports={len(port_info)})")
        
        for cycle in range(1, cycles + 1):
            user_msg(f"[OIR] === Cycle {cycle}/{cycles} ===")
            cycle_ok = True
            
            for port_data in port_info:
                port_num = port_data['port_num']
                port_disp = port_data['port_disp']
                xcvr_name = port_data['xcvr_name']
                link_iface = port_data['link_iface']
                
                try:
                    user_msg(f"[OIR] {port_disp} (cycle {cycle}/{cycles})")
                    _pfe_optics_action(fpc_slot, pic_slot, port_num, "oir_enable")
                    time.sleep(0.5)
                    
                    if remove_step:
                        _pfe_optics_action(fpc_slot, pic_slot, port_num, "remove")
                        time.sleep(0.8)
                        if check_status:
                            user_msg(f"[OIR] {port_disp}: verifying link DOWN on {link_iface} (timeout {down_timeout}s)")
                        if not verify_oper_state(link_iface, "down", down_timeout, down_interval, attempts=verify_attempts):
                            warn_msg(f"[OIR] {port_disp}: link did not go DOWN within timeout")
                            port_data['ok'] = False
                            cycle_ok = False
                    
                    if insert_step:
                        _pfe_optics_action(fpc_slot, pic_slot, port_num, "insert")
                        time.sleep(0.5)
                    
                    _pfe_optics_action(fpc_slot, pic_slot, port_num, "oir_disable")
                    time.sleep(0.5)
                except Exception as e:
                    warn_msg(f"[OIR] Error during OIR sequence for {port_disp} (cycle {cycle}): {e}")
                    port_data['ok'] = False
                    cycle_ok = False
                    continue

                if insert_step:
                    ok = _poll_for_xcvr_after_insert(xcvr_name, retries, poll_delay_sec)
                    if not ok:
                        port_data['ok'] = False
                        cycle_ok = False
                    # Optional link status check after inventory appears
                    if ok and check_status:
                        user_msg(f"[OIR] {port_disp}: verifying link UP on {link_iface} (timeout {status_timeout}s)")
                        if not verify_oper_state(link_iface, "up", status_timeout, status_interval, attempts=verify_attempts):
                            warn_msg(f"[OIR] {port_disp}: link did not come UP within timeout")
                            port_data['ok'] = False
                            cycle_ok = False

                # Wait between ports in the same cycle (if not the last port)
                if port_data != port_info[-1] and inter_cycle_wait > 0:
                    user_msg(f"[OIR] Waiting {inter_cycle_wait}s before next port in cycle {cycle}")
                    time.sleep(inter_cycle_wait)

            # Wait between cycles (if not the last cycle)
            if cycle != cycles and inter_cycle_wait > 0:
                user_msg(f"[OIR] Waiting {inter_cycle_wait}s before next cycle")
                time.sleep(inter_cycle_wait)

        # Final summary per port
        for port_data in port_info:
            port_disp = port_data['port_disp']
            if port_data['ok']:
                result_msg(f"[OIR] {port_disp}: SUCCESS (round-robin, {cycles} cycle{'s' if cycles != 1 else ''})")
            else:
                result_msg(f"[OIR] {port_disp}: FAIL (round-robin, {cycles} cycle{'s' if cycles != 1 else ''})")
                all_ok = False

    else:
        # Sequential mode: original behavior - all cycles for port 1, then all cycles for port 2, etc.
        for port_data in port_info:
            port_num = port_data['port_num']
            port_disp = port_data['port_disp']
            xcvr_name = port_data['xcvr_name']
            link_iface = port_data['link_iface']
            mode_desc = "remove->insert" if (remove_step and insert_step) else ("remove-only" if remove_step else "insert-only")
            user_msg(f"[OIR] {port_disp}: {mode_desc} (cycles={cycles})")

            port_ok = True
            for cycle in range(1, cycles + 1):
                try:
                    user_msg(f"[OIR] {port_disp} cycle {cycle}/{cycles}")
                    _pfe_optics_action(fpc_slot, pic_slot, port_num, "oir_enable")
                    time.sleep(0.5)
                    if remove_step:
                        _pfe_optics_action(fpc_slot, pic_slot, port_num, "remove")
                        time.sleep(0.8)
                        if check_status:
                            user_msg(f"[OIR] {port_disp}: verifying link DOWN on {link_iface} (timeout {down_timeout}s)")
                        if not verify_oper_state(link_iface, "down", down_timeout, down_interval, attempts=verify_attempts):
                            warn_msg(f"[OIR] {port_disp}: link did not go DOWN within timeout")
                            port_ok = False
                    if insert_step:
                        _pfe_optics_action(fpc_slot, pic_slot, port_num, "insert")
                        time.sleep(0.5)
                    _pfe_optics_action(fpc_slot, pic_slot, port_num, "oir_disable")
                    time.sleep(0.5)
                except Exception as e:
                    warn_msg(f"[OIR] Error during OIR sequence for {port_disp} (cycle {cycle}): {e}")
                    port_ok = False
                    continue

                if insert_step:
                    ok = _poll_for_xcvr_after_insert(xcvr_name, retries, poll_delay_sec)
                    if not ok:
                        port_ok = False
                    # Optional link status check after inventory appears
                    if ok and check_status:
                        user_msg(f"[OIR] {port_disp}: verifying link UP on {link_iface} (timeout {status_timeout}s)")
                        if not verify_oper_state(link_iface, "up", status_timeout, status_interval, attempts=verify_attempts):
                            warn_msg(f"[OIR] {port_disp}: link did not come UP within timeout")
                            port_ok = False

            if port_ok:
                result_msg(f"[OIR] {port_disp}: SUCCESS (transceiver visible across {cycles} cycle{'s' if cycles != 1 else ''})")
            else:
                result_msg(f"[OIR] {port_disp}: FAIL (transceiver not visible in one or more cycles)")
                all_ok = False
            if inter_cycle_wait > 0 and cycle != cycles:
                user_msg(f"[OIR] Waiting {inter_cycle_wait}s before next cycle for {port_disp}")
                time.sleep(inter_cycle_wait)

    result_msg(f"[OIR] Summary: {'OK' if all_ok else 'PARTIAL/FAIL'} for ports {', '.join(norm_ports)}")
    return all_ok

# -------------------
# Flap logic (Junos)
# -------------------
def flap_interfaces_junos(
    ifaces: List[str],
    wait_seconds: int,
    count: int,
    down_timeout: int,
    up_timeout: int,
    down_interval: float,
    up_interval: float,
    attempts: Optional[int],
    show_status_after: bool = True,
):
    if not ifaces:
        warn_msg("No interfaces provided for flap.")
        return

    user_msg(f"[JUNOS-FLAP] {', '.join(ifaces)} x{count} (wait {wait_seconds}s)")
    for cycle in range(1, count + 1):
        user_msg(f"--- Flap cycle {cycle}/{count} ---")
        ok_disable = disable_interfaces_junos(ifaces)
        if ok_disable:
            all_down = True
            for n in ifaces:
                ok = verify_oper_state(n, "down", down_timeout, down_interval, attempts=attempts)
                if not ok:
                    warn_msg(f"{n}: did not go DOWN within timeout")
                    all_down = False
            result_msg(f"Cycle {cycle}: disable {'OK' if all_down else 'PARTIAL/FAIL'}")
        else:
            warn_msg("Disable (flap step) failed.")
            result_msg(f"Cycle {cycle}: disable FAIL")

        user_msg(f"Waiting {wait_seconds}s before enabling...")
        time.sleep(max(0, int(wait_seconds)))

        ok_enable = enable_interfaces_junos(ifaces)
        if ok_enable:
            all_up = True
            for n in ifaces:
                ok = verify_oper_state(n, "up", up_timeout, up_interval, attempts=attempts)
                if not ok:
                    warn_msg(f"{n}: did not come UP within timeout")
                    all_up = False
            result_msg(f"Cycle {cycle}: enable {'OK' if all_up else 'PARTIAL/FAIL'}")
        else:
            warn_msg("Enable (flap step) failed.")
            result_msg(f"Cycle {cycle}: enable FAIL")

    if show_status_after:
        status_interfaces(ifaces)
    result_msg(f"[JUNOS-FLAP] complete for: {', '.join(ifaces)}")

# -------------------
# Flap logic (Broadcom)
# -------------------
def flap_interfaces_brcm(
    ifaces: List[str],
    wait_seconds: int,
    count: int,
    down_timeout: int,
    up_timeout: int,
    down_interval: float,
    up_interval: float,
    attempts: Optional[int],
    post_up_wait: int = 5,
    show_status_after: bool = True,
):
    if not ifaces:
        warn_msg("No interfaces provided for flap.")
        return

    cli_path = _find_brcm_cli()
    if not cli_path:
        warn_msg("Broadcom CLI tool not found (cannot flap via BRCM).")
        return
    user_msg(f"[BRCM] Using CLI: {cli_path}")

    user_msg(f"[BRCM-FLAP] {', '.join(ifaces)} x{count} (wait {wait_seconds}s pre-enable, post-up-wait {post_up_wait}s)")
    for cycle in range(1, count + 1):
        user_msg(f"--- Flap cycle {cycle}/{count} ---")
        # Disable
        ok_disable = disable_interfaces_brcm(ifaces)
        if ok_disable:
            all_down = True
            for n in ifaces:
                ok = verify_oper_state(n, "down", down_timeout, down_interval, attempts=attempts)
                if not ok:
                    warn_msg(f"{n}: did not go DOWN within timeout")
                    all_down = False
            result_msg(f"Cycle {cycle}: BRCM disable {'OK' if all_down else 'PARTIAL/FAIL'}")
            # Show BRCM status right after disable
            for n in ifaces:
                brcm_link_status_for_iface(n)
        else:
            warn_msg("BRCM disable (flap step) failed.")
            result_msg(f"Cycle {cycle}: BRCM disable FAIL")

        user_msg(f"Waiting {wait_seconds}s before enabling...")
        time.sleep(max(0, int(wait_seconds)))

        # Enable
        ok_enable = enable_interfaces_brcm(ifaces)
        if ok_enable:
            all_up = True
            for n in ifaces:
                ok = verify_oper_state(n, "up", up_timeout, up_interval, attempts=attempts)
                if not ok:
                    warn_msg(f"{n}: did not come UP within timeout")
                    all_up = False
            result_msg(f"Cycle {cycle}: BRCM enable {'OK' if all_up else 'PARTIAL/FAIL'}")
            # Show BRCM status right after enable
            for n in ifaces:
                brcm_link_status_for_iface(n)
        else:
            warn_msg("BRCM enable (flap step) failed.")
            result_msg(f"Cycle {cycle}: BRCM enable FAIL")

    # Optional post-up wait and status
    if post_up_wait > 0:
        user_msg(f"Waiting {post_up_wait}s after final enable before status...")
        time.sleep(post_up_wait)

    if show_status_after:
        status_interfaces(ifaces)

    result_msg(f"[BRCM-FLAP] complete for: {', '.join(ifaces)}")

# -------------------
# Main
# -------------------
def main():
    # Early parse --logfile to set destination before any output
    pre = argparse.ArgumentParser(add_help=False)
    pre.add_argument("--logfile", help="Path to log file (default: /var/tmp/debug_cli.log)")
    known, _ = pre.parse_known_args()
    if known.logfile:
        set_logfile(known.logfile)
    else:
        set_logfile(LOGFILE)

    ap = argparse.ArgumentParser(description="Debug CLI & Interface Control", parents=[pre])
    ap.add_argument("-d", "--disable", help="Interfaces to disable (comma-separated)")
    ap.add_argument("-e", "--enable",  help="Interfaces to enable (comma-separated)")
    ap.add_argument("-s", "--status",  help="Interfaces to show status (comma-separated)")

    ap.add_argument("--flap",          help="Interfaces to flap (comma-separated)")
    ap.add_argument("--flap-wait",     type=int, default=5, help="Seconds between disable and enable (default: 5)")
    ap.add_argument("--flap-count",    type=int, default=1, help="Number of flap iterations (default: 1)")
    ap.add_argument("--post-up-wait",  type=int, default=5, help="Seconds to wait after final enable before status (default: 5)")

    # Broadcom CLI: optional sub-action or flag
    ap.add_argument("--brcm_cli", nargs="*", metavar=("ACTION", "IFACES"),
                    help="Optional Broadcom action: 'disable IFACES' or 'enable IFACES'. "
                         "Without args, acts as a flag so --flap uses Broadcom.")

    # Optics OIR test
    ap.add_argument("--oir-test", nargs="?", const=OIR_FLAG_SENTINEL,
                    help="Ports to optics OIR test (comma-separated). If provided without args, reuses --flap list.")
    ap.add_argument("--fpc-slot",       type=int, default=0, help="FPC slot for optics OIR test (default: 0)")
    ap.add_argument("--pic-slot",       type=int, default=0, help="PIC slot for optics OIR test (default: 0)")
    ap.add_argument("--oir-poll-delay", type=float, default=1.0, help="Seconds between OIR inventory polls (default: 1.0)")
    ap.add_argument("--oir-retries",    type=int, default=20, help="Retries to find transceiver after insert (default: 20)")
    ap.add_argument("--check-status", action="store_true", help="For OIR: also verify interface link comes UP after insert")
    ap.add_argument("--remove-only", action="store_true", help="For OIR: run remove-only (skip insert)")
    ap.add_argument("--insert-only", action="store_true", help="For OIR: run insert-only (skip remove)")
    ap.add_argument("--round-robin", action="store_true", help="For OIR: alternate ports per cycle (e.g., cycle 1: port 36, then 60; cycle 2: port 36, then 60; etc.)")

    # Interface mapping
    ap.add_argument("--iface-prefix", default="et-0/0/",
                    help="Prefix for mapping bare port numbers (default: et-0/0/; e.g., use xe-0/0/ for 10G)")

    # verification tunables
    ap.add_argument("--down-timeout",  type=int,   default=12,  help="Seconds to wait for link to go DOWN (default: 12)")
    ap.add_argument("--up-timeout",    type=int,   default=24,  help="Seconds to wait for link to come UP (default: 24)")
    ap.add_argument("--down-interval", type=float, default=1.5, help="Polling interval during DOWN verify (default: 1.5)")
    ap.add_argument("--up-interval",   type=float, default=2.0, help="Polling interval during UP verify (default: 2.0)")
    ap.add_argument("--attempts",      type=int,   default=5,   help="Override verify attempts (default: 5).")

    args = ap.parse_args()

    user_msg(f"Log file: {LOGFILE}")
    set_iface_prefix(args.iface_prefix)

    # Determine if brcm flag-only mode is set
    brcm_flag = False
    brcm_action = None
    brcm_ifaces: List[str] = []
    if args.brcm_cli is not None:
        if len(args.brcm_cli) >= 2:
            brcm_action = args.brcm_cli[0].lower()
            brcm_ifaces = normalize_iface_list(args.brcm_cli[1])
        else:
            brcm_flag = True
            user_msg("[BRCM] Broadcom mode flag enabled (flap will prefer BRCM via jbcmcmd.py).")

    # Build ordered ops list from argv to honor user sequence
    ordered_ops: List[Tuple[str, Optional[str]]] = []
    argv = sys.argv[1:]

    def _value_for(idx: int, fallback: Optional[str]) -> Optional[str]:
        tok = argv[idx]
        if "=" in tok:
            return tok.split("=", 1)[1]
        if idx + 1 < len(argv) and not argv[idx + 1].startswith("-"):
            return argv[idx + 1]
        return fallback

    i = 0
    while i < len(argv):
        tok = argv[i]
        if tok in ("-d", "--disable") or tok.startswith("--disable="):
            ordered_ops.append(("disable_junos", _value_for(i, args.disable)))
        elif tok in ("-e", "--enable") or tok.startswith("--enable="):
            ordered_ops.append(("enable_junos", _value_for(i, args.enable)))
        elif tok in ("-s", "--status") or tok.startswith("--status="):
            ordered_ops.append(("status", _value_for(i, args.status)))
        elif tok == "--flap" or tok.startswith("--flap="):
            ordered_ops.append(("flap", _value_for(i, args.flap)))
        elif tok == "--oir-test" or tok.startswith("--oir-test="):
            ordered_ops.append(("oir_test", _value_for(i, args.oir_test)))
        elif tok == "--brcm_cli":
            if brcm_action and brcm_ifaces:
                ordered_ops.append(("brcm_action", f"{brcm_action}:{','.join(brcm_ifaces)}"))
            else:
                # flag-only
                ordered_ops.append(("brcm_flag", None))
        i += 1

    # Execute ordered ops if any
    if ordered_ops:
        for op, val in ordered_ops:
            if op == "brcm_flag":
                user_msg("[BRCM] Broadcom mode flag noted (flap will use BRCM).")
                continue

            if op == "brcm_action":
                action, ifs = (val or "").split(":", 1)
                ifaces = normalize_iface_list(ifs)
                if not ifaces:
                    warn_msg("No IFACES supplied for --brcm_cli ACTION IFACES")
                    continue
                # map first; if mapping fails we still try per-iface mapping in helpers
                _ = [map_iface_to_brcm_port(i) for i in ifaces]  # mapping logs
                if action == "disable":
                    ok = disable_interfaces_brcm(ifaces)
                    if ok:
                        # verify down using configured attempts
                        all_down = True
                        for n in ifaces:
                            if not verify_oper_state(n, "down", args.down_timeout, args.down_interval, attempts=args.attempts):
                                all_down = False
                                warn_msg(f"{n}: did not go DOWN within timeout")
                        result_msg(f"BRCM one-shot disable: {'OK' if all_down else 'PARTIAL/FAIL'} -> {', '.join(ifaces)}")
                        for n in ifaces:
                            brcm_link_status_for_iface(n)
                    else:
                        result_msg(f"BRCM one-shot disable: FAIL -> {', '.join(ifaces)}")
                elif action == "enable":
                    ok = enable_interfaces_brcm(ifaces)
                    if ok:
                        all_up = True
                        for n in ifaces:
                            if not verify_oper_state(n, "up", args.up_timeout, args.up_interval, attempts=args.attempts):
                                all_up = False
                                warn_msg(f"{n}: did not come UP within timeout")
                        result_msg(f"BRCM one-shot enable: {'OK' if all_up else 'PARTIAL/FAIL'} -> {', '.join(ifaces)}")
                        for n in ifaces:
                            brcm_link_status_for_iface(n)
                    else:
                        result_msg(f"BRCM one-shot enable: FAIL -> {', '.join(ifaces)}")
                else:
                    warn_msg(f"Unknown --brcm_cli action: {action}")
                continue

            if op == "oir_test":
                ports: List[str] = []
                # If provided as flag-only, reuse flap list if present
                if val == OIR_FLAG_SENTINEL or not val:
                    ports = oir_ports_from_tokens(normalize_iface_list(args.flap))
                else:
                    ports = oir_ports_from_tokens(normalize_iface_list(val))
                if not ports:
                    warn_msg("No ports for --oir-test (provide ports or reuse --flap list)")
                    continue
                ok = optics_oir_test(
                    fpc_slot=args.fpc_slot,
                    pic_slot=args.pic_slot,
                    ports=ports,
                    poll_delay_sec=args.oir_poll_delay,
                    retries=args.oir_retries,
                    cycles=args.flap_count,
                    check_status=args.check_status,
                    status_timeout=args.up_timeout,
                    status_interval=args.up_interval,
                    down_timeout=args.down_timeout,
                    down_interval=args.down_interval,
                    inter_cycle_wait=args.flap_wait,
                    remove_only=args.remove_only,
                    insert_only=args.insert_only,
                    verify_attempts=args.attempts,
                    round_robin=args.round_robin,
                )
                result_msg(f"OIR test {'OK' if ok else 'PARTIAL/FAIL'} -> {', '.join(ports)}")
                continue

            ifaces = normalize_iface_list(val) if val else []
            if op == "disable_junos":
                if not ifaces:
                    warn_msg("No interfaces for --disable")
                    continue
                ok = disable_interfaces_junos(ifaces)
                if ok:
                    all_down = True
                    for n in ifaces:
                        if not verify_oper_state(n, "down", args.down_timeout, args.down_interval, attempts=args.attempts):
                            all_down = False
                            warn_msg(f"{n}: did not go DOWN within timeout")
                    result_msg(f"Disabled (Junos): {'OK' if all_down else 'PARTIAL/FAIL'} -> {', '.join(ifaces)}")
                else:
                    result_msg(f"Disabled (Junos): FAIL -> {', '.join(ifaces)}")

            elif op == "enable_junos":
                if not ifaces:
                    warn_msg("No interfaces for --enable")
                    continue
                ok = enable_interfaces_junos(ifaces)
                if ok:
                    all_up = True
                    for n in ifaces:
                        if not verify_oper_state(n, "up", args.up_timeout, args.up_interval, attempts=args.attempts):
                            all_up = False
                            warn_msg(f"{n}: did not come UP within timeout")
                    result_msg(f"Enabled (Junos): {'OK' if all_up else 'PARTIAL/FAIL'} -> {', '.join(ifaces)}")
                else:
                    result_msg(f"Enabled (Junos): FAIL -> {', '.join(ifaces)}")

            elif op == "status":
                if not ifaces:
                    warn_msg("No interfaces for --status")
                    continue
                status_interfaces(ifaces)

            elif op == "flap":
                if not ifaces:
                    warn_msg("No interfaces for --flap")
                    continue
                # choose engine: BRCM if flag present, else Junos
                if brcm_flag:
                    user_msg(f"[PLAN] FLAP (BRCM) -> {', '.join(ifaces)}")
                    flap_interfaces_brcm(
                        ifaces,
                        wait_seconds=args.flap_wait,
                        count=args.flap_count,
                        down_timeout=args.down_timeout,
                        up_timeout=args.up_timeout,
                        down_interval=args.down_interval,
                        up_interval=args.up_interval,
                        attempts=args.attempts,
                        post_up_wait=args.post_up_wait,
                        show_status_after=True,
                    )
                else:
                    user_msg(f"[PLAN] FLAP (Junos) -> {', '.join(ifaces)}")
                    flap_interfaces_junos(
                        ifaces,
                        wait_seconds=args.flap_wait,
                        count=args.flap_count,
                        down_timeout=args.down_timeout,
                        up_timeout=args.up_timeout,
                        down_interval=args.down_interval,
                        up_interval=args.up_interval,
                        attempts=args.attempts,
                        show_status_after=True,
                    )
        print(f"[INFO] Log saved at {LOGFILE}")
        return

    # No ops
    user_msg("No interface control args provided; nothing to do.")
    print(f"[INFO] Log saved at {LOGFILE}")

if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print("\n[WARN] Interrupted by user (Ctrl+C).")
        print(f"[INFO] Log saved at {LOGFILE}")
        sys.exit(130)