Skip to content

API Reference

Solvers

sawmil.quadprog

quadprog

quadprog(
    H: NDArray[float64],
    f: NDArray[float64],
    Aeq: Optional[NDArray[float64]],
    beq: Optional[NDArray[float64]],
    lb: NDArray[float64],
    ub: NDArray[float64],
    solver: str = "gurobi",
    verbose: bool = False,
    solver_params: Optional[Mapping[str, Any]] = None,
) -> Union[
    Tuple[npt.NDArray[np.float64], "Objective"], None
]

Solve the quadratic program:

minimize   0.5 * αᵀ H α + fᵀ α
subject to Aeq α = beq
           lb ≤ α ≤ ub

Parameters:

  • H (NDArray[float64]) –

    (n, n) quadratic term matrix in 0.5 * αᵀ H α

  • f (NDArray[float64]) –

    (n,) linear term vector in fᵀ α , usually f = -1

  • Aeq (Optional[NDArray[float64]]) –

    (m, n) equality constraint matrix, usually yᵀ

  • beq (Optional[NDArray[float64]]) –

    (m,) equality constraint rhs, usually 0

  • lb (NDArray[float64]) –

    (n,) lower bound vector, usually 0

  • ub (NDArray[float64]) –

    (n,) upper bound vector, usually C

  • verbose (bool, default: False ) –

    If True, print solver logs

  • solver_params (Optional[Mapping[str, Any]], default: None ) –

    dict of backend-specific options. Examples: - solver='gurobi': {'env': , 'params': {'Method':2, 'Threads':1}} - solver='osqp' : {'setup': {...}, 'solve': {...}} or flat keys for setup - solver='daqp' : {'eps_abs': 1e-8, 'eps_rel': 1e-8, ...}

Returns:

  • Union[Tuple[NDArray[float64], 'Objective'], None]

    α*: Optimal solution vector

  • Objective ( Union[Tuple[NDArray[float64], 'Objective'], None] ) –

    quadratic and linear parts of the optimum

Source code in src/sawmil/quadprog.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def quadprog(
    H: npt.NDArray[np.float64],
    f: npt.NDArray[np.float64],
    Aeq: Optional[npt.NDArray[np.float64]],
    beq: Optional[npt.NDArray[np.float64]],
    lb: npt.NDArray[np.float64],
    ub: npt.NDArray[np.float64],
    solver: str = "gurobi",
    verbose: bool = False,
    solver_params: Optional[Mapping[str, Any]] = None,
) -> Union[Tuple[npt.NDArray[np.float64], "Objective"], None]:
    """
    Solve the quadratic program:

        minimize   0.5 * αᵀ H α + fᵀ α
        subject to Aeq α = beq
                   lb ≤ α ≤ ub

    Args:
        H: (n, n) quadratic term matrix in 0.5 * αᵀ H α
        f: (n,) linear term vector in fᵀ α , usually f = -1
        Aeq: (m, n) equality constraint matrix, usually yᵀ
        beq: (m,) equality constraint rhs, usually 0
        lb: (n,) lower bound vector, usually 0
        ub: (n,) upper bound vector, usually C
        verbose: If True, print solver logs
        solver_params: dict of backend-specific options. 
            Examples:
                - solver='gurobi': {'env': <gp.Env>, 'params': {'Method':2, 'Threads':1}}
                - solver='osqp'  : {'setup': {...}, 'solve': {...}} or flat keys for setup
                - solver='daqp'  : {'eps_abs': 1e-8, 'eps_rel': 1e-8, ...}

    Returns:
        α*: Optimal solution vector
        Objective: quadratic and linear parts of the optimum
    """
    H, f, Aeq, beq, lb, ub = _validate_and_stabilize(H, f, Aeq, beq, lb, ub)
    params = dict(solver_params or {})

    if solver == "gurobi":
        if _qp_gurobi is None:
            raise ImportError("gurobi path selected but 'gurobipy' is not installed or usable. "
                              "Install with: pip install sawmil[gurobi]")
        return _qp_gurobi(H, f, Aeq, beq, lb, ub, verbose=verbose, **params)

    elif solver == "osqp":
        if _qp_osqp is None:
            raise ImportError("osqp path selected but 'osqp' is not installed. "
                              "Install with: pip install sawmil[osqp]")
        return _qp_osqp(H, f, Aeq, beq, lb, ub, verbose=verbose, **params)
    elif solver == "daqp":
        if _qp_daqp is None:
            raise ImportError("daqp path selected but 'daqp' is not installed. "
                              "Install with: pip install sawmil[daqp]")
        return _qp_daqp(H, f, Aeq, beq, lb, ub, verbose=verbose, **params)

    raise ValueError(f"Unknown solver: {solver}")

