Skip to content

Phy

Class and functions for loading data processed with Phy2

Phy (BaseLoader)

Loader for Phy data

Source code in pynapple/io/phy.py
class Phy(BaseLoader):
    """
    Loader for Phy data
    """

    def __init__(self, path):
        """
        Instantiate the data class from a Phy folder.

        Parameters
        ----------
        path : str or Path object
            The path to the data.
        """

        self.time_support = None

        self.sample_rate = None
        self.n_channels_dat = None
        self.channel_map = None
        self.ch_to_sh = None
        self.spikes = None
        self.channel_positions = None

        super().__init__(path)
        # This path stuff should happen only once in the parent class
        self.path = Path(path)
        self.basename = self.path.name
        self.nwb_path = self.path / "pynapplenwb"
        # from what I can see in the loading function, only one nwb file per folder:
        try:
            self.nwb_file = list(self.nwb_path.glob("*.nwb"))[0]
        except IndexError:
            self.nwb_file = None

        # Need to check if nwb file exists and if data are there
        # if self.path is not None:  -> are there any cases where this is None?
        if self.nwb_file is not None:
            loaded_spikes = self.load_nwb_spikes()
            if loaded_spikes is not None:
                return

        # Bypass if data have already been transferred to nwb
        self.load_phy_params()

        app = App()
        window = EphysGUI(app, path=path, groups=self.channel_map)
        app.mainloop()
        try:
            app.update()
        except Exception:
            pass

        if window.status:
            self.ephys_information = window.ephys_information
            self.load_phy_spikes(self.time_support)
            self.save_data()
        app.quit()

    def load_phy_params(self):
        """
        path should be the folder session containing the params.py file

        Function reads :
        1. the number of channels
        2. the sampling frequency of the dat file


        Raises
        ------
        AssertionError
            If path does not contain the params file or channel_map.npy
        """
        assert (
            self.path / "params.py"
        ).exists(), f"Can't find params.py in {self.path}"

        # It is strongly recommended not to conflate parameters and code! Also, there's a library called params.
        # I would recommend putting in the folder a file called params.json, or .txt, or .yml, but not .py!
        # In this way we just read the file, and we don't have to add to sys to import...
        # TODO maybe remove this
        sys.path.append(str(self.path))
        import params as params

        self.sample_rate = params.sample_rate
        self.n_channels_dat = params.n_channels_dat

        assert (
            self.path / "channel_map.npy"
        ).exists(), f"Can't find channel_map.npy in {self.path}"
        channel_map = np.load(self.path / "channel_map.npy")

        if (self.path / "channel_shanks.npy").exists():
            channel_shank = np.load(self.path / "channel_shanks.npy")
            n_shanks = len(np.unique(channel_shank))

            self.channel_map = {
                i: channel_map[channel_shank == i] for i in range(n_shanks)
            }
            self.ch_to_sh = pd.Series(
                index=channel_map.flatten(),
                data=channel_shank.flatten(),
            )
        else:
            self.channel_map = {i: channel_map[i] for i in range(len(channel_map))}
            self.ch_to_sh = pd.Series(
                index=channel_map.flatten(),
                data=np.hstack(
                    [
                        np.ones(len(channel_map[i]), dtype=int) * i
                        for i in range(len(channel_map))
                    ]
                ),
            )

        return

    def load_phy_spikes(self, time_support=None):
        """
        Load Phy spike times and convert to NWB.
        Instantiate automatically a TsGroup object.
        The cluster group is taken first from cluster_info.tsv and second from cluster_group.tsv

        Parameters
        ----------
        path : Path object
            The path to the data
        time_support : IntevalSet, optional
            The time support of the data

        Raises
        ------
        RuntimeError
            If files are missing.
            The function needs :
            - cluster_info.tsv or cluster_group.tsv
            - spike_times.npy
            - spike_clusters.npy
            - channel_positions.npy
            - templates.npy

        """
        # Check if cluster_info.tsv or cluster_group.tsv exists. If both exist, cluster_info.tsv is used:
        has_cluster_info = False
        if (self.path / "cluster_info.tsv").exists():
            cluster_info_file = self.path / "cluster_info.tsv"
            has_cluster_info = True
        elif (self.path / "cluster_group.tsv").exists():
            cluster_info_file = self.path / "cluster_group.tsv"
        else:
            raise RuntimeError(
                "Can't find cluster_info.tsv or cluster_group.tsv in {};".format(
                    self.path
                )
            )

        cluster_info = pd.read_csv(cluster_info_file, sep="\t", index_col="cluster_id")
        # In my processed data with KiloSort 3.0, the column is named KSLabel
        if "group" in cluster_info.columns:
            cluster_id_good = cluster_info[cluster_info.group == "good"].index.values
        elif "KSLabel" in cluster_info.columns:
            cluster_id_good = cluster_info[cluster_info.KSLabel == "good"].index.values
        else:
            raise RuntimeError(
                "Can't find column group or KSLabel in {};".format(cluster_info_file)
            )

        spike_times = np.load(self.path / "spike_times.npy")
        spike_clusters = np.load(self.path / "spike_clusters.npy")

        spikes = {}
        for n in cluster_id_good:
            spikes[n] = nap.Ts(
                t=spike_times[spike_clusters == n] / self.sample_rate,
                time_support=time_support,
            )

        self.spikes = nap.TsGroup(spikes, time_support=time_support)

        # Adding the position of the electrodes in case
        self.channel_positions = np.load(self.path / "channel_positions.npy")

        # Adding shank group info from cluster_info if present
        if has_cluster_info:
            group = cluster_info.loc[cluster_id_good, "sh"]
            self.spikes.set_info(group=group)
        else:
            template = np.load(self.path / "templates.npy")
            template = template[cluster_id_good]
            ch = np.power(template, 2).max(1).argmax(1)
            group = pd.Series(index=cluster_id_good, data=self.ch_to_sh[ch].values)
            self.spikes.set_info(group=group)

        names = pd.Series(
            index=group.index,
            data=[self.ephys_information[group.loc[i]]["name"] for i in group.index],
        )
        if ~np.all(names.values == ""):
            self.spikes.set_info(name=names)

        locations = pd.Series(
            index=group.index,
            data=[
                self.ephys_information[group.loc[i]]["location"] for i in group.index
            ],
        )
        if ~np.all(locations.values == ""):
            self.spikes.set_info(location=locations)

        return

    def save_data(self):
        """Save the data to NWB format."""

        io = NWBHDF5IO(self.nwb_file, "r+")
        nwbfile = io.read()

        electrode_groups = {}

        for g in self.channel_map:
            device = nwbfile.create_device(
                name=self.ephys_information[g]["device"]["name"] + "-" + str(g),
                description=self.ephys_information[g]["device"]["description"],
                manufacturer=self.ephys_information[g]["device"]["manufacturer"],
            )

            if (
                len(self.ephys_information[g]["position"])
                and type(self.ephys_information[g]["position"]) is str
            ):
                self.ephys_information[g]["position"] = re.split(
                    ";|,| ", self.ephys_information[g]["position"]
                )
            elif self.ephys_information[g]["position"] == "":
                self.ephys_information[g]["position"] = None

            electrode_groups[g] = nwbfile.create_electrode_group(
                name="group" + str(g) + "_" + self.ephys_information[g]["name"],
                description=self.ephys_information[g]["description"],
                position=self.ephys_information[g]["position"],
                location=self.ephys_information[g]["location"],
                device=device,
            )

            for idx in self.channel_map[g]:
                nwbfile.add_electrode(
                    id=idx,
                    x=0.0,
                    y=0.0,
                    z=0.0,
                    imp=0.0,
                    location=self.ephys_information[g]["location"],
                    filtering="none",
                    group=electrode_groups[g],
                )

        # Adding units
        nwbfile.add_unit_column("location", "the anatomical location of this unit")
        nwbfile.add_unit_column("group", "the group of the unit")
        for u in self.spikes.keys():
            nwbfile.add_unit(
                id=u,
                spike_times=self.spikes[u].as_units("s").index.values,
                electrode_group=electrode_groups[self.spikes.get_info("group").loc[u]],
                location=self.ephys_information[self.spikes.get_info("group").loc[u]][
                    "location"
                ],
                group=self.spikes.get_info("group").loc[u],
            )

        io.write(nwbfile)
        io.close()

        return

    def load_nwb_spikes(self):
        """Read the NWB spikes to extract the spike times.

        Returns
        -------
        TYPE
            Description
        """

        io = NWBHDF5IO(self.nwb_file, "r")
        nwbfile = io.read()

        if nwbfile.units is None:
            io.close()
            return None
        else:
            units = nwbfile.units.to_dataframe()
            spikes = {
                n: nap.Ts(t=units.loc[n, "spike_times"], time_units="s")
                for n in units.index
            }

            self.spikes = nap.TsGroup(
                spikes,
                time_support=self.time_support,
                time_units="s",
                group=units["group"],
            )

            if ~np.all(units["location"] == ""):
                self.spikes.set_info(location=units["location"])

            io.close()
            return True

    def load_lfp(
        self,
        filename=None,
        channel=None,
        extension=".eeg",
        frequency=1250.0,
        precision="int16",
        bytes_size=2,
    ):
        """
        Load the LFP.

        Parameters
        ----------
        filename : str, optional
            The filename of the lfp file.
            It can be useful it multiple dat files are present in the data directory
        channel : int or list of int, optional
            The channel(s) to load. If None return a memory map of the dat file to avoid memory error
        extension : str, optional
            The file extenstion (.eeg, .dat, .lfp). Make sure the frequency match
        frequency : float, optional
            Default 1250 Hz for the eeg file
        precision : str, optional
            The precision of the binary file
        bytes_size : int, optional
            Bytes size of the lfp file

        Raises
        ------
        RuntimeError
            If can't find the lfp/eeg/dat file

        Returns
        -------
        Tsd or TsdFrame
            The lfp in a time series format
        """
        if filename is not None:
            filepath = self.path / filename
        else:
            try:
                filepath = list(self.path.glob(f"*{extension}"))[0]
            except IndexError:
                raise RuntimeError(f"Path {self.path} contains no {extension} files;")

        # is it possible that this is a leftover from neurosuite data?
        # This is not implemented for this class.
        self.load_neurosuite_xml(self.path)

        n_channels = int(self.nChannels)

        f = open(filepath, "rb")
        startoffile = f.seek(0, 0)
        endoffile = f.seek(0, 2)
        bytes_size = 2
        n_samples = int((endoffile - startoffile) / n_channels / bytes_size)
        duration = n_samples / frequency
        f.close()
        fp = np.memmap(filepath, np.int16, "r", shape=(n_samples, n_channels))
        timestep = np.arange(0, n_samples) / frequency

        time_support = nap.IntervalSet(start=0, end=duration, time_units="s")

        if channel is None:
            return nap.TsdFrame(
                t=timestep, d=fp, time_units="s", time_support=time_support
            )
        elif type(channel) is int:
            return nap.Tsd(
                t=timestep, d=fp[:, channel], time_units="s", time_support=time_support
            )
        elif type(channel) is list:
            return nap.TsdFrame(
                t=timestep,
                d=fp[:, channel],
                time_units="s",
                time_support=time_support,
                columns=channel,
            )

