#!/usr/bin/env bash
# Note: intentionally NOT using `set -e`. A single iperf3/fortio exec
# failure should warn and continue with the next round, not abort the
# whole benchmark — we lose hours of work otherwise.
set -uo pipefail

###############################################################################
# TKE Network Benchmark
#
# A self-contained script for running network performance benchmarks across
# different TKE network solutions (Cilium Native, Cilium Overlay,
# kube-proxy iptables, kube-proxy IPVS).
#
# Prerequisite:
#   KUBECONFIG must point to a working kubeconfig (or current-context is set).
#   The script just runs `kubectl` directly — no wrappers, no extra flags.
#
# Usage:
#   bash network-benchmark.sh
#   KUBECONFIG=/path/to/cfg bash network-benchmark.sh
#   bash network-benchmark.sh --dir ./results-clusterA --ns nb
#
# Options:
#   -h, --help       Show help
#   --dir DIR        Output directory (default: ./benchmark-results-<context>)
#   --skip-cleanup   Skip cleanup after benchmark
#   --ns NS          Namespace for test workloads (default: network-benchmark)
#
# Environment Variables:
#   NETPERF_IMAGE      netperf image (default: networkstatic/netperf:latest)
#   IPERF_IMAGE        iperf3 image (default: networkstatic/iperf3:latest)
#   FORTIO_IMAGE       fortio image (default: fortio/fortio:latest)
#   IPERF_DURATION     iperf3 test duration in seconds (default: 30)
#   FORTIO_DURATION    fortio/netperf test duration in seconds (default: 60)
#   ROUNDS             repetitions per scenario (default: 1)
#   ROUND_SLEEP        seconds between rounds (default: 30)
#   SVC_SCALE_STEPS    comma-separated Service counts for scale test (default: "5000,10000,20000,30000")
#   SVC_ENDPOINTS      endpoints per dummy Service (default: 4; degradation is driven by svc COUNT, not endpoints)
#   SVC_CREATE_PARALLEL parallel workers when creating dummy Services (default: 4)
#   AUTO_FIX_LB_MAP    "true" auto-raises Cilium bpf-lb-map-max without prompting
#                      (default: ask interactively when capacity is insufficient)
#   METRICS_SETTLE_SECS  quiesce wait before steady-state resource sampling (default: 45)
#   METRICS_SAMPLES      resource samples to take after settle (default: 6)
#   KUBECTL_TIMEOUT    timeout for kubectl exec/cp calls (default: 180)
#
# Output:
#   Creates benchmark-results-<context>/ directory with structured results.
#
###############################################################################

# ─── Logging ─────────────────────────────────────────────────────────────────

red() { printf "\033[31m%s\033[0m\n" "$*"; }
green() { printf "\033[32m%s\033[0m\n" "$*"; }
yellow() { printf "\033[33m%s\033[0m\n" "$*"; }
blue() { printf "\033[34m%s\033[0m\n" "$*"; }
log() { printf "[%s] %s\n" "$(date '+%H:%M:%S')" "$*"; }
step() { blue "━━━ $* ━━━"; }
info() { log "INFO: $*"; }
warn() { yellow "WARN: $*"; }
err() { red "ERROR: $*"; }

# ─── Defaults ────────────────────────────────────────────────────────────────

OUTPUT_DIR=""
SKIP_CLEANUP=false
NS="network-benchmark"

NETPERF_IMAGE="${NETPERF_IMAGE:-networkstatic/netperf:latest}"
IPERF_IMAGE="${IPERF_IMAGE:-networkstatic/iperf3:latest}"
FORTIO_IMAGE="${FORTIO_IMAGE:-fortio/fortio:latest}"

WORKER_NODE_1=""
WORKER_NODE_2=""
CLUSTER_TYPE=""

# ─── Test Parameters ─────────────────────────────────────────────────────────
# Override via env to tune test intensity vs QoS budget.
#   IPERF_DURATION: seconds per iperf3 run (default 30). Shorter = less burst
#                   credit consumed, avoids QoS throttling on small instances.
#   FORTIO_DURATION: seconds per fortio/netperf run (default 60).
#   ROUNDS: repetitions per scenario (default 1). More rounds = better
#           statistical confidence but consumes more QoS burst budget.
#   ROUND_SLEEP: seconds to wait between rounds (default 30). Gives burst
#                credit time to recover between tests.
IPERF_DURATION="${IPERF_DURATION:-30}"
FORTIO_DURATION="${FORTIO_DURATION:-60}"
ROUNDS="${ROUNDS:-1}"
ROUND_SLEEP="${ROUND_SLEEP:-30}"

# Timeout wrapping kubectl exec/cp. Must be larger than the longest single
# test (IPERF_DURATION or FORTIO_DURATION) plus warmup + overhead.
KUBECTL_TIMEOUT="${KUBECTL_TIMEOUT:-180}"

# Track every background monitor pid so trap can reap them on exit. Keeping a
# space-separated list rather than an array makes it easy to print in logs.
MONITOR_PIDS=""

_cleanup_on_exit() {
  # Reap background monitors before exit / interrupt so they don't become
  # orphans that keep appending to the resource CSVs forever.
  if [[ -n "$MONITOR_PIDS" ]]; then
    for pid in $MONITOR_PIDS; do
      kill "$pid" 2>/dev/null || true
    done
  fi
}
trap _cleanup_on_exit EXIT INT TERM

# ─── Parse Arguments ─────────────────────────────────────────────────────────

parse_args() {
  while [[ $# -gt 0 ]]; do
    case "$1" in
    -h | --help)
      show_help
      exit 0
      ;;
    --dir)
      OUTPUT_DIR="$2"
      shift 2
      ;;
    --ns)
      NS="$2"
      shift 2
      ;;
    --skip-cleanup)
      SKIP_CLEANUP=true
      shift
      ;;
    *)
      err "Unknown option: $1"
      show_help
      exit 1
      ;;
    esac
  done
}

show_help() {
  cat <<EOF
TKE Network Benchmark

Prerequisite:
  KUBECONFIG points to a working kubeconfig (current-context is used directly).

Usage:
  bash network-benchmark.sh
  KUBECONFIG=/path/to/cfg bash network-benchmark.sh
  bash network-benchmark.sh --dir ./results-clusterA --ns nb

Options:
  -h, --help           Show this help
  --dir DIR            Output directory (default: ./benchmark-results-<context>)
  --ns NS              Namespace for test workloads (default: network-benchmark)
  --skip-cleanup       Skip cleanup after benchmark

Environment Variables:
  NETPERF_IMAGE      netperf image (default: networkstatic/netperf:latest)
  IPERF_IMAGE        iperf3 image (default: networkstatic/iperf3:latest)
  FORTIO_IMAGE       fortio image (default: fortio/fortio:latest)
  IPERF_DURATION     iperf3 test duration in seconds (default: 30)
  FORTIO_DURATION    fortio/netperf test duration in seconds (default: 60)
  ROUNDS             repetitions per scenario (default: 1)
  ROUND_SLEEP        seconds between rounds (default: 30)
  SVC_SCALE_STEPS    comma-separated Service counts for scale test (default: "5000,10000,20000,30000")
  SVC_ENDPOINTS      endpoints per dummy Service (default: 4; degradation is driven by svc COUNT, not endpoints)
  SVC_CREATE_PARALLEL parallel workers when creating dummy Services (default: 4)
  AUTO_FIX_LB_MAP    "true" auto-raises Cilium bpf-lb-map-max without prompting
  METRICS_SETTLE_SECS  quiesce wait before steady-state resource sampling (default: 45)
  METRICS_SAMPLES      resource samples to take after settle (default: 6)
  KUBECTL_TIMEOUT    timeout for kubectl exec/cp calls (default: 180)

Examples:
  # Default: quick run (1 round × 30s iperf / 60s fortio), safe for small instances
  bash network-benchmark.sh --dir ./bench

  # Full run on large instances (no QoS worry): 3 rounds × 120s each
  ROUNDS=3 IPERF_DURATION=120 FORTIO_DURATION=120 bash network-benchmark.sh --dir ./bench

  # Custom scale steps: test at 1000, 5000, 10000 Services
  SVC_SCALE_STEPS="1000,5000,10000" bash network-benchmark.sh
EOF
}

# ─── Helpers ─────────────────────────────────────────────────────────────────

check_prereqs() {
  local missing=0
  for cmd in kubectl python3 timeout; do
    if ! command -v "$cmd" &>/dev/null; then
      err "Prerequisite '$cmd' not found"
      [[ "$cmd" == "timeout" ]] && err "  on macOS install via: brew install coreutils && export PATH=\"\$(brew --prefix coreutils)/libexec/gnubin:\$PATH\""
      missing=1
    fi
  done
  if ! kubectl cluster-info &>/dev/null; then
    err "kubectl cannot reach the cluster. Check KUBECONFIG / current-context."
    missing=1
  fi
  return "$missing"
}

get_cluster_name() {
  kubectl config current-context 2>/dev/null || echo "unknown"
}

# sanitize_name — Make a string safe to use as a single path component.
# kubeconfig context names can contain '/', '(', ')', spaces, CJK, etc.
# (e.g. "cls-xxx(cd/roc-overlay-勿删)"), which break path construction
# (the '/' creates nested dirs, '(' ')' are shell metachars). Replace any
# char outside [A-Za-z0-9._-] with '-', collapse repeats, trim leading/trailing
# dashes. Falls back to "cluster" if the result is empty.
sanitize_name() {
  local s="$1"
  s=$(printf '%s' "$s" | LC_ALL=C sed -e 's/[^A-Za-z0-9._-]/-/g' -e 's/-\{2,\}/-/g' -e 's/^-//' -e 's/-$//')
  [[ -z "$s" ]] && s="cluster"
  printf '%s' "$s"
}

# ─── Cluster Detection ───────────────────────────────────────────────────────