sawmil.solvers._gurobi

quadprog_gurobi

quadprog_gurobi(
    H: NDArray[float64],
    f: NDArray[float64],
    Aeq: Optional[NDArray[float64]],
    beq: Optional[NDArray[float64]],
    lb: NDArray[float64],
    ub: NDArray[float64],
    verbose: bool = False,
    **params: Any,
) -> Tuple[npt.NDArray[np.float64], "Objective"]

Solve the quadratic program using Gurobi:

minimize   0.5 * αᵀ H α + fᵀ α
subject to Aeq α = beq
           lb ≤ α ≤ ub

Parameters:

  • H (NDArray[float64]) –

    (n, n) Hessian matrix for the quadratic term in 0.5 * αᵀ H α.

  • f (NDArray[float64]) –

    (n,) linear term vector in fᵀ α. For SVMs, usually -1 for each component.

  • Aeq (Optional[NDArray[float64]]) –

    (m, n) equality constraint matrix, usually yᵀ

  • beq (Optional[NDArray[float64]]) –

    (m,) equality constraint rhs, usually 0

  • lb (NDArray[float64]) –

    (n,) lower bound vector, usually 0

  • ub (NDArray[float64]) –

    (n,) upper bound vector, usually C

  • verbose (bool, default: False ) –

    If True, print solver logs

Returns:

  • NDArray[float64]

    α*: Optimal solution vector

  • Objective ( 'Objective' ) –

    quadratic and linear parts of the optimum

Source code in src/sawmil/solvers/_gurobi.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def quadprog_gurobi(
    H: npt.NDArray[np.float64],
    f: npt.NDArray[np.float64],
    Aeq: Optional[npt.NDArray[np.float64]],
    beq: Optional[npt.NDArray[np.float64]],
    lb: npt.NDArray[np.float64],
    ub: npt.NDArray[np.float64],
    verbose: bool = False,
    **params: Any,

) -> Tuple[npt.NDArray[np.float64], "Objective"]:
    """
    Solve the quadratic program using Gurobi:

        minimize   0.5 * αᵀ H α + fᵀ α
        subject to Aeq α = beq
                   lb ≤ α ≤ ub

    Args:
        H: (n, n) Hessian matrix for the quadratic term in 0.5 * αᵀ H α.
        f: (n,) linear term vector in fᵀ α. For SVMs, usually -1 for each component.
        Aeq: (m, n) equality constraint matrix, usually yᵀ
        beq: (m,) equality constraint rhs, usually 0
        lb: (n,) lower bound vector, usually 0
        ub: (n,) upper bound vector, usually C
        verbose: If True, print solver logs

    Returns:
        α*: Optimal solution vector
        Objective: quadratic and linear parts of the optimum
    """

    try:
        import gurobipy as gp
    except Exception as exc:  # pragma: no cover
        raise ImportError("gurobipy is required for solver='gurobi'") from exc
    if (Aeq is None) ^ (beq is None):
        raise ValueError("Aeq and beq must both be None or both be provided.")

    n = H.shape[0]

    model = gp.Model()
    if not verbose:
        model.Params.OutputFlag = 0

    x = model.addMVar(n, lb=lb, ub=ub, name="alpha")
    obj = 0.5 * (x @ H @ x) + f @ x
    model.setObjective(obj, gp.GRB.MINIMIZE)

    if Aeq is not None:
        model.addConstr(Aeq @ x == beq, name="eq")
    model.optimize()

    if model.Status != gp.GRB.OPTIMAL:  # pragma: no cover - defensive
        log.warning(RuntimeError(
            f"Gurobi optimization failed with status {model.Status}"))

    xstar = np.asarray(x.X, dtype=float)
    quadratic = float(0.5 * xstar.T @ H @ xstar)
    linear = float(f.T @ xstar)
    return xstar, Objective(quadratic, linear)

sawmil.solvers._osqp

quadprog_osqp

quadprog_osqp(
    H: NDArray[float64],
    f: NDArray[float64],
    Aeq: Optional[NDArray[float64]],
    beq: Optional[NDArray[float64]],
    lb: NDArray[float64],
    ub: NDArray[float64],
    verbose: bool = False,
    **params: Any,
) -> Tuple[npt.NDArray[np.float64], "Objective"]

Solve the quadratic program using OSQP:

minimize   0.5 * αᵀ H α + fᵀ α
subject to Aeq α = beq
           lb ≤ α ≤ ub

