Skip to content

API Reference

Solvers

sawmil.quadprog

quadprog

quadprog(
    H: NDArray[float64],
    f: NDArray[float64],
    Aeq: Optional[NDArray[float64]],
    beq: Optional[NDArray[float64]],
    lb: NDArray[float64],
    ub: NDArray[float64],
    solver: str = "gurobi",
    verbose: bool = False,
    solver_params: Optional[Mapping[str, Any]] = None,
) -> Union[
    Tuple[npt.NDArray[np.float64], "Objective"], None
]

Solve the quadratic program:

minimize   0.5 * αᵀ H α + fᵀ α
subject to Aeq α = beq
           lb ≤ α ≤ ub

Parameters:

  • H (NDArray[float64]) –

    (n, n) quadratic term matrix in 0.5 * αᵀ H α

  • f (NDArray[float64]) –

    (n,) linear term vector in fᵀ α , usually f = -1

  • Aeq (Optional[NDArray[float64]]) –

    (m, n) equality constraint matrix, usually yᵀ

  • beq (Optional[NDArray[float64]]) –

    (m,) equality constraint rhs, usually 0

  • lb (NDArray[float64]) –

    (n,) lower bound vector, usually 0

  • ub (NDArray[float64]) –

    (n,) upper bound vector, usually C

  • verbose (bool, default: False ) –

    If True, print solver logs

  • solver_params (Optional[Mapping[str, Any]], default: None ) –

    dict of backend-specific options. Examples: - solver='gurobi': {'env': , 'params': {'Method':2, 'Threads':1}} - solver='osqp' : {'setup': {...}, 'solve': {...}} or flat keys for setup - solver='daqp' : {'eps_abs': 1e-8, 'eps_rel': 1e-8, ...}

Returns:

  • Union[Tuple[NDArray[float64], 'Objective'], None]

    α*: Optimal solution vector

  • Objective ( Union[Tuple[NDArray[float64], 'Objective'], None] ) –

    quadratic and linear parts of the optimum

Source code in src/sawmil/quadprog.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def quadprog(
    H: npt.NDArray[np.float64],
    f: npt.NDArray[np.float64],
    Aeq: Optional[npt.NDArray[np.float64]],
    beq: Optional[npt.NDArray[np.float64]],
    lb: npt.NDArray[np.float64],
    ub: npt.NDArray[np.float64],
    solver: str = "gurobi",
    verbose: bool = False,
    solver_params: Optional[Mapping[str, Any]] = None,
) -> Union[Tuple[npt.NDArray[np.float64], "Objective"], None]:
    """
    Solve the quadratic program:

        minimize   0.5 * αᵀ H α + fᵀ α
        subject to Aeq α = beq
                   lb ≤ α ≤ ub

    Args:
        H: (n, n) quadratic term matrix in 0.5 * αᵀ H α
        f: (n,) linear term vector in fᵀ α , usually f = -1
        Aeq: (m, n) equality constraint matrix, usually yᵀ
        beq: (m,) equality constraint rhs, usually 0
        lb: (n,) lower bound vector, usually 0
        ub: (n,) upper bound vector, usually C
        verbose: If True, print solver logs
        solver_params: dict of backend-specific options. 
            Examples:
                - solver='gurobi': {'env': <gp.Env>, 'params': {'Method':2, 'Threads':1}}
                - solver='osqp'  : {'setup': {...}, 'solve': {...}} or flat keys for setup
                - solver='daqp'  : {'eps_abs': 1e-8, 'eps_rel': 1e-8, ...}

    Returns:
        α*: Optimal solution vector
        Objective: quadratic and linear parts of the optimum
    """
    H, f, Aeq, beq, lb, ub = _validate_and_stabilize(H, f, Aeq, beq, lb, ub)

    params = dict(solver_params or {})
    if params:
        print(f"Using solver '{solver}' with params: {params}")

    if solver == "gurobi":
        if _qp_gurobi is None:
            raise ImportError("gurobi path selected but 'gurobipy' is not installed or usable. "
                              "Install with: pip install sawmil[gurobi]")
        return _qp_gurobi(H, f, Aeq, beq, lb, ub, verbose=verbose, **params)

    elif solver == "osqp":
        if _qp_osqp is None:
            raise ImportError("osqp path selected but 'osqp' is not installed. "
                              "Install with: pip install sawmil[osqp]")
        return _qp_osqp(H, f, Aeq, beq, lb, ub, verbose=verbose, **params)
    elif solver == "daqp":
        if _qp_daqp is None:
            raise ImportError("daqp path selected but 'daqp' is not installed. "
                              "Install with: pip install sawmil[daqp]")
        return _qp_daqp(H, f, Aeq, beq, lb, ub, verbose=verbose, **params)

    raise ValueError(f"Unknown solver: {solver}")

sawmil.solvers._gurobi

quadprog_gurobi

quadprog_gurobi(
    H: NDArray[float64],
    f: NDArray[float64],
    Aeq: Optional[NDArray[float64]],
    beq: Optional[NDArray[float64]],
    lb: NDArray[float64],
    ub: NDArray[float64],
    verbose: bool = False,
    **params: Optional[Dict],
) -> Tuple[npt.NDArray[np.float64], "Objective"]

Solve the quadratic program using Gurobi:

minimize   0.5 * αᵀ H α + fᵀ α
subject to Aeq α = beq
           lb ≤ α ≤ ub

Parameters:

  • H (ndarray) –

    (n, n) Hessian matrix for the quadratic term in 0.5 * αᵀ H α.

  • f (ndarray) –

    (n,) linear term vector in fᵀ α. For SVMs, usually -1 for each component.

  • Aeq (ndarray | None) –

    (m, n) equality constraint matrix, usually yᵀ. Must be provided iff beq is provided.

  • beq (ndarray | None) –

    (m,) equality constraint right-hand side, usually 0. Must be provided iff Aeq is provided.

  • lb (ndarray) –

    (n,) lower bound vector, usually 0.

  • ub (ndarray) –

    (n,) upper bound vector, usually C.

  • verbose (bool, default: False ) –

    If True, print solver logs.

  • params (Any, default: {} ) –

    Additional keyword parameters passed via **params. Expected keys: * env (dict): Parameters for gurobipy.Env (e.g., {"LogFile": "gurobi.log"}). * model (dict): Parameters for gurobipy.Model.Params (e.g., {"Method": 2, "Threads": 1}). * start (np.ndarray | None): Optional initial solution vector of shape (n,) for warm start.