detect_cluster_type() {
  info "Detecting cluster type..."

  local cilium_ready
  cilium_ready=$(kubectl -n kube-system get ds cilium -o jsonpath='{.status.numberReady}' 2>/dev/null || echo "0")
  if [[ "$cilium_ready" -gt 0 ]]; then
    local cilium_status
    cilium_status=$(kubectl -n kube-system exec ds/cilium -- cilium status 2>/dev/null || echo "")
    if echo "$cilium_status" | grep -qi "Tunnel.*vxlan\|Tunnel.*geneve"; then
      CLUSTER_TYPE="cilium-overlay"
      info "Detected: Cilium Overlay"
    else
      CLUSTER_TYPE="cilium-native"
      info "Detected: Cilium Native Routing"
    fi
    return 0
  fi

  local kube_proxy_running
  kube_proxy_running=$(kubectl -n kube-system get pod -l k8s-app=kube-proxy --field-selector=status.phase=Running --no-headers 2>/dev/null | wc -l | tr -d ' ')
  if [[ "$kube_proxy_running" -gt 0 ]]; then
    local proxy_mode
    proxy_mode=$(kubectl -n kube-system get configmap kube-proxy -o jsonpath='{.data.config}' 2>/dev/null | grep -o 'mode: "[^"]*"' | cut -d'"' -f2 || echo "iptables")
    if [[ "$proxy_mode" == "ipvs" ]]; then
      CLUSTER_TYPE="kubeproxy-ipvs"
      info "Detected: kube-proxy IPVS"
    else
      CLUSTER_TYPE="kubeproxy-iptables"
      info "Detected: kube-proxy iptables"
    fi
    return 0
  fi

  if kubectl -n kube-system get configmap kube-proxy &>/dev/null; then
    local proxy_mode
    proxy_mode=$(kubectl -n kube-system get configmap kube-proxy -o jsonpath='{.data.config}' 2>/dev/null | grep -o 'mode: "[^"]*"' | cut -d'"' -f2 || echo "iptables")
    if [[ "$proxy_mode" == "ipvs" ]]; then
      CLUSTER_TYPE="kubeproxy-ipvs"
      info "Detected: kube-proxy IPVS (from config)"
    else
      CLUSTER_TYPE="kubeproxy-iptables"
      info "Detected: kube-proxy iptables (from config)"
    fi
    return 0
  fi

  err "Unable to detect cluster type"
  exit 1
}

collect_context_info() {
  step "Collecting cluster context info"
  mkdir -p "$OUTPUT_DIR"

  local cluster_name
  cluster_name=$(get_cluster_name)

  cat >"$OUTPUT_DIR/context.yaml" <<EOF
cluster_name: "$cluster_name"
cluster_type: $CLUSTER_TYPE
test_date: $(date -u +"%Y-%m-%dT%H:%M:%SZ")
k8s_version: $(kubectl version -o json 2>/dev/null | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('serverVersion',{}).get('gitVersion',''))" 2>/dev/null || kubectl version 2>/dev/null | awk '/Server Version/{print $NF}')
node_count: $(kubectl get nodes --no-headers 2>/dev/null | wc -l | tr -d ' ')
node_os: $(kubectl get nodes -o jsonpath='{.items[0].status.nodeInfo.osImage}' 2>/dev/null)
kernel_version: $(kubectl get nodes -o jsonpath='{.items[0].status.nodeInfo.kernelVersion}' 2>/dev/null)
node_model: $(kubectl describe node 2>/dev/null | grep 'machine-size' | head -1 | awk '{print $NF}' || echo "unknown")
EOF

  info "Context info saved to $OUTPUT_DIR/context.yaml"
}

# ─── Select Worker Nodes ────────────────────────────────────────────────────

select_worker_nodes() {
  step "Selecting worker nodes for cross-node tests"

  local nodes
  nodes=$(kubectl get nodes --no-headers -o custom-columns=NAME:.metadata.name 2>/dev/null | head -3)

  WORKER_NODE_1=$(echo "$nodes" | sed -n '1p')
  WORKER_NODE_2=$(echo "$nodes" | sed -n '2p')

  if [[ -z "$WORKER_NODE_1" || -z "$WORKER_NODE_2" ]]; then
    err "Need at least 2 worker nodes, found: $(echo "$nodes" | wc -l)"
    exit 1
  fi

  info "Using nodes: $WORKER_NODE_1 (server) <-> $WORKER_NODE_2 (client)"
}

# ─── Deploy / Cleanup ───────────────────────────────────────────────────────

deploy_test_workloads() {
  step "Deploying test workloads"
  local N="$NS"

  kubectl create namespace "$N" --dry-run=client -o yaml | kubectl apply -f - 2>/dev/null

  # Node Level (hostNetwork)
  kubectl apply -n "$N" -f - <<EOF
apiVersion: v1
kind: Pod
metadata:
  name: iperf-server-host
  labels:
    app: benchmark
    role: iperf-server-host
spec:
  hostNetwork: true
  nodeName: $WORKER_NODE_1
  containers:
  - name: iperf
    image: $IPERF_IMAGE
    command: ["iperf3", "-s", "-p", "5202"]
    ports:
    - containerPort: 5202
      hostPort: 5202
  terminationGracePeriodSeconds: 0
---
apiVersion: v1
kind: Pod
metadata:
  name: iperf-client-host
  labels:
    app: benchmark
    role: iperf-client-host
spec:
  hostNetwork: true
  nodeName: $WORKER_NODE_2
  containers:
  - name: iperf
    image: $IPERF_IMAGE
    command: ["sleep", "infinity"]
  terminationGracePeriodSeconds: 0
EOF

  # Pod-to-Pod workloads (iperf + netperf + fortio)
  kubectl apply -n "$N" -f - <<EOF
apiVersion: v1
kind: Pod
metadata:
  name: iperf-server
  labels:
    app: benchmark
    role: iperf-server
spec:
  nodeName: $WORKER_NODE_1
  containers:
  - name: iperf
    image: $IPERF_IMAGE
    command: ["iperf3", "-s"]
    ports:
    - containerPort: 5201
  terminationGracePeriodSeconds: 0
---
apiVersion: v1
kind: Pod
metadata:
  name: iperf-client
  labels:
    app: benchmark
    role: iperf-client
spec:
  nodeName: $WORKER_NODE_2
  containers:
  - name: iperf
    image: $IPERF_IMAGE
    command: ["sleep", "infinity"]
  terminationGracePeriodSeconds: 0
---
apiVersion: v1
kind: Pod
metadata:
  name: netperf-server
  labels:
    app: benchmark
    role: netperf-server
spec:
  nodeName: $WORKER_NODE_1
  containers:
  - name: netperf
    image: $NETPERF_IMAGE
    command: ["netserver", "-D"]
    ports:
    - containerPort: 12865
  terminationGracePeriodSeconds: 0
---
apiVersion: v1
kind: Pod
metadata:
  name: netperf-client
  labels:
    app: benchmark
    role: netperf-client
spec:
  nodeName: $WORKER_NODE_2
  containers:
  - name: netperf
    image: $NETPERF_IMAGE
    command: ["sleep", "infinity"]
  terminationGracePeriodSeconds: 0
---
apiVersion: v1
kind: Pod
metadata:
  name: fortio-server
  labels:
    app: benchmark
    role: fortio-server
spec:
  nodeName: $WORKER_NODE_1
  containers:
  - name: fortio
    image: $FORTIO_IMAGE
    command: ["fortio", "server"]
    ports:
    - containerPort: 8080
  terminationGracePeriodSeconds: 0
---
apiVersion: v1
kind: Pod
metadata:
  name: fortio-client
  labels:
    app: benchmark
    role: fortio-client
spec:
  nodeName: $WORKER_NODE_2
  containers:
  - name: fortio
    image: $FORTIO_IMAGE
    # Fortio distroless: keep pod alive with minimal server. We exec into
    # this container for load tests. Results are written to stdout and may
    # be lost on WebSocket errors — the script handles retries.
    command: ["fortio", "server", "-http-port", "9999", "-grpc-port", "disabled", "-tcp-port", "disabled", "-udp-port", "disabled", "-redirect-port", "disabled"]
  terminationGracePeriodSeconds: 0
EOF

  # Services
  kubectl apply -n "$N" -f - <<EOF
apiVersion: v1
kind: Service
metadata:
  name: iperf-service
spec:
  selector:
    app: benchmark
    role: iperf-server
  ports:
  - port: 5201
    targetPort: 5201
  type: ClusterIP
EOF

  info "Waiting for pods to be ready..."
  local pods="pod/iperf-server-host pod/iperf-client-host pod/iperf-server pod/iperf-client pod/netperf-server pod/netperf-client pod/fortio-server pod/fortio-client"
  if ! kubectl wait -n "$N" --for=condition=Ready $pods --timeout=300s; then
    err "Pods failed to become Ready within 300s — aborting"
    kubectl get pod -n "$N" -o wide || true
    exit 1
  fi
  info "All test workloads ready"
}

cleanup_test_workloads() {
  if [[ "$SKIP_CLEANUP" == "true" ]]; then
    warn "Skipping cleanup (--skip-cleanup)"
    return 0
  fi
  step "Cleaning up test workloads"
  kubectl delete namespace "$NS" --ignore-not-found --timeout=60s 2>/dev/null || {
    kubectl delete pod -n "$NS" --all --grace-period=0 --force --ignore-not-found 2>/dev/null || true
    sleep 5
    kubectl delete namespace "$NS" --ignore-not-found 2>/dev/null || true
  }
  kubectl delete namespace test-services --ignore-not-found --timeout=120s 2>/dev/null || true
  info "Cleanup complete"
}

# ─── exec helpers ───────────────────────────────────────────────────────────

_pod_exec() {
  local pod="$1"
  shift
  kubectl exec -n "$NS" "$pod" -- "$@"
}

# ─── Resource Monitoring ────────────────────────────────────────────────────

start_resource_monitor() {
  local csv_file="$1"
  local label="$2"
  local pid_file="$3"
  mkdir -p "$(dirname "$csv_file")"
  echo "timestamp,pod,cpu_millicores,memory_mib" >"$csv_file"
  # Capture parent (main script) pid so the subshell can self-terminate if the
  # main process dies unexpectedly. Without this, a stalled main script leaves
  # the monitor loop running under init(1) appending forever.
  local parent_pid=$$
  (
    while kill -0 "$parent_pid" 2>/dev/null; do
      kubectl top pod -n kube-system -l "$label" --no-headers 2>/dev/null | while read -r pod cpu mem rest; do
        echo "$(date +%H:%M:%S),$pod,${cpu%%m},${mem%%Mi}" >>"$csv_file"
      done
      sleep 5
    done
  ) &
  local mon_pid=$!
  echo "$mon_pid" >"$pid_file"
  MONITOR_PIDS="$MONITOR_PIDS $mon_pid"
}