Parameters:

  • H (NDArray[float64]) –

    (n, n) Hessian matrix for the quadratic term in 0.5 * αᵀ H α. For SVM duals, this is typically (y yᵀ) ⊙ K where K is the kernel matrix.

  • f (NDArray[float64]) –

    (n,) linear term vector in fᵀ α. For SVMs, usually -1 for each component.

  • Aeq (Optional[NDArray[float64]]) –

    (m, n) optional equality constraint matrix, e.g. yᵀ for SVM bias constraint.

  • beq (Optional[NDArray[float64]]) –

    (m,) optional right-hand side of equality constraint, usually 0.

  • lb (NDArray[float64]) –

    (n,) lower bound vector, e.g. all zeros in standard SVM dual.

  • ub (NDArray[float64]) –

    (n,) upper bound vector, e.g. all entries equal to C in soft-margin SVM.

  • verbose (bool, default: False ) –

    If True, print solver logs

Returns:

  • NDArray[float64]

    α*: Optimal solution vector

  • Objective ( 'Objective' ) –

    quadratic and linear parts of the optimum

Source code in src/sawmil/solvers/_osqp.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def quadprog_osqp(
    H: npt.NDArray[np.float64],
    f: npt.NDArray[np.float64],
    Aeq: Optional[npt.NDArray[np.float64]],
    beq: Optional[npt.NDArray[np.float64]],
    lb: npt.NDArray[np.float64],
    ub: npt.NDArray[np.float64],
    verbose: bool = False,
    **params: Any,
) -> Tuple[npt.NDArray[np.float64], "Objective"]:
    """
    Solve the quadratic program using OSQP:

        minimize   0.5 * αᵀ H α + fᵀ α
        subject to Aeq α = beq
                   lb ≤ α ≤ ub

    Args:
        H: (n, n) Hessian matrix for the quadratic term in 0.5 * αᵀ H α. For SVM duals, this is typically (y yᵀ) ⊙ K where K is the kernel matrix.
        f: (n,) linear term vector in fᵀ α. For SVMs, usually -1 for each component.
        Aeq: (m, n) optional equality constraint matrix, e.g. yᵀ for SVM bias constraint.
        beq: (m,) optional right-hand side of equality constraint, usually 0.
        lb: (n,) lower bound vector, e.g. all zeros in standard SVM dual.
        ub: (n,) upper bound vector, e.g. all entries equal to C in soft-margin SVM.
        verbose: If True, print solver logs

    Returns:
        α*: Optimal solution vector
        Objective: quadratic and linear parts of the optimum
    """
    # Lazy, guarded imports so the module can be imported without OSQP installed.
    try:
        import scipy.sparse as sp
    except Exception as exc:  # pragma: no cover
        raise ImportError("scipy is required for solver='osqp'") from exc
    try:
        import osqp
    except Exception as exc:  # pragma: no cover
        raise ImportError("osqp is required for solver='osqp'") from exc

    n = H.shape[0]
    P = sp.csc_matrix(H)
    q = f

    blocks = []
    l_list, u_list = [], []

    # Equalities: encode as Aeq x in [beq, beq]
    if Aeq is not None:
        blocks.append(sp.csc_matrix(Aeq))
        l_list.append(beq)  # type: ignore[arg-type]
        u_list.append(beq)  # type: ignore[arg-type]

    # Bounds: I x in [lb, ub]
    csc_eye = sp.eye(n, format="csc")
    blocks.append(csc_eye)
    l_list.append(lb)
    u_list.append(ub)

    A = sp.vstack(blocks, format="csc")
    lower_vec = np.concatenate(l_list)
    upper_vec = np.concatenate(u_list)

    # Split options into setup/solve; accept flat options as setup()
    setup_opts: dict[str, Any] = dict(params.get("setup", {}))
    solve_opts: dict[str, Any] = dict(params.get("solve", {}))
    if not setup_opts and not solve_opts:
        # treat all non-nested options as setup()
        setup_opts = {k: v for k,
                      v in params.items() if k not in ("setup", "solve")}

    # sensible defaults; allow override via setup_opts
    setup_defaults = dict(
        polishing=True,
        eps_abs=1e-8,
        eps_rel=1e-8,
        max_iter=20000,
    )
    for k, v in setup_defaults.items():
        setup_opts.setdefault(k, v)

    # And in solve() default to not raising so we can return diagnostics
    solve_opts.setdefault("raise_error", False)

    prob = osqp.OSQP()
    prob.setup(P=P, q=q, A=A, l=lower_vec, u=upper_vec,
               verbose=verbose, **setup_opts)
    res = prob.solve(**solve_opts)

    if res.info.status_val not in (1, 2):  # 1=solved, 2=solved_inaccurate
        log.error(f"OSQP failed: {res.info.status}")

    x = np.asarray(res.x, dtype=float)
    quadratic = float(0.5 * x @ (H @ x))
    linear = float(f @ x)
    return x, Objective(quadratic, linear)

