Coverage for subcell_pipeline/simulation/post_processing.py: 0%

34 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2024-08-29 15:14 +0000

1"""Methods for processing simulations.""" 

2 

3import numpy as np 

4import pandas as pd 

5from io_collection.keys.check_key import check_key 

6from io_collection.load.load_dataframe import load_dataframe 

7from io_collection.save.save_dataframe import save_dataframe 

8 

9SAMPLE_COLUMNS: list[str] = ["xpos", "ypos", "zpos"] 

10"""Columns names used when sampling simulation data.""" 

11 

12 

13def sample_simulation_data( 

14 bucket: str, 

15 series_name: str, 

16 condition_keys: list[str], 

17 random_seeds: list[int], 

18 n_timepoints: int, 

19 n_monomer_points: int, 

20) -> None: 

21 """ 

22 Sample simulation data for select conditions and seeds at given resolution. 

23 

24 Parameters 

25 ---------- 

26 bucket 

27 Name of S3 bucket for input and output files. 

28 series_name 

29 Name of simulation series. 

30 condition_keys 

31 List of condition keys. 

32 random_seeds 

33 Random seeds for simulations. 

34 n_timepoints 

35 Number of equally spaced timepoints to sample. 

36 n_monomer_points 

37 Number of equally spaced monomer points to sample. 

38 """ 

39 

40 for condition_key in condition_keys: 

41 series_key = f"{series_name}_{condition_key}" if condition_key else series_name 

42 

43 for seed in random_seeds: 

44 data_key = f"{series_name}/data/{series_key}_{seed:06d}.csv" 

45 sampled_key = f"{series_name}/samples/{series_key}_{seed:06d}.csv" 

46 

47 # Skip if dataframe file already exists. 

48 if check_key(bucket, sampled_key): 

49 print( 

50 f"Sampled dataframe [ { sampled_key } ] already exists. Skipping." 

51 ) 

52 continue 

53 

54 print(f"Sampling data for [ {condition_key} ] seed [ {seed} ]") 

55 

56 full_data = load_dataframe(bucket, data_key) 

57 sampled_data = sample_simulation_data_points( 

58 full_data, n_timepoints, n_monomer_points 

59 ) 

60 

61 save_dataframe(bucket, sampled_key, sampled_data, index=False) 

62 

63 

64def sample_simulation_data_points( 

65 data: pd.DataFrame, 

66 n_timepoints: int, 

67 n_monomer_points: int, 

68 sampled_columns: list[str] = SAMPLE_COLUMNS, 

69) -> pd.DataFrame: 

70 """ 

71 Sample selected columns from simulation data at given resolution. 

72 

73 Parameters 

74 ---------- 

75 data 

76 Full simulation data. 

77 n_timepoints 

78 Number of equally spaced timepoints to sample. 

79 n_monomer_points 

80 Number of equally spaced monomer points to sample. 

81 sampled_columns 

82 List of column names to sample. 

83 

84 Returns 

85 ------- 

86 : 

87 Sampled simulation data. 

88 """ 

89 

90 all_sampled_points = [] 

91 

92 unique_timepoints = data["time"].unique() 

93 n_unique_timepoints = unique_timepoints.size 

94 

95 time_indices = np.rint( 

96 np.interp( 

97 np.linspace(0, 1, n_timepoints + 1), 

98 np.linspace(0, 1, n_unique_timepoints), 

99 np.arange(n_unique_timepoints), 

100 ) 

101 ).astype(int) 

102 

103 time_data = data[data["time"].isin(unique_timepoints[time_indices])] 

104 

105 for time, group in time_data.groupby("time"): 

106 sampled_points = pd.DataFrame() 

107 sampled_points["fiber_point"] = np.arange(n_monomer_points) 

108 sampled_points["time"] = time 

109 

110 for column in sampled_columns: 

111 sampled_points[column] = np.interp( 

112 np.linspace(0, 1, n_monomer_points), 

113 np.linspace(0, 1, group.shape[0]), 

114 group[column].values, 

115 ) 

116 

117 all_sampled_points.append(sampled_points) 

118 

119 return pd.concat(all_sampled_points)