stop_resource_monitor() {
  local pid_file="$1"
  if [[ -f "$pid_file" ]]; then
    local pid
    pid=$(cat "$pid_file")
    kill "$pid" 2>/dev/null || true
    rm -f "$pid_file"
    # Drop it from MONITOR_PIDS so the trap doesn't double-kill (harmless,
    # but keeps logs tidy if we ever decide to log kills).
    MONITOR_PIDS=$(echo "$MONITOR_PIDS" | tr ' ' '\n' | grep -vx "$pid" | tr '\n' ' ')
  fi
}

# ─── iperf3 Throughput Tests ────────────────────────────────────────────────

_run_iperf() {
  local label="$1" server_ip="$2" port="$3" parallel="$4" output_dir="$5" result_file="$6"
  local out="$output_dir/$result_file"
  mkdir -p "$output_dir"
  info "Running: $label (${IPERF_DURATION}s, P=$parallel)"

  # Write JSON to a file inside the pod then kubectl cp out, avoiding SPDY
  # channel stalls on large stdout payloads.
  local pod_file="/tmp/iperf_${result_file}"
  if ! timeout "$KUBECTL_TIMEOUT" kubectl exec -n "$NS" iperf-client -- \
      iperf3 -c "$server_ip" -p "$port" -t "$IPERF_DURATION" -P "$parallel" -J --logfile "$pod_file" >/dev/null 2>&1; then
    warn "  iperf3 timed out or returned non-zero, result may be incomplete"
  fi
  if ! timeout 60 kubectl cp -n "$NS" "iperf-client:${pod_file}" "$out" >/dev/null 2>&1; then
    warn "  kubectl cp failed for $result_file"
    return 0
  fi
  timeout 30 kubectl exec -n "$NS" iperf-client -- rm -f "$pod_file" >/dev/null 2>&1 || true

  local py_tmp="/tmp/nb_iperf_$$.py"
  cat >"$py_tmp" <<'PYEOF'
import json, sys
with open(sys.argv[1]) as f:
    d = json.load(f)
v = d["end"]["sum_received"]["bits_per_second"] / 1e9
print(f"{v:.2f}")
PYEOF
  local gbps
  gbps=$(python3 "$py_tmp" "$out" 2>/dev/null || echo "N/A")
  rm -f "$py_tmp"

  info "  → ${gbps} Gbps"
}

run_throughput_tests() {
  step "Running Throughput Tests (iperf3, ${IPERF_DURATION}s × ${ROUNDS} rounds, ${ROUND_SLEEP}s interval)"
  local d="$OUTPUT_DIR/throughput"
  mkdir -p "$d"

  local N="$NS"
  local node1_ip
  node1_ip=$(kubectl get pod -n "$N" iperf-server-host -o jsonpath='{.status.hostIP}')
  local server_pod_ip
  server_pod_ip=$(kubectl get pod -n "$N" iperf-server -o jsonpath='{.status.podIP}')
  local svc_ip
  svc_ip=$(kubectl get svc -n "$N" iperf-service -o jsonpath='{.spec.clusterIP}')

  info "Node1 IP: $node1_ip | Server Pod IP: $server_pod_ip | Svc IP: $svc_ip"

  # Warmup: short 5s burst to prime TCP cwnd, route caches, and ENI token
  # bucket. Without this, the first test shows ~25% lower throughput due to
  # TCP slow-start from cold state.
  info "Warmup (5s)..."
  timeout 30 kubectl exec -n "$NS" iperf-client -- \
    iperf3 -c "$node1_ip" -p 5202 -t 5 -P 8 >/dev/null 2>&1 || true
  sleep 5

  local mon_pid=""
  if [[ "$CLUSTER_TYPE" == cilium-* ]]; then
    start_resource_monitor "$OUTPUT_DIR/resources/cilium_throughput.csv" "k8s-app=cilium" "/tmp/nb_cilium_mon.pid"
    mon_pid="/tmp/nb_cilium_mon.pid"
  elif [[ "$CLUSTER_TYPE" == kubeproxy-* ]]; then
    start_resource_monitor "$OUTPUT_DIR/resources/kubeproxy_throughput.csv" "k8s-app=kube-proxy" "/tmp/nb_kp_mon.pid"
    mon_pid="/tmp/nb_kp_mon.pid"
  fi

  for r in $(seq 1 "$ROUNDS"); do
    _run_iperf "Node Level 8stream r$r" "$node1_ip" "5202" "8" "$d" "node_throughput_r${r}.json"
    sleep "$ROUND_SLEEP"
  done
  for r in $(seq 1 "$ROUNDS"); do
    _run_iperf "Pod-to-Pod single r$r" "$server_pod_ip" "5201" "1" "$d" "pod2pod_single_r${r}.json"
    sleep "$ROUND_SLEEP"
  done
  for r in $(seq 1 "$ROUNDS"); do
    _run_iperf "Pod-to-Pod 8stream r$r" "$server_pod_ip" "5201" "8" "$d" "pod2pod_8stream_r${r}.json"
    sleep "$ROUND_SLEEP"
  done
  for r in $(seq 1 "$ROUNDS"); do
    _run_iperf "Pod-to-Pod 16stream r$r" "$server_pod_ip" "5201" "16" "$d" "pod2pod_16stream_r${r}.json"
    sleep "$ROUND_SLEEP"
  done
  for r in $(seq 1 "$ROUNDS"); do
    _run_iperf "Via Service 8stream r$r" "$svc_ip" "5201" "8" "$d" "via_service_8stream_r${r}.json"
    sleep "$ROUND_SLEEP"
  done

  [[ -n "$mon_pid" ]] && stop_resource_monitor "$mon_pid"
  sleep 10
  info "Throughput tests complete"
}

# ─── fortio RPS Tests ───────────────────────────────────────────────────────

# Run fortio inside the pod, streaming the result JSON over stdout.
# NOTE: the fortio image is distroless — it has no `tar`, so `kubectl cp`
# does NOT work against it. We must stream over the exec channel.
# The previous flakiness (websocket close 1006) was caused by fortio's
# per-thread progress logs continuously flooding stderr during a 60s run;
# under a busy node the WebSocket couldn't keep both channels pumping.
# Fix: `-quiet` silences those logs, and we discard stderr (2>/dev/null),
# so the exec channel only delivers one final JSON blob on stdout.
# Args: connections keepalive duration url out_local
# Returns 0 on success (valid JSON in $out_local), 1 otherwise.
_fortio_exec_stream() {
  local connections="$1" ka="$2" duration="$3" url="$4" out="$5"
  local ka_arg=""
  [[ "$ka" == "false" ]] && ka_arg="-keepalive=false"
  local attempt max_attempts=3
  for attempt in $(seq 1 $max_attempts); do
    # -quiet: loglevel=Error, suppresses per-thread progress flooding stderr
    # -json -: write result JSON to stdout (single blob at end of run)
    timeout "$KUBECTL_TIMEOUT" kubectl exec -n "$NS" fortio-client -- \
      fortio load -quiet -qps 0 -c "$connections" -t "${duration}s" $ka_arg -json - "$url" \
      >"$out" 2>/dev/null || true
    if [[ -s "$out" ]] && python3 -c "import json,sys; json.load(open(sys.argv[1]))" "$out" 2>/dev/null; then
      return 0
    fi
    rm -f "$out"
    if [[ $attempt -lt $max_attempts ]]; then
      warn "  attempt $attempt failed, retrying in 10s..."
      sleep 10
    fi
  done
  return 1
}

_run_fortio() {
  local label="$1" url="$2" connections="$3" keepalive="$4"
  local output_dir="$5" result_file="$6"
  local out="$output_dir/$result_file"
  mkdir -p "$output_dir"
  info "Running: $label (c=$connections, keepalive=$keepalive)"

  if [[ "$keepalive" == "false" ]]; then
    # Short-connection mode: stream over exec (REST API can't disable keepalive).
    if ! _fortio_exec_stream "$connections" false "$FORTIO_DURATION" "$url" "$out"; then
      warn "  fortio short-conn failed after retries for $result_file"
    fi
  else
    # Keepalive mode: prefer fortio REST API via port-forward (proven reliable
    # for high-throughput 80K+ req/s). Fall back to streaming exec if it fails.
    local pf_pid=""
    local local_port=$((19000 + RANDOM % 1000))
    kubectl port-forward -n "$NS" pod/fortio-client ${local_port}:9999 >/dev/null 2>&1 &
    pf_pid=$!
    sleep 2

    local api_url="http://localhost:${local_port}/fortio/rest/run"
    local params="url=$(python3 -c "import urllib.parse,sys; print(urllib.parse.quote(sys.argv[1]))" "$url")"
    params="${params}&qps=-1&t=${FORTIO_DURATION}s&c=${connections}&json=on"

    local rc=0
    timeout "$KUBECTL_TIMEOUT" curl -sf "${api_url}?${params}" >"$out" 2>/dev/null || rc=$?

    kill "$pf_pid" 2>/dev/null || true
    wait "$pf_pid" 2>/dev/null || true

    # If REST API failed, fall back to streaming exec
    if ! { [[ -s "$out" ]] && python3 -c "import json,sys; json.load(open(sys.argv[1]))" "$out" 2>/dev/null; }; then
      warn "  REST API failed (rc=$rc), falling back to streaming exec..."
      rm -f "$out"
      if ! _fortio_exec_stream "$connections" true "$FORTIO_DURATION" "$url" "$out"; then
        warn "  fortio keepalive failed after retries for $result_file"
      fi
    fi
  fi

  # Parse result if we got valid output
  if [[ ! -s "$out" ]]; then
    warn "  no output for $result_file"
    return 0
  fi

  local py_tmp="/tmp/nb_fortio_$$.py"
  cat >"$py_tmp" <<'PYEOF'
import json, sys
with open(sys.argv[1]) as f:
    d = json.load(f)
print(int(d["ActualQPS"]))
PYEOF
  local qps
  qps=$(python3 "$py_tmp" "$out" 2>/dev/null || echo "N/A")
  rm -f "$py_tmp"
  info "  → ${qps} req/s"
}