__init__(self, path) special

Instantiate the data class from a Phy folder.

Parameters:

Name Type Description Default
path str or Path object

The path to the data.

required
Source code in pynapple/io/phy.py
def __init__(self, path):
    """
    Instantiate the data class from a Phy folder.

    Parameters
    ----------
    path : str or Path object
        The path to the data.
    """

    self.time_support = None

    self.sample_rate = None
    self.n_channels_dat = None
    self.channel_map = None
    self.ch_to_sh = None
    self.spikes = None
    self.channel_positions = None

    super().__init__(path)
    # This path stuff should happen only once in the parent class
    self.path = Path(path)
    self.basename = self.path.name
    self.nwb_path = self.path / "pynapplenwb"
    # from what I can see in the loading function, only one nwb file per folder:
    try:
        self.nwb_file = list(self.nwb_path.glob("*.nwb"))[0]
    except IndexError:
        self.nwb_file = None

    # Need to check if nwb file exists and if data are there
    # if self.path is not None:  -> are there any cases where this is None?
    if self.nwb_file is not None:
        loaded_spikes = self.load_nwb_spikes()
        if loaded_spikes is not None:
            return

    # Bypass if data have already been transferred to nwb
    self.load_phy_params()

    app = App()
    window = EphysGUI(app, path=path, groups=self.channel_map)
    app.mainloop()
    try:
        app.update()
    except Exception:
        pass

    if window.status:
        self.ephys_information = window.ephys_information
        self.load_phy_spikes(self.time_support)
        self.save_data()
    app.quit()

