Skip to content

Using the Python API

This page documents the Python API. The main API is in the apitofsim.api module, most of which is re-exported from apitofsim.

Data classes

These classes input data to the simulation, as well as output data from intermediate steps in the simulation.

apitofsim.api.ClusterLike

Bases: ABC

A base class for cluster-like things.

get_frequencies abstractmethod

get_frequencies() -> Optional[ndarray]

This method returns an array of vibrational temperatures in Kelvin. In the case of an atom-like product this will return None.

Source code in python/apitofsim/api.py
@abstractmethod
def get_frequencies(self) -> Optional[numpy.ndarray]:
    """
    This method returns an array of vibrational temperatures in Kelvin.
    In the case of an atom-like product this will return None.
    """
    ...

apitofsim.ClusterData dataclass

ClusterData(
    mass: Quantity[float],
    electronic_energy: Quantity[float],
    rotations: ndarray,
    frequencies: ndarray,
)

Bases: ClusterLike

The basic physical data for a cluster.

electronic_energy instance-attribute

electronic_energy: Quantity[float]

The cluster's electronic energy

frequencies instance-attribute

frequencies: ndarray

From quantum chemistry calcuations, the vibrational temperatures in Kelvin for the cluster.

mass instance-attribute

mass: Quantity[float]

The cluster's mass

rotations instance-attribute

rotations: ndarray

From quantum chemistry calcuations, the rotational temperatures in Kelvin for the cluster. This is a 3 element array.

get_frequencies

get_frequencies() -> Optional[ndarray]
Source code in python/apitofsim/api.py
def get_frequencies(self) -> Optional[numpy.ndarray]:
    if self.is_atom_like_product():
        return None
    else:
        return numpy.asfortranarray(self.frequencies, dtype=numpy.float64)

into_cpp

into_cpp() -> ClusterData
Source code in python/apitofsim/api.py
def into_cpp(self) -> _ClusterData:
    frequencies = self.get_frequencies()
    if frequencies is None:
        frequencies = numpy.empty(0, dtype=numpy.float64, order="F")
    return _ClusterData(
        int(self.mass.to("amu").magnitude + 0.5),
        self.electronic_energy.to("hartree").magnitude,
        self.rotations,
        frequencies,
    )

is_atom_like_product

is_atom_like_product() -> bool
Source code in python/apitofsim/api.py
def is_atom_like_product(self) -> bool:
    return self.frequencies is None

apitofsim.ProductsCluster dataclass

ProductsCluster(
    cluster1: ClusterData, cluster2: ClusterData
)

Bases: ClusterLike

A combination of two clusters representing the products of a fragmentation pathway. This is used to compute the density of states and derived quantities for the pathway products at the point of collision.

cluster1 instance-attribute

cluster1: ClusterData

One cluster

cluster2 instance-attribute

cluster2: ClusterData

The other cluster

get_frequencies

get_frequencies() -> Optional[ndarray]
Source code in python/apitofsim/api.py
def get_frequencies(self) -> Optional[numpy.ndarray]:
    frequencies1 = self.cluster1.get_frequencies()
    frequencies2 = self.cluster2.get_frequencies()
    if frequencies1 is None and frequencies2 is None:
        raise ValueError(
            "Cannot have a ProductCluster with both clusters being atom-like products"
        )
    if frequencies2 is None:
        frequencies = frequencies1
    elif frequencies1 is None:
        frequencies = frequencies2
    else:
        frequencies = numpy.concatenate((frequencies1, frequencies2))
    return numpy.asfortranarray(frequencies, dtype=numpy.float64)

apitofsim.Gas dataclass

Gas(
    radius: Quantity[float],
    mass: Quantity[float],
    adiabatic_index: float,
)

The physical quantities related to gas in the mass spectrometer

adiabatic_index instance-attribute

adiabatic_index: float

The adiabatic index of the gas particle

mass instance-attribute

mass: Quantity[float]

The mass of the gas particle

radius instance-attribute

radius: Quantity[float]

The radius of the gas particle

into_cpp

into_cpp() -> Gas
Source code in python/apitofsim/api.py
def into_cpp(self) -> _Gas:
    return _Gas(
        self.radius.to("m").magnitude,
        self.mass.to("kg").magnitude,
        self.adiabatic_index,
    )

apitofsim.Histogram dataclass

Histogram(x: Quantity[ndarray], y: ndarray)

A container for histogrammed data used to store precomputed density of states and rate constants.

You should not typically need to construc this yourself.

x instance-attribute

x: Quantity[ndarray]

y instance-attribute

y: ndarray

from_cpp classmethod

from_cpp(histogram: Histogram)
Source code in python/apitofsim/api.py
@classmethod
def from_cpp(cls, histogram: _Histogram):
    return cls(Q_(histogram.x, "kelvin"), histogram.y)

from_mesh classmethod

from_mesh(bin_width, x_max, y)
Source code in python/apitofsim/api.py
@classmethod
def from_mesh(cls, bin_width, x_max, y):
    bin_width_mag = bin_width.to("kelvin").magnitude
    m_max = int(x_max.to("kelvin").magnitude / bin_width_mag)
    return cls.from_cpp(_Histogram(bin_width_mag, m_max, y))

into_cpp

into_cpp() -> Histogram
Source code in python/apitofsim/api.py
def into_cpp(self) -> _Histogram:
    return _Histogram(self.x.to("kelvin").magnitude, self.y)

apitofsim.Quadrupole dataclass

Quadrupole(
    dc_field: Quantity[float],
    ac_field: Quantity[float],
    radiofrequency: Quantity[float],
    r_quadrupole: Quantity[float],
)

Configuration values related to the quadrupole mass filter in the mass spectrometer, if present

ac_field instance-attribute

ac_field: Quantity[float]

The AC voltage applied to the quadrupole rods

dc_field instance-attribute

dc_field: Quantity[float]

The DC voltage applied to the quadrupole rods

r_quadrupole instance-attribute

r_quadrupole: Quantity[float]

The distance from the center of the quadrupole to the rods

radiofrequency instance-attribute

radiofrequency: Quantity[float]

The radiofrequency of the AC voltage applied to the quadrupole rods

into_cpp

into_cpp() -> Quadrupole
Source code in python/apitofsim/api.py
def into_cpp(self) -> _Quadrupole:
    return _Quadrupole(
        self.dc_field.to("volts").magnitude,
        self.ac_field.to("volts").magnitude,
        self.radiofrequency.to("hertz").magnitude,
        self.r_quadrupole.to("m").magnitude,
    )

apitofsim.MassSpecInputFragmentationPathway

MassSpecInputFragmentationPathway(*args, **kwargs)

Construct a MassSpecInputFragmentationPathway