run_rps_tests() {
  step "Running RPS Tests (fortio, ${FORTIO_DURATION}s × ${ROUNDS} rounds, ${ROUND_SLEEP}s interval)"
  local d="$OUTPUT_DIR/rps"
  mkdir -p "$d"
  local N="$NS"

  local server_pod_ip
  server_pod_ip=$(kubectl get pod -n "$N" fortio-server -o jsonpath='{.status.podIP}')

  kubectl apply -n "$N" -f - <<EOF
apiVersion: v1
kind: Service
metadata:
  name: fortio-service
spec:
  selector:
    app: benchmark
    role: fortio-server
  ports:
  - port: 8080
    targetPort: 8080
  type: ClusterIP
EOF
  sleep 3
  local fortio_svc_ip
  fortio_svc_ip=$(kubectl get svc -n "$N" fortio-service -o jsonpath='{.spec.clusterIP}')

  local mon_pid=""
  if [[ "$CLUSTER_TYPE" == cilium-* ]]; then
    start_resource_monitor "$OUTPUT_DIR/resources/cilium_rps.csv" "k8s-app=cilium" "/tmp/nb_cilium_mon.pid"
    mon_pid="/tmp/nb_cilium_mon.pid"
  elif [[ "$CLUSTER_TYPE" == kubeproxy-* ]]; then
    start_resource_monitor "$OUTPUT_DIR/resources/kubeproxy_rps.csv" "k8s-app=kube-proxy" "/tmp/nb_kp_mon.pid"
    mon_pid="/tmp/nb_kp_mon.pid"
  fi

  for r in $(seq 1 "$ROUNDS"); do
    _run_fortio "Pod-to-Pod 64c r$r" "http://${server_pod_ip}:8080/echo?size=512" 64 true "$d" "pod2pod_c64_r${r}.json"
    sleep "$ROUND_SLEEP"
  done
  for r in $(seq 1 "$ROUNDS"); do
    _run_fortio "Via Svc 64c (ka) r$r" "http://${fortio_svc_ip}:8080/echo?size=512" 64 true "$d" "svc_c64_r${r}.json"
    sleep "$ROUND_SLEEP"
  done
  for r in $(seq 1 "$ROUNDS"); do
    _run_fortio "Via Svc 256c (ka) r$r" "http://${fortio_svc_ip}:8080/echo?size=512" 256 true "$d" "svc_c256_r${r}.json"
    sleep "$ROUND_SLEEP"
  done
  for r in $(seq 1 "$ROUNDS"); do
    _run_fortio "Via Svc 64c (short) r$r" "http://${fortio_svc_ip}:8080/echo?size=512" 64 false "$d" "svc_short_c64_r${r}.json"
    sleep "$ROUND_SLEEP"
  done

  [[ -n "$mon_pid" ]] && stop_resource_monitor "$mon_pid"
  sleep 10
  info "RPS tests complete"
}

# ─── netperf Latency Tests ──────────────────────────────────────────────────

run_latency_tests() {
  step "Running Latency Tests (netperf TCP_RR/TCP_CRR + fortio HTTP, ${FORTIO_DURATION}s × ${ROUNDS} rounds)"
  local d="$OUTPUT_DIR/latency"
  mkdir -p "$d"
  local N="$NS"

  local np_ip
  np_ip=$(kubectl get pod -n "$N" netperf-server -o jsonpath='{.status.podIP}')
  local fs_ip
  fs_ip=$(kubectl get svc -n "$N" fortio-service -o jsonpath='{.spec.clusterIP}')

  for r in $(seq 1 "$ROUNDS"); do
    info "TCP_RR round $r"
    timeout "$KUBECTL_TIMEOUT" kubectl exec -n "$NS" netperf-client -- \
      netperf -H "$np_ip" -t TCP_RR -l "$FORTIO_DURATION" -- -r 1,1 \
      -o MIN_LATENCY,MEAN_LATENCY,P50_LATENCY,P90_LATENCY,P99_LATENCY,MAX_LATENCY,THROUGHPUT \
      >"$d/tcp_rr_r${r}.txt" 2>/dev/null || warn "  TCP_RR round $r failed"
    sleep "$ROUND_SLEEP"
  done

  for r in $(seq 1 "$ROUNDS"); do
    info "TCP_CRR round $r"
    timeout "$KUBECTL_TIMEOUT" kubectl exec -n "$NS" netperf-client -- \
      netperf -H "$np_ip" -t TCP_CRR -l "$FORTIO_DURATION" -- -r 1,1 \
      -o MIN_LATENCY,MEAN_LATENCY,P50_LATENCY,P90_LATENCY,P99_LATENCY,MAX_LATENCY,THROUGHPUT \
      >"$d/tcp_crr_r${r}.txt" 2>/dev/null || warn "  TCP_CRR round $r failed"
    sleep "$ROUND_SLEEP"
  done

  # fortio HTTP p99 — stdout mode (low load, 1000 QPS, no WebSocket issues).
  info "HTTP p99 @ 1000 QPS"
  timeout "$KUBECTL_TIMEOUT" kubectl exec -n "$NS" fortio-client -- \
    fortio load -qps 1000 -c 16 -t "${FORTIO_DURATION}s" -json - \
    "http://${fs_ip}:8080/echo?size=512" >"$d/http_1k_qps.json" 2>/dev/null || warn "  HTTP latency test may have errors"

  info "Latency tests complete"
}

# ─── Service Scale Test ─────────────────────────────────────────────────────

# ensure_lb_map_capacity — When the planned Service-scale test would exceed
# Cilium's bpf-lb-map-max, interactively offer to raise it and restart cilium.
# Args: current_max needed target
# Returns 0 if capacity is now sufficient (raised, or user-confirmed already ok),
#         1 if the user declined / no TTY / patch failed (caller should warn).
ensure_lb_map_capacity() {
  local cur="$1" needed="$2" target="$3"

  warn "═══════════════════════════════════════════════════════════════"
  warn "LB MAP CAPACITY INSUFFICIENT"
  warn "  This test needs ~${needed} LB entries (${max_svc} svc × $((eps_per_svc + 1)) per svc),"
  warn "  but Cilium's bpf-lb-map-max=${cur}."
  warn "  Running as-is would OVERFLOW the LB map — large-scale numbers would be"
  warn "  bogus (forwarding failures, not real O(n) degradation)."
  warn "═══════════════════════════════════════════════════════════════"
  info "Proposed fix: set bpf-lb-map-max=${target} and restart all cilium pods."

  local answer=""
  if [[ "${AUTO_FIX_LB_MAP:-}" == "true" ]]; then
    info "AUTO_FIX_LB_MAP=true — applying without prompt."
    answer="y"
  elif [[ -t 0 ]]; then
    printf "Apply this fix now? cilium will restart (brief datapath churn). [y/N]: "
    read -r answer </dev/tty || answer=""
  else
    # No interactive TTY and no AUTO_FIX_LB_MAP — can't safely auto-modify.
    warn "Non-interactive shell and AUTO_FIX_LB_MAP not set; skipping auto-fix."
    warn "Re-run with AUTO_FIX_LB_MAP=true, or manually:"
    warn "  kubectl -n kube-system patch cm cilium-config --type merge -p '{\"data\":{\"bpf-lb-map-max\":\"${target}\"}}'"
    warn "  kubectl -n kube-system rollout restart ds/cilium"
    return 1
  fi

  case "$answer" in
  y | Y | yes | YES)
    info "Patching cilium-config: bpf-lb-map-max=${target} ..."
    if ! kubectl -n kube-system patch configmap cilium-config --type merge \
      -p "{\"data\":{\"bpf-lb-map-max\":\"${target}\"}}" >/dev/null 2>&1; then
      warn "Failed to patch cilium-config."
      return 1
    fi
    info "Restarting cilium DaemonSet (and operator) to apply..."
    kubectl -n kube-system rollout restart ds/cilium >/dev/null 2>&1 || true
    kubectl -n kube-system rollout restart deploy/cilium-operator >/dev/null 2>&1 || true
    info "Waiting for cilium DaemonSet to become ready (timeout 300s)..."
    if ! kubectl -n kube-system rollout status ds/cilium --timeout=300s >/dev/null 2>&1; then
      warn "cilium rollout did not complete within timeout; check 'kubectl -n kube-system get pod -l k8s-app=cilium'."
      return 1
    fi
    # Give the agents a moment to finish reprogramming BPF maps after restart.
    sleep 10
    local new_max
    new_max=$(kubectl -n kube-system get cm cilium-config -o jsonpath='{.data.bpf-lb-map-max}' 2>/dev/null)
    info "Done. bpf-lb-map-max is now ${new_max}."
    return 0
    ;;
  *)
    warn "Declined. Continuing without raising bpf-lb-map-max."
    return 1
    ;;
  esac
}

