#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
from typing import List, Callable, Tuple
import math
from ..trajectory_converter import TrajectoryConverter
from ..data_objects import (
TrajectoryData,
AgentData,
UnitData,
DimensionData,
DisplayData,
)
from ..constants import (
VIZ_TYPE,
DISPLAY_TYPE,
VALUES_PER_3D_POINT,
SUBPOINT_VALUES_PER_ITEM,
)
from ..exceptions import InputDataError
from .medyan_data import MedyanData
###############################################################################
log = logging.getLogger(__name__)
###############################################################################
[docs]
class MedyanConverter(TrajectoryConverter):
def __init__(
self,
input_data: MedyanData,
progress_callback: Callable[[float], None] = None,
callback_interval: float = 10,
):
"""
This object reads simulation trajectory outputs
from MEDYAN (http://medyan.org/)
and plot data and writes them in the JSON format used
by the Simularium viewer
Parameters
----------
input_data : MedyanData
An object containing info for reading
MEDYAN simulation trajectory outputs and plot data
progress_callback : Callable[[float], None] (optional)
Callback function that accepts 1 float argument and returns None
which will be called at a given progress interval, determined by
callback_interval requested, providing the current percent progress
Default: None
callback_interval : float (optional)
If a progress_callback was provided, the period between updates
to be sent to the callback, in seconds
Default: 10
"""
super().__init__(input_data, progress_callback, callback_interval)
self._data = self._read(input_data)
@staticmethod
def _draw_endpoints(line: str, object_type: str, input_data: MedyanData) -> bool:
"""
Parse a line of a MEDYAN snapshot.traj output file
and determine whether to also draw the endpoints as spheres
"""
if object_type == "motor" or object_type == "linker":
type_name = MedyanConverter._get_display_type_name(
line, object_type, input_data
)
if type_name in input_data.agents_with_endpoints:
return True
return False
@staticmethod
def _get_display_type_name(
line: str, object_type: str, input_data: MedyanData
) -> bool:
"""
Parse a line of a MEDYAN snapshot.traj output file
and return the type name to display for this agent type
"""
raw_tid = int(line.split()[2])
if raw_tid not in input_data.display_data[object_type]:
display_name = object_type + str(raw_tid)
input_data.display_data[object_type][raw_tid] = DisplayData(
name=display_name,
display_type=DISPLAY_TYPE.FIBER,
)
return display_name
else:
return input_data.display_data[object_type][raw_tid].name
@staticmethod
def _parse_data_dimensions(
lines: List[str], input_data: MedyanData
) -> DimensionData:
"""
Parse a MEDYAN snapshot.traj output file to get the max numbers
of subpoints per agent and agents per timestep, and number of timesteps
"""
result = DimensionData(0, 0)
agents = 0
at_frame_start = True
for line in lines:
if len(line) < 1:
at_frame_start = True
continue
if at_frame_start:
# start of timestep
cols = line.split()
if agents > result.max_agents:
result.max_agents = agents
agents = int(cols[2]) + int(cols[3]) + int(cols[4])
result.total_steps += 1
at_frame_start = False
elif "FILAMENT" in line or "LINKER" in line or "MOTOR" in line:
# start of object
if "FILAMENT" in line:
object_type = "filament"
elif "LINKER" in line:
object_type = "linker"
else:
object_type = "motor"
if MedyanConverter._draw_endpoints(line, object_type, input_data):
agents += 2
if "FILAMENT" in line:
# start of filament
# filaments N xyz points = 3 * N subpoints
cols = line.split()
subpoints = SUBPOINT_VALUES_PER_ITEM(DISPLAY_TYPE.FIBER) * int(
cols[3]
)
if result.max_subpoints < subpoints:
result.max_subpoints = subpoints
else:
# start of linker or motor
# all linkers and motors have 2 xyz points = 6 subpoints
if result.max_subpoints < 2 * SUBPOINT_VALUES_PER_ITEM(
DISPLAY_TYPE.FIBER
):
result.max_subpoints = 2 * SUBPOINT_VALUES_PER_ITEM(
DISPLAY_TYPE.FIBER
)
if agents > result.max_agents:
result.max_agents = agents
return result
def _get_trajectory_data(
self,
input_data: MedyanData,
) -> Tuple[AgentData, float]:
"""
Parse a MEDYAN snapshot.traj output file to get agents
"""
try:
lines = input_data.snapshot_file.get_contents().split("\n")
dimensions = MedyanConverter._parse_data_dimensions(lines, input_data)
except Exception as e:
raise InputDataError(f"Error reading input medyan data: {e}")
result = AgentData.from_dimensions(dimensions)
time_index = -1
at_frame_start = True
parsing_object = False
uids = {
"filament": {},
"linker": {},
"motor": {},
}
last_uid = 0
tids = {
"filament": {},
"linker": {},
"motor": {},
}
last_tid = 0
object_type = ""
draw_endpoints = False
line_count = 0
for line in lines:
if len(line) < 1:
at_frame_start = True
continue
cols = line.split()
if at_frame_start:
# start of timestep
time_index += 1
agent_index = 0
result.times[time_index] = float(cols[1])
result.n_agents[time_index] = int(cols[2]) + int(cols[3]) + int(cols[4])
at_frame_start = False
if "FILAMENT" in line or "LINKER" in line or "MOTOR" in line:
# start of object
result.viz_types[time_index][agent_index] = VIZ_TYPE.FIBER
if "FILAMENT" in line:
object_type = "filament"
elif "LINKER" in line:
object_type = "linker"
else:
object_type = "motor"
draw_endpoints = MedyanConverter._draw_endpoints(
line, object_type, input_data
)
# unique instance ID
raw_uid = int(cols[1])
if raw_uid not in uids[object_type]:
uids[object_type][raw_uid] = last_uid
last_uid += 1 if not draw_endpoints else 3
result.unique_ids[time_index][agent_index] = uids[object_type][raw_uid]
# type ID
raw_tid = int(cols[2])
if raw_tid not in tids[object_type]:
tids[object_type][raw_tid] = last_tid
last_tid += 1
# type name
result.types[time_index].append(
MedyanConverter._get_display_type_name(
line, object_type, input_data
)
)
# radius
radius = (
input_data.display_data[object_type][raw_tid].radius
if raw_tid in input_data.display_data[object_type]
and input_data.display_data[object_type][raw_tid].radius is not None
else 1.0
)
result.radii[time_index][agent_index] = radius
if object_type == "filament":
result.n_subpoints[time_index][agent_index] = int(
cols[3]
) * SUBPOINT_VALUES_PER_ITEM(DISPLAY_TYPE.FIBER)
else:
result.n_subpoints[time_index][
agent_index
] = 2 * SUBPOINT_VALUES_PER_ITEM(DISPLAY_TYPE.FIBER)
# draw endpoints?
if draw_endpoints:
for i in range(2):
result.viz_types[time_index][
agent_index + i + 1
] = VIZ_TYPE.DEFAULT
result.unique_ids[time_index][agent_index + i + 1] = (
uids[object_type][raw_uid] + i + 1
)
end_type = result.types[time_index][agent_index] + " End"
result.types[time_index].append(end_type)
if end_type not in result.display_data:
result.display_data[end_type] = DisplayData(
name=end_type,
display_type=DISPLAY_TYPE.SPHERE,
)
result.radii[time_index][agent_index + i + 1] = 2 * radius
parsing_object = True
elif parsing_object:
# object coordinates
for i in range(len(cols)):
result.subpoints[time_index][agent_index][i] = float(cols[i])
if draw_endpoints:
endpoint_index = math.floor(i / float(VALUES_PER_3D_POINT))
dim_index = i % VALUES_PER_3D_POINT
result.positions[time_index][agent_index + endpoint_index + 1][
dim_index
] = float(cols[i])
parsing_object = False
agent_index += 1
if draw_endpoints:
agent_index += 2
result.n_agents[time_index] += 2
line_count += 1
self.check_report_progress(line_count / len(lines))
result.n_timesteps = time_index + 1
if input_data.center:
result, scale_factor = TrajectoryConverter.center_and_scale_agent_data(
result, input_data.meta_data.scale_factor
)
else:
result, scale_factor = TrajectoryConverter.scale_agent_data(
result, input_data.meta_data.scale_factor
)
return (TrajectoryConverter.center_fiber_positions(result), scale_factor)
def _read(self, input_data: MedyanData) -> TrajectoryData:
"""
Return an object containing the data shaped for Simularium format
"""
print("Reading MEDYAN Data -------------")
agent_data, scale_factor = self._get_trajectory_data(input_data)
# get display data (geometry and color)
for object_type in input_data.display_data:
for tid in input_data.display_data[object_type]:
display_data = input_data.display_data[object_type][tid]
if (
display_data.display_type != DISPLAY_TYPE.NONE
and display_data.display_type != DISPLAY_TYPE.FIBER
):
display_data.display_type = DISPLAY_TYPE.FIBER
print(
f"{display_data.name} display type of "
f"{display_data.display_type.value} was changed to FIBER"
)
agent_data.display_data[display_data.name] = display_data
input_data.meta_data.scale_factor = scale_factor
input_data.meta_data._set_box_size()
return TrajectoryData(
meta_data=input_data.meta_data,
agent_data=agent_data,
time_units=UnitData("s"),
spatial_units=UnitData("nm", 1.0 / scale_factor),
plots=input_data.plots,
)