Returns:

  • x ( ndarray ) –

    Optimal solution vector α* of shape (n,).

  • objective ( Objective ) –

    Quadratic and linear parts of the optimum.

Raises:

  • ImportError

    If gurobipy is not installed.

  • ValueError

    If only one of Aeq or beq is provided, or if a warm start has the wrong shape.

Source code in src/sawmil/solvers/_gurobi.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def quadprog_gurobi(
    H: npt.NDArray[np.float64],
    f: npt.NDArray[np.float64],
    Aeq: Optional[npt.NDArray[np.float64]],
    beq: Optional[npt.NDArray[np.float64]],
    lb: npt.NDArray[np.float64],
    ub: npt.NDArray[np.float64],
    verbose: bool = False,
    **params: Optional[Dict],

) -> Tuple[npt.NDArray[np.float64], "Objective"]:
    """
    Solve the quadratic program using Gurobi:

        minimize   0.5 * αᵀ H α + fᵀ α
        subject to Aeq α = beq
                   lb ≤ α ≤ ub

    Args:
        H (np.ndarray): (n, n) Hessian matrix for the quadratic term in 0.5 * αᵀ H α.
        f (np.ndarray): (n,) linear term vector in fᵀ α. For SVMs, usually -1 for each component.
        Aeq (np.ndarray | None): (m, n) equality constraint matrix, usually yᵀ. Must be provided iff ``beq`` is provided.
        beq (np.ndarray | None): (m,) equality constraint right-hand side, usually 0. Must be provided iff ``Aeq`` is provided.
        lb (np.ndarray): (n,) lower bound vector, usually 0.
        ub (np.ndarray): (n,) upper bound vector, usually C.
        verbose (bool): If True, print solver logs.
        params (Any): Additional keyword parameters passed via ``**params``.
            Expected keys:
            * ``env`` (dict): Parameters for ``gurobipy.Env`` (e.g., ``{"LogFile": "gurobi.log"}``).
            * ``model`` (dict): Parameters for ``gurobipy.Model.Params`` (e.g., ``{"Method": 2, "Threads": 1}``).
            * ``start`` (np.ndarray | None): Optional initial solution vector of shape (n,) for warm start.

    Returns:
        x (np.ndarray): Optimal solution vector α* of shape (n,).
        objective (Objective): Quadratic and linear parts of the optimum.

    Raises:
        ImportError: If ``gurobipy`` is not installed.
        ValueError: If only one of ``Aeq`` or ``beq`` is provided, or if a warm start has the wrong shape.
    """

    try:
        import gurobipy as gp
    except Exception as exc:  # pragma: no cover
        raise ImportError("gurobipy is required for solver='gurobi'") from exc
    if (Aeq is None) ^ (beq is None):
        raise ValueError("Aeq and beq must both be None or both be provided.")

    n = H.shape[0]
    env_cfg = dict(params.get("env", {}))
    env = gp.Env(**env_cfg) if env_cfg else None
    if env_cfg:
        log.debug(f"Gurobi Env params: {env_cfg}")

    model = gp.Model(env=env) if env else gp.Model()
    if not verbose:
        model.Params.OutputFlag = 0

    # Collect model parameter overrides
    model_params: dict[str, Any] = dict(params.get("model", {}))
    # Also allow flat, non-namespaced params as a convenience
    for k, v in params.items():
        if k not in ("model", "env", "start"):
            model_params.setdefault(k, v)

    if model_params:
        log.debug(f"Gurobi Model.Params overrides: {model_params}")

    # Apply model.Params.* safely
    for k, v in model_params.items():
        try:
            setattr(model.Params, k, v)
        except AttributeError as exc:
            raise ValueError(f"Unknown Gurobi parameter: '{k}'") from exc


    x = model.addMVar(n, lb=lb, ub=ub, name="alpha")

    if "start" in params and params["start"] is not None:
        start = np.asarray(params["start"], dtype=float)
        if start.shape != (n,):
            raise ValueError(f"start must have shape ({n},), got {start.shape}")
        x.Start = start
    obj = 0.5 * (x @ H @ x) + f @ x
    model.setObjective(obj, gp.GRB.MINIMIZE)

    if Aeq is not None:
        model.addConstr(Aeq @ x == beq, name="eq")
    model.optimize()

    if model.Status != gp.GRB.OPTIMAL:  # pragma: no cover - defensive
        log.warning(RuntimeError(
            f"Gurobi optimization failed with status {model.Status}"))

    xstar = np.asarray(x.X, dtype=float)
    quadratic = float(0.5 * xstar.T @ H @ xstar)
    linear = float(f.T @ xstar)
    return xstar, Objective(quadratic, linear)

sawmil.solvers._osqp

quadprog_osqp

quadprog_osqp(
    H: NDArray[float64],
    f: NDArray[float64],
    Aeq: Optional[NDArray[float64]],
    beq: Optional[NDArray[float64]],
    lb: NDArray[float64],
    ub: NDArray[float64],
    verbose: bool = False,
    **params: Any,
) -> Tuple[npt.NDArray[np.float64], "Objective"]

Solve a convex quadratic program using the OSQP solver.

The problem has the form:

minimize    0.5 * αᵀ H α + fᵀ α
subject to  Aeq α = beq
            lb ≤ α ≤ ub

This wrapper builds the constraint matrix and passes options to OSQP's setup() and solve() routines.

Parameters

H : (n, n) ndarray of float Hessian matrix for the quadratic term in 0.5 * αᵀ H α. For SVM duals, this is typically (y yᵀ) ⊙ K where K is the kernel matrix. f : (n,) ndarray of float Linear term vector in fᵀ α. For SVMs, usually -1 for each component. Aeq : (m, n) ndarray of float, optional Equality constraint matrix, e.g. yᵀ for the SVM bias constraint. beq : (m,) ndarray of float, optional Right-hand side of the equality constraint, usually 0. lb : (n,) ndarray of float Lower bounds on the variables. For standard SVM duals, usually all zeros. ub : (n,) ndarray of float Upper bounds on the variables. For soft-margin SVM duals, usually all entries equal to C. verbose : bool, default=False If True, print solver logs. **params : dict Additional OSQP options. Keys can be: - setup (dict): passed to OSQP.setup() (e.g. {"eps_abs": 1e-6}). - solve (dict): passed to OSQP.solve() (e.g. {"warm_start": True}). - Flat keyword args: if neither setup nor solve is provided, non-nested keys are treated as setup options.