run_service_scale_test() {
  # Support multi-step scale testing: SVC_SCALE_STEPS="5000,10000,20000,30000"
  # (comma-separated, ascending). The high steps are chosen to bracket the
  # iptables-vs-Cilium short-connection crossover (~20-25k svc), so the result
  # directly shows at what Service count Cilium's RPS overtakes iptables.
  local steps_str="${SVC_SCALE_STEPS:-5000,10000,20000,30000}"
  local -a steps
  IFS=',' read -ra steps <<< "$steps_str"

  # Endpoints per dummy Service. NOTE: the load test hits a SINGLE fronting
  # Service, so its new-connection SYN traverses the KUBE-SERVICES chain whose
  # length == number of Services (one entry per svc) — independent of how many
  # endpoints each dummy svc has. Endpoints only inflate total rule count /
  # LB-map usage / creation time without affecting the hot path. So we keep this
  # small (a realistic multi-replica count) and drive degradation via svc COUNT.
  local eps_per_svc="${SVC_ENDPOINTS:-4}"

  local max_svc="${steps[${#steps[@]}-1]}"
  step "Running Service Scale Test (steps: ${steps_str})"
  local d="$OUTPUT_DIR/service-scale"
  mkdir -p "$d"

  local fs_ip
  fs_ip=$(kubectl get svc -n "$NS" fortio-service -o jsonpath='{.spec.clusterIP}')
  # netperf server Pod IP for per-scale TCP_CRR / TCP_RR latency probes.
  local np_ip
  np_ip=$(kubectl get pod -n "$NS" netperf-server -o jsonpath='{.status.podIP}' 2>/dev/null)

  # Preflight: ensure Cilium's LB map capacity is large enough for the test.
  # Each Service needs ~(1 frontend + eps_per_svc backend) LB entries; if the
  # total exceeds bpf-lb-map-max (default 65536) the map silently truncates,
  # causing forwarding failures and bogus "degradation" numbers (not O(n)).
  # When insufficient, prompt the user to auto-raise the limit and restart cilium.
  if [[ "$CLUSTER_TYPE" == cilium-* ]]; then
    local lb_max
    lb_max=$(kubectl -n kube-system get cm cilium-config -o jsonpath='{.data.bpf-lb-map-max}' 2>/dev/null)
    [[ -z "$lb_max" ]] && lb_max=65536  # Cilium default
    local needed=$((max_svc * (eps_per_svc + 1)))
    if [[ $needed -gt $lb_max ]]; then
      # Target with ~2x headroom, rounded up to a power-of-two-ish round number.
      local target=$((needed * 2))
      ensure_lb_map_capacity "$lb_max" "$needed" "$target" || {
        warn "Proceeding WITHOUT raising bpf-lb-map-max — large-scale results may be INVALID."
      }
      # Re-read the (possibly updated) value for the record.
      lb_max=$(kubectl -n kube-system get cm cilium-config -o jsonpath='{.data.bpf-lb-map-max}' 2>/dev/null)
      [[ -z "$lb_max" ]] && lb_max=65536
    else
      info "LB map capacity OK: need ~${needed}, limit ${lb_max}"
    fi
    echo "$lb_max" >"$d/bpf_lb_map_max.txt"
  fi

  # Save scale steps metadata
  printf '%s\n' "${steps[@]}" | paste -sd',' - >"$d/scale_steps.txt"
  echo "$eps_per_svc" >"$d/endpoints_per_svc.txt"

  kubectl create namespace test-services --dry-run=client -o yaml | kubectl apply -f - 2>/dev/null

  local batch_size=500
  local parallel="${SVC_CREATE_PARALLEL:-4}"
  local prev_count=0

  for svc_count in "${steps[@]}"; do
    local start_idx=$((prev_count + 1))
    local create_count=$((svc_count - prev_count))

    if [[ $create_count -le 0 ]]; then
      warn "Step ${svc_count} <= prev ${prev_count}, skipping"
      continue
    fi

    info "Creating dummy Services ${start_idx}..${svc_count} (${create_count} new, ${eps_per_svc} endpoints each)..."
    local tmpdir
    tmpdir=$(mktemp -d)

    local batch_idx=0
    for ((s = start_idx; s <= svc_count; s += batch_size)); do
      local e=$((s + batch_size - 1))
      [[ $e -gt $svc_count ]] && e=$svc_count
      local batch_file="$tmpdir/batch_${batch_idx}.yaml"
      local first=true
      for i in $(seq $s $e); do
        if [[ "$first" == "true" ]]; then
          first=false
        else
          echo "---" >>"$batch_file"
        fi
        cat <<EOF >>"$batch_file"
apiVersion: v1
kind: Service
metadata:
  name: dummy-svc-${i}
  namespace: test-services
spec:
  ports:
  - port: 80
    targetPort: 80
    protocol: TCP
---
apiVersion: v1
kind: Endpoints
metadata:
  name: dummy-svc-${i}
  namespace: test-services
subsets:
- addresses:
EOF
        # Generate eps_per_svc distinct endpoint IPs. Global endpoint index
        # g maps into 10.(100+g/64516).(g/254%254).(g%254+1), giving ~16M
        # addresses of headroom (enough for 10000 svc × 100+ endpoints).
        local ep
        for ((ep = 0; ep < eps_per_svc; ep++)); do
          local g=$(((i - 1) * eps_per_svc + ep))
          local a=$((100 + g / 64516))
          local b=$(((g / 254) % 254))
          local c=$((g % 254 + 1))
          echo "  - ip: 10.${a}.${b}.${c}" >>"$batch_file"
        done
        cat <<EOF >>"$batch_file"
  ports:
  - port: 80
    protocol: TCP
EOF
      done
      batch_idx=$((batch_idx + 1))
    done

    local total_batches=$batch_idx
    info "  Applying ${total_batches} batches with ${parallel} parallel workers..."
    local start_ts
    start_ts=$(date +%s)
    ls "$tmpdir"/batch_*.yaml | xargs -n 1 -P "$parallel" -I {} \
      kubectl apply --validate=false -f {} >/dev/null 2>&1
    local elapsed=$(($(date +%s) - start_ts))
    info "  Created ${create_count} services in ${elapsed}s (total: ${svc_count})"
    rm -rf "$tmpdir"

    # Wait for datapath sync. Scale with total endpoint count: more endpoints
    # = more iptables rules / BPF entries to program. ~60s base + 1s per 1000
    # endpoints, capped at 180s.
    local total_eps=$((svc_count * eps_per_svc))
    local sync_wait=$((60 + total_eps / 1000))
    [[ $sync_wait -gt 180 ]] && sync_wait=180
    info "Waiting ${sync_wait}s for datapath sync (${total_eps} endpoints)..."
    sleep "$sync_wait"

    # Collect rules count at this scale point
    _collect_rules_at_scale "$d" "$svc_count"

    # Run fortio tests at this scale point
    for r in $(seq 1 "$ROUNDS"); do
      _run_fortio "Via Svc 64c keepalive (${svc_count}svc) r$r" \
        "http://${fs_ip}:8080/echo?size=512" 64 true "$d" "ka_at_${svc_count}svc_r${r}.json"
      sleep "$ROUND_SLEEP"
    done
    for r in $(seq 1 "$ROUNDS"); do
      _run_fortio "Via Svc 64c short (${svc_count}svc) r$r" \
        "http://${fs_ip}:8080/echo?size=512" 64 false "$d" "short_at_${svc_count}svc_r${r}.json"
      sleep "$ROUND_SLEEP"
    done

    # Per-scale latency: TCP_CRR (new-connection RTT, the latency twin of the
    # short-conn RPS curve) and TCP_RR (established-connection RTT, expected flat).
    # Probes the netperf-server Pod IP directly (same target as the 0-svc baseline
    # latency test, so the numbers are directly comparable). This still captures
    # the iptables scan cost: in kube-proxy iptables mode EVERY new connection's
    # first packet traverses the nat KUBE-SERVICES chain (a linear list of
    # per-service dst-match rules) before falling through — so connect latency
    # grows O(svc count) even for Pod-IP-destined traffic. TCP_RR reuses one
    # connection (conntrack fast path after the first packet), so it stays flat.
    # netperf is serial / non-saturating → p99 is clean SLO-grade latency, not a
    # queueing artifact.
    if [[ -n "$np_ip" ]]; then
      for r in $(seq 1 "$ROUNDS"); do
        info "TCP_CRR (${svc_count}svc) r$r"
        timeout "$KUBECTL_TIMEOUT" kubectl exec -n "$NS" netperf-client -- \
          netperf -H "$np_ip" -t TCP_CRR -l "$FORTIO_DURATION" -- -r 1,1 \
          -o MIN_LATENCY,MEAN_LATENCY,P50_LATENCY,P90_LATENCY,P99_LATENCY,MAX_LATENCY,THROUGHPUT \
          >"$d/crr_at_${svc_count}svc_r${r}.txt" 2>/dev/null || warn "  TCP_CRR (${svc_count}svc) r$r failed"
        sleep "$ROUND_SLEEP"
      done
      for r in $(seq 1 "$ROUNDS"); do
        info "TCP_RR (${svc_count}svc) r$r"
        timeout "$KUBECTL_TIMEOUT" kubectl exec -n "$NS" netperf-client -- \
          netperf -H "$np_ip" -t TCP_RR -l "$FORTIO_DURATION" -- -r 1,1 \
          -o MIN_LATENCY,MEAN_LATENCY,P50_LATENCY,P90_LATENCY,P99_LATENCY,MAX_LATENCY,THROUGHPUT \
          >"$d/rr_at_${svc_count}svc_r${r}.txt" 2>/dev/null || warn "  TCP_RR (${svc_count}svc) r$r failed"
        sleep "$ROUND_SLEEP"
      done
    fi

    prev_count=$svc_count
  done

  info "Service scale tests complete"
}

# Helper: collect iptables/BPF rules count at a given scale point
_collect_rules_at_scale() {
  local d="$1" svc_count="$2"
  if [[ "$CLUSTER_TYPE" == "kubeproxy-iptables" ]]; then
    local kp_pod
    kp_pod=$(kubectl get pod -n kube-system -l k8s-app=kube-proxy -o name 2>/dev/null | head -1)
    kp_pod="${kp_pod#pod/}"
    if [[ -n "$kp_pod" ]]; then
      # Total ruleset size (all chains). Inflated by per-endpoint KUBE-SEP rules;
      # NOT the hot-path metric for a single fronting Service.
      kubectl exec -n kube-system "$kp_pod" -- iptables-save 2>/dev/null | wc -l | tr -d ' ' \
        >"$d/iptables_rules_at_${svc_count}svc.txt" || true
      # KUBE-SERVICES chain length ≈ number of Services. THIS is what a new
      # connection's SYN linearly scans, so it's the true driver of short-conn
      # degradation (independent of endpoints-per-svc). Count rules in the
      # nat-table KUBE-SERVICES chain.
      kubectl exec -n kube-system "$kp_pod" -- iptables-save -t nat 2>/dev/null \
        | grep -c '^-A KUBE-SERVICES ' | tr -d ' ' \
        >"$d/kube_services_chain_at_${svc_count}svc.txt" || true
    fi
  elif [[ "$CLUSTER_TYPE" == cilium-* ]]; then
    kubectl exec -n kube-system ds/cilium -- cilium bpf lb list 2>/dev/null | wc -l | tr -d ' ' \
      >"$d/lb_entries_at_${svc_count}svc.txt" || true
  fi
}

