from __future__ import annotations
from dataclasses import dataclass, field
from typing import Tuple, Optional, List, Dict, Any
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scanpy as sc
import squidpy as sq
import skbio
from skbio.stats.ordination import pcoa
import plotly.express as px
from .preparation.choose_roi import extract_roi, contour_regions
from .preparation.scomv_calc_vector import compute_min_vectors_polar
# =========================================================
# SCOMV pipeline as a class
# =========================================================
[docs]
@dataclass
class SCOMVPipeline:
adata: Any
grid: Any
# ---- defaults (can be overridden per run) ----
bin_size: int = 10
region_col: str = "region_10"
subsample_fraction: float = 0.5
min_gene_percentile: float = 30
max_gene_percentile: float = 95
require_moran_nonneg: bool = True
contour_bounds: Tuple[float, float, float, float] = (0, 10000, 0, 10000)
invert_y: bool = True
make_inside_negative: bool = True
# ---- polar bins ----
radius_bins: Optional[np.ndarray] = None
angle_bins_deg: Optional[np.ndarray] = None
# ---- cached / last run results ----
last_roi: Optional[Tuple[float, float, float, float]] = None
subset_grid: Any = None
filtered_shortest: Any = None
xy_list: Any = None
outline_points: Optional[List[Tuple[float, float]]] = None
inside_points: Optional[List[Tuple[float, float]]] = None
min_vector_df: Optional[pd.DataFrame] = None
moran_df: Optional[pd.DataFrame] = None
gene_lens: Optional[np.ndarray] = None
p_low: Optional[int] = None
p_high: Optional[int] = None
selected_genes: Optional[List[str]] = None
polar_counts_list: Optional[List[np.ndarray]] = None
dist_df: Optional[pd.DataFrame] = None
pcoa_res: Any = None
coords: Optional[pd.DataFrame] = None
explained: Optional[pd.Series] = None
def _ensure_bins(self):
if self.radius_bins is None:
self.radius_bins = np.arange(-150, 310, 10)
if self.angle_bins_deg is None:
self.angle_bins_deg = np.arange(-180, 181, 30)
# ----------------------------
# Step 0: ROI subset for adata (Moran用)
# ----------------------------
[docs]
@staticmethod
def subset_by_roi(adata, roi: Tuple[float, float, float, float]):
min_x, max_x, min_y, max_y = roi
return adata[
(adata.obs["imagecol"] >= min_x) & (adata.obs["imagecol"] <= max_x) &
(adata.obs["imagerow"] >= min_y) & (adata.obs["imagerow"] <= max_y)
].copy()
# ----------------------------
# Step 1: compute Moran's I
# ----------------------------
[docs]
def compute_moran_df(
self,
roi: Tuple[float, float, float, float],
subsample_fraction: Optional[float] = None,
delaunay: bool = True,
n_perms: int = 100,
n_jobs: int = 1,
plot_cdf: bool = False,
) -> pd.DataFrame:
frac = self.subsample_fraction if subsample_fraction is None else subsample_fraction
ad = self.subset_by_roi(self.adata, roi)
ad_sub = sc.pp.subsample(ad, fraction=frac, copy=True)
sq.gr.spatial_neighbors(ad_sub, coord_type="generic", delaunay=delaunay)
sq.gr.spatial_autocorr(ad_sub, mode="moran", n_perms=n_perms, n_jobs=n_jobs)
moran_df = ad_sub.uns["moranI"].copy()
if plot_cdf:
I_values = moran_df["I"].to_numpy()
sorted_vals = np.sort(I_values)
cdf = np.arange(1, len(sorted_vals) + 1) / len(sorted_vals)
plt.figure(figsize=(6, 4))
plt.plot(sorted_vals, cdf, linewidth=2)
plt.xlabel("Moran's I")
plt.ylabel("Cumulative Frequency")
plt.title("CDF of Moran's I")
plt.grid(True)
plt.tight_layout()
plt.show()
return moran_df
# ----------------------------
# Step 2: gene total counts
# ----------------------------
[docs]
@staticmethod
def gene_total_counts_int(adata) -> np.ndarray:
X = adata.X
return np.array(X.sum(axis=0)).ravel().astype(int)
# ----------------------------
# Step 3: build polar distributions for genes
# ----------------------------
[docs]
def build_polar_distributions_for_genes(
self,
adata_grid,
min_vector_df: pd.DataFrame,
moran_df: Optional[pd.DataFrame] = None,
min_gene_count: int = 0,
require_moran_nonneg: Optional[bool] = None,
bin_size_um: Optional[int] = None,
make_plots: bool = False,
clim_ratio: float = 0.15,
) -> Tuple[List[np.ndarray], List[str]]:
self._ensure_bins()
req_moran = self.require_moran_nonneg if require_moran_nonneg is None else require_moran_nonneg
bin_um = self.bin_size if bin_size_um is None else bin_size_um
moran_I = moran_df["I"] if moran_df is not None else None
X = adata_grid.X
n_genes = adata_grid.n_vars
n_cells = adata_grid.n_obs
polar_counts_list: List[np.ndarray] = []
selected_genes: List[str] = []
for n in range(n_genes):
gene_name = adata_grid.var.index[n]
expr = X[:, n]
if hasattr(expr, "toarray"):
expr = expr.toarray().ravel()
else:
expr = np.asarray(expr).ravel()
total_n = int(expr.sum())
if total_n <= min_gene_count:
continue
if moran_I is not None and req_moran:
if gene_name in moran_I.index:
if float(moran_I.loc[gene_name]) < 0:
continue
else:
continue
gene_angles = []
gene_radiis = []
gene_weights = []
for i in range(n_cells):
# ここは元コード互換(int化)。正規化値を使うなら int を外して float にしてOK
e = int(expr[i])
if e == 0:
continue
angles = min_vector_df["angle"].iloc[i]
radiis = min_vector_df["radii"].iloc[i]
n_vecs = len(angles)
if n_vecs == 0:
continue
w = e / n_vecs
for ang, rad in zip(angles, radiis):
gene_angles.append(ang)
gene_radiis.append(rad * bin_um)
gene_weights.append(w)
if len(gene_angles) == 0:
continue
deg = np.degrees(gene_angles)
w = np.asarray(gene_weights, dtype=float)
w = w / w.sum()
fig = plt.figure(figsize=(6, 6)) if make_plots else plt.figure(figsize=(1, 1))
counts, _, _, img = plt.hist2d(
gene_radiis, deg,
bins=[self.radius_bins, self.angle_bins_deg],
weights=w,
cmap="viridis"
)
if make_plots:
max_val = np.max(counts) if counts.size else 0
if max_val > 0:
img.set_clim(0, max_val * float(clim_ratio))
plt.title(f"{gene_name} (n={total_n})")
plt.tight_layout()
plt.show()
plt.clf()
plt.close(fig)
selected_genes.append(gene_name)
polar_counts_list.append(counts)
return polar_counts_list, selected_genes
# ----------------------------
# Step 4: MINAS similarity -> distance df
# ----------------------------
[docs]
@staticmethod
def minas_similarity_matrix(
polar_counts_list: List[np.ndarray],
labels: List[str],
make_distance: bool = True,
fill_diagonal_zero: bool = True,
) -> pd.DataFrame:
n = len(labels)
table = np.zeros((n, n), dtype=float)
for i in range(n):
A = np.asarray(polar_counts_list[i])
for j in range(i, n):
B = np.asarray(polar_counts_list[j])
sim = float(np.minimum(A, B).sum())
table[i, j] = sim
table[j, i] = sim
df = pd.DataFrame(table, index=labels, columns=labels)
if make_distance:
df = 1.0 - df
if fill_diagonal_zero:
np.fill_diagonal(df.values, 0.0)
return df
# ----------------------------
# Step 5: PCoA
# ----------------------------
[docs]
@staticmethod
def run_pcoa_from_distance_df(dist_df: pd.DataFrame):
dm = skbio.DistanceMatrix(dist_df.values, ids=list(dist_df.index))
res = pcoa(dm)
coords = res.samples
explained = res.proportion_explained
return res, coords, explained
# ----------------------------
# Plot: PCoA Plotly
# ----------------------------
[docs]
@staticmethod
def plot_pcoa_plotly(coords: pd.DataFrame, labels: List[str], width: int = 720, height: int = 540, margin: float = 0.05):
df_plot = pd.DataFrame({
"PCoA1": coords.iloc[:, 0].values,
"PCoA2": coords.iloc[:, 1].values,
"gene": labels
})
fig = px.scatter(df_plot, x="PCoA1", y="PCoA2", hover_name="gene")
fig.update_traces(marker=dict(size=12))
fig.update_yaxes(scaleanchor="x", scaleratio=1)
xmin, xmax = df_plot["PCoA1"].min(), df_plot["PCoA1"].max()
ymin, ymax = df_plot["PCoA2"].min(), df_plot["PCoA2"].max()
fig.update_xaxes(range=[xmin - margin, xmax + margin])
fig.update_yaxes(range=[ymin - margin, ymax + margin])
fig.update_layout(
width=width, height=height,
title="PCoA Plot",
xaxis_title="PCoA1", yaxis_title="PCoA2",
font=dict(size=18),
)
fig.show()
#return fig
# =========================================================
# Public API: run full pipeline (single ROI)
# =========================================================
[docs]
def run(
self,
roi: Tuple[float, float, float, float],
*,
subset_grid=None,
filtered_shortest=None,
xy_list=None,
outline_points=None,
inside_points=None,
bin_size: Optional[int] = None,
region_col: Optional[str] = None,
subsample_fraction: Optional[float] = None,
min_gene_percentile: Optional[float] = None,
max_gene_percentile: Optional[float] = None,
require_moran_nonneg: Optional[bool] = None,
contour_bounds: Optional[Tuple[float, float, float, float]] = None,
invert_y: Optional[bool] = None,
make_inside_negative: Optional[bool] = None,
make_plots: bool = False,
) -> Dict[str, Any]:
if bin_size is not None:
self.bin_size = bin_size
if region_col is not None:
self.region_col = region_col
if subsample_fraction is not None:
self.subsample_fraction = subsample_fraction
if min_gene_percentile is not None:
self.min_gene_percentile = min_gene_percentile
if max_gene_percentile is not None:
self.max_gene_percentile = max_gene_percentile
if require_moran_nonneg is not None:
self.require_moran_nonneg = require_moran_nonneg
if contour_bounds is not None:
self.contour_bounds = contour_bounds
if invert_y is not None:
self.invert_y = invert_y
if make_inside_negative is not None:
self.make_inside_negative = make_inside_negative
self._ensure_bins()
self.last_roi = roi
# Use precomputed inputs
if subset_grid is not None:
self.subset_grid = subset_grid
if filtered_shortest is not None:
self.filtered_shortest = filtered_shortest
if xy_list is not None:
self.xy_list = xy_list
if outline_points is not None:
self.outline_points = outline_points
if inside_points is not None:
self.inside_points = inside_points
if self.subset_grid is None or self.xy_list is None or self.outline_points is None or self.inside_points is None:
raise ValueError(
"subset_grid, xy_list, outline_points, and inside_points must be provided."
)
# ------------------
# 3) min vectors
# ------------------
self.min_vector_df = compute_min_vectors_polar(
xy_list=self.xy_list,
outline_points=self.outline_points,
inside_points=self.inside_points,
invert_y=self.invert_y,
make_inside_negative=self.make_inside_negative,
)
# ------------------
# 4) Moran
# ------------------
self.moran_df = self.compute_moran_df(
roi=roi,
subsample_fraction=self.subsample_fraction,
plot_cdf=False,
).sort_index()
# ------------------
# 5) gene lens + thresholds
# ------------------
self.gene_lens = self.gene_total_counts_int(self.subset_grid)
self.p_low = int(np.percentile(self.gene_lens, self.min_gene_percentile))
self.p_high = int(np.percentile(self.gene_lens, self.max_gene_percentile))
# ------------------
# 6) polar distributions
# ------------------
self.polar_counts_list, self.selected_genes = self.build_polar_distributions_for_genes(
adata_grid=self.subset_grid,
min_vector_df=self.min_vector_df,
moran_df=self.moran_df,
min_gene_count=self.p_low,
require_moran_nonneg=self.require_moran_nonneg,
make_plots=make_plots,
)
# ------------------
# 7) distance
# ------------------
self.dist_df = self.minas_similarity_matrix(
polar_counts_list=self.polar_counts_list,
labels=self.selected_genes,
make_distance=True,
fill_diagonal_zero=True,
)
# ------------------
# 8) PCoA
# ------------------
self.pcoa_res, self.coords, self.explained = self.run_pcoa_from_distance_df(self.dist_df)
return {
"subset_grid": self.subset_grid,
"min_vector_df": self.min_vector_df,
"moran_df": self.moran_df,
"selected_genes": self.selected_genes,
"dist_df": self.dist_df,
"pcoa_res": self.pcoa_res,
"coords": self.coords,
"explained": self.explained,
}
# =========================================================
# Usage
# =========================================================
# pipe = SCOMVPipeline(adata=adata, grid=grid, bin_size=10, region_col="region_10")
# out = pipe.run(roi=(min_x, max_x, min_y, max_y))
# pipe.plot_pcoa_plotly(pipe.coords, pipe.selected_genes)