Returns

x : (n,) ndarray of float Optimal solution vector α*. objective : Objective Object containing the quadratic and linear parts of the optimum value.

Raises

ImportError If required dependencies (scipy or osqp) are not installed. RuntimeError If OSQP terminates with a status other than solved or solved_inaccurate.

Source code in src/sawmil/solvers/_osqp.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def quadprog_osqp(
    H: npt.NDArray[np.float64],
    f: npt.NDArray[np.float64],
    Aeq: Optional[npt.NDArray[np.float64]],
    beq: Optional[npt.NDArray[np.float64]],
    lb: npt.NDArray[np.float64],
    ub: npt.NDArray[np.float64],
    verbose: bool = False,
    **params: Any,
) -> Tuple[npt.NDArray[np.float64], "Objective"]:
    """
    Solve a convex quadratic program using the OSQP solver.

    The problem has the form:

        minimize    0.5 * αᵀ H α + fᵀ α
        subject to  Aeq α = beq
                    lb ≤ α ≤ ub

    This wrapper builds the constraint matrix and passes options to OSQP's
    `setup()` and `solve()` routines.

    Parameters
    ----------
    H : (n, n) ndarray of float
        Hessian matrix for the quadratic term in 0.5 * αᵀ H α.
        For SVM duals, this is typically (y yᵀ) ⊙ K where K is the kernel matrix.
    f : (n,) ndarray of float
        Linear term vector in fᵀ α. For SVMs, usually -1 for each component.
    Aeq : (m, n) ndarray of float, optional
        Equality constraint matrix, e.g. yᵀ for the SVM bias constraint.
    beq : (m,) ndarray of float, optional
        Right-hand side of the equality constraint, usually 0.
    lb : (n,) ndarray of float
        Lower bounds on the variables. For standard SVM duals, usually all zeros.
    ub : (n,) ndarray of float
        Upper bounds on the variables. For soft-margin SVM duals, usually all entries equal to C.
    verbose : bool, default=False
        If True, print solver logs.
    **params : dict
        Additional OSQP options. Keys can be:
        - ``setup`` (dict): passed to ``OSQP.setup()`` (e.g. ``{"eps_abs": 1e-6}``).
        - ``solve`` (dict): passed to ``OSQP.solve()`` (e.g. ``{"warm_start": True}``).
        - Flat keyword args: if neither ``setup`` nor ``solve`` is provided,
          non-nested keys are treated as setup options.

    Returns
    -------
    x : (n,) ndarray of float
        Optimal solution vector α*.
    objective : Objective
        Object containing the quadratic and linear parts of the optimum value.

    Raises
    ------
    ImportError
        If required dependencies (``scipy`` or ``osqp``) are not installed.
    RuntimeError
        If OSQP terminates with a status other than solved or solved_inaccurate.
    """
    # Lazy, guarded imports so the module can be imported without OSQP installed.
    try:
        import scipy.sparse as sp
    except Exception as exc:  # pragma: no cover
        raise ImportError("scipy is required for solver='osqp'") from exc
    try:
        import osqp
    except Exception as exc:  # pragma: no cover
        raise ImportError("osqp is required for solver='osqp'") from exc

    n = H.shape[0]
    P = sp.csc_matrix(H)
    q = f

    blocks = []
    l_list, u_list = [], []

    # Equalities: encode as Aeq x in [beq, beq]
    if Aeq is not None:
        blocks.append(sp.csc_matrix(Aeq))
        l_list.append(beq)  # type: ignore[arg-type]
        u_list.append(beq)  # type: ignore[arg-type]

    # Bounds: I x in [lb, ub]
    csc_eye = sp.eye(n, format="csc")
    blocks.append(csc_eye)
    l_list.append(lb)
    u_list.append(ub)

    A = sp.vstack(blocks, format="csc")
    lower_vec = np.concatenate(l_list)
    upper_vec = np.concatenate(u_list)

    # Split options into setup/solve; accept flat options as setup()
    setup_opts: dict[str, Any] = dict(params.get("setup", {}))
    solve_opts: dict[str, Any] = dict(params.get("solve", {}))
    if not setup_opts and not solve_opts:
        # treat all non-nested options as setup()
        setup_opts = {k: v for k,
                      v in params.items() if k not in ("setup", "solve")}

    # sensible defaults; allow override via setup_opts
    setup_defaults = dict(
        polishing=True,
        eps_abs=1e-8,
        eps_rel=1e-8,
        max_iter=20000,
    )
    for k, v in setup_defaults.items():
        setup_opts.setdefault(k, v)

    # And in solve() default to not raising so we can return diagnostics
    solve_opts.setdefault("raise_error", False)

    prob = osqp.OSQP()
    prob.setup(P=P, q=q, A=A, l=lower_vec, u=upper_vec,
               verbose=verbose, **setup_opts)
    res = prob.solve(**solve_opts)

    if res.info.status_val not in (1, 2):  # 1=solved, 2=solved_inaccurate
        log.error(f"OSQP failed: {res.info.status}")

    x = np.asarray(res.x, dtype=float)
    quadratic = float(0.5 * x @ (H @ x))
    linear = float(f @ x)
    return x, Objective(quadratic, linear)

Kernels

sawmil.kernels

BaseKernel

Bases: ABC

Minimal kernel interface for the single-instance kernels: fit (optional) + call.

Linear dataclass

Linear()

Bases: BaseKernel

Linear kernel: K(x, y) = x^T y

Normalize dataclass

Normalize(k: BaseKernel, eps: float = 1e-12)

Bases: BaseKernel

Cosine-style normalization: Kxy / sqrt(Kxx * Kyy).

Polynomial dataclass

Polynomial(
    degree: int = 3, gamma: float = 1.0, coef0: float = 0.0
)

Bases: BaseKernel

Polynomial kernel: K(x, y) = (gamma * x^T y + coef0)^degree

Precomputed dataclass

Precomputed(K: ndarray)

Bases: BaseKernel

Use when a Gram matrix is already built; ignores X,Y and returns K (shape checked by caller).

Product dataclass

Product(k1: BaseKernel, k2: BaseKernel)

Bases: BaseKernel

Product kernel: K(x, y) = k1(x, y) * k2(x, y)