# ─── Hubble Overhead Test ────────────────────────────────────────────────────

run_hubble_overhead_test() {
  if [[ "$CLUSTER_TYPE" != cilium-* ]]; then
    info "Skipping Hubble overhead test (not a Cilium cluster)"
    return 0
  fi
  step "Running Hubble Overhead Test"
  local d="$OUTPUT_DIR/hubble"
  mkdir -p "$d"

  local fs_ip
  fs_ip=$(kubectl get svc -n "$NS" fortio-service -o jsonpath='{.spec.clusterIP}')

  # Phase 1: with Hubble (default state)
  info "Testing with Hubble enabled (default)..."
  _run_fortio "Hubble ON (keepalive)" "http://${fs_ip}:8080/echo?size=512" 64 true "$d" "hubble_on.json"

  # Phase 2: disable Hubble
  info "Disabling Hubble..."
  kubectl -n kube-system exec ds/cilium -- cilium config set hubble-disable true >/dev/null 2>&1 || true
  sleep 15  # wait for config to propagate

  _run_fortio "Hubble OFF (keepalive)" "http://${fs_ip}:8080/echo?size=512" 64 true "$d" "hubble_off.json"

  # Restore Hubble
  info "Re-enabling Hubble..."
  kubectl -n kube-system exec ds/cilium -- cilium config set hubble-disable false >/dev/null 2>&1 || true
  sleep 10

  info "Hubble overhead test complete"
}

# ─── NetworkPolicy Overhead Test ─────────────────────────────────────────────

run_networkpolicy_test() {
  if [[ "$CLUSTER_TYPE" != cilium-* ]]; then
    info "Skipping NetworkPolicy test (not a Cilium cluster)"
    return 0
  fi
  step "Running NetworkPolicy L3/L4 + L7 Overhead Test"
  local d="$OUTPUT_DIR/networkpolicy"
  mkdir -p "$d"

  local fs_ip
  fs_ip=$(kubectl get svc -n "$NS" fortio-service -o jsonpath='{.spec.clusterIP}')

  # Phase 1: baseline (no policy)
  info "Testing without NetworkPolicy (baseline)..."
  _run_fortio "No policy (keepalive)" "http://${fs_ip}:8080/echo?size=512" 64 true "$d" "no_policy.json"

  # Phase 2: apply L3/L4 CiliumNetworkPolicy
  info "Applying L3/L4 CiliumNetworkPolicy..."
  kubectl apply -n "$NS" -f - <<'CNPEOF'
apiVersion: "cilium.io/v2"
kind: CiliumNetworkPolicy
metadata:
  name: allow-fortio-benchmark
spec:
  endpointSelector:
    matchLabels:
      app: benchmark
      role: fortio-server
  ingress:
  - fromEndpoints:
    - matchLabels:
        app: benchmark
        role: fortio-client
    toPorts:
    - ports:
      - port: "8080"
        protocol: TCP
CNPEOF
  sleep 10  # wait for policy to take effect

  _run_fortio "L3/L4 policy (keepalive)" "http://${fs_ip}:8080/echo?size=512" 64 true "$d" "l3l4_policy.json"

  # Cleanup L3/L4 policy
  info "Removing L3/L4 CiliumNetworkPolicy..."
  kubectl delete -n "$NS" ciliumnetworkpolicy allow-fortio-benchmark --ignore-not-found >/dev/null 2>&1
  sleep 5

  # Phase 3: apply L7 CiliumNetworkPolicy (HTTP method filtering → Envoy proxy)
  info "Applying L7 CiliumNetworkPolicy (HTTP)..."
  kubectl apply -n "$NS" -f - <<'L7EOF'
apiVersion: "cilium.io/v2"
kind: CiliumNetworkPolicy
metadata:
  name: allow-fortio-benchmark-l7
spec:
  endpointSelector:
    matchLabels:
      app: benchmark
      role: fortio-server
  ingress:
  - fromEndpoints:
    - matchLabels:
        app: benchmark
        role: fortio-client
    toPorts:
    - ports:
      - port: "8080"
        protocol: TCP
      rules:
        http:
        - method: "GET"
L7EOF
  # L7 policy triggers Envoy proxy injection — needs more time than L3/L4.
  info "Waiting 30s for Envoy proxy initialization..."
  sleep 30

  _run_fortio "L7 policy (keepalive)" "http://${fs_ip}:8080/echo?size=512" 64 true "$d" "l7_policy.json"

  # Cleanup L7 policy
  info "Removing L7 CiliumNetworkPolicy..."
  kubectl delete -n "$NS" ciliumnetworkpolicy allow-fortio-benchmark-l7 --ignore-not-found >/dev/null 2>&1
  sleep 10  # wait for Envoy proxy teardown

  info "NetworkPolicy overhead test complete"
}

# ─── Component-Specific Metrics ─────────────────────────────────────────────

collect_component_metrics() {
  step "Collecting component-specific metrics"
  local rd="$OUTPUT_DIR/resources"
  mkdir -p "$rd"

  # Settle before sampling: the Service-scale test just finished and the agent
  # (cilium) / kube-proxy may still be reprogramming BPF maps / iptables rules.
  # Sampling immediately captures transient CPU spikes (e.g. agent CPU jumping
  # to >1000m mid-reprogram), which don't represent steady state. Wait for
  # things to quiesce, then sample several times over a window and let the
  # summary average them.
  local settle="${METRICS_SETTLE_SECS:-45}"
  local samples="${METRICS_SAMPLES:-6}"
  local sample_interval="${METRICS_SAMPLE_INTERVAL:-5}"

  if [[ "$CLUSTER_TYPE" == cilium-* ]]; then
    info "Letting datapath quiesce ${settle}s before sampling steady-state resources..."
    sleep "$settle"

    info "Collecting Cilium metrics (${samples} samples × ${sample_interval}s)..."
    _sample_pod_resources "k8s-app=cilium" "$rd/cilium_agent_cpu_mem.csv" "$samples" "$sample_interval"

    info "Collecting BPF map info..."
    kubectl exec -n kube-system ds/cilium -- bpftool map list -j 2>/dev/null >"$rd/bpf_map_info.json" || true
    kubectl exec -n kube-system ds/cilium -- cilium bpf metrics 2>/dev/null >"$rd/bpf_metrics.txt" || true

    kubectl exec -n kube-system ds/cilium -- cilium bpf lb list 2>/dev/null | wc -l >"$rd/lb_entries_count.txt" || true
    kubectl exec -n kube-system ds/cilium -- cilium bpf ct list global 2>/dev/null | wc -l >"$rd/ct_entries_count.txt" || true
    kubectl exec -n kube-system ds/cilium -- cilium identity list 2>/dev/null | wc -l >"$rd/identity_count.txt" || true

    # Parse BPF map memory from bpf_map_info.json (locally with python3)
    if [[ -s "$rd/bpf_map_info.json" ]]; then
      python3 -c "
import json, sys
try:
    with open('$rd/bpf_map_info.json') as f:
        maps = json.load(f)
    result = {'maps': [], 'total_bytes': 0, 'map_count': len(maps)}
    for m in maps:
        name = m.get('name', 'unnamed')
        # Prefer bytes_memlock (kernel 5.11+); fallback to key/value * max_entries estimate
        mem = m.get('bytes_memlock', 0)
        if not mem:
            ks = m.get('bytes_key', m.get('key_size', 0))
            vs = m.get('bytes_value', m.get('value_size', 0))
            me = m.get('max_entries', 0)
            mem = (ks + vs) * me
        result['maps'].append({'name': name, 'bytes': mem, 'max_entries': m.get('max_entries', 0)})
        result['total_bytes'] += mem
    # Sort by memory descending, keep top 10
    result['maps'].sort(key=lambda x: x['bytes'], reverse=True)
    result['top_maps'] = result['maps'][:10]
    del result['maps']
    result['total_mb'] = round(result['total_bytes'] / 1024 / 1024, 1)
    with open('$rd/bpf_map_memory.json', 'w') as f:
        json.dump(result, f, indent=2)
except Exception as e:
    print(f'WARN: BPF map memory parse failed: {e}', file=sys.stderr)
" 2>/dev/null || true
    fi

    # Collect Cilium Agent RSS (VmRSS from /proc)
    kubectl exec -n kube-system ds/cilium -- sh -c \
      'cat /proc/1/status 2>/dev/null | grep -E "^(VmRSS|VmSize):"' \
      >"$rd/cilium_agent_proc_status.txt" 2>/dev/null || true

  elif [[ "$CLUSTER_TYPE" == kubeproxy-* ]]; then
    info "Letting datapath quiesce ${settle}s before sampling steady-state resources..."
    sleep "$settle"

    info "Collecting kube-proxy metrics (${samples} samples × ${sample_interval}s)..."
    _sample_pod_resources "k8s-app=kube-proxy" "$rd/kubeproxy_cpu_mem.csv" "$samples" "$sample_interval"

    local kp_pod
    kp_pod=$(kubectl get pod -n kube-system -l k8s-app=kube-proxy -o name 2>/dev/null | head -1)
    kp_pod="${kp_pod#pod/}"

    if [[ "$CLUSTER_TYPE" == "kubeproxy-iptables" && -n "$kp_pod" ]]; then
      kubectl exec -n kube-system "$kp_pod" -- iptables-save 2>/dev/null | wc -l >"$rd/iptables_rules_count.txt" || true
    fi

    if [[ "$CLUSTER_TYPE" == "kubeproxy-ipvs" && -n "$kp_pod" ]]; then
      kubectl exec -n kube-system "$kp_pod" -- ipvsadm -ln 2>/dev/null | wc -l >"$rd/ipvs_rules_count.txt" || true
    fi
  fi

  info "Component metrics collected"
}

