Source code for subcell_pipeline.analysis.dimensionality_reduction.pca_dim_reduction
"""Methods for dimensionality reduction using PCA."""
import random
from typing import Optional
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from io_collection.save.save_dataframe import save_dataframe
from io_collection.save.save_figure import save_figure
from io_collection.save.save_json import save_json
from sklearn.decomposition import PCA
from subcell_pipeline.analysis.dimensionality_reduction.fiber_data import reshape_fibers
[docs]
def run_pca(data: pd.DataFrame) -> tuple[pd.DataFrame, PCA]:
"""
Run Principal Component Analysis (PCA) on simulation data.
Parameters
----------
data
Simulated fiber data.
Returns
-------
:
Dataframe with PCA components appended and the PCA object.
"""
all_fibers, all_features = reshape_fibers(data)
pca = PCA(n_components=2)
pca = pca.fit(all_fibers)
transform = pca.transform(all_fibers)
pca_results = pd.concat(
[pd.DataFrame(transform, columns=["PCA1", "PCA2"]), all_features],
axis=1,
)
return pca_results, pca
[docs]
def save_pca_results(
pca_results: pd.DataFrame, save_location: str, save_key: str, resample: bool = True
) -> None:
"""
Save PCA results data.
Parameters
----------
pca_results
PCA trajectory data.
save_location
Location for output file (local path or S3 bucket).
save_key
Name key for output file.
resample
True if data should be resampled before saving, False otherwise.
"""
if resample:
pca_results = pca_results.copy().sample(frac=1.0, random_state=1)
save_dataframe(save_location, save_key, pca_results, index=False)
[docs]
def save_pca_trajectories(
pca_results: pd.DataFrame, save_location: str, save_key: str
) -> None:
"""
Save PCA trajectories data.
Parameters
----------
pca_results
PCA trajectory data.
save_location
Location for output file (local path or S3 bucket).
save_key
Name key for output file.
"""
output = []
for (simulator, repeat, velocity), group in pca_results.groupby(
["SIMULATOR", "REPEAT", "VELOCITY"]
):
output.append(
{
"simulator": simulator.upper(),
"replicate": int(repeat),
"velocity": velocity,
"x": group["PCA1"].tolist(),
"y": group["PCA2"].tolist(),
}
)
random.Random(1).shuffle(output)
save_json(save_location, save_key, output)
[docs]
def save_pca_transforms(
pca: PCA, points: list[list[float]], save_location: str, save_key: str
) -> None:
"""
Save PCA transform data.
Parameters
----------
pca
PCA object.
points
List of inverse transform points.
save_location
Location for output file (local path or S3 bucket).
save_key
Name key for output file.
"""
output = []
pc1_points, pc2_points = points
for point in pc1_points:
fiber = pca.inverse_transform([point, 0]).reshape(-1, 3)
output.append(
{
"component": 1,
"point": point,
"x": fiber[:, 0].tolist(),
"y": fiber[:, 1].tolist(),
"z": fiber[:, 2].tolist(),
}
)
for point in pc2_points:
fiber = pca.inverse_transform([0, point]).reshape(-1, 3)
output.append(
{
"component": 2,
"point": point,
"x": fiber[:, 0].tolist(),
"y": fiber[:, 1].tolist(),
"z": fiber[:, 2].tolist(),
}
)
save_json(save_location, save_key, output)
[docs]
def plot_pca_feature_scatter(
data: pd.DataFrame,
features: dict,
pca: PCA,
save_location: Optional[str] = None,
save_key: str = "pca_feature_scatter.png",
) -> None:
"""
Plot scatter of PCA components colored by the given features.
Parameters
----------
data
PCA results data.
features
Map of feature name to coloring.
pca
PCA object.
save_location
Location for output file (local path or S3 bucket).
save_key
Name key for output file.
"""
figure, ax = plt.subplots(
1, len(features), figsize=(10, 3), sharey=True, sharex=True
)
for index, (feature, colors) in enumerate(features.items()):
if isinstance(colors, dict):
ax[index].scatter(
data["PCA1"],
data["PCA2"],
s=2,
c=data[feature].map(colors),
)
elif isinstance(colors, tuple):
ax[index].scatter(
data["PCA1"],
data["PCA2"],
s=2,
c=data[feature].map(colors[0]),
cmap=colors[1],
)
else:
ax[index].scatter(
data["PCA1"],
data["PCA2"],
s=2,
c=data[feature],
cmap=colors,
)
ax[index].set_title(feature)
ax[index].set_xlabel(f"PCA1 ({(pca.explained_variance_ratio_[0] * 100):.1f} %)")
ax[index].set_ylabel(f"PCA2 ({(pca.explained_variance_ratio_[1] * 100):.1f} %)")
plt.tight_layout()
plt.show()
if save_location is not None:
save_figure(save_location, save_key, figure)
[docs]
def plot_pca_inverse_transform(
pca: PCA,
pca_results: pd.DataFrame,
save_location: Optional[str] = None,
save_key: str = "pca_inverse_transform.png",
) -> None:
"""
Plot inverse transform of PCA.
Parameters
----------
pca
PCA object.
pca_results
PCA results data.
save_location
Location for output file (local path or S3 bucket).
save_key
Name key for output file.
"""
figure, ax = plt.subplots(2, 3, figsize=(10, 6))
points = np.arange(-2, 2, 0.5)
stdev_pc1 = pca_results["PCA1"].std(ddof=0)
stdev_pc2 = pca_results["PCA2"].std(ddof=0)
cmap = plt.colormaps.get_cmap("RdBu_r")
for point in points:
# Traverse PC 1
fiber = pca.inverse_transform([point * stdev_pc1, 0]).reshape(-1, 3)
ax[0, 0].plot(fiber[:, 0], fiber[:, 1], color=cmap((point + 2) / 4))
ax[0, 1].plot(fiber[:, 1], fiber[:, 2], color=cmap((point + 2) / 4))
ax[0, 2].plot(fiber[:, 0], fiber[:, 2], color=cmap((point + 2) / 4))
# Traverse PC 2
fiber = pca.inverse_transform([0, point * stdev_pc2]).reshape(-1, 3)
ax[1, 0].plot(fiber[:, 0], fiber[:, 1], color=cmap((point + 2) / 4))
ax[1, 1].plot(fiber[:, 1], fiber[:, 2], color=cmap((point + 2) / 4))
ax[1, 2].plot(fiber[:, 0], fiber[:, 2], color=cmap((point + 2) / 4))
for index in [0, 1]:
ax[index, 0].set_xlabel("X")
ax[index, 0].set_ylabel("Y", rotation=0)
ax[index, 1].set_xlabel("Y")
ax[index, 1].set_ylabel("Z", rotation=0)
ax[index, 2].set_xlabel("X")
ax[index, 2].set_ylabel("Z", rotation=0)
for index in [0, 1, 2]:
ax[0, index].set_title("PC1")
ax[1, index].set_title("PC2")
plt.tight_layout()
plt.show()
if save_location is not None:
save_figure(save_location, save_key, figure)