RBF dataclass

RBF(gamma: Optional[float] = None)

Bases: BaseKernel

Radial Basis Function (RBF) kernel: K(x, y) = exp(-gamma * ||x - y||^2)

Scale dataclass

Scale(a: float, k: BaseKernel)

Bases: BaseKernel

Scale kernel: K(x, y) = a * k(x, y)

Sigmoid dataclass

Sigmoid(gamma: Optional[float] = None, coef0: float = 0.0)

Bases: BaseKernel

Sigmoid kernel: K(x, y) = tanh(gamma * x^T y + coef0)

Sum dataclass

Sum(k1: BaseKernel, k2: BaseKernel)

Bases: BaseKernel

Sum kernel: K(x, y) = k1(x, y) + k2(x, y)

get_kernel

get_kernel(spec: KernelType, **kwargs) -> BaseKernel

Normalize various 'kernel=' inputs into a BaseKernel object. kwargs are used only when spec is a string (e.g., gamma, degree, coef0, K).

Source code in src/sawmil/kernels.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def get_kernel(spec: KernelType, **kwargs) -> BaseKernel:
    """
    Normalize various 'kernel=' inputs into a BaseKernel object.
    kwargs are used only when spec is a string (e.g., gamma, degree, coef0, K).
    """
    # already an object?
    if isinstance(spec, BaseKernel):
        return spec

    # plain callable? wrap so it behaves like a BaseKernel
    if callable(spec) and not isinstance(spec, str):
        class _CallableKernel(BaseKernel):
            def __call__(self, X, Y): return np.asarray(
                spec(X, Y), dtype=float)
        return _CallableKernel()

    # by name
    name = str(spec).lower()
    if name == "precomputed":
        if "K" not in kwargs:
            raise ValueError(
                "get_kernel('precomputed', K=...) requires the Gram matrix K.")
        return Precomputed(np.asarray(kwargs["K"], dtype=float))

    registry: Dict[str, BaseKernel] = {
        "linear":  Linear(),
        "rbf":     RBF(kwargs.get("gamma")),
        "poly":    Polynomial(kwargs.get("degree", 3), kwargs.get("gamma"), kwargs.get("coef0", 0.0)),
        "sigmoid": Sigmoid(kwargs.get("gamma"), kwargs.get("coef0", 0.0)),
    }
    if name in registry:
        return registry[name]

    raise ValueError(f"Unknown kernel spec: {spec!r}")

sawmil.bag_kernels

BaseBagKernel

Bases: ABC

Base class for the Multiple-instance (bags) kernel

WeightedMeanBagKernel dataclass

WeightedMeanBagKernel(
    inst_kernel: BaseKernel,
    normalizer: _NormalizerName = "average",
    p: float = 1.0,
)

Bases: BaseBagKernel

A simple, readable bag kernel built from an instance kernel k(·,·).

Definition

For two bags Bi and Bj with instance sets Xi and Xj, let w_i and w_j be per-instance weights (see below). The kernel is

K(Bi, Bj) = [(w_i^T K(Xi, Xj) w_j)]^p / (norm(Bi) * norm(Bj)),

where K(Xi, Xj) is the instance-level Gram submatrix produced by inst_kernel and p>=1 is an optional element-wise exponent (default 1.0).

Weights and normalizers
  • normalizer="none": w is the mean weight (1/n per instance). No additional normalization; K(Bi,Bj) equals the average pairwise instance kernel value.
  • normalizer="average" (default): w is the sum weight (1 per instance). The numerator becomes the sum over all instance pairs, and dividing by norm(Bi)=|Bi| and norm(Bj)=|Bj| yields the same average over pairs as above, but kept in a form that generalizes to non-linear feature-space norms.
  • normalizer="featurespace": w is the mean weight (1/n), and norm(B) = sqrt(w^T K(X, X) w), i.e. the feature-space norm of the (weighted) mean embedding. For a Linear instance kernel this is just the Euclidean norm of the mean vector.
Notes
  • For Linear instance kernels and p=1, this recovers the dot-product of (possibly normalized) bag means.
  • When applying the exponent p>1 we clamp negatives to 0 before powering to help preserve PSD-like behaviour for numerical stability.
__call__
__call__(
    bags_X: list[Bag], bags_Y: list[Bag]
) -> npt.NDArray[np.float64]

Compute the bag-by-bag Gram matrix.

Parameters:

  • bags_X (list[Bag]) –

    list of Bag objects (left side)

  • bags_Y (list[Bag]) –

    list of Bag objects (right side)

Returns:

  • NDArray[float64]

    A (len(bags_X) x len(bags_Y)) matrix K where K[i,j] is the kernel

  • NDArray[float64]

    value between bags_X[i] and bags_Y[j] under the configuration

  • NDArray[float64]

    described in the class docstring.

