Coverage for labnirs2snirf / labnirs.py: 100%
181 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-28 06:02 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-28 06:02 +0000
1"""
2Functions related to reading data from LabNIRS files.
3"""
5import logging
6import re
7from collections.abc import Collection
8from pathlib import Path
9from typing import Final
11import numpy as np
12import polars as pl
14from . import model
15from .error import Labnirs2SnirfError
17log = logging.getLogger(__name__)
20class LabNirsReadError(Labnirs2SnirfError):
21 """Custom error class for LabNIRS reading errors."""
24# Constants
26# (tab-separated) data table starts on line 36 (counting from 1)
27DATA_START_LINE: Final[int] = 36
29# regexp patterns to extract fields from the header & to verify correct format
30LINE_PATTERNS: Final[dict[str, str]] = {
31 "top_line": rf"^ \[File Information\]\s*\[Data Line\]\t(?P<data_start_line>{DATA_START_LINE})\s*$",
32 "version": r"^[^\t]*\t[^\t]*\tVersion\t11\.0$",
33 "headertype": r"^[^\t]*\t[^\t]*\t\[HeaderType\]\t11\.0/11\.0$",
34 "id": r"^ID\t(?P<id>[^\t]*)\t.*$",
35 "measurement_datetime": r"^Measured Date\t(?P<date>[\d/]+) (?P<time>[\d:]+)\s*$",
36 "name": r"^Name\t(?P<name>[^\t]*)\t.*$",
37 "comment": r"^Comment\t(?P<comment>.*)$",
38 "channel_pairs": r"^(?P<channel_pairs>(?>\(\d+,\d+\))+)$",
39}
42def read_labnirs(
43 data_file: Path,
44 keep_category: str = "all",
45 drop_subtype: Collection[str] | None = None,
46) -> model.Nirs:
47 """
48 Read and process a LabNIRS data file and returns a NIRS data model.
50 Parameters
51 ----------
52 data_file : Path
53 Path to the LabNIRS data file.
54 File is expected to be in the format exported by the LabNIRS software,
55 with 35 lines of header and a version number/header type of 11.0.
56 keep_category : "hb"| "raw" | "all", default="all"
57 Data category to include in the output. "Raw" means raw voltage data,
58 "hb" means haemoglobin data. If "all", both categories are included.
59 drop_subtype : Collection[str] | None, default=None
60 Set or list of data types and/or wavelengths to drop from the data.
61 Hb data types are: "hbr", "hbo" and "hbt".
62 Wavelengths should be convertible to integer.
63 All included if None.
65 Returns
66 -------
67 model.Nirs
68 A NIRS object containing most data required by the SNIRF specification.
70 Notes
71 -----
72 This function reads experiment data and metadata from .txt files exported by
73 the LabNIRS software. It expects a 35-line header with specific formatting
74 (and version 11.0). Only the top line (containing header length), the
75 presence of channel pairs on line 33, and the presence of data columns are
76 enforced. Other validation failures only raise a warning, but errors may
77 still occur.
79 LabNIRS can export files with both raw and Hb data, depending on the options
80 selected. The ``keep_category`` parameter controls which of the two is
81 retained in the output. The ``drop_subtype`` parameter can be used to
82 further exclude specific wavelengths or Hb data types from the output.
84 By default, all data is included, which may not be desirable when the goal
85 is to import the .snirf file to a NIRS analyser tool such as MNE, as these
86 tools may not support files with both raw and Hb data present, may not need
87 HbT, or may not be able to handle more than 2 wavelengths. For MNE, for
88 example, it would be sensible to either include ``raw`` and drop one
89 wavelength of the 3, or to include ``hb`` and drop ``HbT``.
91 For reasons of compatibility with other software, a list of wavelengths is
92 preserved even for Hb data. Dropped wavelengths are not included in the
93 list. For Hb data, the wavelength indices are set to 0 for each data
94 channel. NB that this is an invalid index.
96 Since the labNIRS files don't store coordinates, probe positions are all set
97 to (0, 0, 0). Positions can be read from file using the ``--layout`` option.
98 Probe labels are based on actual numbers in the source file, however,
99 the position matrices are contiguous and skip over any missing probe numbers.
100 E.g. if there are sources 1 and 3, then the source position matrix will have
101 2 rows, with source 1 at index 0 and source 3 at index 1, and the labels
102 will be S1 and S3 respectively.
103 """
105 ###########################
106 # Validate input parameters
107 ###########################
109 log.info("Validating input parameters")
110 log.debug(
111 "Parameters: data_file=%s, keep_category=%s, drop_subtype=%s",
112 data_file,
113 keep_category,
114 drop_subtype,
115 )
116 if not isinstance(keep_category, str):
117 raise LabNirsReadError("Invalid parameters: 'keep_category' must be a string.")
118 keep_category = keep_category.lower()
119 if keep_category not in ("hb", "raw", "all"):
120 raise LabNirsReadError(
121 f"Invalid parameters: 'keep_category': must be one of 'hb', 'raw', or 'all', got {keep_category}.",
122 )
123 if drop_subtype is not None:
124 if not (
125 isinstance(drop_subtype, Collection)
126 and all(isinstance(x, str) for x in drop_subtype)
127 ):
128 raise LabNirsReadError(
129 "Invalid parameters: 'drop_subtype' must be a collection of strings or None.",
130 )
131 drop_subtype = {x.lower() for x in drop_subtype}
132 if not all(x in {"hbo", "hbr", "hbt"} or x.isdigit() for x in drop_subtype):
133 raise LabNirsReadError(
134 "Invalid parameters: 'drop_subtype' can only contain 'hbo', 'hbr', 'hbt', or wavelength integers.",
135 )
136 if not data_file.exists():
137 log.error("Data file not found: %s", data_file)
138 raise LabNirsReadError(f"Data file not found: {data_file}")
140 ##########################
141 # Read & verify the header
142 ##########################
144 log.info("Reading and validating header")
145 header = _read_header(data_file)
147 #########################
148 # Parse channels & probes
149 #########################
151 log.info("Parsing channel pairs and probe information")
153 # parse channel pairs
154 channels = (
155 pl.DataFrame(
156 data=[
157 (int(x), int(y)) for x, y in re.findall(r"\((\d+),(\d+)\)", header[32])
158 ],
159 schema=[("source", pl.UInt32), ("detector", pl.UInt32)],
160 orient="row",
161 )
162 .with_row_index(name="channel", offset=1)
163 # add probe indices; order is closest to probe numbers in file; missing probes are skipped over
164 .with_columns(
165 pl.col("source").rank(method="dense").alias("source_index"),
166 pl.col("detector").rank(method="dense").alias("detector_index"),
167 )
168 )
169 log.debug(
170 "Channel pairs: %s",
171 [
172 f"{row['source']}-{row['detector']}"
173 for row in channels.iter_rows(named=True)
174 ],
175 )
177 # Extract source and detector indices, add labels (Si, Di)
178 sources = (
179 channels.select(
180 pl.col("source").alias("number"),
181 pl.col("source_index").alias("index"),
182 )
183 .unique("number")
184 .with_columns(
185 # pl.lit("source").alias("type").cast(pl.Categorical()),
186 pl.concat_str(pl.lit("S"), pl.col("number")).alias("label"),
187 )
188 .drop("number")
189 .sort("index")
190 )
191 log.debug("Sources: %s", sources["label"].to_list())
193 detectors = (
194 channels.select(
195 pl.col("detector").alias("number"),
196 pl.col("detector_index").alias("index"),
197 )
198 .unique("number")
199 .with_columns(
200 # pl.lit("detector").alias("type").cast(pl.Categorical()),
201 pl.concat_str(pl.lit("D"), pl.col("number")).alias("label"),
202 )
203 .drop("number")
204 .sort("index")
205 )
206 log.debug("Detectors: %s", detectors["label"].to_list())
208 log.info(
209 "Found %d channels, %d sources, %d detectors",
210 len(channels),
211 len(sources),
212 len(detectors),
213 )
215 #######################
216 # Parse column metadata
217 #######################
219 log.info("Parsing column metadata and data structure")
221 # parse and transform column names to conform with naming in model
222 column_names_line1 = (
223 header[33]
224 .lower()
225 .replace(" ", "")
226 .replace("ch-", "")
227 .replace("\n", "")
228 .split("\t")
229 )
230 column_names_line2 = (
231 header[34]
232 .lower()
233 .replace(" ", "")
234 .replace("time(sec)", "time")
235 .replace("deoxyhb", "hbr")
236 .replace("oxyhb", "hbo")
237 .replace("totalhb", "hbt")
238 .replace("abs", "")
239 .replace("nm", "")
240 .replace("\n", "")
241 .split("\t")
242 )
244 # name, type, etc. information about all data columns in the experiment file
245 columns = (
246 pl.DataFrame(
247 data=[
248 [
249 # column name (e.g. time, mark, 1-hbr, 2-870)
250 f"{int(a)}-{b}" if a else b,
251 # channel number (1, 2, ...), None for non-channel metadata like time
252 int(a) if a.isdigit() else None,
253 # data category (meta, raw (voltage), hb)
254 "meta" if a == "" else "raw" if b.isdigit() else "hb",
255 # subtype (hbr, hbo, hbt, wavelength)
256 b if a != "" else None,
257 # wavelength as string (e.g. 870, 830), None for non-wavelength or meta columns
258 # b if b.isdigit() else None,
259 # wavelength as integer (e.g. 870, 830), None for non-wavelength or meta columns
260 int(b) if b.isdigit() else None,
261 ]
262 for a, b in zip(column_names_line1, column_names_line2)
263 ],
264 schema=pl.Schema(
265 [
266 ("name", pl.String),
267 ("channel", pl.Int32),
268 ("category", pl.Enum(["meta", "raw", "hb"])),
269 ("subtype", pl.Categorical()),
270 # ("wavelength_str", pl.String),
271 ("wavelength", pl.UInt32),
272 ],
273 ),
274 orient="row",
275 )
276 # index required later for excluding dropped columns from data table
277 .with_row_index(name="column")
278 # join with source and detector indexes
279 .join(
280 channels.select(["channel", "source_index", "detector_index"]),
281 on="channel",
282 how="left",
283 )
284 )
286 column_names = columns["name"].to_list()
288 log.debug("Parsed %d columns: %s", len(columns), column_names)
290 # Drop user-specified columns based on subtype (e.g. hbo, hbr, a specific wavelength, etc.).
291 # This needs to happen before the list of wavelengths is extracted, as dropped wavelengths are
292 # not to be included. Things might break if all wavelengths are dropped, but that's up to the
293 # user to decide...
294 if drop_subtype is not None and len(drop_subtype) > 0:
295 log.info("Dropping columns based on subtype filter: %s", drop_subtype)
296 initial_count = len(columns)
297 columns = columns.filter(
298 # parentheses are necessary otherwise Polars thinks that "meta" is a column name
299 (pl.col("category") == "meta") | (~pl.col("subtype").is_in(drop_subtype)),
300 )
301 log.debug(
302 "Dropped %d columns based on subtype filter %s. Remaining columns after filtering: %s",
303 initial_count - len(columns),
304 drop_subtype,
305 columns.to_dict(as_series=False),
306 )
307 else:
308 log.info("No column subtypes specified for dropping")
310 # Extract set of remaining unique wavelengths, create wavelength indices
311 log.info("Extracting unique wavelengths from data columns")
312 wavelengths = (
313 columns.select("wavelength")
314 .drop_nulls()
315 .unique()
316 .sort("wavelength")
317 .with_row_index(name="wavelength_index", offset=1)
318 )
320 if wavelengths.height == 0:
321 log.debug(
322 "No wavelengths found in data columns, creating dummy wavelength entry",
323 )
324 wavelengths = pl.DataFrame({"wavelength": [0], "wavelength_index": [1]})
326 wavelength_list = wavelengths["wavelength"].to_list()
327 log.debug("Identified %d wavelengths: %s nm", len(wavelength_list), wavelength_list)
328 log.debug("Wavelength mapping: %s", wavelengths.to_dict(as_series=False))
330 # Add wavelength indices and data types to columns
331 log.info("Adding wavelength indices and data types to column metadata")
332 columns = columns.join(wavelengths, on="wavelength", how="left").with_columns(
333 # Hb data also needs a wavelength index, even if meaningless, so assign 0 to those rows
334 pl.when(pl.col("category") == "hb")
335 .then(pl.col("wavelength_index").replace(None, 0))
336 .otherwise(pl.col("wavelength_index"))
337 .alias("wavelength_index"),
338 # continuous wave datatype = 1, processed = 99999 according to SNIRF specifications
339 pl.when(pl.col("category") == "hb")
340 .then(pl.lit(99999))
341 .when(pl.col("category") == "raw")
342 .then(pl.lit(1))
343 .otherwise(None)
344 .alias("datatype"),
345 )
347 # Finally, if the user only wants to keep one data category, keep only that and meta columns.
348 # Discarding "raw" destroys wavelength information, that's why it had to be extracted earlier.
349 if keep_category != "all":
350 log.info("Filtering to keep only '%s' data category", keep_category)
351 initial_count = len(columns)
352 columns = columns.filter(pl.col("category").is_in(["meta", keep_category]))
353 log.debug(
354 "Filtered to keep only '%s' and required meta categories: %d columns retained from %d",
355 keep_category,
356 len(columns),
357 initial_count,
358 )
359 else:
360 log.info("Keeping all data categories (keep_category=%s)", keep_category)
361 log.debug("Dropping channel and wavelength columns")
362 columns = columns.drop(["channel", "wavelength"])
363 log.debug("Final columns: %s", columns["name"].to_list())
365 ###############################
366 # Read experiment data from CSV
367 ###############################
369 log.info("Reading experiment data from file")
371 # read the data table from the experiment file, formatted as CSV
372 # keep only time, task, mark, and selected data columns
373 data_table = (
374 pl.scan_csv(
375 data_file,
376 has_header=False,
377 skip_lines=DATA_START_LINE - 1,
378 separator="\t",
379 schema=pl.Schema(
380 zip(
381 column_names,
382 [pl.String] * len(column_names),
383 ),
384 ),
385 )
386 # select only needed columns
387 .select(columns["name"].to_list())
388 # drop count metadata
389 .drop("count")
390 # remove whitespace around values
391 .select(pl.col(pl.String).str.strip_chars())
392 # convert mark to enum, task to uint, and the rest to float
393 .cast({"mark": pl.Enum(["0Z", "0", "1"]), "task": pl.UInt32})
394 .cast(
395 {pl.String: pl.Float64},
396 )
397 # scan_csv is lazy, need to collect
398 .collect()
399 )
401 log.info(
402 "Successfully read data table with %d rows and %d columns",
403 len(data_table),
404 len(data_table.columns),
405 )
407 ###########################################
408 # Extract information needed for NIRS model
409 ###########################################
411 log.info("Extracting metadata, data, stimuli, and probe information")
413 return model.Nirs(
414 metadata=_extract_metadata(header),
415 data=[_extract_data(data_table, columns)],
416 stim=_extract_stims(data_table),
417 probe=_extract_probes(sources, detectors, wavelengths),
418 )
421def read_probe_pairs(data_file: Path) -> str: # noqa: F841
422 """
423 Read the header line containing probe pairs.
425 Parameters
426 ----------
427 data_file : Path
428 Path to the LabNIRS data file.
429 File is expected to be in the format exported by the LabNIRS software,
430 with 35 lines of header and a version number/header type of 11.0.
432 Returns
433 -------
434 str
435 String containing probe pairs without leading and trailing whitespace.
436 For example: "(1,1)(2,1)...".
437 """
438 log.info("Reading probe pairs from file: %s", data_file)
439 if not data_file.exists():
440 raise LabNirsReadError(f"Data file not found: {data_file}")
442 header = _read_header(data_file)
443 pairs_str = header[32].strip()
444 log.debug("Found probe pairs string: %s", pairs_str)
445 return pairs_str
448def _extract_data(data: pl.DataFrame, columns: pl.DataFrame) -> model.Data:
449 """
450 Compile data into a model.Data object.
452 Parameters
453 ----------
454 data : pl.DataFrame
455 DataFrame containing experimental time series data.
456 columns : pl.DataFrame
457 DataFrame with column metadata including category, subtype, source/detector indices.
459 Returns
460 -------
461 model.Data
462 Data object containing time, data time series, and measurement list.
464 Raises
465 ------
466 LabNirsReadError
467 If no data columns are found after filtering.
468 """
470 def get_label(subtype: str) -> str | None:
471 """
472 Map subtype to data type label.
474 Parameters
475 ----------
476 subtype : str
477 Data subtype identifier (e.g., "hbo", "hbr", "hbt").
479 Returns
480 -------
481 str or None
482 Corresponding label ("HbO", "HbR", "HbT") or None if no match.
483 """
484 match subtype:
485 case "hbo":
486 return "HbO"
487 case "hbr":
488 return "HbR"
489 case "hbt":
490 return "HbT"
491 case _:
492 return None
494 log.info("Extracting experimental data")
495 measurementList = [
496 model.Measurement(
497 sourceIndex=row["source_index"],
498 detectorIndex=row["detector_index"],
499 dataType=row["datatype"],
500 dataTypeIndex=0,
501 dataTypeLabel=(get_label(row["subtype"])),
502 wavelengthIndex=row["wavelength_index"],
503 )
504 for row in columns.rows(named=True)
505 if row["category"] != "meta"
506 ]
507 data_columns = columns.filter(pl.col("category") != "meta")["name"].to_list()
508 if len(data_columns) == 0:
509 raise LabNirsReadError(
510 "No data columns found after filtering; cannot extract data.",
511 )
512 extracted_data = model.Data(
513 time=data["time"].to_numpy(),
514 dataTimeSeries=data.select(data_columns).to_numpy(),
515 measurementList=measurementList,
516 )
517 log.debug(
518 "Extracted data has %d time points (range %.3f - %.3f), %d data channels, and %d MeasurementList entries",
519 len(extracted_data.time),
520 extracted_data.time[0],
521 extracted_data.time[-1],
522 extracted_data.dataTimeSeries.shape[1],
523 len(extracted_data.measurementList),
524 )
525 log.debug(
526 "Unique data type labels: %s, wavelength indices: %s",
527 {m.dataTypeLabel for m in extracted_data.measurementList},
528 {m.wavelengthIndex for m in extracted_data.measurementList},
529 )
530 return extracted_data
533def _extract_probes(
534 sources: pl.DataFrame,
535 detectors: pl.DataFrame,
536 wavelengths: pl.DataFrame,
537) -> model.Probe:
538 """
539 Compile probe information into a model.Probe object.
541 Parameters
542 ----------
543 sources : pl.DataFrame
544 DataFrame with source indices and labels.
545 detectors : pl.DataFrame
546 DataFrame with detector indices and labels.
547 wavelengths : pl.DataFrame
548 DataFrame with wavelength values.
550 Returns
551 -------
552 model.Probe
553 Probe object with wavelengths, positions (initialized to zero), and labels.
555 Raises
556 ------
557 LabNirsReadError
558 If any of the input dataframes are empty.
560 Notes
561 -----
562 - All positions are set to 0. Locations can be read from file or guessed elsewhere.
563 - Probe labels are set according to Si and Di (source, detector respectively),
564 where the numbers are the same as the probe numbers in the labNIRS file.
565 - Position matrices skip over missing probe numbers, so make sure you use the
566 labels to associate actual positions with probes.
567 """
568 log.info("Extracting probe information")
569 if wavelengths.height == 0 or sources.height == 0 or detectors.height == 0:
570 raise LabNirsReadError(
571 "Cannot extract probe information: wavelength, source, or detector list is empty.",
572 )
573 probe = model.Probe(
574 wavelengths=wavelengths["wavelength"].to_numpy().astype(np.float64),
575 sourcePos3D=np.zeros((sources.height, 3), dtype=np.float64),
576 detectorPos3D=np.zeros((detectors.height, 3), dtype=np.float64),
577 sourceLabels=sources["label"].to_list(),
578 detectorLabels=detectors["label"].to_list(),
579 )
580 log.debug(
581 "Extracted probe information: %d wavelengths, %d sources, and %d detectors, %d source labels, %d detector labels",
582 len(probe.wavelengths),
583 probe.sourcePos3D.shape[0],
584 probe.detectorPos3D.shape[0],
585 len(probe.sourceLabels) if probe.sourceLabels is not None else 0,
586 len(probe.detectorLabels) if probe.detectorLabels is not None else 0,
587 )
588 return probe
591def _extract_metadata(header: list[str]) -> model.Metadata:
592 """
593 Compile metadata into a model.Metadata object.
595 Parameters
596 ----------
597 header : list[str]
598 List of header lines from the LabNIRS file.
600 Returns
601 -------
602 model.Metadata
603 Metadata object with subject ID, measurement date/time, and additional fields.
605 Raises
606 ------
607 LabNirsReadError
608 If date or time format in header is invalid.
610 Notes
611 -----
612 - Additional patient and study metadata are also stored in a .pat file,
613 which is not exported by labNIRS. For now, reading this file is not supported.
614 """
615 # extract snirf metadata fields from the header
616 log.info("Extracting metadata from header")
617 # ID may be missing, in which case return empty string
618 subject_id = _match_line(LINE_PATTERNS["id"], header).get("id", "")
619 measurement_datetime = _match_line(LINE_PATTERNS["measurement_datetime"], header)
620 date = measurement_datetime["date"].split("/")
621 if len(date) != 3:
622 raise LabNirsReadError(
623 f"Invalid measurement date format in header: {measurement_datetime['date']}",
624 )
625 measurement_date = f"{date[0]}-{date[1]:>02}-{date[2]:>02}"
626 time = measurement_datetime["time"].split(":")
627 if len(time) != 3:
628 raise LabNirsReadError(
629 f"Invalid measurement time format in header: {measurement_datetime['time']}",
630 )
631 measurement_time = f"{time[0]:>02}:{time[1]:>02}:{time[2]:>02}"
632 additional_fields = dict()
633 if (
634 len(subject_name := _match_line(LINE_PATTERNS["name"], header).get("name", ""))
635 > 0
636 ):
637 additional_fields["SubjectName"] = subject_name
638 if (
639 len(comment := _match_line(LINE_PATTERNS["comment"], header).get("comment", ""))
640 > 0
641 ):
642 additional_fields["comment"] = comment
643 metadata = model.Metadata(
644 SubjectID=subject_id,
645 MeasurementDate=measurement_date,
646 MeasurementTime=measurement_time,
647 additional_fields=additional_fields,
648 )
649 log.debug(
650 "Extracted metadata has subject ID: %s, has date: %s, has time: %s, and has additional fields: %s",
651 metadata.SubjectID is not None and metadata.SubjectID != "",
652 metadata.MeasurementDate is not None and metadata.MeasurementDate != "",
653 metadata.MeasurementTime is not None and metadata.MeasurementTime != "",
654 (
655 metadata.additional_fields.keys()
656 if len(metadata.additional_fields) > 0
657 else "none"
658 ),
659 )
660 return metadata
663def _extract_stims(data: pl.DataFrame) -> list[model.Stim]:
664 """
665 Extract stimulus information into a list of model.Stim objects.
667 Parameters
668 ----------
669 data : pl.DataFrame
670 DataFrame containing time, task, and mark columns.
672 Returns
673 -------
674 list[model.Stim]
675 List of Stim objects, one for each unique task/stimulus type.
677 Notes
678 -----
679 - In case of event-marked tasks, mark is 1 for the event and task contains the task number.
680 Event 0Z marks zeroing to baseline.
681 - In the output, task name is a string, Z for zeroing and the task number for others.
682 - Event-marked operation allows task 0 to be used as a normal event, whereas the .csv file
683 saved by the labnirs software doesn't contain information for task 0; the timings are also
684 different: timings in the .csv are 1 sample later than in the .txt.
685 - I'm uncertain how tasks are marked in other modus operandi, e.g. when tasks are generated
686 by the labnirs software.
687 - LabNIRS also stores stim information in a .csv file (not exported), which includes duration,
688 pre-rest and post-rest periods. For now, reading that file is not supported. This function
689 only extracts event onsets from the .txt file.
690 """
691 log.info("Extracting stimulus information from data")
692 task_df = (
693 data.lazy()
694 .select(["time", "task", "mark"])
695 .filter(pl.col("mark") != "0")
696 .with_columns(
697 pl.when(pl.col("mark") == "0Z")
698 .then(pl.lit("Z"))
699 .otherwise(pl.col("task").cast(pl.String))
700 .alias("task_name"),
701 )
702 .select(["time", "task_name"])
703 .collect()
704 )
705 log.debug(
706 "Extracted task dataframe has %d rows and %d columns",
707 task_df.shape[0],
708 task_df.shape[1],
709 )
710 stims = [
711 model.Stim(
712 name=task,
713 data=task_df["time"].filter(task_df["task_name"] == task).to_numpy(),
714 )
715 for task in task_df["task_name"].unique().sort()
716 ]
717 log.debug("Found %d stimulus types", len(stims))
718 for stim in stims:
719 log.debug("Stimulus type '%s' has %d events", stim.name, len(stim.data))
720 return stims
723def _match_line(pattern: str, lines: list[str]) -> dict[str, str]:
724 """
725 Match a regexp pattern against each line until a match is found.
727 Parameters
728 ----------
729 pattern : str
730 Regular expression pattern with named capture groups.
731 lines : list[str]
732 List of lines to search through.
734 Returns
735 -------
736 dict[str, str]
737 Dictionary of matched groups (empty if no match found).
738 """
739 log.debug("Matching pattern '%s' against header lines", pattern)
740 pat = re.compile(pattern)
741 for line in lines:
742 m = pat.match(line)
743 if m is not None:
744 log.debug("Found pattern in line: %s", line.strip())
745 return m.groupdict()
746 log.debug("Pattern not found in header")
747 return dict()
750def _read_header(data_file: Path) -> list[str]:
751 """
752 Read header lines from a LabNIRS file.
754 Parameters
755 ----------
756 data_file : Path
757 Path to the LabNIRS data file.
759 Returns
760 -------
761 list[str]
762 List of header lines (35 lines expected).
764 Raises
765 ------
766 LabNirsReadError
767 If an error occurs while reading the file or if header format is invalid.
768 """
769 log.info("Reading header lines from file %s", data_file)
770 try:
771 with open(data_file, encoding="ASCII") as f:
772 header = [f.readline() for _ in range(DATA_START_LINE - 1)]
773 log.debug(
774 "Read header lines: requested %d, read %d lines",
775 DATA_START_LINE - 1,
776 len(header),
777 )
778 except Exception as e:
779 log.exception("Error reading the header of %s: %s", data_file, e)
780 raise LabNirsReadError(f"Error reading the header of {data_file}") from e
781 _verify_header_format(header)
783 return header
786def _verify_header_format(header: list[str]) -> None:
787 """
788 Verify that the header conforms to expected LabNIRS format.
790 Parameters
791 ----------
792 header : list[str]
793 List of header lines to verify.
795 Raises
796 ------
797 LabNirsReadError
798 If critical format errors are found (invalid top line or missing channel pairs).
800 Notes
801 -----
802 - Critical errors (top line format, channel pairs) raise exceptions
803 - Non-critical issues (version, metadata fields) only log warnings
804 """
806 log.info("Verifying header format with %d lines", len(header))
808 # Critical errors
809 # Check exact top line format
810 log.debug("Checking for critical header format errors")
811 if re.match(LINE_PATTERNS["top_line"], header[0]) is None:
812 raise LabNirsReadError(
813 f"Critical header format error: invalid top line in header: {header[0].strip()}",
814 )
815 # Channel pairs are on line 33
816 if re.match(LINE_PATTERNS["channel_pairs"], header[32]) is None:
817 raise LabNirsReadError(
818 f"Critical header format error: channel pairs not found in line 33: {header[32].strip()}. "
819 "Expected format: (source,detector)(source,detector)...",
820 )
822 # Non-critical warnings (may produce errors later)
823 # Version number and header type should be "11.0"
824 if re.match(LINE_PATTERNS["version"], header[2]) is None:
825 log.warning(
826 "Version number in line 3 must be '11.0'. Current: %s. Errors may occur.",
827 header[2].strip(),
828 )
829 if re.match(LINE_PATTERNS["headertype"], header[3]) is None:
830 log.warning(
831 "HeaderType in line 4 must be '11.0/11.0'. Current: %s. Errors may occur.",
832 header[3].strip(),
833 )
834 if re.match(LINE_PATTERNS["id"], header[2]) is None:
835 log.warning("Missing ID metadata in line 3: %s", header[2].strip())
836 if re.match(LINE_PATTERNS["measurement_datetime"], header[1]) is None:
837 log.warning(
838 "Missing measurement datetime metadata in line 2: %s",
839 header[1].strip(),
840 )
841 if re.match(LINE_PATTERNS["name"], header[3]) is None:
842 log.warning("Missing subject name metadata in line 4: %s", header[3].strip())
843 if re.match(LINE_PATTERNS["comment"], header[4]) is None:
844 log.warning("Missing comment metadata in line 5: %s", header[4].strip())
846 log.debug("Header format verification completed")