Source code in python/apitofsim/api.py
def MassSpecInputFragmentationPathway(*args, **kwargs):
    """
    Construct a MassSpecInputFragmentationPathway
    """
    process_arg = QuantityProcessor(kwargs.get("quantities_strict", True))

    def proc_bonding_energy(bonding_energy):
        if bonding_energy is not None:
            return process_arg("bonding_energy", bonding_energy, "kelvin")

    get = ArgGetter(args, kwargs)
    if len(args) >= 1 and isinstance(args[0], ClusterLike) or "cluster_0" in kwargs:
        return _MassSpecInputFragmentationPathway(
            cluster_0=get("cluster_0", 0).into_cpp(),
            cluster_1=get("cluster_1", 1).into_cpp(),
            cluster_2=get("cluster_2", 2).into_cpp(),
            rate_const=get("rate_const", 3).into_cpp(),
            bonding_energy=proc_bonding_energy(get("bonding_energy", 4, None)),
        )
    else:
        return _MassSpecInputFragmentationPathway(
            rate_const=get("rate_const", 0).into_cpp(),
            bonding_energy=proc_bonding_energy(get("bonding_energy", 1, None)),
        )

apitofsim.MassSpecSubstanceInput

MassSpecSubstanceInput(*args, **kwargs)

Construct a MassSpecSubstanceInput

Source code in python/apitofsim/api.py
def MassSpecSubstanceInput(*args, **kwargs):
    """
    Construct a MassSpecSubstanceInput
    """
    get = ArgGetter(args, kwargs)
    if len(args) >= 1 and isinstance(args[0], ClusterLike) or "cluster_0" in kwargs:
        if len(args) >= 2 and isinstance(args[1], ClusterLike) or "cluster_1" in kwargs:
            return _MassSpecSubstanceInput(
                cluster_0=get("cluster_0", 0).into_cpp(),
                cluster_1=get("cluster_1", 1).into_cpp(),
                cluster_2=get("cluster_2", 2).into_cpp(),
                gas=get("gas", 3).into_cpp(),
                density_cluster=get("density_cluster", 4).into_cpp(),
                rate_const=get("rate_const", 5).into_cpp(),
                fragmentation_energy=get("fragmentation_energy", 6, None),
                cluster_charge_sign=get(
                    "cluster_charge_sign", 7, defaults.cluster_charge_sign
                ),
            )
        else:
            return _MassSpecSubstanceInput(
                cluster_0=get("cluster_0", 0).into_cpp(),
                pathways=get("pathways", 1),
                gas=get("gas", 2).into_cpp(),
                density_cluster=get("density_cluster", 3).into_cpp(),
                cluster_charge_sign=get(
                    "cluster_charge_sign", 4, defaults.cluster_charge_sign
                ),
            )
    else:
        return _MassSpecSubstanceInput(
            cluster_charge_sign=get("cluster_charge_sign", 0),
            m_ion=get("m_ion", 1),
            R_cluster=get("R_cluster", 2),
            density_cluster=get("density_cluster", 3).into_cpp(),
            pathway=get("pathway", 4),
            gas=get("gas", 5).into_cpp(),
        )

apitofsim.MassSpectrometer dataclass

MassSpectrometer(
    skimmer: ndarray,
    lengths: Quantity[ndarray],
    voltages: Quantity[ndarray],
    T: Quantity[float],
    pressures: Quantity[ndarray],
    *,
    mesh_skimmer: Quantity[float] | None = None,
    quadrupole: Quadrupole | None = None,
    radius_pinhole: Quantity[float] | None = Q_(1, "mm"),
)

The configuration values needed to simulate the mass spectrometer as well as the precomputed, histogrammed skimmer values.

T instance-attribute

T: Quantity[float]

The temperature in the mass spectrometer.

_ instance-attribute

_: KW_ONLY

lengths instance-attribute

lengths: Quantity[ndarray]

An array of the lengths of the different sections of the mass spectrometer

mesh_skimmer class-attribute instance-attribute

mesh_skimmer: Quantity[float] | None = None

The histogram mesh size used for the precomputed, histogrammed skimmer quantities. If not given this will be computed from the skimmer array if it has 6 columns, otherwise it must be supplied.

pressures instance-attribute

pressures: Quantity[ndarray]

The pressures in the two chambers of the mass spectrometer

quadrupole class-attribute instance-attribute

quadrupole: Quadrupole | None = None

The quadrupole configuration, if a quadrupole is present in the mass spectrometer.

radius_pinhole class-attribute instance-attribute

radius_pinhole: Quantity[float] | None = Q_(1, 'mm')

The radius of the pinhole in the skimmer, if present.

skimmer instance-attribute

skimmer: ndarray

A 2D array of values along the skimmer, with either XXXXCHECK 3 columns (r, vel, T) or 6 columns (x, r, vel, T, P, rho)

voltages instance-attribute

voltages: Quantity[ndarray]

The voltages applied at different points on the mass spectrometer

__post_init__

__post_init__()
Source code in python/apitofsim/api.py
def __post_init__(self):
    if self.skimmer.shape[1] == 3:
        if self.mesh_skimmer is None:
            raise ValueError(
                "mesh_skimmer must be supplied when 3 column array is given for skimmer"
            )
    elif self.skimmer.shape[1] == 6:
        if self.mesh_skimmer is not None:
            raise ValueError(
                "mesh_skimmer should not be supplied when 6 column array is given for skimmer"
            )
        self.mesh_skimmer = Q_(float(self.skimmer[1, 0] - self.skimmer[0, 0]), "m")
        self.skimmer = self.skimmer[:, 1:4]
    else:
        raise ValueError("skimmer must have 3 or 6 columns")

into_cpp

into_cpp()
Source code in python/apitofsim/api.py
def into_cpp(self):
    assert self.mesh_skimmer is not None
    return _MassSpectrometer(
        numpy.asfortranarray(self.skimmer),
        self.mesh_skimmer.to("m").magnitude,
        self.lengths.to("m").magnitude,
        self.voltages.to("volts").magnitude,
        self.T.to("K").magnitude,
        self.pressures.to("pascals").magnitude,
        self.quadrupole and self.quadrupole.into_cpp(),
        self.radius_pinhole and self.radius_pinhole.to("m").magnitude,
    )

Workflow interface

The apitofsim.workflow module, contains functions to keep cluster data in a database, convenient for running scaled-up simulations.

Typically an ExperimentDatabase is created and its tables created with db.create_tables(...) and then clusters are ingested e.g. with ingest_tree(...).

After this, multiple simulation can be run using the ExperimentRunner class.

apitofsim.workflow.ClusterDatabase

ClusterDatabase(
    filename, *, readonly=False, ase_filename=None
)
Source code in python/apitofsim/workflow/db.py
def __init__(self, filename, *, readonly=False, ase_filename=None):
    self.closed = False
    self.cleanup = None
    if readonly:
        self.db, self.cleanup = duckdb_connect_roview_cow(
            filename, fallback="connect"
        )
    else:
        self.db = duckdb.connect(filename)
    if ase_filename is not None:
        # TODO: These ClusterDatabase, etc. objects should probably be context managers too
        self.ase_db = connect_ase_db(ase_filename, type="db").__enter__()
    else:
        self.ase_db = None
    self._setup_db()

TABLES class-attribute instance-attribute

TABLES = [pathway]

ase_db instance-attribute

ase_db = __enter__()

cleanup instance-attribute

cleanup = None

closed instance-attribute

closed = False

db instance-attribute

db = connect(filename)

__del__

__del__()
Source code in python/apitofsim/workflow/db.py
def __del__(self):
    self.close()

_cluster_obj_from_tuple staticmethod