Kernels

sawmil.kernels

BaseKernel

Bases: ABC

Minimal kernel interface for the single-instance kernels: fit (optional) + call.

Linear dataclass

Linear()

Bases: BaseKernel

Linear kernel: K(x, y) = x^T y

Normalize dataclass

Normalize(k: BaseKernel, eps: float = 1e-12)

Bases: BaseKernel

Cosine-style normalization: Kxy / sqrt(Kxx * Kyy).

Polynomial dataclass

Polynomial(
    degree: int = 3,
    gamma: Optional[float] = None,
    coef0: float = 0.0,
)

Bases: BaseKernel

Polynomial kernel: K(x, y) = (gamma * x^T y + coef0)^degree

Precomputed dataclass

Precomputed(K: ndarray)

Bases: BaseKernel

Use when a Gram matrix is already built; ignores X,Y and returns K (shape checked by caller).

Product dataclass

Product(k1: BaseKernel, k2: BaseKernel)

Bases: BaseKernel

Product kernel: K(x, y) = k1(x, y) * k2(x, y)

RBF dataclass

RBF(gamma: Optional[float] = None)

Bases: BaseKernel

Radial Basis Function (RBF) kernel: K(x, y) = exp(-gamma * ||x - y||^2)

Scale dataclass

Scale(a: float, k: BaseKernel)

Bases: BaseKernel

Scale kernel: K(x, y) = a * k(x, y)

Sigmoid dataclass

Sigmoid(gamma: Optional[float] = None, coef0: float = 0.0)

Bases: BaseKernel

Sigmoid kernel: K(x, y) = tanh(gamma * x^T y + coef0)

Sum dataclass

Sum(k1: BaseKernel, k2: BaseKernel)

Bases: BaseKernel

Sum kernel: K(x, y) = k1(x, y) + k2(x, y)

get_kernel

get_kernel(spec: KernelType, **kwargs) -> BaseKernel

Normalize various 'kernel=' inputs into a BaseKernel object. kwargs are used only when spec is a string (e.g., gamma, degree, coef0, K).

Source code in src/sawmil/kernels.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def get_kernel(spec: KernelType, **kwargs) -> BaseKernel:
    """
    Normalize various 'kernel=' inputs into a BaseKernel object.
    kwargs are used only when spec is a string (e.g., gamma, degree, coef0, K).
    """
    # already an object?
    if isinstance(spec, BaseKernel):
        return spec

    # plain callable? wrap so it behaves like a BaseKernel
    if callable(spec) and not isinstance(spec, str):
        class _CallableKernel(BaseKernel):
            def __call__(self, X, Y): return np.asarray(
                spec(X, Y), dtype=float)
        return _CallableKernel()

    # by name
    name = str(spec).lower()
    if name == "precomputed":
        if "K" not in kwargs:
            raise ValueError(
                "get_kernel('precomputed', K=...) requires the Gram matrix K.")
        return Precomputed(np.asarray(kwargs["K"], dtype=float))

    registry: Dict[str, BaseKernel] = {
        "linear":  Linear(),
        "rbf":     RBF(kwargs.get("gamma")),
        "poly":    Polynomial(kwargs.get("degree", 3), kwargs.get("gamma"), kwargs.get("coef0", 0.0)),
        "sigmoid": Sigmoid(kwargs.get("gamma"), kwargs.get("coef0", 0.0)),
    }
    if name in registry:
        return registry[name]

    raise ValueError(f"Unknown kernel spec: {spec!r}")

sawmil.bag_kernels

BaseBagKernel

Bases: ABC

Base class for the Multiple-instance (bags) kernel

WeightedMeanBagKernel dataclass

WeightedMeanBagKernel(
    inst_kernel: BaseKernel,
    normalizer: _NormalizerName = "average",
    p: float = 1.0,
    use_intra_labels: bool = False,
    fast_linear: bool = True,
)

Bases: BaseBagKernel

K(Bi,Bj) = [ (w_i^T k(Bi,Bj) w_j) ] ** p / ( norm(Bi) * norm(Bj) ), where w_i, w_j are instance weights normalized to sum=1 (fallback to uniform if a bag’s mask sums to 0).