load_phy_params(self)

path should be the folder session containing the params.py file

Function reads : 1. the number of channels 2. the sampling frequency of the dat file

Exceptions:

Type Description
AssertionError

If path does not contain the params file or channel_map.npy

Source code in pynapple/io/phy.py
def load_phy_params(self):
    """
    path should be the folder session containing the params.py file

    Function reads :
    1. the number of channels
    2. the sampling frequency of the dat file


    Raises
    ------
    AssertionError
        If path does not contain the params file or channel_map.npy
    """
    assert (
        self.path / "params.py"
    ).exists(), f"Can't find params.py in {self.path}"

    # It is strongly recommended not to conflate parameters and code! Also, there's a library called params.
    # I would recommend putting in the folder a file called params.json, or .txt, or .yml, but not .py!
    # In this way we just read the file, and we don't have to add to sys to import...
    # TODO maybe remove this
    sys.path.append(str(self.path))
    import params as params

    self.sample_rate = params.sample_rate
    self.n_channels_dat = params.n_channels_dat

    assert (
        self.path / "channel_map.npy"
    ).exists(), f"Can't find channel_map.npy in {self.path}"
    channel_map = np.load(self.path / "channel_map.npy")

    if (self.path / "channel_shanks.npy").exists():
        channel_shank = np.load(self.path / "channel_shanks.npy")
        n_shanks = len(np.unique(channel_shank))

        self.channel_map = {
            i: channel_map[channel_shank == i] for i in range(n_shanks)
        }
        self.ch_to_sh = pd.Series(
            index=channel_map.flatten(),
            data=channel_shank.flatten(),
        )
    else:
        self.channel_map = {i: channel_map[i] for i in range(len(channel_map))}
        self.ch_to_sh = pd.Series(
            index=channel_map.flatten(),
            data=np.hstack(
                [
                    np.ones(len(channel_map[i]), dtype=int) * i
                    for i in range(len(channel_map))
                ]
            ),
        )

    return