_cluster_obj_from_tuple(cluster)
Source code in python/apitofsim/workflow/db.py
@staticmethod
def _cluster_obj_from_tuple(cluster):
    return ClusterData(
        Q_(cluster.atomic_mass, "amu"),
        Q_(cluster.electronic_energy, "hartree"),
        cluster.rotational_temperatures,
        cluster.vibrational_temperatures,
    )

_setup_db

_setup_db()
Source code in python/apitofsim/workflow/db.py
def _setup_db(self):
    import os

    self.db.execute("SET preserve_insertion_order=false")
    if "DUCKDB_MEMORY_LIMIT" in os.environ:
        memory_limit = os.environ["DUCKDB_MEMORY_LIMIT"]
        self.db.execute(f"set memory_limit='{memory_limit}';")

close

close()
Source code in python/apitofsim/workflow/db.py
def close(self):
    if self.closed:
        return
    self.closed = True
    if self.cleanup is not None:
        self.cleanup()
    self.db.close()
    if self.ase_db is not None:
        self.ase_db.__exit__(None, None, None)

clusters_df

clusters_df(*args, **kwargs)
Source code in python/apitofsim/workflow/db.py
def clusters_df(self, *args, **kwargs):
    return self.clusters_query(*args, **kwargs).fetchdf().replace({pandas.NA: None})

clusters_dicts_indexed

clusters_dicts_indexed(*args, **kwargs)
Source code in python/apitofsim/workflow/db.py
def clusters_dicts_indexed(self, *args, **kwargs):
    ret = {}
    for d in self.iter_clusters_dicts(*args, **kwargs):
        ret[d["id"]] = d
    return ret

clusters_objects_indexed

clusters_objects_indexed(
    *args, include_name_lookup=False, **kwargs
)
Source code in python/apitofsim/workflow/db.py
def clusters_objects_indexed(self, *args, include_name_lookup=False, **kwargs):
    name_lookup = {}
    ret = {}
    for cluster in self.clusters_df(*args, **kwargs).itertuples():
        ret[cluster.id] = self._cluster_obj_from_tuple(cluster)
        if include_name_lookup:
            name_lookup[cluster.id] = cluster.common_name
    if include_name_lookup:
        return ret, name_lookup
    else:
        return ret

clusters_query

clusters_query(
    parent=None,
    pathways=None,
    parents_only=False,
    children_only=False,
)
Source code in python/apitofsim/workflow/db.py
def clusters_query(
    self, parent=None, pathways=None, parents_only=False, children_only=False
):
    if parents_only and children_only:
        raise ValueError("Cannot set both parents_only and children_only to True")
    query = self.db.table("cluster")
    if (parent is None and pathways is None) and not (
        parents_only or children_only
    ):
        # Shortcut for efficiency
        return query
    if parents_only:
        relevant_fragment = "cluster_id"
    elif children_only:
        relevant_fragment = "product1_id, product2_id"
    else:
        relevant_fragment = "cluster_id, product1_id, product2_id"
    pathways_query = self.pathways_query(parent, pathways)
    # relevant_cluster_ids = self.db.table("pathway").select(duckdb.SQLExpression(f"unnest([{relevant_fragment}])").alias("relevant_cluster_id"))
    # if parent is not None:
    # relevant_cluster_ids = relevant_cluster_ids.filter(duckdb.ColumnExpression('cluster_id ') == duckdb.ConstantExpression(parent))
    # relevant_cluster_ids = relevant_cluster_ids.distinct().fetchdf()
    relevant_cluster_ids = (
        pathways_query.select(
            duckdb.SQLExpression(f"unnest([{relevant_fragment}])").alias(
                "relevant_cluster_id"
            )
        )
        .distinct()
        .fetchdf()
    )
    return query.join(
        self.db.from_df(relevant_cluster_ids).set_alias("relevant"),
        condition="relevant.relevant_cluster_id = cluster.id",
    )

create_tables

create_tables()
Source code in python/apitofsim/workflow/db.py
def create_tables(self):
    sql = "\n".join(self.TABLES)
    self.db.execute(sql)
    if self.ase_db is not None:
        self.db.execute(
            """
            alter table cluster add column ase_mol_id integer default null;
            """
        )

get_all_lookups

get_all_lookups(parent=None, pathways=None)
Source code in python/apitofsim/workflow/db.py
def get_all_lookups(self, parent=None, pathways=None):
    if isinstance(parent, str):
        parent = self.db.db.execute(
            """
            select id
            from cluster
            where common_name = ?
            """,
            (parent,),
        ).fetchone()
        if parent is None:
            raise ValueError(f"No cluster found with name {parent}")
        parent = parent[0]
    if pathways is not None:
        pathways = list(pathways)
        if (
            len(pathways) > 0
            and isinstance(pathways[0], tuple)
            and isinstance(pathways[0][0], str)
        ):
            import pyarrow as pa

            wanted_tbl = pa.table(
                list(zip(*pathways)), names=["pathway", "product1", "product2"]
            )
            pathway_common_names = self.db.db.sql(
                """
                select
                    p.id as pathway_id,
                    c.common_name as cluster_common_name,
                    p1.common_name as product1_common_name,
                    p2.common_name as product2_common_name,

                from pathway p
                inner join cluster c on c.id = p.cluster_id
                inner join cluster p1 on p1.id = p.product1_id
                inner join cluster p2 on p2.id = p.product2_id;
                """
            )
            pathways = (
                pathway_common_names.join(
                    self.db.db.from_arrow(wanted_tbl).set_alias("wanted"),
                    condition=(
                        "wanted.pathway = cluster_common_name "
                        "and ((wanted.product1 = product1_common_name and wanted.product2 = product2_common_name) "
                        "or (wanted.product1 = product2_common_name and wanted.product2 = product1_common_name))"
                    ),
                )
                .select("pathway_id")
                .to_arrow_table()["pathway_id"]
            )
    cluster_indexed, name_lookup = self.clusters_objects_indexed(
        include_name_lookup=True, parent=parent, pathways=pathways
    )
    pathway_lookup = {}
    for pathway_id, cluster_id, product1_id, product2_id in self.pathways_ids(
        parent=parent, pathways=pathways, sort=True
    ):
        pathway_lookup[pathway_id] = (cluster_id, product1_id, product2_id)

    return cluster_indexed, name_lookup, pathway_lookup

insert_ase

insert_ase(cluster_id, ase_mol)
Source code in python/apitofsim/workflow/db.py
def insert_ase(self, cluster_id, ase_mol):
    if self.ase_db is None:
        raise ValueError("ASE database not initialized")
    ase_mol_id = self.ase_db.write(ase_mol)
    self.db.execute(
        "update cluster set ase_mol_id = ? where id = ?", (ase_mol_id, cluster_id)
    )

insert_cluster