(default is 'average')
  • "none" -> norm(B)=1
  • "average" -> norm(B)=sum(mask) (fallback to bag size)
  • "featurespace" -> norm(B)=sqrt(w^T k(X,X) w) (fast for Linear via ||weighted_mean||)

By default (use_intra_labels=False): - w_i are UNIFORM weights (1/n_i), i.e., intra labels are IGNORED. - normalizer defaults to "none" (no extra scaling).

Bags

sawmil.bag

Bag dataclass

Bag(
    X: NDArray[float64],
    y: Label,
    intra_bag_label: Optional[NDArray[float64]] = None,
)

A bag of instances with a bag-level label and per-instance (0/1) flags.

d property
d: int

Number of features.

mask property
mask: NDArray[float64]

Intra-bag label mask.

n property
n: int

Number of instances in the bag.

negatives
negatives() -> npt.NDArray[np.int64]

Indices of instances with intra_bag_label == 0.

Source code in src/sawmil/bag.py
53
54
55
def negatives(self) -> npt.NDArray[np.int64]:
    """Indices of instances with intra_bag_label == 0."""
    return np.flatnonzero(self.mask < 0.5)
positives
positives() -> npt.NDArray[np.int64]

Indices of instances with intra_bag_label == 1.

Source code in src/sawmil/bag.py
49
50
51
def positives(self) -> npt.NDArray[np.int64]:
    """Indices of instances with intra_bag_label == 1."""
    return np.flatnonzero(self.mask >= 0.5)

BagDataset dataclass

BagDataset(bags: List[Bag])

A dataset of bags.

num_bags property
num_bags: int

Returns the number of bags.

num_instances property
num_instances: int

Returns the total number of instances.

num_neg_bags property
num_neg_bags: int

Returns the number of negative bags.

num_neg_instances property
num_neg_instances: int

Returns the number of negative instances.

num_pos_bags property
num_pos_bags: int

Returns the number of positive bags.

num_pos_instances property
num_pos_instances: int

Returns the number of positive instances.

y property
y: ndarray

Returns all the bag labels.

from_arrays staticmethod
from_arrays(
    bags: Sequence[ndarray],
    y: Sequence[float],
    intra_bag_labels: Sequence[ndarray] | None = None,
) -> "BagDataset"

Create a :class:BagDataset from raw numpy arrays.

Parameters

bags: Sequence of arrays where each element contains the instances of a bag with shape (n_i, d). y: Bag-level labels corresponding to each element of bags. intra_bag_labels: Optional sequence of 1D arrays with per-instance 0/1 flags. If omitted, all instances in a bag are considered positive.

Returns

BagDataset Dataset composed of :class:Bag objects built from the provided arrays.

Source code in src/sawmil/bag.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@staticmethod
def from_arrays(
    bags: Sequence[np.ndarray],
    y: Sequence[float],
    intra_bag_labels: Sequence[np.ndarray] | None = None
) -> "BagDataset":
    """Create a :class:`BagDataset` from raw numpy arrays.

    Parameters
    ----------
    bags:
        Sequence of arrays where each element contains the instances of a
        bag with shape ``(n_i, d)``.
    y:
        Bag-level labels corresponding to each element of ``bags``.
    intra_bag_labels:
        Optional sequence of 1D arrays with per-instance ``0/1`` flags. If
        omitted, all instances in a bag are considered positive.

    Returns
    -------
    BagDataset
        Dataset composed of :class:`Bag` objects built from the provided
        arrays.
    """
    if intra_bag_labels is None:
        intra_bag_labels = [None] * len(bags)
    if len(bags) != len(y) or len(bags) != len(intra_bag_labels):
        raise ValueError(
            "bags, y, intra_bag_labels must have same length.")
    return BagDataset([
        Bag(X=b, y=float(lbl), intra_bag_label=ibl)
        for b, lbl, ibl in zip(bags, y, intra_bag_labels)
    ])
negative_bags
negative_bags() -> list[Bag]

Returns all negative bags.

Source code in src/sawmil/bag.py
111
112
113
def negative_bags(self) -> list[Bag]:
    '''Returns all negative bags.'''
    return [b for b in self.bags if float(b.y) <= 0.0]
negative_bags_as_singletons
negative_bags_as_singletons() -> list[Bag]

Transforms all negative bags into singleton bags, by flattening each bag (b, n, d) -> (b x n, d)

Source code in src/sawmil/bag.py
162
163
164
165
166
167
168
169
170
def negative_bags_as_singletons(self) -> list[Bag]:
    '''
    Transforms all negative bags into singleton bags, by flattening each bag  (b, n, d) -> (b x n, d)
    '''
    singletons: list[Bag] = []
    for b in self.negative_bags():
        for j in range(b.n):
            singletons.append(Bag(X=b.X[j:j+1, :], y=-1.0))
    return singletons
