Source code for simularium_readdy_models.common.readdy_util

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import copy
import numpy as np
import scipy.linalg as linalg
import random
import readdy
import os
from shutil import rmtree
import math
import pandas as pd
from tqdm import tqdm


[docs]class ReaddyUtil: def __init__(self): """ Utilities used for Simularium ReaDDy models """ self.bond_pairs = [] self.angle_triples = [] self.dihedral_quads = [] self.repulse_pairs = []
[docs] @staticmethod def normalize(v): """ normalize a vector """ return v / np.linalg.norm(v)
[docs] @staticmethod def analyze_reaction_count_over_time(reactions, reaction_name): """ Get a list of the number of times a reaction happened between each analyzed timestep of the given reaction """ if reaction_name not in reactions: print(f"Couldn't find reaction: {reaction_name}") return None return np.insert(reactions[reaction_name].to_numpy(), 0, 0.0)
[docs] @staticmethod def get_perpendicular_components_of_vector(v, v_perpendicular): """ Get the components of v that are perpendicular to v_perpendicular """ return ( v - (np.dot(v, v_perpendicular) / pow(np.linalg.norm(v_perpendicular), 2)) * v_perpendicular )
[docs] @staticmethod def get_angle_between_vectors(v1, v2, in_degrees=False): """ get the angle between two vectors in radians unless in_degrees is True """ result = np.arccos( np.clip( np.dot(ReaddyUtil.normalize(v1), ReaddyUtil.normalize(v2)), -1.0, 1.0 ) ) return result if not in_degrees else np.rad2deg(result)
[docs] @staticmethod def rotate(v, axis, angle): """ rotate a vector around axis by angle (radians) """ rotation = linalg.expm(np.cross(np.eye(3), ReaddyUtil.normalize(axis) * angle)) return np.dot(rotation, np.copy(v))
[docs] @staticmethod def get_rotation_matrix(v1, v2): """ Cross the vectors and get a rotation matrix """ v3 = np.cross(v2, v1) return np.array( [[v1[0], v2[0], v3[0]], [v1[1], v2[1], v3[1]], [v1[2], v2[2], v3[2]]] )
[docs] @staticmethod def get_orientation_from_positions(positions): """ orthonormalize and cross the vectors from position 2 to the other positions to get a basis local to position 2, positions = [position 1, position 2, position 3] """ v1 = ReaddyUtil.normalize(positions[0] - positions[1]) v2 = ReaddyUtil.normalize(positions[2] - positions[1]) v2 = ReaddyUtil.normalize(v2 - (np.dot(v1, v2) / np.dot(v1, v1)) * v1) return ReaddyUtil.get_rotation_matrix(v1, v2)
[docs] @staticmethod def get_orientation_from_vectors(v1, v2): """ orthonormalize and cross the vectors to get a rotation matrix """ v2 = ReaddyUtil.normalize(v2 - (np.dot(v1, v2) / np.dot(v1, v1)) * v1) return ReaddyUtil.get_rotation_matrix(v1, v2)
[docs] @staticmethod def get_random_perpendicular_vector(v): """ get a random unit vector perpendicular to v """ if v[0] == 0 and v[1] == 0: if v[2] == 0: raise ValueError("zero vector") return np.array([0, 1, 0]) u = ReaddyUtil.normalize(np.array([-v[1], v[0], 0])) return ReaddyUtil.rotate(u, v, 2 * np.pi * random.random())
[docs] @staticmethod def topology_to_string(topology): """ get string with vertex types and ids in a topology """ result = f"{topology.type} : \n" for vertex in topology.graph.get_vertices(): result += f"{ReaddyUtil.vertex_to_string(topology, vertex)}\n" for neighbor in vertex: result += ( f" -- {ReaddyUtil.vertex_to_string(topology, neighbor.get())}\n" ) return result
[docs] @staticmethod def vertex_to_string(topology, vertex): """ get string with type and id for vertex """ return ( topology.particle_type_of_vertex(vertex) + " (" + str(topology.particle_id_of_vertex(vertex)) + ")" )
[docs] @staticmethod def get_non_periodic_boundary_position(pos1, pos2, box_size): """ if the distance between two positions is greater than box_size, move the second position across the box (for positioning calculations) """ result = np.copy(pos2) for dim in range(3): if abs(pos2[dim] - pos1[dim]) > box_size[dim] / 2.0: result[dim] -= pos2[dim] / abs(pos2[dim]) * box_size[dim] return result
[docs] @staticmethod def calculate_diffusionCoefficient(r0, eta, T): """ calculates the theoretical diffusion constant of a spherical particle with radius r0[nm] in a media with viscosity eta [cP] at temperature T [Kelvin] returns nm^2/s """ return ( (1.38065 * 10 ** (-23) * T) / (6 * np.pi * eta * 10 ** (-3) * r0 * 10 ** (-9)) * 10**18 / 10**9 )
[docs] @staticmethod def calculate_nParticles(C, dim): """ calculates the number of particles for a species at concentration C [uM] in a cuboidal container with dimensions dim = dimx, dimy, dimz [nm] returns unitless number """ return int(round(C * 1e-30 * 6.022e23 * np.prod(dim)))
[docs] @staticmethod def calculate_concentration(n, dim): """ calculates the concentration for a species with number of particles n in a cuboidal container with dimensions dim = dimx, dimy, dimz [nm] returns concentration [uM] """ return n / (1e-30 * 6.022e23 * np.prod(dim))
[docs] @staticmethod def vertex_not_found(topology, verbose, error_msg, debug_msg): """ If a vertex was not found, check if an exception should be raised or a debug message should be printed If error_msg == "", don't raise an exception If not verbose or debug_msg == "", don't print the message """ if error_msg: raise Exception(error_msg + "\n" + ReaddyUtil.topology_to_string(topology)) if verbose and debug_msg: print(debug_msg)
[docs] @staticmethod def get_vertex_of_type( topology, vertex_type, exact_match, verbose=False, debug_msg="", error_msg="" ): """ get the first vertex with a given type """ for vertex in topology.graph.get_vertices(): pt = topology.particle_type_of_vertex(vertex) if (not exact_match and vertex_type in pt) or ( exact_match and pt == vertex_type ): return vertex ReaddyUtil.vertex_not_found(topology, verbose, error_msg, debug_msg) return None
[docs] @staticmethod def get_first_vertex_of_types( topology, vertex_types, verbose=False, debug_msg="", error_msg="" ): """ get the first vertex with any of the given types """ for vertex in topology.graph.get_vertices(): if topology.particle_type_of_vertex(vertex) in vertex_types: return vertex ReaddyUtil.vertex_not_found(topology, verbose, error_msg, debug_msg) return None
[docs] @staticmethod def get_vertex_with_id( topology, vertex_id, verbose=False, debug_msg="", error_msg="" ): """ get the first vertex with a given id """ for vertex in topology.graph.get_vertices(): if topology.particle_id_of_vertex(vertex) == vertex_id: return vertex ReaddyUtil.vertex_not_found(topology, verbose, error_msg, debug_msg) return None
[docs] @staticmethod def get_first_neighbor( topology, vertex, exclude_vertices, verbose=False, debug_msg="", error_msg="" ): """ get the first neighboring vertex """ exclude_ids = [] for v in exclude_vertices: exclude_ids.append(topology.particle_id_of_vertex(v)) for neighbor in vertex: if topology.particle_id_of_vertex(neighbor) in exclude_ids: continue return neighbor.get() ReaddyUtil.vertex_not_found(topology, verbose, error_msg, debug_msg) return None
[docs] @staticmethod def get_neighbor_of_type( topology, vertex, vertex_type, exact_match, exclude_vertices=[], verbose=False, debug_msg="", error_msg="", ): """ get the first neighboring vertex of type vertex_type """ exclude_ids = [] for v in exclude_vertices: exclude_ids.append(topology.particle_id_of_vertex(v)) for neighbor in vertex: if topology.particle_id_of_vertex(neighbor) in exclude_ids: continue v_neighbor = neighbor.get() pt = topology.particle_type_of_vertex(v_neighbor) if (not exact_match and vertex_type in pt) or ( exact_match and pt == vertex_type ): return v_neighbor ReaddyUtil.vertex_not_found(topology, verbose, error_msg, debug_msg) return None
[docs] @staticmethod def get_neighbor_of_types( topology, vertex, vertex_types, exclude_vertices, verbose=False, debug_msg="", error_msg="", ): """ get the first neighboring vertex with any of the given types, excluding particles with the given ids """ exclude_ids = [] for v in exclude_vertices: exclude_ids.append(topology.particle_id_of_vertex(v)) for neighbor in vertex: if topology.particle_id_of_vertex(neighbor) in exclude_ids: continue v_neighbor = neighbor.get() pt = topology.particle_type_of_vertex(v_neighbor) if pt in vertex_types: return v_neighbor ReaddyUtil.vertex_not_found(topology, verbose, error_msg, debug_msg) return None
[docs] @staticmethod def get_vertices_of_type( topology, vertex_type, exact_match, verbose=False, debug_msg="", error_msg="" ): """ get all vertices with a given type """ v = [] for vertex in topology.graph.get_vertices(): pt = topology.particle_type_of_vertex(vertex) if (not exact_match and vertex_type in pt) or ( exact_match and pt == vertex_type ): v.append(vertex) if len(v) == 0: ReaddyUtil.vertex_not_found(topology, verbose, error_msg, debug_msg) return v
[docs] @staticmethod def get_neighbors_of_type( topology, vertex, vertex_type, exact_match, verbose=False, debug_msg="", error_msg="", ): """ get all neighboring vertices with a given type """ v = [] for neighbor in vertex: v_neighbor = neighbor.get() pt = topology.particle_type_of_vertex(v_neighbor) if (not exact_match and vertex_type in pt) or ( exact_match and pt == vertex_type ): v.append(v_neighbor) if len(v) == 0: ReaddyUtil.vertex_not_found(topology, verbose, error_msg, debug_msg) return v
[docs] @staticmethod def get_random_vertex_of_type( topology, vertex_type, exact_match, verbose=False, debug_msg="", error_msg="" ): """ get a random vertex with a given type """ vertices = ReaddyUtil.get_vertices_of_type(topology, vertex_type, exact_match) if len(vertices) == 0: ReaddyUtil.vertex_not_found(topology, verbose, error_msg, debug_msg) return None return random.choice(vertices)
[docs] @staticmethod def get_random_vertex_of_types( topology, vertex_types, verbose=False, debug_msg="", error_msg="" ): """ get a random vertex with any of the given types """ v = [] for vertex_type in vertex_types: v += ReaddyUtil.get_vertices_of_type(topology, vertex_type, True) if len(v) == 0: ReaddyUtil.vertex_not_found(topology, verbose, error_msg, debug_msg) return None return random.choice(v)
[docs] @staticmethod def vertex_satisfies_type(vertex_type, types_include, types_exclude): """ check if vertex satisfies the type requirements """ for t in types_include: if t not in vertex_type: return False for t in types_exclude: if t in vertex_type: return False return True
[docs] @staticmethod def particle_type_with_flags( particle_type, add_flags, remove_flags, reverse_sort=False ): """ get particle type with the flags added and removed """ if "#" not in particle_type: for f in range(len(add_flags)): particle_type = particle_type + ("_" if f > 0 else "#") + add_flags[f] return particle_type flag_string = particle_type[particle_type.index("#") + 1 :] flags = flag_string.split("_") polymer_indices = "" if "tubulin" in particle_type and len(flags) > 1: polymer_indices = f"_{flags[-2]}_{flags[-1]}" flags = flags[:-2] for flag in remove_flags: if flag in flags: flags.remove(flag) for flag in add_flags: if flag not in flags: flags.append(flag) if "" in flags: flags.remove("") if len(flags) < 1: return particle_type[: particle_type.index("#")] flags.sort(reverse=reverse_sort) flag_string = "" for f in range(len(flags)): flag_string = flag_string + ("_" if f > 0 else "") + flags[f] particle_type = particle_type[: particle_type.index("#")] new_type = f"{particle_type}#{flag_string}{polymer_indices}" return new_type
[docs] @staticmethod def set_flags( topology, recipe, vertex, add_flags, remove_flags, reverse_sort=False ): """ set flags on a vertex """ particle_type = topology.particle_type_of_vertex(vertex) particle_type = ReaddyUtil.particle_type_with_flags( particle_type, add_flags, remove_flags, reverse_sort ) recipe.change_particle_type(vertex, particle_type)
[docs] @staticmethod def calculate_polymer_number(number, offset): """ calculates the polymer number from number by offset in [-2, 2] returns number in [1,3] """ n = number + offset if n > 3: n -= 3 if n < 1: n += 3 return n
[docs] @staticmethod def get_vertex_position(topology, vertex): """ get the position of a vertex """ pos = topology.position_of_vertex(vertex) return np.array([pos[0], pos[1], pos[2]])
[docs] @staticmethod def vertices_are_equal(topology, vertex1, vertex2): """ check if references are the same vertex """ return topology.particle_id_of_vertex( vertex1 ) == topology.particle_id_of_vertex(vertex2)
[docs] @staticmethod def vertices_are_connected(topology, vertex1, vertex2): """ check if the vertices share an edge """ for neighbor in vertex1: if ReaddyUtil.vertices_are_equal(topology, vertex2, neighbor.get()): return True return False
[docs] @staticmethod def reaction_function_reset_state(topology): """ reaction function for removing flags from a topology """ recipe = readdy.StructuralReactionRecipe(topology) tt = topology.type recipe.change_topology_type(tt[: tt.index("#")]) return recipe
[docs] @staticmethod def rate_function_infinity(topology): """ rate function for a reaction that should trigger immediately whenever reactants are available """ return 1e30
[docs] @staticmethod def clamp_polymer_offsets_2D(polymer_index_x, polymer_offsets): """ clamp offsets so y polymer offset is incremented if new x polymer index is not in [1,3] """ if len(polymer_offsets) < 2: return polymer_offsets offsets = copy.deepcopy(polymer_offsets) if offsets[0] != 0: if polymer_index_x + offsets[0] < 1: offsets[1] -= 1 elif polymer_index_x + offsets[0] > 3: offsets[1] += 1 return offsets
[docs] @staticmethod def get_types_with_polymer_numbers_1D(particle_types, x, polymer_offset): """ creates a list of types with 1D polymer numbers for each type in particle types at polymer number x with polymer_offset dx in [-2, 2] returns list of types """ types = [] for t in particle_types: types.append( (t + str(ReaddyUtil.calculate_polymer_number(x, polymer_offset))) if polymer_offset is not None else t ) return types
[docs] @staticmethod def get_types_with_polymer_numbers_2D(particle_types, x, y, polymer_offsets): """ creates a list of types with 2D polymer numbers for each type in particle types at polymer number x_y with polymer_offsets [dx, dy] both in [-2, 2] returns list of types """ types = [] for t in particle_types: types.append( ( t + str(ReaddyUtil.calculate_polymer_number(x, polymer_offsets[0])) + "_" + str(ReaddyUtil.calculate_polymer_number(y, polymer_offsets[1])) ) if len(polymer_offsets) > 0 else t ) return types
[docs] @staticmethod def get_random_unit_vector(): """ get a random unit vector """ return ReaddyUtil.normalize( np.array([random.random(), random.random(), random.random()]) )
[docs] @staticmethod def get_random_boundary_position(box_size): """ get a random position on one of the boundary faces """ pos = box_size * np.random.uniform(size=(3)) - box_size / 2 face = random.randint(0, 5) if face == 0: pos[0] = -box_size[0] / 2 elif face == 1: pos[0] = box_size[0] / 2 elif face == 2: pos[1] = -box_size[1] / 2 elif face == 3: pos[1] = box_size[1] / 2 elif face == 4: pos[2] = -box_size[2] / 2 else: pos[2] = box_size[2] / 2 return pos
[docs] @staticmethod def try_remove_edge(topology, recipe, vertex1, vertex2): """ try to remove an edge """ if not ReaddyUtil.vertices_are_connected(topology, vertex1, vertex2): return False, ( "Tried to remove non-existent edge! " + ReaddyUtil.vertex_to_string(topology, vertex1) + " -- " + ReaddyUtil.vertex_to_string(topology, vertex2) ) recipe.remove_edge(vertex1, vertex2) return True, ""
[docs] def add_bond(self, types1, types2, force_const, bond_length, system): """ adds a bond to the system (if it hasn't been added already) from each type in types1 to each type in types2 with force constant force_const and length bond_length [nm] """ for t1 in types1: for t2 in types2: if (t1, t2) not in self.bond_pairs and (t2, t1) not in self.bond_pairs: system.topologies.configure_harmonic_bond( t1, t2, force_const, bond_length ) self.bond_pairs.append((t1, t2)) self.bond_pairs.append((t2, t1))
[docs] def add_angle(self, types1, types2, types3, force_const, angle, system): """ adds an angle to the system (if it hasn't been added already) from each type in types1 through each type in types2 to each type in types3 with force constant force_const and angle [radians] """ for t1 in types1: for t2 in types2: for t3 in types3: if (t1, t2, t3) not in self.angle_triples and ( t3, t2, t1, ) not in self.angle_triples: system.topologies.configure_harmonic_angle( t1, t2, t3, force_const, angle ) self.angle_triples.append((t1, t2, t3)) self.angle_triples.append((t3, t2, t1))
[docs] def add_dihedral(self, types1, types2, types3, types4, force_const, angle, system): """ adds a cosine dihedral to the system (if it hasn't been added already) from each type in types1 through each type in types2 through each type in types3 to each type in types4 with force constant force_const and angle [radians] """ for t1 in types1: for t2 in types2: for t3 in types3: for t4 in types4: if (t1, t2, t3, t4) not in self.dihedral_quads and ( t4, t3, t2, t1, ) not in self.dihedral_quads: system.topologies.configure_cosine_dihedral( t1, t2, t3, t4, force_const, 1.0, angle ) system.topologies.configure_cosine_dihedral( t4, t3, t2, t1, force_const, 1.0, angle ) self.dihedral_quads.append((t1, t2, t3, t4)) self.dihedral_quads.append((t4, t3, t2, t1))
[docs] def add_repulsion(self, types1, types2, force_const, distance, system): """ adds a pairwise repulsion to the system (if it hasn't been added already) between each type in types1 and each type in types2 with force constant force_const with equilibrium distance [nm] """ for t1 in types1: for t2 in types2: if (t1, t2) not in self.repulse_pairs and ( t2, t1, ) not in self.repulse_pairs: system.potentials.add_harmonic_repulsion( t1, t2, force_const, distance ) self.repulse_pairs.append((t1, t2)) self.repulse_pairs.append((t2, t1))
[docs] def add_polymer_bond_1D( self, particle_types1, polymer_offset1, particle_types2, polymer_offset2, force_const, bond_length, system, ): """ adds a bond between all polymer numbers from types particle_types1 with offset polymer_offset1 to types particle_types2 with offset polymer_offset2 with force constant force_const and length bond_length [nm] """ for x in range(1, 4): self.add_bond( ( ReaddyUtil.get_types_with_polymer_numbers_1D( particle_types1, x, polymer_offset1 ) if polymer_offset1 is not None else particle_types1 ), ( ReaddyUtil.get_types_with_polymer_numbers_1D( particle_types2, x, polymer_offset2 ) if polymer_offset2 is not None else particle_types2 ), force_const, bond_length, system, )
[docs] def add_polymer_bond_2D( self, particle_types1, polymer_offsets1, particle_types2, polymer_offsets2, force_const, bond_length, system, ): """ adds a bond between all polymer numbers from types particle_types1 with offsets polymer_offsets1 to types particle_types2 with offsets polymer_offsets2 with force constant force_const and length bond_length [nm] """ for x in range(1, 4): for y in range(1, 4): offsets1 = ReaddyUtil.clamp_polymer_offsets_2D(x, polymer_offsets1) offsets2 = ReaddyUtil.clamp_polymer_offsets_2D(x, polymer_offsets2) self.add_bond( ReaddyUtil.get_types_with_polymer_numbers_2D( particle_types1, x, y, offsets1 ), ReaddyUtil.get_types_with_polymer_numbers_2D( particle_types2, x, y, offsets2 ), force_const, bond_length, system, )
[docs] def add_polymer_angle_1D( self, particle_types1, polymer_offset1, particle_types2, polymer_offset2, particle_types3, polymer_offset3, force_const, angle, system, ): """ adds an angle between all polymer numbers with offset polymer_offset of types particle_types with force constant force_const and angle [radians] """ for x in range(1, 4): self.add_angle( ReaddyUtil.get_types_with_polymer_numbers_1D( particle_types1, x, polymer_offset1 ), ReaddyUtil.get_types_with_polymer_numbers_1D( particle_types2, x, polymer_offset2 ), ReaddyUtil.get_types_with_polymer_numbers_1D( particle_types3, x, polymer_offset3 ), force_const, angle, system, )
[docs] def add_polymer_angle_2D( self, particle_types1, polymer_offsets1, particle_types2, polymer_offsets2, particle_types3, polymer_offsets3, force_const, angle, system, ): """ adds an angle between all polymer numbers with offsets polymer_offsets of types particle_types with force constant force_const and angle [radians] """ for x in range(1, 4): for y in range(1, 4): offsets1 = ReaddyUtil.clamp_polymer_offsets_2D(x, polymer_offsets1) offsets2 = ReaddyUtil.clamp_polymer_offsets_2D(x, polymer_offsets2) offsets3 = ReaddyUtil.clamp_polymer_offsets_2D(x, polymer_offsets3) self.add_angle( ReaddyUtil.get_types_with_polymer_numbers_2D( particle_types1, x, y, offsets1 ), ReaddyUtil.get_types_with_polymer_numbers_2D( particle_types2, x, y, offsets2 ), ReaddyUtil.get_types_with_polymer_numbers_2D( particle_types3, x, y, offsets3 ), force_const, angle, system, )
[docs] def add_polymer_dihedral_1D( self, particle_types1, polymer_offset1, particle_types2, polymer_offset2, particle_types3, polymer_offset3, particle_types4, polymer_offset4, force_const, angle, system, ): """ adds a cosine dihedral between all polymer numbers with offset polymer_offset of types particle_types with force constant force_const and angle [radians] """ for x in range(1, 4): self.add_dihedral( ReaddyUtil.get_types_with_polymer_numbers_1D( particle_types1, x, polymer_offset1 ), ReaddyUtil.get_types_with_polymer_numbers_1D( particle_types2, x, polymer_offset2 ), ReaddyUtil.get_types_with_polymer_numbers_1D( particle_types3, x, polymer_offset3 ), ReaddyUtil.get_types_with_polymer_numbers_1D( particle_types4, x, polymer_offset4 ), force_const, angle, system, )
[docs] def add_polymer_dihedral_2D( self, particle_types1, polymer_offsets1, particle_types2, polymer_offsets2, particle_types3, polymer_offsets3, particle_types4, polymer_offsets4, force_const, angle, system, ): """ adds a cosine dihedral between all polymer numbers with offsets polymer_offsets of types particle_types with force constant force_const and angle [radians] """ for x in range(1, 4): for y in range(1, 4): offsets1 = ReaddyUtil.clamp_polymer_offsets_2D(x, polymer_offsets1) offsets2 = ReaddyUtil.clamp_polymer_offsets_2D(x, polymer_offsets2) offsets3 = ReaddyUtil.clamp_polymer_offsets_2D(x, polymer_offsets3) offsets4 = ReaddyUtil.clamp_polymer_offsets_2D(x, polymer_offsets4) self.add_dihedral( ReaddyUtil.get_types_with_polymer_numbers_2D( particle_types1, x, y, offsets1 ), ReaddyUtil.get_types_with_polymer_numbers_2D( particle_types2, x, y, offsets2 ), ReaddyUtil.get_types_with_polymer_numbers_2D( particle_types3, x, y, offsets3 ), ReaddyUtil.get_types_with_polymer_numbers_2D( particle_types4, x, y, offsets4 ), force_const, angle, system, )
[docs] @staticmethod def create_readdy_simulation( system, n_cpu, sim_name="", total_steps=0, record=False, save_checkpoints=False ): """ Create the ReaDDy simulation """ simulation = system.simulation("CPU") simulation.kernel_configuration.n_threads = n_cpu if record: simulation.output_file = sim_name + ".h5" if os.path.exists(simulation.output_file): os.remove(simulation.output_file) recording_stride = max(int(total_steps / 1000.0), 1) simulation.record_trajectory(recording_stride) simulation.observe.topologies(recording_stride) simulation.observe.particles(recording_stride) simulation.observe.reaction_counts(1) simulation.progress_output_stride = recording_stride if save_checkpoints: checkpoint_stride = max(int(total_steps / 10.0), 1) checkpoint_path = f"checkpoints/{os.path.basename(sim_name)}/" if os.path.exists(checkpoint_path): rmtree(checkpoint_path) simulation.make_checkpoints(checkpoint_stride, checkpoint_path, 0) return simulation
[docs] @staticmethod def get_current_particle_edges(current_topologies): """ During a running simulation, get all the edges in the ReaDDy topologies as (particle1 id, particle2 id) from readdy.simulation.current_topologies """ result = [] for top in current_topologies: for v1, v2 in top.graph.edges: p1_id = top.particle_id_of_vertex(v1) p2_id = top.particle_id_of_vertex(v2) if p1_id <= p2_id: result.append((p1_id, p2_id)) return result
[docs] @staticmethod def get_current_monomers(current_topologies): """ During a running simulation, get data for topologies of particles """ edges = ReaddyUtil.get_current_particle_edges(current_topologies) result = { "topologies": {}, "particles": {}, } for index, topology in enumerate(current_topologies): particle_ids = [] for p in topology.particles: particle_ids.append(p.id) neighbor_ids = [] for edge in edges: if p.id == edge[0]: neighbor_ids.append(edge[1]) elif p.id == edge[1]: neighbor_ids.append(edge[0]) result["particles"][p.id] = { "type_name": p.type, "position": p.pos, "neighbor_ids": neighbor_ids, } result["topologies"][index] = { "type_name": topology.type, "particle_ids": particle_ids, } return result
@staticmethod def _shape_frame_edge_data_from_file(time_index, topology_records): """ After a simulation has finished, get all the edges at the given time index as (particle1 id, particle2 id) topology_records from readdy.Trajectory(h5_file_path).read_observable_topologies() """ result = [] for top in topology_records[time_index]: for e1, e2 in top.edges: if e1 <= e2: ix1 = top.particles[e1] ix2 = top.particles[e2] result.append((ix1, ix2)) return result @staticmethod def _shape_frame_monomer_data_from_file( time_index, topology_records, ids, types, positions, traj ): """ After a simulation has finished, get data for topologies of particles traj from readdy.Trajectory(h5_file_path) topology_records from traj.read_observable_topologies() ids, types, positions from traj.read_observable_particles() """ edges = ReaddyUtil._shape_frame_edge_data_from_file( time_index, topology_records ) result = { "topologies": {}, "particles": {}, } for index, top in enumerate(topology_records[time_index]): result["topologies"][index] = { "type_name": top.type, "particle_ids": top.particles, } for p in range(len(ids[time_index])): p_id = ids[time_index][p] position = positions[time_index][p] neighbor_ids = [] for edge in edges: if p_id == edge[0]: neighbor_ids.append(edge[1]) elif p_id == edge[1]: neighbor_ids.append(edge[0]) result["particles"][ids[time_index][p]] = { "type_name": traj.species_name(types[time_index][p]), "position": np.array([position[0], position[1], position[2]]), "neighbor_ids": neighbor_ids, } return result @staticmethod def _shape_monomer_data_from_file( min_time, max_time, time_inc, times, topology_records, ids, types, positions, traj, ): """ For each time point, get monomer data and times """ print("Shaping data for analysis...") result = [] new_times = [] for t in tqdm(range(len(times))): if t >= min_time and t <= max_time and t % time_inc == 0: result.append( ReaddyUtil._shape_frame_monomer_data_from_file( t, topology_records, ids, types, positions, traj ) ) new_times.append(times[t]) return result, np.array(new_times)
[docs] @staticmethod def monomer_data_and_reactions_from_file( h5_file_path, stride=1, timestep=0.1, reaction_names=None, pickle_file_path=None, save_pickle_file=False, ): """ For data saved in a ReaDDy .h5 file: For each time point, get monomer data as a dictionary: { "topologies" : mapping of topology id to data for each topology: [id: int] : { "type_name" : str, "particle_ids" : List[int] } "particles" : mapping of particle id to data for each particle: [id: int] : { "type_name" : str, "position" : np.ndarray, "neighbor_ids" : List[int] } } Also return the counts of reactions over time, the timestamps for each frame, and the reaction time increment in seconds """ if pickle_file_path is not None: print("Loading pickle file for shaped data") import pickle data = [] with open(pickle_file_path, "rb") as f: while True: try: data.append(pickle.load(f)) except EOFError: break monomer_data, reactions, times, time_inc_s = data[0] return monomer_data, reactions, times, time_inc_s else: trajectory = readdy.Trajectory(h5_file_path) _, topology_records = trajectory.read_observable_topologies() ( times, types, ids, positions, ) = trajectory.read_observable_particles() monomer_data, times = ReaddyUtil._shape_monomer_data_from_file( 0, times.shape[0], stride, times, topology_records, ids, types, positions, trajectory, ) times = timestep / 1e3 * times # index --> microseconds # times = timestep * times # index --> nanoseconds reactions = None time_inc_s = None if reaction_names is not None: recorded_steps = stride * (len(times) - 1) reactions = ReaddyUtil.load_reactions( trajectory, stride, reaction_names, recorded_steps ) time_inc_s = times[-1] * 1e-6 / (len(times) - 1) data = [monomer_data, reactions, times, time_inc_s] if save_pickle_file: import pickle fname = h5_file_path + ".dat" with open(fname, "wb") as f: pickle.dump(data, f) return monomer_data, reactions, times, time_inc_s
[docs] @staticmethod def vector_is_invalid(v): """ check if any of a 3D vector's components are NaN """ return math.isnan(v[0]) or math.isnan(v[1]) or math.isnan(v[2])
[docs] @staticmethod def analyze_frame_get_count_of_topologies_of_type( time_index, topology_type, topology_records, traj ): """ Get the number of topologies of a given type at the given time index """ result = 0 for top in topology_records[time_index]: if traj.topology_type_name(top.type) == topology_type: result += 1 return result
[docs] @staticmethod def analyze_frame_get_neighbor_ids_of_types( particle_id, particle_types, frame_particle_data, exact_match, ): """ Get a list of ids for all the neighbors of particle_id with particle type in the given list of types in the given frame of data """ result = [] for neighbor_id in frame_particle_data["particles"][particle_id][ "neighbor_ids" ]: if neighbor_id in frame_particle_data["particles"]: type_name = frame_particle_data["particles"][neighbor_id]["type_name"] for particle_type in particle_types: if (exact_match and type_name == particle_type) or ( not exact_match and particle_type in type_name ): result.append(neighbor_id) break return result
[docs] @staticmethod def analyze_frame_get_ids_for_types(particle_types, frame_particle_data): """ Get a list of ids for all the particles with particle type in the given list of types in the given frame of data """ result = [] for p_id in frame_particle_data["particles"]: if frame_particle_data["particles"][p_id]["type_name"] in particle_types: result.append(p_id) return result
[docs] @staticmethod def analyze_frame_get_id_for_neighbor_of_types( particle_id, neighbor_types, frame_particle_data, exclude_ids=[], exact_match=True, ): """ Get the id for the first neighbor with one of the neighbor_types in the given frame of data """ for neighbor_id in frame_particle_data["particles"][particle_id][ "neighbor_ids" ]: if neighbor_id not in frame_particle_data["particles"]: neighbor_id = str(neighbor_id) if neighbor_id in exclude_ids: continue current_neighbor_type = frame_particle_data["particles"][neighbor_id][ "type_name" ] for neighbor_type in neighbor_types: if (exact_match and current_neighbor_type == neighbor_type) or ( not exact_match and ( neighbor_type in current_neighbor_type or current_neighbor_type in neighbor_type ) ): return neighbor_id return None
[docs] @staticmethod def analyze_frame_get_chain_of_types( start_particle_id, neighbor_types, frame_particle_data, chain_length=0, last_particle_id=None, result=[], next_neighbor_index=None, exact_match=True, ): """ Starting from the particle with start_particle_id, get ids for a chain of particles with neighbor_types in the given frame of data, avoiding the particle with last_particle_id, if chain_length = 0, return entire chain """ if next_neighbor_index is not None: n_types = [neighbor_types[next_neighbor_index]] else: n_types = neighbor_types n_id = ReaddyUtil.analyze_frame_get_id_for_neighbor_of_types( start_particle_id, n_types, frame_particle_data, [last_particle_id] if last_particle_id is not None else [], exact_match=exact_match, ) if n_id is None: return result result.append(n_id) if chain_length == 1: return result next_neighbor_index = (next_neighbor_index + 1) % len(neighbor_types) return ReaddyUtil.analyze_frame_get_chain_of_types( n_id, neighbor_types, frame_particle_data, chain_length - 1 if chain_length > 0 else 0, start_particle_id, result, next_neighbor_index=next_neighbor_index, exact_match=exact_match, )
[docs] @staticmethod def load_reactions(trajectory, stride, total_reactions_mapping, recorded_steps=1e3): """ Read reaction counts per frame from a ReaDDy trajectory and create a DataFrame with the number of times each ReaDDy reaction and total set of reactions has happened by each time step / stride """ print("Loading reactions...") reaction_times, readdy_reactions = trajectory.read_observable_reaction_counts() flat_readdy_reactions = {} for rxn_type in readdy_reactions: flat_readdy_reactions = dict( flat_readdy_reactions, **readdy_reactions[rxn_type] ) reaction_names = list(np.column_stack(flat_readdy_reactions)[0]) reaction_data = np.column_stack([x for x in flat_readdy_reactions.values()]) interval = int((reaction_data.shape[0] - 1) * stride / float(recorded_steps)) reaction_data = np.sum( reaction_data[:-1].reshape(-1, interval, len(reaction_names)), axis=1 ) reactions_df = pd.DataFrame(reaction_data, columns=reaction_names) for total_rxn_name in total_reactions_mapping: if total_rxn_name in reactions_df: continue reactions_df[total_rxn_name] = 0.0 for rxn_name in total_reactions_mapping[total_rxn_name]: if rxn_name in reactions_df.columns: reactions_df[total_rxn_name] += reactions_df[rxn_name] else: print(f"Couldn't find {rxn_name} in ReaDDy reactions.") return reactions_df
# read in box size
[docs] @staticmethod def get_box_size(input_size): if isinstance(input_size, str): lengths = input_size.split(",") if len(lengths) != 3: print("INCORRECT BOX SIZE. PLEASE CHECK INPUT FILE.") return np.array([float(length) for length in lengths]) else: return np.array([float(input_size)] * 3)