insert_cluster(
    name,
    atomic_mass,
    charge,
    electronic_energy,
    rotational_temperatures,
    vibrational_temperatures,
    import_info=None,
    *,
    ase_id=None,
    allow_duplicates=False,
)
Source code in python/apitofsim/workflow/db.py
def insert_cluster(
    self,
    name,
    atomic_mass,
    charge,
    electronic_energy,
    rotational_temperatures,
    vibrational_temperatures,
    import_info=None,
    *,
    ase_id=None,
    allow_duplicates=False,
):
    if ase_id is not None and self.ase_db is None:
        raise ValueError("ASE database not initialized, cannot insert ASE molecule")
    value_names = [
        "atomic_mass",
        "charge",
        "electronic_energy",
        "rotational_temperatures",
        "vibrational_temperatures",
    ]
    values = [
        atomic_mass,
        charge,
        electronic_energy,
        rotational_temperatures,
        vibrational_temperatures,
    ]
    existing = self.db.execute(
        """
        select
            id,
            atomic_mass,
            charge,
            electronic_energy,
            rotational_temperatures,
            vibrational_temperatures
        from cluster
        where common_name = ?
        """,
        (name,),
    ).fetchone()
    if existing is not None:
        for value_name, existing_value, new_value in zip(
            value_names, existing[1:], values
        ):
            if not numpy.array_equal(existing_value, new_value):
                if not allow_duplicates:
                    raise ValueError(
                        f"Cluster with name '{name}' already exists with different {value_name}: existing={existing_value}, new={new_value}"
                    )
                if "__" in name:
                    barename, num = name.rsplit("__", 1)
                    try:
                        num = int(num)
                    except ValueError:
                        barename = name
                        num = 0
                else:
                    barename = name
                    num = 0
                return self.insert_cluster(
                    f"{barename}__{num + 1}",
                    *values,
                    import_info=import_info,
                    allow_duplicates=True,
                )
        return False, existing[0]

    id = self.db.execute(
        f"insert into cluster values (default, ?, ?, ?, ?, ?, ?, ?, {'?' if self.ase_db is not None else ''}) returning id",
        (
            name,
            atomic_mass,
            charge,
            electronic_energy,
            rotational_temperatures,
            vibrational_temperatures,
            import_info,
            *((ase_id,) if self.ase_db is not None else ()),
        ),
    ).fetchone()
    assert id is not None
    return True, id[0]

insert_pathway

insert_pathway(parent_id, product1_id, product2_id)
Source code in python/apitofsim/workflow/db.py
def insert_pathway(self, parent_id, product1_id, product2_id):
    self.db.execute(
        "insert into pathway values (default, ?, ?, ?)",
        (parent_id, product1_id, product2_id),
    )

iter_clusters_dicts

iter_clusters_dicts(*args, **kwargs)
Source code in python/apitofsim/workflow/db.py
def iter_clusters_dicts(self, *args, **kwargs):
    for cluster in self.clusters_df(*args, **kwargs).itertuples():
        yield cluster._asdict()

iter_clusters_objects

iter_clusters_objects(*args, **kwargs)
Source code in python/apitofsim/workflow/db.py
def iter_clusters_objects(self, *args, **kwargs):
    for cluster in self.clusters_df(*args, **kwargs).itertuples():
        obj = self._cluster_obj_from_tuple(cluster)
        yield obj

pathways_ids

pathways_ids(parent=None, **kwargs)
Source code in python/apitofsim/workflow/db.py
def pathways_ids(self, parent=None, **kwargs):
    query = self.pathways_query(parent, **kwargs)
    for pathway in query.fetchdf().itertuples():
        yield (
            pathway.id,
            pathway.cluster_id,
            pathway.product1_id,
            pathway.product2_id,
        )

pathways_objs

pathways_objs(*args, indexed=None, **kwargs)
Source code in python/apitofsim/workflow/db.py
def pathways_objs(self, *args, indexed=None, **kwargs):
    if indexed is None:
        indexed = self.clusters_objects_indexed()
    for pathway_id, cluster_id, product1_id, product2_id in self.pathways_ids(
        *args, **kwargs
    ):
        yield (
            pathway_id,
            indexed[cluster_id],
            indexed[product1_id],
            indexed[product2_id],
        )

pathways_query

pathways_query(parent=None, pathways=None, sort=False)
Source code in python/apitofsim/workflow/db.py
def pathways_query(self, parent=None, pathways=None, sort=False):
    if parent is not None and pathways is not None:
        raise ValueError("Cannot specify both parent and pathways")
    pathway_rel = self.db.table("pathway")
    if pathways is not None:
        import pyarrow as pa

        wanted_tbl = pa.table([pathways], names=["pathway_id"])
        pathway_rel = pathway_rel.join(
            self.db.from_arrow(wanted_tbl).set_alias("wanted"),
            condition="wanted.pathway_id = pathway.id",
        )
    if parent is not None:
        pathway_rel = pathway_rel.filter(
            duckdb.ColumnExpression("cluster_id")
            == duckdb.ConstantExpression(parent)
        )
    if sort:
        pathway_rel = pathway_rel.sort(
            "cluster_id",  # pyright: ignore[reportArgumentType]
            "product1_id",  # pyright: ignore[reportArgumentType]
            "product2_id",  # pyright: ignore[reportArgumentType]
        )
    return pathway_rel

apitofsim.workflow.SuperClusterDatabase

SuperClusterDatabase(filename, **kwargs)

Bases: ClusterDatabase

Source code in python/apitofsim/workflow/db.py
def __init__(self, filename, **kwargs):
    super().__init__(filename, **kwargs)

TABLES class-attribute instance-attribute

TABLES = [pathway, histograms, pathway_report]

refresh_views

refresh_views()
Source code in python/apitofsim/workflow/db.py
def refresh_views(self):
    self.db.execute(sql_files.pathway_report)
    self.db.execute(sql_files.experiment_report)

apitofsim.workflow.ExperimentDatabase

ExperimentDatabase(filename, **kwargs)

Bases: SuperClusterDatabase

Source code in python/apitofsim/workflow/db.py
def __init__(self, filename, **kwargs):
    super().__init__(filename, **kwargs)

TABLES class-attribute instance-attribute

TABLES = [
    pathway,
    histograms,
    experiment,
    pathway_report,
    experiment_report,
]

forget

forget(runs=False, configs=False, derived=False, all=False)
Source code in python/apitofsim/workflow/db.py
def forget(self, runs=False, configs=False, derived=False, all=False):
    if all:
        runs = True
        configs = True
        derived = True

    if configs:
        self.db.execute("truncate experiment_config")

    if runs or configs:
        for tbl in [
            "experiment_run",
            "single_pathway_experiment_result",
            "multi_pathway_experiment_result",
            "pathway_fragmentation",
            "experiment_failure",
        ]:
            self.db.execute(f"truncate {tbl}")

    if derived:
        for tbl in ["cluster_dos", "products_dos", "k_rate"]:
            self.db.execute(f"truncate {tbl}")

insert_config

insert_config(name, config)
Source code in python/apitofsim/workflow/db.py
def insert_config(self, name, config):
    from apitofsim.config import dump_to_raw

    if isinstance(config, dict):
        config = dump_to_raw(config).decode("utf-8")
    id = self.db.execute(
        "insert into experiment_config values (default, ?, ?::json) returning id",
        (name, config),
    ).fetchone()
    assert id is not None
    return id[0]

insert_run

insert_run(config_id=None, pathway_at_a_time=False)
Source code in python/apitofsim/workflow/db.py
def insert_run(self, config_id=None, pathway_at_a_time=False):
    id = self.db.execute(
        "insert into experiment_run values (default, ?, ?, current_timestamp) returning id",
        (config_id, pathway_at_a_time),
    ).fetchone()
    assert id is not None
    return id[0]

is_experiment_db