Source code in src/sawmil/bag_kernels.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def __call__(self, bags_X: list[Bag], bags_Y: list[Bag]) -> npt.NDArray[np.float64]:
    """Compute the bag-by-bag Gram matrix.

    Args:
        bags_X: list of Bag objects (left side)
        bags_Y: list of Bag objects (right side)

    Returns:
        A (len(bags_X) x len(bags_Y)) matrix K where K[i,j] is the kernel
        value between bags_X[i] and bags_Y[j] under the configuration
        described in the class docstring.
    """

    nX, nY = len(bags_X), len(bags_Y)

    # Handle empty bag lists quickly
    if nX == 0 or nY == 0:
        return np.zeros((nX, nY), dtype=float)

    # ---- build stacked instance matrices
    d = bags_X[0].d if nX else 0

    X_stack = np.vstack(
        [b.X if b.n else np.zeros((0, d), float) for b in bags_X])
    Y_stack = X_stack if (bags_X is bags_Y) else \
        np.vstack([b.X if b.n else np.zeros((0, d), float)
                  for b in bags_Y])

    # ---- per-instance weights per bag
    #
    def w_vec(b: Bag) -> np.ndarray:
        """Weight vector for a bag b.
        Returns:
            A 1D array of shape (b.n,) with the weight for each instance.

        Notes
        -----
        - "average": use 1 per instance (sum); divide by |Bi||Bj| later.
        - else ("none" or "featurespace"): use mean weights (1/n per instance).
        """
        if b.n == 0:
            return np.zeros((0,), dtype=float)
        if self.normalizer == "average":
            return np.full(b.n, 1.0 / b.n, dtype=float)  # mean
        return np.ones(b.n, dtype=float)     # sum= n

    wX = [w_vec(b) for b in bags_X]
    wY = wX if (bags_X is bags_Y) else [w_vec(b) for b in bags_Y]

    # ---- sparse selection/weighting matrices Sx, Sy
    # Shape: (n_bags, n_instances). Row i selects/weights instances of bag i.
    def make_S(weights_list: list[np.ndarray]) -> sp.csr_matrix:
        """ Matrix of shape (n_bags, n_instances) with weights for each instance in each bag.
        Tracks which instance belongs to which bag, and applies the per-instance weights.
        """
        rows, cols, data = [], [], []
        offset = 0
        for i, w in enumerate(weights_list):
            if w.size:
                j = np.arange(w.size) + offset
                rows.extend([i] * w.size)
                cols.extend(j.tolist())
                data.extend(w.tolist())
            offset += w.size
        return sp.csr_matrix((data, (rows, cols)), shape=(len(weights_list), offset))

    Sx = make_S(wX)
    Sy = Sx if (bags_X is bags_Y) else make_S(wY)

    # ---- one big instance-kernel call, then reduce by Sx/Sy
    # K_inst has shape (#instances in X) x (#instances in Y)
    K_inst = self.inst_kernel(X_stack, Y_stack)

    # ---- aggregate to bag-bag
    # K_bag[i,j] = w_i^T K_inst(block_i, block_j) w_j  <=>  Sx @ K_inst @ Sy.T
    K_bag = (Sx @ K_inst) @ Sy.T
    K_bag = np.asarray(K_bag, dtype=float)

    # ---- norms
    if self.normalizer == "none":
        norms_X = np.ones(nX, dtype=float)
        norms_Y = norms_X if (
            bags_X is bags_Y) else np.ones(nY, dtype=float)
    elif self.normalizer == "average":
        norms_X = np.array([max(b.n, 1) for b in bags_X], dtype=float)
        norms_Y = norms_X if (bags_X is bags_Y) else np.array(
            [max(b.n, 1) for b in bags_Y], dtype=float)
    else:  # "featurespace"
        if isinstance(self.inst_kernel, Linear):
            # For linear kernels, the feature map is the identity, so the
            # mean embedding is simply the (weighted) mean vector.
            # means: (n_bags, d) = Sx @ X_stack
            means_X = Sx @ X_stack
            norms_X = np.linalg.norm(means_X, axis=1).clip(min=1e-12)
            if bags_X is bags_Y:
                norms_Y = norms_X
            else:
                means_Y = Sy @ Y_stack
                norms_Y = np.linalg.norm(means_Y, axis=1).clip(min=1e-12)
        else:
            # general featurespace norm via self-gram reduction
            # K_self_X = Sx @ (inst_kernel(X,X)) @ Sx.T
            K_self_X = (Sx @ (self.inst_kernel(X_stack, X_stack))) @ Sx.T
            norms_X = np.sqrt(np.maximum(np.diag(K_self_X), 1e-12))
            if bags_X is bags_Y:
                norms_Y = norms_X
            else:
                K_self_Y = (
                    Sy @ (self.inst_kernel(Y_stack, Y_stack))) @ Sy.T
                norms_Y = np.sqrt(np.maximum(np.diag(K_self_Y), 1e-12))

    # ---- apply exponent p and normalization
    if self.p != 1.0:
        np.maximum(K_bag, 0.0, out=K_bag)  # keep PSD-ish
        K_bag = np.power(K_bag, self.p, dtype=float)

    denom = np.outer(norms_X, norms_Y)
    np.divide(K_bag, denom, out=K_bag, where=(denom > 0))

    return K_bag
fit
fit(bags: List[Bag]) -> 'WeightedMeanBagKernel'

Fit the underlying instance kernel if it needs data-dependent defaults.

We just peek at the first non-empty bag to infer dimensionality for kernels like RBF that auto-derive parameters (e.g., gamma).

Source code in src/sawmil/bag_kernels.py
111
112
113
114
115
116
117
118
119
120
121
122
123
def fit(self, bags: List[Bag]) -> "WeightedMeanBagKernel":
    """Fit the underlying instance kernel if it needs data-dependent defaults.

    We just peek at the first non-empty bag to infer dimensionality for
    kernels like RBF that auto-derive parameters (e.g., gamma).
    """
    # If the instance kernel needs defaults (e.g. gamma), fit it on a few instances.
    # We can use the first bag to infer dimensionality.
    for b in bags:
        if b.n > 0:
            self.inst_kernel.fit(b.X)
            break
    return self

make_bag_kernel

make_bag_kernel(
    inst_kernel: BaseKernel,
    *,
    normalizer: _NormalizerName = "none",
    p: float = 1.0,
) -> WeightedMeanBagKernel

Helper to create a WeightedMeanBagKernel with the given instance kernel and parameters. Args: inst_kernel: BaseKernel The instance-level kernel to use (will be fitted on first bag with instances). normalizer: {"none", "average", "featurespace"} The bag kernel normalizer (default is "none"). p: float Exponent for the weighted mean kernel (default is 1.0, i.e. no exponentiation).

Source code in src/sawmil/bag_kernels.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
def make_bag_kernel(
    inst_kernel: BaseKernel,
    *,
    normalizer: _NormalizerName = "none",
    p: float = 1.0,
) -> WeightedMeanBagKernel:
    '''Helper to create a WeightedMeanBagKernel with the given instance kernel and parameters.
    Args:
      inst_kernel: BaseKernel
        The instance-level kernel to use (will be fitted on first bag with instances).
      normalizer: {"none", "average", "featurespace"}
        The bag kernel normalizer (default is "none").
      p: float
        Exponent for the weighted mean kernel (default is 1.0, i.e. no exponentiation).
    '''
    return WeightedMeanBagKernel(
        inst_kernel=inst_kernel,
        normalizer=normalizer,
        p=p,
    )

Bags

sawmil.bag

Bag dataclass

Bag(
    X: NDArray[float64],
    y: Label,
    intra_bag_mask: Optional[NDArray[float64]] = None,
)

A bag of instances with a bag-level label and per-instance mask flags.