load_phy_spikes(self, time_support=None)

Load Phy spike times and convert to NWB.

Instantiate automatically a TsGroup object. The cluster group is taken first from cluster_info.tsv and second from cluster_group.tsv

Parameters:

Name Type Description Default
path Path object

The path to the data

required
time_support IntevalSet

The time support of the data

None

Exceptions:

Type Description
RuntimeError

If files are missing. The function needs : - cluster_info.tsv or cluster_group.tsv - spike_times.npy - spike_clusters.npy - channel_positions.npy - templates.npy

Source code in pynapple/io/phy.py
def load_phy_spikes(self, time_support=None):
    """
    Load Phy spike times and convert to NWB.
    Instantiate automatically a TsGroup object.
    The cluster group is taken first from cluster_info.tsv and second from cluster_group.tsv

    Parameters
    ----------
    path : Path object
        The path to the data
    time_support : IntevalSet, optional
        The time support of the data

    Raises
    ------
    RuntimeError
        If files are missing.
        The function needs :
        - cluster_info.tsv or cluster_group.tsv
        - spike_times.npy
        - spike_clusters.npy
        - channel_positions.npy
        - templates.npy

    """
    # Check if cluster_info.tsv or cluster_group.tsv exists. If both exist, cluster_info.tsv is used:
    has_cluster_info = False
    if (self.path / "cluster_info.tsv").exists():
        cluster_info_file = self.path / "cluster_info.tsv"
        has_cluster_info = True
    elif (self.path / "cluster_group.tsv").exists():
        cluster_info_file = self.path / "cluster_group.tsv"
    else:
        raise RuntimeError(
            "Can't find cluster_info.tsv or cluster_group.tsv in {};".format(
                self.path
            )
        )

    cluster_info = pd.read_csv(cluster_info_file, sep="\t", index_col="cluster_id")
    # In my processed data with KiloSort 3.0, the column is named KSLabel
    if "group" in cluster_info.columns:
        cluster_id_good = cluster_info[cluster_info.group == "good"].index.values
    elif "KSLabel" in cluster_info.columns:
        cluster_id_good = cluster_info[cluster_info.KSLabel == "good"].index.values
    else:
        raise RuntimeError(
            "Can't find column group or KSLabel in {};".format(cluster_info_file)
        )

    spike_times = np.load(self.path / "spike_times.npy")
    spike_clusters = np.load(self.path / "spike_clusters.npy")

    spikes = {}
    for n in cluster_id_good:
        spikes[n] = nap.Ts(
            t=spike_times[spike_clusters == n] / self.sample_rate,
            time_support=time_support,
        )

    self.spikes = nap.TsGroup(spikes, time_support=time_support)

    # Adding the position of the electrodes in case
    self.channel_positions = np.load(self.path / "channel_positions.npy")

    # Adding shank group info from cluster_info if present
    if has_cluster_info:
        group = cluster_info.loc[cluster_id_good, "sh"]
        self.spikes.set_info(group=group)
    else:
        template = np.load(self.path / "templates.npy")
        template = template[cluster_id_good]
        ch = np.power(template, 2).max(1).argmax(1)
        group = pd.Series(index=cluster_id_good, data=self.ch_to_sh[ch].values)
        self.spikes.set_info(group=group)

    names = pd.Series(
        index=group.index,
        data=[self.ephys_information[group.loc[i]]["name"] for i in group.index],
    )
    if ~np.all(names.values == ""):
        self.spikes.set_info(name=names)

    locations = pd.Series(
        index=group.index,
        data=[
            self.ephys_information[group.loc[i]]["location"] for i in group.index
        ],
    )
    if ~np.all(locations.values == ""):
        self.spikes.set_info(location=locations)

    return