is_experiment_db()
Source code in python/apitofsim/workflow/db.py
def is_experiment_db(self):
    try:
        self.db.table("experiment_config")
    except duckdb.CatalogException:
        return False
    else:
        return True

iter_configs

iter_configs(name=None)
Source code in python/apitofsim/workflow/db.py
def iter_configs(self, name=None):
    import orjson

    from apitofsim.config import import_raw_config

    query = self.db.table("experiment_config")
    if name is not None:
        if isinstance(name, (tuple, list)):
            query = query.filter(
                duckdb.ColumnExpression("name").isin(
                    *(duckdb.ConstantExpression(n) for n in name)
                )
            )
        else:
            query = query.filter(
                duckdb.ColumnExpression("name") == duckdb.ConstantExpression(name)
            )
    for id, name, config in query.fetchall():
        yield ConfigRow(id, name, import_raw_config(orjson.loads(config)))

record_failure

record_failure(
    run_id,
    exc_name,
    msg,
    overflow_requested=None,
    pathway_id=None,
    cluster_id=None,
)
Source code in python/apitofsim/workflow/db.py
def record_failure(
    self,
    run_id,
    exc_name,
    msg,
    overflow_requested=None,
    pathway_id=None,
    cluster_id=None,
):
    id = self.db.execute(
        "insert into experiment_failure values (default, ?, ?, ?, ?, ?, ?) returning id",
        (run_id, pathway_id, cluster_id, exc_name, msg, overflow_requested),
    ).fetchone()
    assert id is not None
    return id[0]

record_result

record_result(
    run_id,
    counters,
    timings,
    pathway_id=None,
    cluster_id=None,
    pathway_ids=None,
)
Source code in python/apitofsim/workflow/db.py
def record_result(
    self,
    run_id,
    counters,
    timings,
    pathway_id=None,
    cluster_id=None,
    pathway_ids=None,
):
    if pathway_id is None and (cluster_id is None or pathway_ids is None):
        raise ValueError(
            "Either pathway_id or cluster_id and pathway_ids must be provided"
        )
    if pathway_id is not None:
        id = self.db.execute(
            "insert into single_pathway_experiment_result values (default, ?, ?, ?, ?, ?, ?, ?, ?, ?) returning id",
            (
                run_id,
                pathway_id,
                int(timings.loop / timedelta(microseconds=1)),
                int(timings.total / timedelta(microseconds=1)),
                int(counters.nwarnings),
                int(counters.n_fragmented_total[0]),
                int(counters.n_escaped_total),
                int(counters.ncoll_total),
                int(counters.counter_collision_rejections),
            ),
        ).fetchone()
    else:
        id = self.db.execute(
            "insert into multi_pathway_experiment_result values (default, ?, ?, ?, ?, ?, ?, ?, ?) returning id",
            (
                run_id,
                cluster_id,
                int(timings.loop / timedelta(microseconds=1)),
                int(timings.total / timedelta(microseconds=1)),
                int(counters.nwarnings),
                int(counters.n_escaped_total),
                int(counters.ncoll_total),
                int(counters.counter_collision_rejections),
            ),
        ).fetchone()
        assert pathway_ids is not None
        assert id is not None
        for pathway_id, fragmented in zip(
            pathway_ids, counters.n_fragmented_total, strict=True
        ):
            self.db.execute(
                "insert into pathway_fragmentation values (default, ?, ?, ?)",
                (id[0], pathway_id, int(fragmented)),
            )
    assert id is not None
    return id[0]

report_df

report_df(tbl_name)
Source code in python/apitofsim/workflow/db.py
def report_df(self, tbl_name):
    return self.db.table(tbl_name).fetchdf()

apitofsim.workflow.ingest_tree

ingest_tree(
    db: ClusterDatabase,
    pathways,
    path_base,
    descriptor=None,
    ingest_ase=None,
)

Ingest a tree of fragmentation pathways into the database.

  • pathways can be a list or single element, and matches the configuration

  • path_base is used to resolve relative paths in the config

  • descriptor is used in case the config is parsed from TOML, to provide line numbers for errors, and so not typically used outside apitofsim itself.

  • ingest_ase is a boolean that controls whether to ingest ASE information. By default, it will be be true iff you pass a ClusterDatabase.

Source code in python/apitofsim/workflow/ingest.py
def ingest_tree(
    db: ClusterDatabase, pathways, path_base, descriptor=None, ingest_ase=None
):
    """
    Ingest a tree of fragmentation pathways into the database.

     * `pathways` can be a list or single element, and matches the configuration

     * `path_base` is used to resolve relative paths in the config

     * `descriptor` is used in case the config is parsed from TOML,
       to provide line numbers for errors,
       and so not typically used outside apitofsim itself.

     * `ingest_ase` is a boolean that controls whether to ingest ASE information.
       By default, it will be be true iff you pass a `ClusterDatabase`.
    """
    if ingest_ase is None:
        ingest_ase = db.ase_db is not None

    if isinstance(pathways, list):
        for idx, pathways_segment in enumerate(pathways):
            ingest_tree(
                db,
                pathways_segment,
                path_base,
                (descriptor, idx),
                ingest_ase=ingest_ase,
            )
        return
    if pathways["type"] == "legacy_glob":
        if ingest_ase:
            raise ValueError(
                "Legacy glob pathway ingestion does not support ASE ingestion (ingest_ase=True)"
            )
        from apitofsim.ingest.legacy import parse_legacy_tree

        for pathway in parse_legacy_tree(
            pathways["path"], pathways["clusters"], path_base
        ):
            insert_parsed_pathway(db, pathway, prefix=pathways.get("prefix"))
    elif pathways["type"] == "csv":
        from apitofsim.ingest.csv import parse_csv_tree

        clusters_path = path_base / pathways["clusters_path"]
        current_path_base = clusters_path.parent

        for pathway in parse_csv_tree(
            path_base / pathways["pathways_path"],
            clusters_path,
            pathways["clusters"],
            current_path_base,
            descriptor=descriptor,
            ingest_ase=ingest_ase,
        ):
            insert_parsed_pathway(db, pathway, prefix=pathways.get("prefix"))

apitofsim.workflow.ExperimentRunner

ExperimentRunner(db: ExperimentDatabase)

This class helps with running the simulation across configs and clusters/pathways in an ExperimentDatabase.

It also records results and failures back into the database, and can optionally print progress tables to the terminal.

It precomputes all histograms of density of states and rate constants as needed, and caches them in the database for future runs.

Source code in python/apitofsim/workflow/runners.py
def __init__(self, db: ExperimentDatabase):
    self.db = db
    self.preparer = DerivedDataPreparer(db)
    self.current_run_id = None

current_run_id instance-attribute

current_run_id = None

db instance-attribute

db = db

preparer instance-attribute

preparer = DerivedDataPreparer(db)

_guard_run_started

_guard_run_started()
Source code in python/apitofsim/workflow/runners.py
def _guard_run_started(self):
    if self.current_run_id is None:
        raise RuntimeError("No experiment run started; call start_run() first")

_mk_update_from_counters