negative_instances
negative_instances() -> tuple[np.ndarray, np.ndarray]

Returns instances from: - all negative bags (all instances), - plus from positive bags where intra_bag_label == 0. Returns: X_neg: (M, d) bag_index: (M,) indices into self.bags (original positions)

Source code in src/sawmil/bag.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def negative_instances(self) -> tuple[np.ndarray, np.ndarray]:
    """
    Returns instances from:
    - all negative bags (all instances),
    - plus from positive bags where intra_bag_label == 0.
    Returns:
        X_neg: (M, d)
        bag_index: (M,) indices into self.bags (original positions)
    """
    Xs, bag_idx = [], []
    for i, b in enumerate(self.bags):          # iterate original list!
        if float(b.y) <= 0.0:
            # take all instances
            if b.X.shape[0] > 0:
                Xs.append(b.X)
                bag_idx.extend([i] * b.X.shape[0])
        else:
            # positive bag: only intra == 0
            mask = b.mask < 0.5
            if np.any(mask):
                Xs.append(b.X[mask])
                bag_idx.extend([i] * int(mask.sum()))
    if not Xs:
        d = self.bags[0].d if self.bags else 0
        return np.zeros((0, d)), np.array([], dtype=int)
    return np.vstack(Xs), np.array(bag_idx, dtype=int)
positive_bags
positive_bags() -> list[Bag]

Returns all positive bags.

Source code in src/sawmil/bag.py
107
108
109
def positive_bags(self) -> list[Bag]:
    '''Returns all positive bags.'''
    return [b for b in self.bags if float(b.y) > 0.0]
positive_bags_as_singletons
positive_bags_as_singletons() -> list[Bag]

Transforms all positive bags into singleton bags, by flattening each bag (b, n, d) -> (b x n, d)

Source code in src/sawmil/bag.py
172
173
174
175
176
177
178
179
180
def positive_bags_as_singletons(self) -> list[Bag]:
    '''
    Transforms all positive bags into singleton bags, by flattening each bag  (b, n, d) -> (b x n, d)
    '''
    singletons: list[Bag] = []
    for b in self.positive_bags():
        for j in range(b.n):
            singletons.append(Bag(X=b.X[j:j+1, :], y=1.0))
    return singletons
positive_instances
positive_instances() -> tuple[np.ndarray, np.ndarray]

Returns instances from positive bags with intra_bag_label == 1. Returns: X_pos: (N, d) bag_index: (N,) indices into self.bags (original positions)

Source code in src/sawmil/bag.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def positive_instances(self) -> tuple[np.ndarray, np.ndarray]:
    """
    Returns instances from positive bags with intra_bag_label == 1.
    Returns:
        X_pos: (N, d)
        bag_index: (N,) indices into self.bags (original positions)
    """
    Xs, bag_idx = [], []
    for i, b in enumerate(self.bags):          # iterate original list!
        if float(b.y) <= 0.0:
            continue
        mask = b.mask >= 0.5
        if np.any(mask):
            Xs.append(b.X[mask])
            bag_idx.extend([i] * int(mask.sum()))
    if not Xs:
        d = self.bags[0].d if self.bags else 0
        return np.zeros((0, d)), np.array([], dtype=int)
    return np.vstack(Xs), np.array(bag_idx, dtype=int)

Dummy Data

sawmil.data

generate_dummy_bags

generate_dummy_bags(
    *,
    n_pos: int = 100,
    n_neg: int = 60,
    inst_per_bag: Tuple[int, int] = (4, 12),
    d: int = 2,
    pos_centers: Sequence[Sequence[float]] = (
        (+2.0, +1.0),
        (+4.0, +3.0),
    ),
    neg_centers: Sequence[Sequence[float]] = (
        (-1.5, -1.0),
        (-3.0, +0.5),
    ),
    pos_scales: Sequence[Tuple[float, float]] = (
        (2.0, 0.6),
        (1.2, 0.8),
    ),
    neg_scales: Sequence[Tuple[float, float]] = (
        (1.5, 0.5),
        (2.5, 0.9),
    ),
    pos_intra_rate: Tuple[float, float] = (0.3, 0.8),
    ensure_pos_in_every_pos_bag: bool = True,
    neg_pos_noise_rate: Tuple[float, float] = (0.0, 0.05),
    pos_neg_noise_rate: Tuple[float, float] = (0.0, 0.2),
    outlier_rate: float = 0.01,
    outlier_scale: float = 10.0,
    random_state: int = 0,
) -> BagDataset

