Source code for subcell_pipeline.simulation.readdy.loader
"""Class for loading and shaping ReaDDy trajectories."""fromtypingimportAny,Optionalimportnumpyasnpimportreaddyfromio_collection.keys.check_keyimportcheck_keyfromio_collection.load.load_pickleimportload_picklefromio_collection.save.save_pickleimportsave_picklefromtqdmimporttqdmfromsubcell_pipeline.simulation.readdy.data_structuresimport(FrameData,ParticleData,TopologyData,)
[docs]classReaddyLoader:""" Load and shape data from a ReaDDy trajectory. Trajectory is loaded from the simulation output h5 file of the .dat pickle file. If a .dat pickle location and key are provided, the loaded trajectory is saved to the given location for faster reloads. """_readdy_trajectory:Optional[readdy.Trajectory]"""ReaDDy trajectory object."""_trajectory:Optional[list[FrameData]]"""List of FrameData for trajectory."""h5_file_path:str"""Path to the ReaDDy .h5 file or .dat pickle file."""min_time_ix:int"""First time index to include."""max_time_ix:int"""Last time index to include."""time_inc:int"""Include every time_inc timestep."""timestep:float"""Real time for each simulation timestep."""pickle_location:Optional[str]"""Location to save pickle file (AWS S3 bucket or local path)."""pickle_key:Optional[str]"""Name of pickle file (AWS S3 bucket or local path)."""
[docs]defreaddy_trajectory(self)->readdy.Trajectory:""" Lazy load the ReaDDy trajectory object. Note that loading ReaDDy trajectories requires a path to a local file. Loading currently does not support S3 locations. Returns ------- : The ReaDDy trajectory object. """ifself._readdy_trajectoryisNone:self._readdy_trajectory=readdy.Trajectory(self.h5_file_path)returnself._readdy_trajectory
@staticmethoddef_frame_edges(time_ix:int,topology_records:Any)->list[list[int]]:""" Get all edges at the given time index as [particle1 id, particle2 id]. The ``topology_records`` object is output from ``readdy.Trajectory(h5_file_path).read_observable_topologies()``. """result=[]fortopintopology_records[time_ix]:fore1,e2intop.edges:ife1<=e2:ix1=top.particles[e1]ix2=top.particles[e2]result.append([ix1,ix2])returnresultdef_shape_trajectory_data(self)->list[FrameData]:"""Shape data from a ReaDDy trajectory for analysis."""(_,topology_records,)=self.readdy_trajectory().read_observable_topologies()# type: ignore(times,types,ids,positions,)=self.readdy_trajectory().read_observable_particles()# type: ignoreresult=[]fortime_ixintqdm(range(len(times))):if(time_ix<self.min_time_ixor(self.max_time_ix>=0andtime_ix>self.max_time_ix)ortimes[time_ix]%self.time_inc!=0):continueframe=FrameData(time=self.timestep*time_ix)frame.edge_ids=ReaddyLoader._frame_edges(time_ix,topology_records)forindex,topinenumerate(topology_records[time_ix]):frame.topologies[index]=TopologyData(uid=index,type_name=top.type,particle_ids=top.particles,)forpinrange(len(ids[time_ix])):p_id=ids[time_ix][p]position=positions[time_ix][p]neighbor_ids=[]foredgeinframe.edge_ids:ifp_id==edge[0]:neighbor_ids.append(edge[1])elifp_id==edge[1]:neighbor_ids.append(edge[0])frame.particles[ids[time_ix][p]]=ParticleData(uid=ids[time_ix][p],type_name=self.readdy_trajectory().species_name(# type: ignoretypes[time_ix][p]),position=np.array([position[0],position[1],position[2]]),neighbor_ids=neighbor_ids,)result.append(frame)returnresult
[docs]deftrajectory(self)->list[FrameData]:""" Lazy load the shaped trajectory. Returns ------- : The trajectory of data shaped for analysis. """ifself._trajectoryisnotNone:returnself._trajectoryifself.pickle_locationisnotNoneandself.pickle_keyisnotNone:ifcheck_key(self.pickle_location,self.pickle_key):print(f"Loading pickle file for ReaDDy data from {self.pickle_key}")self._trajectory=load_pickle(self.pickle_location,self.pickle_key)else:print(f"Loading ReaDDy data from h5 file {self.h5_file_path}")print(f"Saving pickle file for ReaDDy data to {self.h5_file_path}")self._trajectory=self._shape_trajectory_data()save_pickle(self.pickle_location,self.pickle_key,self._trajectory)else:print(f"Loading ReaDDy data from h5 file {self.h5_file_path}")self._trajectory=self._shape_trajectory_data()returnself._trajectory