_mk_update_from_counters(table, pbar)
Source code in python/apitofsim/workflow/runners.py
def _mk_update_from_counters(self, table, pbar):
    def update_from_counters(counters):
        fragmented_total = counter_fragmented_total(counters)
        realizations = fragmented_total + counters.n_escaped_total
        table["Frags"] = int(fragmented_total)
        table["Intacts"] = int(counters.n_escaped_total)
        table["Avg colls"] = counters.ncoll_total / realizations
        table["PH rej"] = int(counters.counter_collision_rejections)
        table["Surv prob"] = counters.n_escaped_total / realizations
        table["Warns"] = int(counters.nwarnings)
        pbar.set_step(realizations)

    return update_from_counters

_run_cluster_grouped

_run_cluster_grouped(
    mass_spec,
    config,
    cluster_indexed,
    name_lookup,
    pathway_lookup,
    k_rates,
    cluster_dos,
    strict_dos=True,
    verbose=False,
)
Source code in python/apitofsim/workflow/runners.py
def _run_cluster_grouped(
    self,
    mass_spec,
    config,
    cluster_indexed,
    name_lookup,
    pathway_lookup,
    k_rates,
    cluster_dos,
    strict_dos=True,
    verbose=False,
):
    from os import environ
    from timeit import default_timer as timer
    from typing import Any

    from progress_table import ProgressTable

    from apitofsim.api import Histogram

    last_cluster_id = None
    groups = []
    cur_group: dict[str, Any] | None = None
    for pathway_id, (
        cluster_id,
        product1_id,
        product2_id,
    ) in pathway_lookup.items():
        rate_const = k_rates[pathway_id]
        density_cluster = cluster_dos[pathway_id]
        cluster = cluster_indexed[cluster_id]
        if cluster_id != last_cluster_id:
            if cur_group is not None:
                groups.append(cur_group)
            density_hist = Histogram.from_mesh(
                config["bin_width"],
                config["energy_max"],
                density_cluster,
            )
            cur_group = {
                "pathways": [],
                "cluster": cluster,
                "cluster_id": cluster_id,
                "cluster_label": name_lookup[cluster_id],
                "density_hist": density_hist,
                "product_labels": [],
                "pathway_ids": [],
            }
        product1 = cluster_indexed[product1_id]
        product2 = cluster_indexed[product2_id]
        rate_hist = Histogram.from_mesh(
            config["bin_width"],
            config["energy_max_rate"],
            rate_const,
        )
        assert cur_group is not None
        cur_group["pathways"].append(
            MassSpecInputFragmentationPathway(
                cluster, product1, product2, rate_hist
            )
        )
        cur_group["product_labels"].append(
            f"{name_lookup[product1_id]} + {name_lookup[product2_id]}"
        )
        cur_group["pathway_ids"].append(pathway_id)
        last_cluster_id = cluster_id
    if cur_group is not None:
        groups.append(cur_group)

    mass_spec_table = ProgressTable(
        default_column_alignment="right",
        refresh_rate=0,
        interactive=0 if verbose else int(environ.get("PTABLE_INTERACTIVE", "2")),
    )
    mass_spec_table.add_column("Cluster", width=16, alignment="left")
    mass_spec_table.add_column("Paths", width=5, alignment="left")
    mass_spec_table.add_column("Frags", width=5)
    mass_spec_table.add_columns(
        "Intacts",
        "Avg colls",
    )
    mass_spec_table.add_column("PH rej", width=6)
    mass_spec_table.add_column("Warns", width=5)
    mass_spec_table.add_columns(
        "Surv prob",
        "Time (s)",
    )
    cluster_seq = 1
    outer_pbar = mass_spec_table(
        groups,
        description="Cluster",
        show_throughput=False,
        show_progress=True,
        show_eta=True,
        position=2,
    )
    for group in outer_pbar:
        mass_spec_table["Cluster"] = group["cluster_label"]
        mass_spec_table["Paths"] = str(len(group["pathways"]))
        start = timer()
        realizations = (
            int(environ["N_OVERRIDE"]) if "N_OVERRIDE" in environ else config["N"]
        )
        subs = MassSpecSubstanceInput(
            group["cluster"],
            group["pathways"],
            config["gas"],
            group["density_hist"],
            config.get("cluster_charge_sign", defaults.cluster_charge_sign),
        )
        inner_pbar = mass_spec_table(
            realizations, position=1, description="Realization"
        )

        update_from_counters = self._mk_update_from_counters(
            mass_spec_table, inner_pbar
        )
        counters = self.run_mass_spec(
            mass_spec,
            subs,
            realizations,
            cluster_id=group["cluster_id"],
            pathway_ids=group["pathway_ids"],
            sample_mode=2,
            loglevel=1 if verbose else 0,
            strict="STRICT" in environ,
            strict_dos=strict_dos,
            result_callback=update_from_counters,
        )
        t_total = timer() - start
        if counters is None:
            for k in [
                "Frags",
                "Intacts",
                "Avg colls",
                "PH rej",
                "Surv prob",
                "Warns",
            ]:
                mass_spec_table[k] = "FAIL"
        else:
            update_from_counters(counters)
        mass_spec_table["Time (s)"] = f"{t_total:.2f}"
        inner_pbar.close()
        mass_spec_table.next_row()
        cluster_seq += 1
    mass_spec_table.close()

_run_pathways_at_a_time

_run_pathways_at_a_time(
    mass_spec,
    config,
    cluster_indexed,
    name_lookup,
    pathway_lookup,
    k_rates,
    cluster_dos,
    strict_dos=True,
    parent=None,
    verbose=False,
)
Source code in python/apitofsim/workflow/runners.py
def _run_pathways_at_a_time(
    self,
    mass_spec,
    config,
    cluster_indexed,
    name_lookup,
    pathway_lookup,
    k_rates,
    cluster_dos,
    strict_dos=True,
    parent=None,
    verbose=False,
):
    from os import environ
    from timeit import default_timer as timer

    from progress_table import ProgressTable

    from apitofsim.api import Histogram

    mass_spec_table = ProgressTable(
        default_column_alignment="right",
        refresh_rate=0,
        interactive=0 if verbose else int(environ.get("PTABLE_INTERACTIVE", "2")),
    )
    mass_spec_table.add_column("#")
    mass_spec_table.add_column("Cluster", alignment="left")
    mass_spec_table.add_column("Products", width=16, alignment="left")
    mass_spec_table.add_columns(
        "Frags",
        "Intacts",
        "Avg colls",
        "PH rej",
    )
    mass_spec_table.add_column("Warns", width=5)
    mass_spec_table.add_columns(
        "Surv prob",
        "Time (s)",
    )
    cluster_seq = 1
    outer_pbar = mass_spec_table(
        pathway_lookup.items(),
        description="Pathway",
        show_throughput=False,
        show_progress=True,
        show_eta=True,
        position=2,
    )
    for pathway_id, (cluster_id, product1_id, product2_id) in outer_pbar:
        density_cluster = cluster_dos[pathway_id]
        rate_const = k_rates[pathway_id]
        cluster = cluster_indexed[cluster_id]
        product1 = cluster_indexed[product1_id]
        product2 = cluster_indexed[product2_id]
        mass_spec_table["#"] = f"{cluster_seq}/{len(pathway_lookup)}"
        mass_spec_table["Cluster"] = name_lookup[cluster_id]
        mass_spec_table["Products"] = (
            f"{name_lookup[product1_id]} + {name_lookup[product2_id]}"
        )
        start = timer()
        density_hist = Histogram.from_mesh(
            config["bin_width"],
            config["energy_max"],
            density_cluster,
        )
        rate_hist = Histogram.from_mesh(
            config["bin_width"],
            config["energy_max_rate"],
            rate_const,
        )
        subs = MassSpecSubstanceInput(
            cluster,
            product1,
            product2,
            config["gas"],
            density_hist,
            rate_hist,
            fragmentation_energy=config.get("fragmentation_energy"),
            cluster_charge_sign=config.get(
                "cluster_charge_sign", defaults.cluster_charge_sign
            ),
        )
        realizations = (
            int(environ["N_OVERRIDE"]) if "N_OVERRIDE" in environ else config["N"]
        )
        inner_pbar = mass_spec_table(
            realizations, position=1, description="Realization"
        )

        def log_callback(typ, msg):
            msg = msg.rstrip()
            print(f"Channel: {typ}; Msg: {msg}")

        update_from_counters = self._mk_update_from_counters(
            mass_spec_table, inner_pbar
        )
        counters = self.run_mass_spec(
            mass_spec,
            subs,
            realizations,
            pathway_id=pathway_id,
            sample_mode=2,
            loglevel=1 if verbose else 0,
            strict="STRICT" in environ,
            strict_dos=strict_dos,
            result_callback=update_from_counters,
            log_callback=log_callback,
        )
        t_total = timer() - start
        if counters is None:
            for k in [
                "Frags",
                "Intacts",
                "Avg colls",
                "PH rej",
                "Surv prob",
                "Warns",
            ]:
                mass_spec_table[k] = "FAIL"
        else:
            update_from_counters(counters)
        mass_spec_table["Time (s)"] = f"{t_total:.2f}"
        inner_pbar.close()
        mass_spec_table.next_row()
        cluster_seq += 1
    mass_spec_table.close()