Generate a synthetic MIL dataset with mixed Gaussian components.

Returns:

  • BagDataset ( BagDataset ) –
    • Positive bags (y=1) mix pos-like instances (intra=1) and distractors (intra=0).
    • Negative bags (y=0) may include a small fraction of pos-like contamination; intra labels default to ones in Bag, but the bag label remains 0.
Source code in src/sawmil/data/dummy.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def generate_dummy_bags(
    *,
    n_pos: int = 100,
    n_neg: int = 60,
    inst_per_bag: Tuple[int, int] = (4, 12),     # bag size ~ Uniform[min,max]
    d: int = 2,
    # Positive & negative class mixtures
    pos_centers: Sequence[Sequence[float]] = ((+2.0, +1.0), (+4.0, +3.0)),
    neg_centers: Sequence[Sequence[float]] = ((-1.5, -1.0), (-3.0, +0.5)),
    pos_scales: Sequence[Tuple[float, float]] = (
        (2.0, 0.6), (1.2, 0.8)),  # (scale, anisotropy)
    neg_scales: Sequence[Tuple[float, float]] = ((1.5, 0.5), (2.5, 0.9)),
    # Intra-bag positives (only matter for positive bags)
    # per-bag fraction range for intra==1
    pos_intra_rate: Tuple[float, float] = (0.3, 0.8),
    ensure_pos_in_every_pos_bag: bool = True,
    # Cross-contamination
    # fraction of pos-like instances inside *negative* bags
    neg_pos_noise_rate: Tuple[float, float] = (0.00, 0.05),
    # fraction of neg-like instances inside *positive* bags
    pos_neg_noise_rate: Tuple[float, float] = (0.00, 0.20),
    # Outliers sprinkled everywhere
    outlier_rate: float = 0.01,
    outlier_scale: float = 10.0,
    random_state: int = 0,
) -> BagDataset:
    """Generate a synthetic MIL dataset with mixed Gaussian components.

    Returns:
        BagDataset:
            - Positive bags (y=1) mix pos-like instances (intra=1) and distractors (intra=0).
            - Negative bags (y=0) may include a small fraction of pos-like contamination; intra
              labels default to ones in Bag, but the bag label remains 0.
    """
    rng = np.random.default_rng(random_state)
    assert inst_per_bag[0] >= 1 and inst_per_bag[1] >= inst_per_bag[0]
    assert len(pos_centers) == len(pos_scales)
    assert len(neg_centers) == len(neg_scales)
    assert d == len(pos_centers[0]) == len(neg_centers[0])

    pos_comps = _components_from_centers(rng, pos_centers, pos_scales)
    neg_comps = _components_from_centers(rng, neg_centers, neg_scales)

    def sample_from_mix(n: int, comps: list[GaussianComp]) -> np.ndarray:
        # uniform mixture over components
        ks = rng.integers(0, len(comps), size=n)
        X = np.empty((n, d), dtype=float)
        for k in range(len(comps)):
            idx = np.where(ks == k)[0]
            if idx.size:
                X[idx] = _sample_comp(rng, comps[k], idx.size)
        return X

    def add_outliers(X: np.ndarray, frac: float) -> None:
        if frac <= 0:
            return
        m = X.shape[0]
        n_out = int(round(frac * m))
        if n_out > 0:
            # Heavy-tailed-ish outliers
            X[:n_out] += rng.normal(scale=outlier_scale, size=(n_out, d))

    bags: list[Bag] = []

    # ---- Positive bags ----
    for _ in range(n_pos):
        m = int(rng.integers(inst_per_bag[0], inst_per_bag[1] + 1))
        # how many intra==1?
        r_pos = rng.uniform(*pos_intra_rate)
        n_pos_inst = int(round(r_pos * m))
        if ensure_pos_in_every_pos_bag:
            n_pos_inst = max(1, n_pos_inst)
        n_neg_like = m - n_pos_inst

        # fraction of neg-like distractors inside positive bag
        r_distr = rng.uniform(*pos_neg_noise_rate)
        n_neg_like = max(0, n_neg_like)  # in case n_pos_inst == m
        # sample positives from pos-mixture
        X_pos = sample_from_mix(
            n_pos_inst, pos_comps) if n_pos_inst > 0 else np.zeros((0, d))
        # sample distractors from neg-mixture
        X_distr = sample_from_mix(
            n_neg_like, neg_comps) if n_neg_like > 0 else np.zeros((0, d))
        X = np.vstack([X_pos, X_distr]) if X_distr.size else X_pos

        # optional: sprinkle outliers
        add_outliers(X, outlier_rate)

        # intra labels: 1 for pos-like, 0 for distractors
        intra = np.concatenate(
            [np.ones(n_pos_inst), np.zeros(n_neg_like)]).astype(float)
        # shuffle instances within the bag
        perm = rng.permutation(X.shape[0])
        X = X[perm]
        intra = intra[perm]

        bags.append(Bag(X=X, y=1.0, intra_bag_label=intra))

    # ---- Negative bags ----
    for _ in range(n_neg):
        m = int(rng.integers(inst_per_bag[0], inst_per_bag[1] + 1))
        # proportion of pos-like contamination (still y=0 at bag level)
        r_noise = rng.uniform(*neg_pos_noise_rate)
        n_pos_like = int(round(r_noise * m))
        n_neg_core = m - n_pos_like

        X_neg_core = sample_from_mix(
            n_neg_core, neg_comps) if n_neg_core > 0 else np.zeros((0, d))
        X_pos_noise = sample_from_mix(
            n_pos_like, pos_comps) if n_pos_like > 0 else np.zeros((0, d))
        X = np.vstack([X_neg_core, X_pos_noise]
                      ) if X_pos_noise.size else X_neg_core

        add_outliers(X, outlier_rate)

        # For negative bags, intra labels default to ones in your Bag class—but
        # they are ignored by your negative_instances(); we can still pass all ones.
        intra = np.ones(X.shape[0], dtype=float)
        perm = rng.permutation(X.shape[0])
        X = X[perm]
        intra = intra[perm]

        bags.append(Bag(X=X, y=0.0, intra_bag_label=intra))

    return BagDataset(bags)