# _sample_pod_resources — Sample `kubectl top pod` for a label N times into a
# CSV (timestamp,pod,cpu_millicores,memory_mib). Used for steady-state resource
# capture after the datapath has settled.
# Args: label csv_path samples interval_secs
_sample_pod_resources() {
  local label="$1" csv="$2" samples="$3" interval="$4"
  echo "timestamp,pod,cpu_millicores,memory_mib" >"$csv"
  local i
  for ((i = 0; i < samples; i++)); do
    kubectl top pod -n kube-system -l "$label" --no-headers 2>/dev/null | while read -r pod cpu mem rest; do
      echo "$(date +%H:%M:%S),$pod,${cpu%%m},${mem%%Mi}" >>"$csv"
    done
    [[ $i -lt $((samples - 1)) ]] && sleep "$interval"
  done
}

# ─── Generate Summary ───────────────────────────────────────────────────────

generate_summary() {
  step "Generating benchmark summary"
  local pyfile="/tmp/nb_summary_$$.py"
  cat >"$pyfile" <<'PYEOF'
import json, glob, os, csv, sys

basedir = os.environ.get('NB_OUTPUT_DIR', '.')
if not os.path.isdir(basedir):
    sys.exit(0)

summary = {"cluster": {}, "throughput": {}, "rps": {}, "latency": {}, "service_scale": {}, "resources": {}}

# Context
ctx = os.path.join(basedir, "context.yaml")
if os.path.exists(ctx):
    with open(ctx) as f:
        for line in f:
            if ':' in line:
                k, v = line.strip().split(':', 1)
                # values may be quoted (e.g. cluster_name with special chars)
                summary["cluster"][k.strip()] = v.strip().strip('"')

# Throughput
def parse_iperf(p):
    fs = sorted(glob.glob(os.path.join(basedir, p)))
    if not fs:
        return None
    vals = []
    for f in fs:
        try:
            with open(f) as fh:
                d = json.load(fh)
                vals.append(round(d['end']['sum_received']['bits_per_second'] / 1e9, 2))
        except: pass
    if not vals:
        return None
    return {"gbps": vals, "avg": round(sum(vals)/len(vals), 2)}

for key, pat in [
    ("node_level_8stream", "throughput/node_throughput_r*.json"),
    ("pod2pod_single", "throughput/pod2pod_single_r*.json"),
    ("pod2pod_8stream", "throughput/pod2pod_8stream_r*.json"),
    ("pod2pod_16stream", "throughput/pod2pod_16stream_r*.json"),
    ("via_service_8stream", "throughput/via_service_8stream_r*.json"),
]:
    r = parse_iperf(pat)
    if r: summary["throughput"][key] = r

# RPS
def parse_fortio(p):
    fs = sorted(glob.glob(os.path.join(basedir, p)))
    if not fs:
        return None
    vals = []
    for f in fs:
        try:
            with open(f) as fh:
                d = json.load(fh)
                vals.append(int(d['ActualQPS']))
        except: pass
    if not vals:
        return None
    return {"qps": vals, "avg_qps": int(sum(vals)/len(vals))}

for key, pat in [
    ("pod2pod_c64", "rps/pod2pod_c64_r*.json"),
    ("svc_c64", "rps/svc_c64_r*.json"),
    ("svc_c256", "rps/svc_c256_r*.json"),
    ("svc_short_c64", "rps/svc_short_c64_r*.json"),
]:
    r = parse_fortio(pat)
    if r: summary["rps"][key] = r

# Latency
def parse_netperf_latency(p, fname):
    fs = sorted(glob.glob(os.path.join(basedir, p)))
    if not fs:
        return None
    vals = []
    idx = {"p50": 2, "p99": 4, "mean": 1}.get(fname, 2)
    for f in fs:
        try:
            with open(f) as fh:
                lines = [l.strip() for l in fh.readlines() if l.strip()]
            if not lines: continue
            parts = lines[-1].split(',')
            if len(parts) > idx: vals.append(float(parts[idx]))
        except: pass
    if not vals:
        return None
    return int(sum(vals)/len(vals))

p50 = parse_netperf_latency("latency/tcp_rr_r*.txt", "p50")
p99 = parse_netperf_latency("latency/tcp_rr_r*.txt", "p99")
crr99 = parse_netperf_latency("latency/tcp_crr_r*.txt", "p99")
if p50: summary["latency"]["tcp_rr_p50_us"] = p50
if p99: summary["latency"]["tcp_rr_p99_us"] = p99
if crr99: summary["latency"]["tcp_crr_p99_us"] = crr99

# HTTP p99
hfs = glob.glob(os.path.join(basedir, "latency/http_1k_qps.json"))
if hfs:
    try:
        with open(hfs[0]) as f: d = json.load(f)
        for p in d.get('DurationHistogram', {}).get('Percentiles', []):
            if p.get('Percentile') == 99:
                summary["latency"]["http_p99_1k_qps_ms"] = round(p['Value'] * 1000, 2)
                break
    except: pass

# Service scale — multi-step: parse ka_at_{N}svc_r*.json and short_at_{N}svc_r*.json
summary["service_scale"] = {}
baseline_ka = summary.get("rps", {}).get("svc_c64", {}).get("avg_qps")
baseline_short = summary.get("rps", {}).get("svc_short_c64", {}).get("avg_qps")
if baseline_ka: summary["service_scale"]["keepalive_baseline_qps"] = baseline_ka
if baseline_short: summary["service_scale"]["short_conn_baseline_qps"] = baseline_short

# Endpoints per dummy Service
eps_file = os.path.join(basedir, "service-scale", "endpoints_per_svc.txt")
if os.path.exists(eps_file):
    try:
        with open(eps_file) as f:
            summary["service_scale"]["endpoints_per_svc"] = int(f.read().strip())
    except: pass

# Read scale steps
steps_file = os.path.join(basedir, "service-scale", "scale_steps.txt")
scale_steps = []
if os.path.exists(steps_file):
    with open(steps_file) as f:
        scale_steps = [int(x.strip()) for x in f.read().strip().split(',') if x.strip()]

# Also detect steps from file names if scale_steps.txt is missing (backward compat)
if not scale_steps:
    for fn in sorted(glob.glob(os.path.join(basedir, "service-scale", "ka_at_*svc_r1.json"))):
        bn = os.path.basename(fn)
        try:
            n = int(bn.split('_at_')[1].split('svc_')[0])
            if n not in scale_steps: scale_steps.append(n)
        except: pass
    # Backward compat: old single-step format (svc_1k_svc_r*.json)
    if not scale_steps:
        old_ka = parse_fortio("service-scale/svc_1k_svc_r*.json")
        old_short = parse_fortio("service-scale/short_conn_1k_svc_r*.json")
        svc_count_file = os.path.join(basedir, "service-scale", "dummy_services_count.txt")
        old_count = None
        if os.path.exists(svc_count_file):
            with open(svc_count_file) as f:
                old_count = int(f.read().strip())
        if old_ka and old_count and baseline_ka and baseline_ka > 0:
            step_data = {"svc_count": old_count, "keepalive_qps": old_ka["avg_qps"],
                         "keepalive_degradation_pct": round((old_ka["avg_qps"] - baseline_ka) / baseline_ka * 100, 1)}
            if old_short and baseline_short and baseline_short > 0:
                step_data["short_conn_qps"] = old_short["avg_qps"]
                step_data["short_conn_degradation_pct"] = round((old_short["avg_qps"] - baseline_short) / baseline_short * 100, 1)
            summary["service_scale"]["steps"] = [step_data]

if scale_steps:
    steps_data = []
    for sc in sorted(scale_steps):
        step_data = {"svc_count": sc}
        ka = parse_fortio(f"service-scale/ka_at_{sc}svc_r*.json")
        short = parse_fortio(f"service-scale/short_at_{sc}svc_r*.json")
        if ka and baseline_ka and baseline_ka > 0:
            step_data["keepalive_qps"] = ka["avg_qps"]
            step_data["keepalive_degradation_pct"] = round((ka["avg_qps"] - baseline_ka) / baseline_ka * 100, 1)
        if short and baseline_short and baseline_short > 0:
            step_data["short_conn_qps"] = short["avg_qps"]
            step_data["short_conn_degradation_pct"] = round((short["avg_qps"] - baseline_short) / baseline_short * 100, 1)
        # Per-scale latency: TCP_CRR p99 (new-connection RTT, tracks short-conn
        # degradation) and TCP_RR p99 (established-connection RTT, expected flat).
        crr99 = parse_netperf_latency(f"service-scale/crr_at_{sc}svc_r*.txt", "p99")
        rr99 = parse_netperf_latency(f"service-scale/rr_at_{sc}svc_r*.txt", "p99")
        if crr99: step_data["tcp_crr_p99_us"] = crr99
        if rr99: step_data["tcp_rr_p99_us"] = rr99
        # Rules count at this scale.
        # rules_count = total ruleset / LB entries (size indicator).
        for pat in [f"service-scale/iptables_rules_at_{sc}svc.txt", f"service-scale/lb_entries_at_{sc}svc.txt"]:
            fp = os.path.join(basedir, pat)
            if os.path.exists(fp):
                with open(fp) as f:
                    val = f.read().strip()
                    if val: step_data["rules_count"] = int(val)
        # kube_services_chain = KUBE-SERVICES chain length ≈ svc count; this is
        # the true hot-path scan length driving short-conn degradation (iptables).
        ksp = os.path.join(basedir, f"service-scale/kube_services_chain_at_{sc}svc.txt")
        if os.path.exists(ksp):
            with open(ksp) as f:
                val = f.read().strip()
                if val: step_data["kube_services_chain"] = int(val)
        steps_data.append(step_data)
    summary["service_scale"]["steps"] = steps_data

# Resources
# Reports avg AND max across all (pod × sample) rows. Cross-pod variance is
# real: the cilium agent doing kube-proxy-replacement / LB sync work uses more
# CPU/mem than idle agents on other nodes, so max captures the busiest node
# while avg captures the cluster-wide typical per-node cost.
def parse_csv(pattern):
    fs = sorted(glob.glob(os.path.join(basedir, pattern)))
    if not fs:
        return {}
    cpu_vals, mem_vals = [], []
    pods = set()
    for f in fs:
        try:
            with open(f) as fh:
                for row in csv.DictReader(fh):
                    try:
                        cpu_vals.append(float(row['cpu_millicores']))
                        mem_vals.append(float(row['memory_mib']))
                        pods.add(row.get('pod', ''))
                    except: pass
        except: pass
    if not cpu_vals:
        return {}
    return {
        "avg_cpu_m": round(sum(cpu_vals)/len(cpu_vals), 1),
        "max_cpu_m": round(max(cpu_vals), 1),
        "avg_mem_mb": round(sum(mem_vals)/len(mem_vals), 1),
        "max_mem_mb": round(max(mem_vals), 1),
        "pods_sampled": len(pods),
    }

for res_key, csv_pat in [
    ("cilium_agent", "resources/cilium_agent_cpu_mem.csv"),
    ("kubeproxy", "resources/kubeproxy_cpu_mem.csv"),
]:
    rd = parse_csv(csv_pat)
    if rd: summary["resources"][res_key] = rd

# Hubble overhead
def parse_single_fortio(path):
    fp = os.path.join(basedir, path)
    if not os.path.exists(fp):
        return None
    try:
        with open(fp) as fh:
            d = json.load(fh)
            return int(d['ActualQPS'])
    except:
        return None

hubble_on = parse_single_fortio("hubble/hubble_on.json")
hubble_off = parse_single_fortio("hubble/hubble_off.json")
if hubble_on and hubble_off and hubble_off > 0:
    summary["hubble"] = {
        "with_hubble_qps": hubble_on,
        "without_hubble_qps": hubble_off,
        "overhead_pct": round((hubble_on - hubble_off) / hubble_off * 100, 1)
    }

# NetworkPolicy overhead (L3/L4 + L7)
np_baseline = parse_single_fortio("networkpolicy/no_policy.json")
np_l3l4 = parse_single_fortio("networkpolicy/l3l4_policy.json")
np_l7 = parse_single_fortio("networkpolicy/l7_policy.json")
if np_baseline and np_baseline > 0:
    np_data = {"baseline_qps": np_baseline}
    if np_l3l4:
        np_data["l3l4_qps"] = np_l3l4
        np_data["l3l4_overhead_pct"] = round((np_l3l4 - np_baseline) / np_baseline * 100, 1)
    if np_l7:
        np_data["l7_qps"] = np_l7
        np_data["l7_overhead_pct"] = round((np_l7 - np_baseline) / np_baseline * 100, 1)
    summary["networkpolicy"] = np_data

# BPF map memory
bpf_mem_file = os.path.join(basedir, "resources", "bpf_map_memory.json")
if os.path.exists(bpf_mem_file):
    try:
        with open(bpf_mem_file) as f:
            summary["resources"]["bpf_maps"] = json.load(f)
    except: pass

# Cilium Agent RSS from /proc
proc_file = os.path.join(basedir, "resources", "cilium_agent_proc_status.txt")
if os.path.exists(proc_file):
    try:
        with open(proc_file) as f:
            for line in f:
                if line.startswith("VmRSS:"):
                    rss_kb = int(line.split()[1])
                    summary["resources"]["cilium_agent_rss_mb"] = round(rss_kb / 1024, 1)
    except: pass

# Write
out = os.path.join(basedir, "benchmark-summary.json")
with open(out, 'w') as f:
    json.dump(summary, f, indent=2, ensure_ascii=False)
print(f"Summary written to {out}")
PYEOF
  NB_OUTPUT_DIR="$OUTPUT_DIR" python3 "$pyfile" 2>/dev/null || warn "Python3 not available for summary generation"
  rm -f "$pyfile"
}