run_from_config

run_from_config(
    config,
    run_started=False,
    strict_dos=True,
    pathway_at_a_time=False,
    parent=None,
    pathways=None,
    verbose=False,
)
Source code in python/apitofsim/workflow/runners.py
def run_from_config(
    self,
    config,
    run_started=False,
    strict_dos=True,
    pathway_at_a_time=False,
    parent=None,
    pathways=None,
    verbose=False,
):
    if not run_started:
        self.start_run()

    cluster_indexed, name_lookup, pathway_lookup = self.db.get_all_lookups(
        parent, pathways
    )

    skimmer_np, k_rates, cluster_dos = self.preparer.run_preliminaries(
        config, cluster_indexed, pathway_lookup=pathway_lookup
    )

    assert isinstance(skimmer_np, numpy.ndarray)
    mass_spec = MassSpectrometer(
        skimmer_np,
        config["lengths"],
        config["voltages"],
        config["T"],
        config["pressures"],
        quadrupole=config.get("quadrupole"),
    )

    if pathway_at_a_time:
        self._run_pathways_at_a_time(
            mass_spec,
            config,
            cluster_indexed,
            name_lookup,
            pathway_lookup,
            k_rates,
            cluster_dos,
            strict_dos=strict_dos,
            verbose=verbose,
        )
    else:
        self._run_cluster_grouped(
            mass_spec,
            config,
            cluster_indexed,
            name_lookup,
            pathway_lookup,
            k_rates,
            cluster_dos,
            strict_dos=strict_dos,
            verbose=verbose,
        )

run_mass_spec

run_mass_spec(
    *args,
    pathway_id=None,
    cluster_id=None,
    pathway_ids=None,
    strict=False,
    strict_dos=True,
    **kwargs,
)
Source code in python/apitofsim/workflow/runners.py
def run_mass_spec(
    self,
    *args,
    pathway_id=None,
    cluster_id=None,
    pathway_ids=None,
    strict=False,
    strict_dos=True,
    **kwargs,
):
    self._guard_run_started()
    if pathway_id is None and cluster_id is None:
        raise ValueError("Either pathway_id or cluster_id must be provided")
    from apitofsim.api import mass_spec

    counters = None
    try:
        counters, timings = mass_spec(
            *args,
            **kwargs,
            named_tuple_counters=True,
            output_timings=True,
            strict=strict_dos,
        )
    except ApiTofError as e:
        if strict:
            raise
        overflow_requested = None
        if isinstance(e, ApiTofOverflowError):
            overflow_requested = e.current
        self.db.record_failure(
            self.current_run_id,
            type(e).__name__,
            str(e),
            overflow_requested,
            pathway_id=pathway_id,
            cluster_id=cluster_id,
        )
    else:
        self.db.record_result(
            self.current_run_id,
            counters,
            timings,
            pathway_id=pathway_id,
            pathway_ids=pathway_ids,
            cluster_id=cluster_id,
        )
    return counters

run_prepared_config

run_prepared_config(name=None, **kwargs)

Run from an experiment config that has been inserted into an ExperimentDatabase.

  • name is the name of an experiment config, a list thereof, or None to run all configs
Source code in python/apitofsim/workflow/runners.py
def run_prepared_config(self, name=None, **kwargs):
    """
    Run from an experiment config that has been inserted into an ExperimentDatabase.

     * `name` is the name of an experiment config, a list thereof, or None to run all configs
    """
    from pprint import pprint

    configs = list(self.db.iter_configs(name))
    for idx, row in enumerate(configs):
        print(f"# Running experiment config: {row.name} [{idx + 1}/{len(configs)}]")
        pprint(row.config)
        print()
        self.start_run(
            row.id, pathway_at_a_time=kwargs.get("pathway_at_a_time", False)
        )
        self.run_from_config(row.config, run_started=True, **kwargs)
        print()

start_run

start_run(config_id=None, **kwargs)
Source code in python/apitofsim/workflow/runners.py
def start_run(self, config_id=None, **kwargs):
    self.current_run_id = self.db.insert_run(config_id, **kwargs)

Individual simulation functions

The individual simulation functions are the low level interface to the simulation.

apitofsim.mass_spec

mass_spec(
    mass_spec: MassSpectrometer,
    subs: MassSpecSubstanceInput,
    N: int,
    *,
    sample_mode: SampleMode = rejection,
    strict=True,
    loglevel: int = 0,
    seed: int = 42,
    log_callback: Callable[[str, str], None] | None = None,
    result_callback: Callable[[ndarray], None]
    | None = None,
    named_tuple_counters=False,
    output_timings=False,
)

This function runs the main simulation of the APi-ToF mass spectrometer.

Source code in python/apitofsim/api.py
def mass_spec(
    mass_spec: MassSpectrometer,
    subs: _MassSpecSubstanceInput,
    N: int,
    *,
    sample_mode: SampleMode = SampleMode.rejection,
    strict=True,
    loglevel: int = 0,
    seed: int = 42,
    log_callback: Callable[[str, str], None] | None = None,
    result_callback: Callable[[numpy.ndarray], None] | None = None,
    named_tuple_counters=False,
    output_timings=False,
):
    """
    This function runs the main simulation of the APi-ToF mass spectrometer.
    """

    def convert_counters(counters):
        if named_tuple_counters:
            return Counters(*counters[: len(Counter) - 1], counters[len(Counter) - 1 :])
        else:
            return counters

    def wrap_callback(callback):
        if callback is None:
            return None

        def inner(counters):
            return callback(convert_counters(counters))

        return inner

    counters, loop_time, total_time = _mass_spec(
        mass_spec.into_cpp(),
        subs,
        N,
        seed=seed,
        log_callback=log_callback,
        result_callback=wrap_callback(result_callback),
        sample_mode=sample_mode,
        strict=strict,
        loglevel=loglevel,
    )
    if named_tuple_counters:
        counters = convert_counters(counters)
    if output_timings:
        return counters, Timings(loop_time, total_time)
    else:
        return counters