Parameters:

  • X (np.ndarray of float, shape (n_i, d)) –

    Instance feature matrix where each row is a feature vector.

  • y (Label) –

    Bag-level label (e.g., 0/1 or -1/+1).

  • intra_bag_mask ((ndarray, shape(n_i)), default: None ) –

    Optional array of 0/1 flags indicating which instances are active. If None, defaults to all ones.

Attributes:

  • X (np.ndarray, shape (n_i, d) –

    Instance feature matrix.

  • y (Label) –

    Bag label.

  • intra_bag_mask (np.ndarray, shape (n_i,) –

    Per-instance 0/1 mask.

d property
d: int

Number of features.

mask property
mask: NDArray[float64]

Intra-bag mask, specifies which instances could potentially contain the relevant signal (1) or not (0).

n property
n: int

Number of instances in the bag.

__post_init__
__post_init__()

Validate and normalize input arrays.

Raises:

  • ValueError

    If X is not 2D or if intra_bag_mask length does not match the number of instances.

Source code in src/sawmil/bag.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def __post_init__(self):
    """Validate and normalize input arrays.

    Raises:
        ValueError: If ``X`` is not 2D or if ``intra_bag_mask`` length
            does not match the number of instances.
    """
    self.X = np.asarray(self.X, dtype=float)
    if self.X.ndim != 2:
        raise ValueError("Bag.X must be 2D (n_i, d).")
    n_i = self.X.shape[0]
    if self.intra_bag_mask is None:
        # default: all ones
        self.intra_bag_mask = np.ones(n_i, dtype=float)
    else:
        self.intra_bag_mask = np.asarray(
            self.intra_bag_mask, dtype=float).ravel()
        if self.intra_bag_mask.shape[0] != n_i:
            raise ValueError(
                "intra_bag_mask length must match number of instances.")
negatives
negatives() -> npt.NDArray[np.int64]

Indices of instances with intra_bag_mask == 0.

Source code in src/sawmil/bag.py
81
82
83
def negatives(self) -> npt.NDArray[np.int64]:
    """Indices of instances with intra_bag_mask == 0."""
    return np.flatnonzero(self.mask < 0.5)
positives
positives() -> npt.NDArray[np.int64]

Indices of instances with intra_bag_mask == 1.

Source code in src/sawmil/bag.py
77
78
79
def positives(self) -> npt.NDArray[np.int64]:
    """Indices of instances with intra_bag_mask == 1."""
    return np.flatnonzero(self.mask >= 0.5)

BagDataset dataclass

BagDataset(bags: List[Bag])

A dataset of bags.

num_bags property
num_bags: int

Returns the number of bags.

num_instances property
num_instances: int

Returns the total number of instances.

num_neg_bags property
num_neg_bags: int

Returns the number of negative bags.

num_neg_instances property
num_neg_instances: int

Returns the number of negative instances.

num_pos_bags property
num_pos_bags: int

Returns the number of positive bags.

num_pos_instances property
num_pos_instances: int

Returns the number of positive instances.

y property
y: ndarray

Returns all the bag labels.

from_arrays staticmethod
from_arrays(
    bags: Sequence[ndarray],
    y: Sequence[float],
    intra_bag_masks: Sequence[ndarray] | None = None,
) -> "BagDataset"

Create a :class:BagDataset from raw numpy arrays.

Parameters

bags: Sequence of arrays where each element contains the instances of a bag with shape (n_i, d). y: Bag-level labels corresponding to each element of bags. intra_bag_masks: Optional sequence of 1D arrays with per-instance 0/1 flags. If omitted, all instances in a bag are considered positive.

Returns

BagDataset Dataset composed of :class:Bag objects built from the provided arrays.

Source code in src/sawmil/bag.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
@staticmethod
def from_arrays(
    bags: Sequence[np.ndarray],
    y: Sequence[float],
    intra_bag_masks: Sequence[np.ndarray] | None = None
) -> "BagDataset":
    """Create a :class:`BagDataset` from raw numpy arrays.

    Parameters
    ----------
    bags:
        Sequence of arrays where each element contains the instances of a
        bag with shape ``(n_i, d)``.
    y:
        Bag-level labels corresponding to each element of ``bags``.
    intra_bag_masks:
        Optional sequence of 1D arrays with per-instance ``0/1`` flags. If
        omitted, all instances in a bag are considered positive.

    Returns
    -------
    BagDataset
        Dataset composed of :class:`Bag` objects built from the provided
        arrays.
    """
    if intra_bag_masks is None:
        intra_bag_masks = [None] * len(bags)
    if len(bags) != len(y) or len(bags) != len(intra_bag_masks):
        raise ValueError(
            "bags, y, intra_bag_masks must have same length.")
    return BagDataset([
        Bag(X=b, y=float(lbl), intra_bag_mask=ibl)
        for b, lbl, ibl in zip(bags, y, intra_bag_masks)
    ])
negative_bags
negative_bags() -> list[Bag]

Returns all negative bags.

Source code in src/sawmil/bag.py
139
140
141
def negative_bags(self) -> list[Bag]:
    '''Returns all negative bags.'''
    return [b for b in self.bags if float(b.y) <= 0.0]
negative_bags_as_singletons
negative_bags_as_singletons() -> list[Bag]

Transforms all negative bags into singleton bags, by flattening each bag (b, n, d) -> (b x n, d)

Source code in src/sawmil/bag.py
190
191
192
193
194
195
196
197
198
def negative_bags_as_singletons(self) -> list[Bag]:
    '''
    Transforms all negative bags into singleton bags, by flattening each bag  (b, n, d) -> (b x n, d)
    '''
    singletons: list[Bag] = []
    for b in self.negative_bags():
        for j in range(b.n):
            singletons.append(Bag(X=b.X[j:j+1, :], y=-1.0))
    return singletons
negative_instances
negative_instances() -> tuple[np.ndarray, np.ndarray]

Returns instances from: - all negative bags (all instances), - plus from positive bags where intra_bag_mask == 0. Returns: X_neg: (M, d) bag_index: (M,) indices into self.bags (original positions)

Source code in src/sawmil/bag.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def negative_instances(self) -> tuple[np.ndarray, np.ndarray]:
    """
    Returns instances from:
    - all negative bags (all instances),
    - plus from positive bags where intra_bag_mask == 0.
    Returns:
        X_neg: (M, d)
        bag_index: (M,) indices into self.bags (original positions)
    """
    Xs, bag_idx = [], []
    for i, b in enumerate(self.bags):          # iterate original list!
        if float(b.y) <= 0.0:
            # take all instances
            if b.X.shape[0] > 0:
                Xs.append(b.X)
                bag_idx.extend([i] * b.X.shape[0])
        else:
            # positive bag: only intra_mask == 0
            mask = b.mask < 0.5
            if np.any(mask):
                Xs.append(b.X[mask])
                bag_idx.extend([i] * int(mask.sum()))
    if not Xs:
        d = self.bags[0].d if self.bags else 0
        return np.zeros((0, d)), np.array([], dtype=int)
    return np.vstack(Xs), np.array(bag_idx, dtype=int)
positive_bags
positive_bags() -> list[Bag]

Returns all positive bags.

Source code in src/sawmil/bag.py
135
136
137
def positive_bags(self) -> list[Bag]:
    '''Returns all positive bags.'''
    return [b for b in self.bags if float(b.y) > 0.0]
positive_bags_as_singletons
positive_bags_as_singletons() -> list[Bag]

Transforms all positive bags into singleton bags, by flattening each bag (b, n, d) -> (b x n, d)

Source code in src/sawmil/bag.py
200
201
202
203
204
205
206
207
208
def positive_bags_as_singletons(self) -> list[Bag]:
    '''
    Transforms all positive bags into singleton bags, by flattening each bag  (b, n, d) -> (b x n, d)
    '''
    singletons: list[Bag] = []
    for b in self.positive_bags():
        for j in range(b.n):
            singletons.append(Bag(X=b.X[j:j+1, :], y=1.0))
    return singletons
positive_instances
positive_instances() -> tuple[np.ndarray, np.ndarray]

Returns instances from positive bags with intra_bag_mask == 1. Returns: X_pos: (N, d) bag_index: (N,) indices into self.bags (original positions)

Source code in src/sawmil/bag.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def positive_instances(self) -> tuple[np.ndarray, np.ndarray]:
    """
    Returns instances from positive bags with intra_bag_mask == 1.
    Returns:
        X_pos: (N, d)
        bag_index: (N,) indices into self.bags (original positions)
    """
    Xs, bag_idx = [], []
    for i, b in enumerate(self.bags):          # iterate original list!
        if float(b.y) <= 0.0:
            continue
        mask = b.mask >= 0.5
        if np.any(mask):
            Xs.append(b.X[mask])
            bag_idx.extend([i] * int(mask.sum()))
    if not Xs:
        d = self.bags[0].d if self.bags else 0
        return np.zeros((0, d)), np.array([], dtype=int)
    return np.vstack(Xs), np.array(bag_idx, dtype=int)

Dummy Data

sawmil.data

generate_dummy_bags

generate_dummy_bags(
    *,
    n_pos: int = 100,
    n_neg: int = 60,
    inst_per_bag: Tuple[int, int] = (4, 12),
    d: int = 2,
    pos_centers: Sequence[Sequence[float]] = (
        (+2.0, +1.0),
        (+4.0, +3.0),
    ),
    neg_centers: Sequence[Sequence[float]] = (
        (-1.5, -1.0),
        (-3.0, +0.5),
    ),
    pos_scales: Sequence[Tuple[float, float]] = (
        (2.0, 0.6),
        (1.2, 0.8),
    ),
    neg_scales: Sequence[Tuple[float, float]] = (
        (1.5, 0.5),
        (2.5, 0.9),
    ),
    pos_intra_rate: Tuple[float, float] = (0.3, 0.8),
    ensure_pos_in_every_pos_bag: bool = True,
    neg_pos_noise_rate: Tuple[float, float] = (0.0, 0.05),
    pos_neg_noise_rate: Tuple[float, float] = (0.0, 0.2),
    outlier_rate: float = 0.01,
    outlier_scale: float = 10.0,
    random_state: int = 0,
) -> BagDataset

Generate a synthetic MIL dataset with mixed Gaussian components.

Returns:

  • BagDataset ( BagDataset ) –
    • Positive bags (y=1) mix pos-like instances (intra=1) and distractors (intra=0).
    • Negative bags (y=0) may include a small fraction of pos-like contamination; intra labels default to ones in Bag, but the bag label remains 0.
Source code in src/sawmil/data/dummy.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def generate_dummy_bags(
    *,
    n_pos: int = 100,
    n_neg: int = 60,
    inst_per_bag: Tuple[int, int] = (4, 12),     # bag size ~ Uniform[min,max]
    d: int = 2,
    # Positive & negative class mixtures
    pos_centers: Sequence[Sequence[float]] = ((+2.0, +1.0), (+4.0, +3.0)),
    neg_centers: Sequence[Sequence[float]] = ((-1.5, -1.0), (-3.0, +0.5)),
    pos_scales: Sequence[Tuple[float, float]] = (
        (2.0, 0.6), (1.2, 0.8)),  # (scale, anisotropy)
    neg_scales: Sequence[Tuple[float, float]] = ((1.5, 0.5), (2.5, 0.9)),
    # Intra-bag positives (only matter for positive bags)
    # per-bag fraction range for intra==1
    pos_intra_rate: Tuple[float, float] = (0.3, 0.8),
    ensure_pos_in_every_pos_bag: bool = True,
    # Cross-contamination
    # fraction of pos-like instances inside *negative* bags
    neg_pos_noise_rate: Tuple[float, float] = (0.00, 0.05),
    # fraction of neg-like instances inside *positive* bags
    pos_neg_noise_rate: Tuple[float, float] = (0.00, 0.20),
    # Outliers sprinkled everywhere
    outlier_rate: float = 0.01,
    outlier_scale: float = 10.0,
    random_state: int = 0,
) -> BagDataset:
    """Generate a synthetic MIL dataset with mixed Gaussian components.

    Returns:
        BagDataset:
            - Positive bags (y=1) mix pos-like instances (intra=1) and distractors (intra=0).
            - Negative bags (y=0) may include a small fraction of pos-like contamination; intra
              labels default to ones in Bag, but the bag label remains 0.
    """
    rng = np.random.default_rng(random_state)
    assert inst_per_bag[0] >= 1 and inst_per_bag[1] >= inst_per_bag[0]
    assert len(pos_centers) == len(pos_scales)
    assert len(neg_centers) == len(neg_scales)
    assert d == len(pos_centers[0]) == len(neg_centers[0])

    pos_comps = _components_from_centers(rng, pos_centers, pos_scales)
    neg_comps = _components_from_centers(rng, neg_centers, neg_scales)

    def sample_from_mix(n: int, comps: list[GaussianComp]) -> np.ndarray:
        # uniform mixture over components
        ks = rng.integers(0, len(comps), size=n)
        X = np.empty((n, d), dtype=float)
        for k in range(len(comps)):
            idx = np.where(ks == k)[0]
            if idx.size:
                X[idx] = _sample_comp(rng, comps[k], idx.size)
        return X

    def add_outliers(X: np.ndarray, frac: float) -> None:
        if frac <= 0:
            return
        m = X.shape[0]
        n_out = int(round(frac * m))
        if n_out > 0:
            # Heavy-tailed-ish outliers
            X[:n_out] += rng.normal(scale=outlier_scale, size=(n_out, d))

    bags: list[Bag] = []

    # ---- Positive bags ----
    for _ in range(n_pos):
        m = int(rng.integers(inst_per_bag[0], inst_per_bag[1] + 1))
        # how many intra==1?
        r_pos = rng.uniform(*pos_intra_rate)
        n_pos_inst = int(round(r_pos * m))
        if ensure_pos_in_every_pos_bag:
            n_pos_inst = max(1, n_pos_inst)
        n_neg_like = m - n_pos_inst

        # fraction of neg-like distractors inside positive bag
        r_distr = rng.uniform(*pos_neg_noise_rate)
        n_neg_like = max(0, n_neg_like)  # in case n_pos_inst == m
        # sample positives from pos-mixture
        X_pos = sample_from_mix(
            n_pos_inst, pos_comps) if n_pos_inst > 0 else np.zeros((0, d))
        # sample distractors from neg-mixture
        X_distr = sample_from_mix(
            n_neg_like, neg_comps) if n_neg_like > 0 else np.zeros((0, d))
        X = np.vstack([X_pos, X_distr]) if X_distr.size else X_pos

        # optional: sprinkle outliers
        add_outliers(X, outlier_rate)

        # intra labels: 1 for pos-like, 0 for distractors
        intra = np.concatenate(
            [np.ones(n_pos_inst), np.zeros(n_neg_like)]).astype(float)
        # shuffle instances within the bag
        perm = rng.permutation(X.shape[0])
        X = X[perm]
        intra = intra[perm]

        bags.append(Bag(X=X, y=1.0, intra_bag_mask=intra))

    # ---- Negative bags ----
    for _ in range(n_neg):
        m = int(rng.integers(inst_per_bag[0], inst_per_bag[1] + 1))
        # proportion of pos-like contamination (still y=0 at bag level)
        r_noise = rng.uniform(*neg_pos_noise_rate)
        n_pos_like = int(round(r_noise * m))
        n_neg_core = m - n_pos_like

        X_neg_core = sample_from_mix(
            n_neg_core, neg_comps) if n_neg_core > 0 else np.zeros((0, d))
        X_pos_noise = sample_from_mix(
            n_pos_like, pos_comps) if n_pos_like > 0 else np.zeros((0, d))
        X = np.vstack([X_neg_core, X_pos_noise]
                      ) if X_pos_noise.size else X_neg_core

        add_outliers(X, outlier_rate)

        # For negative bags, intra labels default to ones in your Bag class—but
        # they are ignored by your negative_instances(); we can still pass all ones.
        intra = np.ones(X.shape[0], dtype=float)
        perm = rng.permutation(X.shape[0])
        X = X[perm]
        intra = intra[perm]

        bags.append(Bag(X=X, y=0.0, intra_bag_mask=intra))

    return BagDataset(bags)

load_musk_bags

load_musk_bags(
    *,
    version: int = 1,
    test_size: float = 0.2,
    random_state: int = 0,
    standardize: bool = True,
) -> Tuple[BagDataset, BagDataset, StandardScaler | None]

Fetch Musk from OpenML, build BagDataset, stratified split by bag label. Args: version: 'musk' dataset version (default 1) test_size: proportion of the dataset to include in the test split (default 0.2) random_state: random seed for the sklearn package standardize: whether to standardize the features. The StandardScaler will be fit on the training data (default True) Returns: (train_ds, test_ds)

Source code in src/sawmil/data/musk.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def load_musk_bags(
    *,
    version: int = 1,
    test_size: float = 0.2,
    random_state: int = 0,
    standardize: bool = True,
) -> Tuple[BagDataset, BagDataset, StandardScaler | None]:
    """
    Fetch Musk from OpenML, build BagDataset, stratified split by bag label.
    Args:
        version: 'musk' dataset version (default 1)
        test_size: proportion of the dataset to include in the test split (default 0.2)
        random_state: random seed for the sklearn package
        standardize: whether to standardize the features. The StandardScaler will be fit on the training data (default True)
    Returns: (train_ds, test_ds)
    """

    musk = fetch_openml(name="musk", version=version, as_frame=True)
    df = musk.frame.copy()

    label = df[LABEL_COL]

    bag_list: list[Bag] = []
    cols = df.columns.tolist()
    use_cols = [c for c in cols if c != LABEL_COL and c != BAG_COL]
    for name, grp in df.groupby(df[BAG_COL].astype(str)):
        X = grp[use_cols].to_numpy(dtype=float)
        ys = label.loc[grp.index].to_numpy(dtype=float)
        y_bag = float(ys[0]) if np.all(
            ys == ys[0]) else float((ys > 0.5).any())
        bag_list.append(Bag(X=X, y=y_bag))  # intra_bag_labels defaults to ones

    # Stratified split at the bag level
    bag_labels = np.array([b.y for b in bag_list], dtype=float)
    idx = np.arange(len(bag_list))
    idx_tr, idx_te = train_test_split(
        idx, test_size=test_size, random_state=random_state, stratify=bag_labels
    )
    train_bags = [bag_list[i] for i in idx_tr]
    test_bags = [bag_list[i] for i in idx_te]

    # Standardize features
    if standardize:
        scaler = StandardScaler()
        train_X = np.vstack([b.X for b in train_bags])
        scaler.fit(train_X)
    else:
        scaler = None
    train_bags = [Bag(X=scaler.transform(b.X), y=b.y) for b in train_bags]
    test_bags = [Bag(X=scaler.transform(b.X), y=b.y) for b in test_bags]

    train_ds = BagDataset(train_bags)
    test_ds = BagDataset(test_bags)
    return train_ds, test_ds, scaler