# ─── Print Summary Table ────────────────────────────────────────────────────

print_summary_table() {
  step "Benchmark Results Summary"
  local f="$OUTPUT_DIR/benchmark-summary.json"
  if [[ ! -f "$f" ]]; then
    warn "No summary file found at $f"
    return
  fi
  local pyfile="/tmp/nb_print_$$.py"
  cat >"$pyfile" <<'PYEOF'
import json, sys
with open(sys.argv[1]) as f:
    d = json.load(f)
print()
print(f"  Cluster:       {d.get('cluster', {}).get('cluster_name', '?')}")
print(f"  Type:          {d.get('cluster', {}).get('cluster_type', '?')}")
print(f"  K8s Version:   {d.get('cluster', {}).get('k8s_version', '?')}")
print()
t = d.get('throughput', {})
if t:
    print("  ── Throughput ──")
    for k, v in t.items():
        print(f"    {k}: {v.get('avg', '?')} Gbps (rounds: {v.get('gbps', [])})")
    print()
r = d.get('rps', {})
if r:
    print("  ── RPS ──")
    for k, v in r.items():
        print(f"    {k}: {v.get('avg_qps', '?')} req/s")
    print()
l = d.get('latency', {})
if l:
    print("  ── Latency ──")
    for k, v in l.items():
        print(f"    {k}: {v}")
    print()
ss = d.get('service_scale', {})
if ss:
    steps = ss.get('steps', [])
    if steps:
        eps = ss.get('endpoints_per_svc')
        hdr = "  ── Service Scale ──" if not eps else f"  ── Service Scale ({eps} endpoints/svc) ──"
        print(hdr)
        if 'keepalive_baseline_qps' in ss:
            print(f"    keepalive baseline:    {ss['keepalive_baseline_qps']} req/s")
        if 'short_conn_baseline_qps' in ss:
            print(f"    short-conn baseline:   {ss['short_conn_baseline_qps']} req/s")
        for step in steps:
            sc = step.get('svc_count', '?')
            print(f"    ── {sc} Services ──")
            if 'keepalive_qps' in step:
                print(f"      keepalive:     {step['keepalive_qps']} req/s ({step.get('keepalive_degradation_pct', '?')}%)")
            if 'short_conn_qps' in step:
                print(f"      short-conn:    {step['short_conn_qps']} req/s ({step.get('short_conn_degradation_pct', '?')}%)")
            if 'tcp_crr_p99_us' in step:
                print(f"      TCP_CRR p99:   {step['tcp_crr_p99_us']} µs (new-conn latency)")
            if 'tcp_rr_p99_us' in step:
                print(f"      TCP_RR p99:    {step['tcp_rr_p99_us']} µs (established-conn latency)")
            if 'rules_count' in step:
                print(f"      rules/entries: {step['rules_count']}")
            if 'kube_services_chain' in step:
                print(f"      KUBE-SERVICES chain: {step['kube_services_chain']} (hot-path scan length)")
    print()
hb = d.get('hubble', {})
if hb:
    print("  ── Hubble Overhead ──")
    print(f"    with Hubble:    {hb.get('with_hubble_qps', '?')} req/s")
    print(f"    without Hubble: {hb.get('without_hubble_qps', '?')} req/s")
    print(f"    overhead:       {hb.get('overhead_pct', '?')}%")
    print()
np = d.get('networkpolicy', {})
if np:
    print("  ── NetworkPolicy Overhead ──")
    print(f"    no policy:     {np.get('baseline_qps', '?')} req/s")
    if 'l3l4_qps' in np:
        print(f"    L3/L4 CNP:     {np.get('l3l4_qps', '?')} req/s (overhead: {np.get('l3l4_overhead_pct', '?')}%)")
    if 'l7_qps' in np:
        print(f"    L7 CNP (HTTP): {np.get('l7_qps', '?')} req/s (overhead: {np.get('l7_overhead_pct', '?')}%)")
    print()
res = d.get('resources', {})
for rk, rlabel in [("kubeproxy", "kube-proxy"), ("cilium_agent", "Cilium Agent")]:
    r = res.get(rk)
    if r:
        print(f"  ── Resource: {rlabel} (per-pod, steady state) ──")
        print(f"    CPU:    avg {r.get('avg_cpu_m','?')}m / max {r.get('max_cpu_m','?')}m")
        print(f"    Memory: avg {r.get('avg_mem_mb','?')}MiB / max {r.get('max_mem_mb','?')}MiB")
        print(f"    (sampled across {r.get('pods_sampled','?')} pod(s))")
        print()
bpf = res.get('bpf_maps', {})
if bpf:
    print("  ── BPF Map Memory ──")
    print(f"    total: {bpf.get('total_mb', '?')} MB ({bpf.get('map_count', '?')} maps)")
    tops = bpf.get('top_maps', [])
    if tops:
        print("    top maps:")
        for m in tops[:5]:
            mb = round(m.get('bytes', 0) / 1024 / 1024, 1)
            print(f"      {m.get('name', '?'):30s} {mb:>6.1f} MB  (max_entries: {m.get('max_entries', '?')})")
    print()
if 'cilium_agent_rss_mb' in res:
    print(f"  Cilium Agent RSS: {res['cilium_agent_rss_mb']} MB")
    print()
PYEOF
  python3 "$pyfile" "$f" 2>/dev/null || true
  rm -f "$pyfile"
}

# ─── Main ────────────────────────────────────────────────────────────────────

main() {
  echo ""
  echo "╔═══════════════════════════════════════╗"
  echo "║         TKE Network Benchmark         ║"
  echo "╚═══════════════════════════════════════╝"
  echo ""

  parse_args "$@"

  check_prereqs || {
    err "Prerequisite check failed"
    exit 1
  }

  info "Current context: $(get_cluster_name)"

  detect_cluster_type

  local cname
  cname=$(get_cluster_name)
  # Derive the default output dir from a filesystem-safe form of the context
  # name (raw context may contain '/', '(', ')', spaces, CJK that break paths).
  OUTPUT_DIR="${OUTPUT_DIR:-./benchmark-results-$(sanitize_name "$cname")}"
  info "Output directory: $OUTPUT_DIR"
  mkdir -p "$OUTPUT_DIR/resources"

  collect_context_info
  select_worker_nodes

  deploy_test_workloads
  run_throughput_tests
  run_rps_tests
  run_latency_tests
  run_hubble_overhead_test
  run_networkpolicy_test
  run_service_scale_test
  collect_component_metrics

  generate_summary
  print_summary_table

  cleanup_test_workloads

  green ""
  green "╔══════════════════════════════════════════════════════════╗"
  green "║          Benchmark Complete!                            ║"
  green "╚══════════════════════════════════════════════════════════╝"
  green ""
  green "Results saved to: $OUTPUT_DIR"
  green "Summary file:     $OUTPUT_DIR/benchmark-summary.json"
  echo ""
}

main "$@"