apitofsim.compute_density_of_states_batch

compute_density_of_states_batch(
    clusters: List[ClusterLike],
    energy_max: MaybeQuantity,
    bin_width: MaybeQuantity,
    use_old_impl=False,
    *,
    quantities_strict=True,
)
Source code in python/apitofsim/api.py
def compute_density_of_states_batch(
    clusters: List[ClusterLike],
    energy_max: MaybeQuantity,
    bin_width: MaybeQuantity,
    use_old_impl=False,
    *,
    quantities_strict=True,
):
    process_arg = QuantityProcessor(quantities_strict)
    energy_max = process_arg("energy_max", energy_max, "kelvin")
    bin_width = process_arg("bin_width", bin_width, "kelvin")
    frequencies = []
    for i, cluster in enumerate(clusters):
        frequencies_cluster = cluster.get_frequencies()
        if frequencies_cluster is None:
            raise ValueError(
                f"Cannot compute density of states for a atom-like product {cluster!r} at index {i}"
            )
        frequencies.append(frequencies_cluster)
    return _compute_density_of_states_batch(
        frequencies, energy_max, bin_width, use_old_impl=use_old_impl
    )

apitofsim.precompute_mesh

precompute_mesh(
    energy_max_rate: MaybeQuantity,
    bin_width: MaybeQuantity,
    mesh_mode: MeshMode = compute_mesh_diagonal_multithreaded,
    *,
    quantities_strict=True,
)
Source code in python/apitofsim/api.py
def precompute_mesh(
    energy_max_rate: MaybeQuantity,
    bin_width: MaybeQuantity,
    mesh_mode: MeshMode = MeshMode.compute_mesh_diagonal_multithreaded,
    *,
    quantities_strict=True,
):
    process_arg = QuantityProcessor(quantities_strict)
    energy_max_rate = process_arg("energy_max", energy_max_rate, "kelvin")
    bin_width = process_arg("bin_width", bin_width, "kelvin")
    return _precompute_mesh(energy_max_rate, bin_width, mesh_mode)

apitofsim.compute_k_total_batch

compute_k_total_batch(
    inputs: List[KTotalInput],
    energy_max_rate: MaybeQuantity,
    bin_width: MaybeQuantity,
    mesh: MeshMode
    | ndarray = compute_mesh_diagonal_multithreaded,
    progress_callback: Callable[[int], None] | None = None,
    *,
    quantities_strict=True,
)
Source code in python/apitofsim/api.py
def compute_k_total_batch(
    inputs: List[KTotalInput],
    energy_max_rate: MaybeQuantity,
    bin_width: MaybeQuantity,
    mesh: MeshMode | numpy.ndarray = MeshMode.compute_mesh_diagonal_multithreaded,
    progress_callback: Callable[[int], None] | None = None,
    *,
    quantities_strict=True,
):
    process_arg = QuantityProcessor(quantities_strict)
    energy_max_rate = process_arg("energy_max", energy_max_rate, "kelvin")
    bin_width = process_arg("bin_width", bin_width, "kelvin")
    return _compute_k_total_batch(
        inputs, energy_max_rate, bin_width, mesh, progress_callback
    )

apitofsim.densityandrate

densityandrate(
    cluster_0: ClusterData,
    cluster_1: ClusterData,
    cluster_2: ClusterData,
    energy_max: MaybeQuantity,
    energy_max_rate: MaybeQuantity,
    bin_width: MaybeQuantity,
    fragmentation_energy: MaybeQuantity | None = None,
    *,
    quantities_strict=True,
)

This function precomputes the density of states and rate constants histograms for a given set of clusters.

Source code in python/apitofsim/api.py
def densityandrate(
    cluster_0: ClusterData,
    cluster_1: ClusterData,
    cluster_2: ClusterData,
    energy_max: MaybeQuantity,
    energy_max_rate: MaybeQuantity,
    bin_width: MaybeQuantity,
    fragmentation_energy: MaybeQuantity | None = None,
    *,
    quantities_strict=True,
):
    """
    This function precomputes the density of states and rate constants histograms for a given set of clusters.
    """
    process_arg = QuantityProcessor(quantities_strict)
    energy_max = process_arg("energy_max", energy_max, "kelvin")
    energy_max_rate = process_arg("energy_max_rate", energy_max_rate, "kelvin")
    bin_width = process_arg("bin_width", bin_width, "kelvin")
    if fragmentation_energy is None:
        fragmentation_energy = 0
    else:
        fragmentation_energy = process_arg(
            "fragmentation_energy", fragmentation_energy, "kelvin"
        )
    density_cluster, rate_const = _densityandrate(
        cluster_0.into_cpp(),
        cluster_1.into_cpp(),
        cluster_2.into_cpp(),
        energy_max,
        energy_max_rate,
        bin_width,
        fragmentation_energy,
    )
    return Histogram.from_cpp(density_cluster), Histogram.from_cpp(rate_const)

apitofsim.skimmer

skimmer(
    T0: MaybeQuantity,
    P0: MaybeQuantity,
    rmax: MaybeQuantity,
    dc: MaybeQuantity,
    alpha_factor: MaybeQuantity,
    gas: Gas | Gas,
    N: int,
    M: int,
    resolution: int,
    tolerance: float,
    *,
    output_pandas=False,
    quantities_strict=True,
)

This function precomputes various parameters including gas velocity, temperature and pressure at fixed points along the skimmer's' length.

Source code in python/apitofsim/api.py
def skimmer(
    T0: MaybeQuantity,
    P0: MaybeQuantity,
    rmax: MaybeQuantity,
    dc: MaybeQuantity,
    alpha_factor: MaybeQuantity,
    gas: Gas | _Gas,
    N: int,
    M: int,
    resolution: int,
    tolerance: float,
    *,
    output_pandas=False,
    quantities_strict=True,
):
    """
    This function precomputes various parameters including gas velocity, temperature and pressure at fixed points along the skimmer's' length.
    """
    process_arg = QuantityProcessor(quantities_strict)
    T0 = process_arg("T0", T0, "kelvin")
    P0 = process_arg("P0", P0, "pascal")
    rmax = process_arg("rmax", rmax, "meters")
    dc = process_arg("dc", dc, "meters")
    alpha_factor = process_arg("alpha_factor", alpha_factor, "halfturn")
    if isinstance(gas, Gas):
        gas = gas.into_cpp()
    out = _skimmer(T0, P0, rmax, dc, alpha_factor, gas, N, M, resolution, tolerance)
    if output_pandas:
        # Ignore this because Pandas' types are broken
        return DataFrame(out, columns=SKIMMER_COLUMNS)  # pyright: ignore [reportArgumentType]
    else:
        return out