load_musk_bags

load_musk_bags(
    *,
    version: int = 1,
    test_size: float = 0.2,
    random_state: int = 0,
    standardize: bool = True,
) -> Tuple[BagDataset, BagDataset, StandardScaler | None]

Fetch Musk from OpenML, build BagDataset, stratified split by bag label. Args: version: 'musk' dataset version (default 1) test_size: proportion of the dataset to include in the test split (default 0.2) random_state: random seed for the sklearn package standardize: whether to standardize the features. The StandardScaler will be fit on the training data (default True) Returns: (train_ds, test_ds)

Source code in src/sawmil/data/musk.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def load_musk_bags(
    *,
    version: int = 1,
    test_size: float = 0.2,
    random_state: int = 0,
    standardize: bool = True,
) -> Tuple[BagDataset, BagDataset, StandardScaler | None]:
    """
    Fetch Musk from OpenML, build BagDataset, stratified split by bag label.
    Args:
        version: 'musk' dataset version (default 1)
        test_size: proportion of the dataset to include in the test split (default 0.2)
        random_state: random seed for the sklearn package
        standardize: whether to standardize the features. The StandardScaler will be fit on the training data (default True)
    Returns: (train_ds, test_ds)
    """

    musk = fetch_openml(name="musk", version=version, as_frame=True)
    df = musk.frame.copy()

    label = df[LABEL_COL]

    bag_list: list[Bag] = []
    cols = df.columns.tolist()
    use_cols = [c for c in cols if c != LABEL_COL and c != BAG_COL]
    for name, grp in df.groupby(df[BAG_COL].astype(str)):
        X = grp[use_cols].to_numpy(dtype=float)
        ys = label.loc[grp.index].to_numpy(dtype=float)
        y_bag = float(ys[0]) if np.all(
            ys == ys[0]) else float((ys > 0.5).any())
        bag_list.append(Bag(X=X, y=y_bag))  # intra_bag_labels defaults to ones

    # Stratified split at the bag level
    bag_labels = np.array([b.y for b in bag_list], dtype=float)
    idx = np.arange(len(bag_list))
    idx_tr, idx_te = train_test_split(
        idx, test_size=test_size, random_state=random_state, stratify=bag_labels
    )
    train_bags = [bag_list[i] for i in idx_tr]
    test_bags = [bag_list[i] for i in idx_te]

    # Standardize features
    if standardize:
        scaler = StandardScaler()
        train_X = np.vstack([b.X for b in train_bags])
        scaler.fit(train_X)
    else:
        scaler = None
    train_bags = [Bag(X=scaler.transform(b.X), y=b.y) for b in train_bags]
    test_bags = [Bag(X=scaler.transform(b.X), y=b.y) for b in test_bags]

    train_ds = BagDataset(train_bags)
    test_ds = BagDataset(test_bags)
    return train_ds, test_ds, scaler