save_data(self)

Save the data to NWB format.

Source code in pynapple/io/phy.py
def save_data(self):
    """Save the data to NWB format."""

    io = NWBHDF5IO(self.nwb_file, "r+")
    nwbfile = io.read()

    electrode_groups = {}

    for g in self.channel_map:
        device = nwbfile.create_device(
            name=self.ephys_information[g]["device"]["name"] + "-" + str(g),
            description=self.ephys_information[g]["device"]["description"],
            manufacturer=self.ephys_information[g]["device"]["manufacturer"],
        )

        if (
            len(self.ephys_information[g]["position"])
            and type(self.ephys_information[g]["position"]) is str
        ):
            self.ephys_information[g]["position"] = re.split(
                ";|,| ", self.ephys_information[g]["position"]
            )
        elif self.ephys_information[g]["position"] == "":
            self.ephys_information[g]["position"] = None

        electrode_groups[g] = nwbfile.create_electrode_group(
            name="group" + str(g) + "_" + self.ephys_information[g]["name"],
            description=self.ephys_information[g]["description"],
            position=self.ephys_information[g]["position"],
            location=self.ephys_information[g]["location"],
            device=device,
        )

        for idx in self.channel_map[g]:
            nwbfile.add_electrode(
                id=idx,
                x=0.0,
                y=0.0,
                z=0.0,
                imp=0.0,
                location=self.ephys_information[g]["location"],
                filtering="none",
                group=electrode_groups[g],
            )

    # Adding units
    nwbfile.add_unit_column("location", "the anatomical location of this unit")
    nwbfile.add_unit_column("group", "the group of the unit")
    for u in self.spikes.keys():
        nwbfile.add_unit(
            id=u,
            spike_times=self.spikes[u].as_units("s").index.values,
            electrode_group=electrode_groups[self.spikes.get_info("group").loc[u]],
            location=self.ephys_information[self.spikes.get_info("group").loc[u]][
                "location"
            ],
            group=self.spikes.get_info("group").loc[u],
        )

    io.write(nwbfile)
    io.close()

    return

load_nwb_spikes(self)

Read the NWB spikes to extract the spike times.

Returns:

Type Description
TYPE

Description

Source code in pynapple/io/phy.py
def load_nwb_spikes(self):
    """Read the NWB spikes to extract the spike times.

    Returns
    -------
    TYPE
        Description
    """

    io = NWBHDF5IO(self.nwb_file, "r")
    nwbfile = io.read()

    if nwbfile.units is None:
        io.close()
        return None
    else:
        units = nwbfile.units.to_dataframe()
        spikes = {
            n: nap.Ts(t=units.loc[n, "spike_times"], time_units="s")
            for n in units.index
        }

        self.spikes = nap.TsGroup(
            spikes,
            time_support=self.time_support,
            time_units="s",
            group=units["group"],
        )

        if ~np.all(units["location"] == ""):
            self.spikes.set_info(location=units["location"])

        io.close()
        return True

load_lfp(self, filename=None, channel=None, extension='.eeg', frequency=1250.0, precision='int16', bytes_size=2)

Load the LFP.

Parameters:

Name Type Description Default
filename str

The filename of the lfp file. It can be useful it multiple dat files are present in the data directory

None
channel int or list of int

The channel(s) to load. If None return a memory map of the dat file to avoid memory error

None
extension str

The file extenstion (.eeg, .dat, .lfp). Make sure the frequency match

'.eeg'
frequency float

Default 1250 Hz for the eeg file

1250.0
precision str

The precision of the binary file

'int16'
bytes_size int

Bytes size of the lfp file

2

Exceptions:

Type Description
RuntimeError

If can't find the lfp/eeg/dat file

Returns:

Type Description
Tsd or TsdFrame

The lfp in a time series format

Source code in pynapple/io/phy.py
def load_lfp(
    self,
    filename=None,
    channel=None,
    extension=".eeg",
    frequency=1250.0,
    precision="int16",
    bytes_size=2,
):
    """
    Load the LFP.

    Parameters
    ----------
    filename : str, optional
        The filename of the lfp file.
        It can be useful it multiple dat files are present in the data directory
    channel : int or list of int, optional
        The channel(s) to load. If None return a memory map of the dat file to avoid memory error
    extension : str, optional
        The file extenstion (.eeg, .dat, .lfp). Make sure the frequency match
    frequency : float, optional
        Default 1250 Hz for the eeg file
    precision : str, optional
        The precision of the binary file
    bytes_size : int, optional
        Bytes size of the lfp file

    Raises
    ------
    RuntimeError
        If can't find the lfp/eeg/dat file

    Returns
    -------
    Tsd or TsdFrame
        The lfp in a time series format
    """
    if filename is not None:
        filepath = self.path / filename
    else:
        try:
            filepath = list(self.path.glob(f"*{extension}"))[0]
        except IndexError:
            raise RuntimeError(f"Path {self.path} contains no {extension} files;")

    # is it possible that this is a leftover from neurosuite data?
    # This is not implemented for this class.
    self.load_neurosuite_xml(self.path)

    n_channels = int(self.nChannels)

    f = open(filepath, "rb")
    startoffile = f.seek(0, 0)
    endoffile = f.seek(0, 2)
    bytes_size = 2
    n_samples = int((endoffile - startoffile) / n_channels / bytes_size)
    duration = n_samples / frequency
    f.close()
    fp = np.memmap(filepath, np.int16, "r", shape=(n_samples, n_channels))
    timestep = np.arange(0, n_samples) / frequency

    time_support = nap.IntervalSet(start=0, end=duration, time_units="s")

    if channel is None:
        return nap.TsdFrame(
            t=timestep, d=fp, time_units="s", time_support=time_support
        )
    elif type(channel) is int:
        return nap.Tsd(
            t=timestep, d=fp[:, channel], time_units="s", time_support=time_support
        )
    elif type(channel) is list:
        return nap.TsdFrame(
            t=timestep,
            d=fp[:, channel],
            time_units="s",
            time_support=time_support,
            columns=channel,
        )