Coverage for labnirs2snirf / labnirs.py: 100%

181 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-28 06:02 +0000

1""" 

2Functions related to reading data from LabNIRS files. 

3""" 

4 

5import logging 

6import re 

7from collections.abc import Collection 

8from pathlib import Path 

9from typing import Final 

10 

11import numpy as np 

12import polars as pl 

13 

14from . import model 

15from .error import Labnirs2SnirfError 

16 

17log = logging.getLogger(__name__) 

18 

19 

20class LabNirsReadError(Labnirs2SnirfError): 

21 """Custom error class for LabNIRS reading errors.""" 

22 

23 

24# Constants 

25 

26# (tab-separated) data table starts on line 36 (counting from 1) 

27DATA_START_LINE: Final[int] = 36 

28 

29# regexp patterns to extract fields from the header & to verify correct format 

30LINE_PATTERNS: Final[dict[str, str]] = { 

31 "top_line": rf"^ \[File Information\]\s*\[Data Line\]\t(?P<data_start_line>{DATA_START_LINE})\s*$", 

32 "version": r"^[^\t]*\t[^\t]*\tVersion\t11\.0$", 

33 "headertype": r"^[^\t]*\t[^\t]*\t\[HeaderType\]\t11\.0/11\.0$", 

34 "id": r"^ID\t(?P<id>[^\t]*)\t.*$", 

35 "measurement_datetime": r"^Measured Date\t(?P<date>[\d/]+) (?P<time>[\d:]+)\s*$", 

36 "name": r"^Name\t(?P<name>[^\t]*)\t.*$", 

37 "comment": r"^Comment\t(?P<comment>.*)$", 

38 "channel_pairs": r"^(?P<channel_pairs>(?>\(\d+,\d+\))+)$", 

39} 

40 

41 

42def read_labnirs( 

43 data_file: Path, 

44 keep_category: str = "all", 

45 drop_subtype: Collection[str] | None = None, 

46) -> model.Nirs: 

47 """ 

48 Read and process a LabNIRS data file and returns a NIRS data model. 

49 

50 Parameters 

51 ---------- 

52 data_file : Path 

53 Path to the LabNIRS data file. 

54 File is expected to be in the format exported by the LabNIRS software, 

55 with 35 lines of header and a version number/header type of 11.0. 

56 keep_category : "hb"| "raw" | "all", default="all" 

57 Data category to include in the output. "Raw" means raw voltage data, 

58 "hb" means haemoglobin data. If "all", both categories are included. 

59 drop_subtype : Collection[str] | None, default=None 

60 Set or list of data types and/or wavelengths to drop from the data. 

61 Hb data types are: "hbr", "hbo" and "hbt". 

62 Wavelengths should be convertible to integer. 

63 All included if None. 

64 

65 Returns 

66 ------- 

67 model.Nirs 

68 A NIRS object containing most data required by the SNIRF specification. 

69 

70 Notes 

71 ----- 

72 This function reads experiment data and metadata from .txt files exported by 

73 the LabNIRS software. It expects a 35-line header with specific formatting 

74 (and version 11.0). Only the top line (containing header length), the 

75 presence of channel pairs on line 33, and the presence of data columns are 

76 enforced. Other validation failures only raise a warning, but errors may 

77 still occur. 

78 

79 LabNIRS can export files with both raw and Hb data, depending on the options 

80 selected. The ``keep_category`` parameter controls which of the two is 

81 retained in the output. The ``drop_subtype`` parameter can be used to 

82 further exclude specific wavelengths or Hb data types from the output. 

83 

84 By default, all data is included, which may not be desirable when the goal 

85 is to import the .snirf file to a NIRS analyser tool such as MNE, as these 

86 tools may not support files with both raw and Hb data present, may not need 

87 HbT, or may not be able to handle more than 2 wavelengths. For MNE, for 

88 example, it would be sensible to either include ``raw`` and drop one 

89 wavelength of the 3, or to include ``hb`` and drop ``HbT``. 

90 

91 For reasons of compatibility with other software, a list of wavelengths is 

92 preserved even for Hb data. Dropped wavelengths are not included in the 

93 list. For Hb data, the wavelength indices are set to 0 for each data 

94 channel. NB that this is an invalid index. 

95 

96 Since the labNIRS files don't store coordinates, probe positions are all set 

97 to (0, 0, 0). Positions can be read from file using the ``--layout`` option. 

98 Probe labels are based on actual numbers in the source file, however, 

99 the position matrices are contiguous and skip over any missing probe numbers. 

100 E.g. if there are sources 1 and 3, then the source position matrix will have 

101 2 rows, with source 1 at index 0 and source 3 at index 1, and the labels 

102 will be S1 and S3 respectively. 

103 """ 

104 

105 ########################### 

106 # Validate input parameters 

107 ########################### 

108 

109 log.info("Validating input parameters") 

110 log.debug( 

111 "Parameters: data_file=%s, keep_category=%s, drop_subtype=%s", 

112 data_file, 

113 keep_category, 

114 drop_subtype, 

115 ) 

116 if not isinstance(keep_category, str): 

117 raise LabNirsReadError("Invalid parameters: 'keep_category' must be a string.") 

118 keep_category = keep_category.lower() 

119 if keep_category not in ("hb", "raw", "all"): 

120 raise LabNirsReadError( 

121 f"Invalid parameters: 'keep_category': must be one of 'hb', 'raw', or 'all', got {keep_category}.", 

122 ) 

123 if drop_subtype is not None: 

124 if not ( 

125 isinstance(drop_subtype, Collection) 

126 and all(isinstance(x, str) for x in drop_subtype) 

127 ): 

128 raise LabNirsReadError( 

129 "Invalid parameters: 'drop_subtype' must be a collection of strings or None.", 

130 ) 

131 drop_subtype = {x.lower() for x in drop_subtype} 

132 if not all(x in {"hbo", "hbr", "hbt"} or x.isdigit() for x in drop_subtype): 

133 raise LabNirsReadError( 

134 "Invalid parameters: 'drop_subtype' can only contain 'hbo', 'hbr', 'hbt', or wavelength integers.", 

135 ) 

136 if not data_file.exists(): 

137 log.error("Data file not found: %s", data_file) 

138 raise LabNirsReadError(f"Data file not found: {data_file}") 

139 

140 ########################## 

141 # Read & verify the header 

142 ########################## 

143 

144 log.info("Reading and validating header") 

145 header = _read_header(data_file) 

146 

147 ######################### 

148 # Parse channels & probes 

149 ######################### 

150 

151 log.info("Parsing channel pairs and probe information") 

152 

153 # parse channel pairs 

154 channels = ( 

155 pl.DataFrame( 

156 data=[ 

157 (int(x), int(y)) for x, y in re.findall(r"\((\d+),(\d+)\)", header[32]) 

158 ], 

159 schema=[("source", pl.UInt32), ("detector", pl.UInt32)], 

160 orient="row", 

161 ) 

162 .with_row_index(name="channel", offset=1) 

163 # add probe indices; order is closest to probe numbers in file; missing probes are skipped over 

164 .with_columns( 

165 pl.col("source").rank(method="dense").alias("source_index"), 

166 pl.col("detector").rank(method="dense").alias("detector_index"), 

167 ) 

168 ) 

169 log.debug( 

170 "Channel pairs: %s", 

171 [ 

172 f"{row['source']}-{row['detector']}" 

173 for row in channels.iter_rows(named=True) 

174 ], 

175 ) 

176 

177 # Extract source and detector indices, add labels (Si, Di) 

178 sources = ( 

179 channels.select( 

180 pl.col("source").alias("number"), 

181 pl.col("source_index").alias("index"), 

182 ) 

183 .unique("number") 

184 .with_columns( 

185 # pl.lit("source").alias("type").cast(pl.Categorical()), 

186 pl.concat_str(pl.lit("S"), pl.col("number")).alias("label"), 

187 ) 

188 .drop("number") 

189 .sort("index") 

190 ) 

191 log.debug("Sources: %s", sources["label"].to_list()) 

192 

193 detectors = ( 

194 channels.select( 

195 pl.col("detector").alias("number"), 

196 pl.col("detector_index").alias("index"), 

197 ) 

198 .unique("number") 

199 .with_columns( 

200 # pl.lit("detector").alias("type").cast(pl.Categorical()), 

201 pl.concat_str(pl.lit("D"), pl.col("number")).alias("label"), 

202 ) 

203 .drop("number") 

204 .sort("index") 

205 ) 

206 log.debug("Detectors: %s", detectors["label"].to_list()) 

207 

208 log.info( 

209 "Found %d channels, %d sources, %d detectors", 

210 len(channels), 

211 len(sources), 

212 len(detectors), 

213 ) 

214 

215 ####################### 

216 # Parse column metadata 

217 ####################### 

218 

219 log.info("Parsing column metadata and data structure") 

220 

221 # parse and transform column names to conform with naming in model 

222 column_names_line1 = ( 

223 header[33] 

224 .lower() 

225 .replace(" ", "") 

226 .replace("ch-", "") 

227 .replace("\n", "") 

228 .split("\t") 

229 ) 

230 column_names_line2 = ( 

231 header[34] 

232 .lower() 

233 .replace(" ", "") 

234 .replace("time(sec)", "time") 

235 .replace("deoxyhb", "hbr") 

236 .replace("oxyhb", "hbo") 

237 .replace("totalhb", "hbt") 

238 .replace("abs", "") 

239 .replace("nm", "") 

240 .replace("\n", "") 

241 .split("\t") 

242 ) 

243 

244 # name, type, etc. information about all data columns in the experiment file 

245 columns = ( 

246 pl.DataFrame( 

247 data=[ 

248 [ 

249 # column name (e.g. time, mark, 1-hbr, 2-870) 

250 f"{int(a)}-{b}" if a else b, 

251 # channel number (1, 2, ...), None for non-channel metadata like time 

252 int(a) if a.isdigit() else None, 

253 # data category (meta, raw (voltage), hb) 

254 "meta" if a == "" else "raw" if b.isdigit() else "hb", 

255 # subtype (hbr, hbo, hbt, wavelength) 

256 b if a != "" else None, 

257 # wavelength as string (e.g. 870, 830), None for non-wavelength or meta columns 

258 # b if b.isdigit() else None, 

259 # wavelength as integer (e.g. 870, 830), None for non-wavelength or meta columns 

260 int(b) if b.isdigit() else None, 

261 ] 

262 for a, b in zip(column_names_line1, column_names_line2) 

263 ], 

264 schema=pl.Schema( 

265 [ 

266 ("name", pl.String), 

267 ("channel", pl.Int32), 

268 ("category", pl.Enum(["meta", "raw", "hb"])), 

269 ("subtype", pl.Categorical()), 

270 # ("wavelength_str", pl.String), 

271 ("wavelength", pl.UInt32), 

272 ], 

273 ), 

274 orient="row", 

275 ) 

276 # index required later for excluding dropped columns from data table 

277 .with_row_index(name="column") 

278 # join with source and detector indexes 

279 .join( 

280 channels.select(["channel", "source_index", "detector_index"]), 

281 on="channel", 

282 how="left", 

283 ) 

284 ) 

285 

286 column_names = columns["name"].to_list() 

287 

288 log.debug("Parsed %d columns: %s", len(columns), column_names) 

289 

290 # Drop user-specified columns based on subtype (e.g. hbo, hbr, a specific wavelength, etc.). 

291 # This needs to happen before the list of wavelengths is extracted, as dropped wavelengths are 

292 # not to be included. Things might break if all wavelengths are dropped, but that's up to the 

293 # user to decide... 

294 if drop_subtype is not None and len(drop_subtype) > 0: 

295 log.info("Dropping columns based on subtype filter: %s", drop_subtype) 

296 initial_count = len(columns) 

297 columns = columns.filter( 

298 # parentheses are necessary otherwise Polars thinks that "meta" is a column name 

299 (pl.col("category") == "meta") | (~pl.col("subtype").is_in(drop_subtype)), 

300 ) 

301 log.debug( 

302 "Dropped %d columns based on subtype filter %s. Remaining columns after filtering: %s", 

303 initial_count - len(columns), 

304 drop_subtype, 

305 columns.to_dict(as_series=False), 

306 ) 

307 else: 

308 log.info("No column subtypes specified for dropping") 

309 

310 # Extract set of remaining unique wavelengths, create wavelength indices 

311 log.info("Extracting unique wavelengths from data columns") 

312 wavelengths = ( 

313 columns.select("wavelength") 

314 .drop_nulls() 

315 .unique() 

316 .sort("wavelength") 

317 .with_row_index(name="wavelength_index", offset=1) 

318 ) 

319 

320 if wavelengths.height == 0: 

321 log.debug( 

322 "No wavelengths found in data columns, creating dummy wavelength entry", 

323 ) 

324 wavelengths = pl.DataFrame({"wavelength": [0], "wavelength_index": [1]}) 

325 

326 wavelength_list = wavelengths["wavelength"].to_list() 

327 log.debug("Identified %d wavelengths: %s nm", len(wavelength_list), wavelength_list) 

328 log.debug("Wavelength mapping: %s", wavelengths.to_dict(as_series=False)) 

329 

330 # Add wavelength indices and data types to columns 

331 log.info("Adding wavelength indices and data types to column metadata") 

332 columns = columns.join(wavelengths, on="wavelength", how="left").with_columns( 

333 # Hb data also needs a wavelength index, even if meaningless, so assign 0 to those rows 

334 pl.when(pl.col("category") == "hb") 

335 .then(pl.col("wavelength_index").replace(None, 0)) 

336 .otherwise(pl.col("wavelength_index")) 

337 .alias("wavelength_index"), 

338 # continuous wave datatype = 1, processed = 99999 according to SNIRF specifications 

339 pl.when(pl.col("category") == "hb") 

340 .then(pl.lit(99999)) 

341 .when(pl.col("category") == "raw") 

342 .then(pl.lit(1)) 

343 .otherwise(None) 

344 .alias("datatype"), 

345 ) 

346 

347 # Finally, if the user only wants to keep one data category, keep only that and meta columns. 

348 # Discarding "raw" destroys wavelength information, that's why it had to be extracted earlier. 

349 if keep_category != "all": 

350 log.info("Filtering to keep only '%s' data category", keep_category) 

351 initial_count = len(columns) 

352 columns = columns.filter(pl.col("category").is_in(["meta", keep_category])) 

353 log.debug( 

354 "Filtered to keep only '%s' and required meta categories: %d columns retained from %d", 

355 keep_category, 

356 len(columns), 

357 initial_count, 

358 ) 

359 else: 

360 log.info("Keeping all data categories (keep_category=%s)", keep_category) 

361 log.debug("Dropping channel and wavelength columns") 

362 columns = columns.drop(["channel", "wavelength"]) 

363 log.debug("Final columns: %s", columns["name"].to_list()) 

364 

365 ############################### 

366 # Read experiment data from CSV 

367 ############################### 

368 

369 log.info("Reading experiment data from file") 

370 

371 # read the data table from the experiment file, formatted as CSV 

372 # keep only time, task, mark, and selected data columns 

373 data_table = ( 

374 pl.scan_csv( 

375 data_file, 

376 has_header=False, 

377 skip_lines=DATA_START_LINE - 1, 

378 separator="\t", 

379 schema=pl.Schema( 

380 zip( 

381 column_names, 

382 [pl.String] * len(column_names), 

383 ), 

384 ), 

385 ) 

386 # select only needed columns 

387 .select(columns["name"].to_list()) 

388 # drop count metadata 

389 .drop("count") 

390 # remove whitespace around values 

391 .select(pl.col(pl.String).str.strip_chars()) 

392 # convert mark to enum, task to uint, and the rest to float 

393 .cast({"mark": pl.Enum(["0Z", "0", "1"]), "task": pl.UInt32}) 

394 .cast( 

395 {pl.String: pl.Float64}, 

396 ) 

397 # scan_csv is lazy, need to collect 

398 .collect() 

399 ) 

400 

401 log.info( 

402 "Successfully read data table with %d rows and %d columns", 

403 len(data_table), 

404 len(data_table.columns), 

405 ) 

406 

407 ########################################### 

408 # Extract information needed for NIRS model 

409 ########################################### 

410 

411 log.info("Extracting metadata, data, stimuli, and probe information") 

412 

413 return model.Nirs( 

414 metadata=_extract_metadata(header), 

415 data=[_extract_data(data_table, columns)], 

416 stim=_extract_stims(data_table), 

417 probe=_extract_probes(sources, detectors, wavelengths), 

418 ) 

419 

420 

421def read_probe_pairs(data_file: Path) -> str: # noqa: F841 

422 """ 

423 Read the header line containing probe pairs. 

424 

425 Parameters 

426 ---------- 

427 data_file : Path 

428 Path to the LabNIRS data file. 

429 File is expected to be in the format exported by the LabNIRS software, 

430 with 35 lines of header and a version number/header type of 11.0. 

431 

432 Returns 

433 ------- 

434 str 

435 String containing probe pairs without leading and trailing whitespace. 

436 For example: "(1,1)(2,1)...". 

437 """ 

438 log.info("Reading probe pairs from file: %s", data_file) 

439 if not data_file.exists(): 

440 raise LabNirsReadError(f"Data file not found: {data_file}") 

441 

442 header = _read_header(data_file) 

443 pairs_str = header[32].strip() 

444 log.debug("Found probe pairs string: %s", pairs_str) 

445 return pairs_str 

446 

447 

448def _extract_data(data: pl.DataFrame, columns: pl.DataFrame) -> model.Data: 

449 """ 

450 Compile data into a model.Data object. 

451 

452 Parameters 

453 ---------- 

454 data : pl.DataFrame 

455 DataFrame containing experimental time series data. 

456 columns : pl.DataFrame 

457 DataFrame with column metadata including category, subtype, source/detector indices. 

458 

459 Returns 

460 ------- 

461 model.Data 

462 Data object containing time, data time series, and measurement list. 

463 

464 Raises 

465 ------ 

466 LabNirsReadError 

467 If no data columns are found after filtering. 

468 """ 

469 

470 def get_label(subtype: str) -> str | None: 

471 """ 

472 Map subtype to data type label. 

473 

474 Parameters 

475 ---------- 

476 subtype : str 

477 Data subtype identifier (e.g., "hbo", "hbr", "hbt"). 

478 

479 Returns 

480 ------- 

481 str or None 

482 Corresponding label ("HbO", "HbR", "HbT") or None if no match. 

483 """ 

484 match subtype: 

485 case "hbo": 

486 return "HbO" 

487 case "hbr": 

488 return "HbR" 

489 case "hbt": 

490 return "HbT" 

491 case _: 

492 return None 

493 

494 log.info("Extracting experimental data") 

495 measurementList = [ 

496 model.Measurement( 

497 sourceIndex=row["source_index"], 

498 detectorIndex=row["detector_index"], 

499 dataType=row["datatype"], 

500 dataTypeIndex=0, 

501 dataTypeLabel=(get_label(row["subtype"])), 

502 wavelengthIndex=row["wavelength_index"], 

503 ) 

504 for row in columns.rows(named=True) 

505 if row["category"] != "meta" 

506 ] 

507 data_columns = columns.filter(pl.col("category") != "meta")["name"].to_list() 

508 if len(data_columns) == 0: 

509 raise LabNirsReadError( 

510 "No data columns found after filtering; cannot extract data.", 

511 ) 

512 extracted_data = model.Data( 

513 time=data["time"].to_numpy(), 

514 dataTimeSeries=data.select(data_columns).to_numpy(), 

515 measurementList=measurementList, 

516 ) 

517 log.debug( 

518 "Extracted data has %d time points (range %.3f - %.3f), %d data channels, and %d MeasurementList entries", 

519 len(extracted_data.time), 

520 extracted_data.time[0], 

521 extracted_data.time[-1], 

522 extracted_data.dataTimeSeries.shape[1], 

523 len(extracted_data.measurementList), 

524 ) 

525 log.debug( 

526 "Unique data type labels: %s, wavelength indices: %s", 

527 {m.dataTypeLabel for m in extracted_data.measurementList}, 

528 {m.wavelengthIndex for m in extracted_data.measurementList}, 

529 ) 

530 return extracted_data 

531 

532 

533def _extract_probes( 

534 sources: pl.DataFrame, 

535 detectors: pl.DataFrame, 

536 wavelengths: pl.DataFrame, 

537) -> model.Probe: 

538 """ 

539 Compile probe information into a model.Probe object. 

540 

541 Parameters 

542 ---------- 

543 sources : pl.DataFrame 

544 DataFrame with source indices and labels. 

545 detectors : pl.DataFrame 

546 DataFrame with detector indices and labels. 

547 wavelengths : pl.DataFrame 

548 DataFrame with wavelength values. 

549 

550 Returns 

551 ------- 

552 model.Probe 

553 Probe object with wavelengths, positions (initialized to zero), and labels. 

554 

555 Raises 

556 ------ 

557 LabNirsReadError 

558 If any of the input dataframes are empty. 

559 

560 Notes 

561 ----- 

562 - All positions are set to 0. Locations can be read from file or guessed elsewhere. 

563 - Probe labels are set according to Si and Di (source, detector respectively), 

564 where the numbers are the same as the probe numbers in the labNIRS file. 

565 - Position matrices skip over missing probe numbers, so make sure you use the 

566 labels to associate actual positions with probes. 

567 """ 

568 log.info("Extracting probe information") 

569 if wavelengths.height == 0 or sources.height == 0 or detectors.height == 0: 

570 raise LabNirsReadError( 

571 "Cannot extract probe information: wavelength, source, or detector list is empty.", 

572 ) 

573 probe = model.Probe( 

574 wavelengths=wavelengths["wavelength"].to_numpy().astype(np.float64), 

575 sourcePos3D=np.zeros((sources.height, 3), dtype=np.float64), 

576 detectorPos3D=np.zeros((detectors.height, 3), dtype=np.float64), 

577 sourceLabels=sources["label"].to_list(), 

578 detectorLabels=detectors["label"].to_list(), 

579 ) 

580 log.debug( 

581 "Extracted probe information: %d wavelengths, %d sources, and %d detectors, %d source labels, %d detector labels", 

582 len(probe.wavelengths), 

583 probe.sourcePos3D.shape[0], 

584 probe.detectorPos3D.shape[0], 

585 len(probe.sourceLabels) if probe.sourceLabels is not None else 0, 

586 len(probe.detectorLabels) if probe.detectorLabels is not None else 0, 

587 ) 

588 return probe 

589 

590 

591def _extract_metadata(header: list[str]) -> model.Metadata: 

592 """ 

593 Compile metadata into a model.Metadata object. 

594 

595 Parameters 

596 ---------- 

597 header : list[str] 

598 List of header lines from the LabNIRS file. 

599 

600 Returns 

601 ------- 

602 model.Metadata 

603 Metadata object with subject ID, measurement date/time, and additional fields. 

604 

605 Raises 

606 ------ 

607 LabNirsReadError 

608 If date or time format in header is invalid. 

609 

610 Notes 

611 ----- 

612 - Additional patient and study metadata are also stored in a .pat file, 

613 which is not exported by labNIRS. For now, reading this file is not supported. 

614 """ 

615 # extract snirf metadata fields from the header 

616 log.info("Extracting metadata from header") 

617 # ID may be missing, in which case return empty string 

618 subject_id = _match_line(LINE_PATTERNS["id"], header).get("id", "") 

619 measurement_datetime = _match_line(LINE_PATTERNS["measurement_datetime"], header) 

620 date = measurement_datetime["date"].split("/") 

621 if len(date) != 3: 

622 raise LabNirsReadError( 

623 f"Invalid measurement date format in header: {measurement_datetime['date']}", 

624 ) 

625 measurement_date = f"{date[0]}-{date[1]:>02}-{date[2]:>02}" 

626 time = measurement_datetime["time"].split(":") 

627 if len(time) != 3: 

628 raise LabNirsReadError( 

629 f"Invalid measurement time format in header: {measurement_datetime['time']}", 

630 ) 

631 measurement_time = f"{time[0]:>02}:{time[1]:>02}:{time[2]:>02}" 

632 additional_fields = dict() 

633 if ( 

634 len(subject_name := _match_line(LINE_PATTERNS["name"], header).get("name", "")) 

635 > 0 

636 ): 

637 additional_fields["SubjectName"] = subject_name 

638 if ( 

639 len(comment := _match_line(LINE_PATTERNS["comment"], header).get("comment", "")) 

640 > 0 

641 ): 

642 additional_fields["comment"] = comment 

643 metadata = model.Metadata( 

644 SubjectID=subject_id, 

645 MeasurementDate=measurement_date, 

646 MeasurementTime=measurement_time, 

647 additional_fields=additional_fields, 

648 ) 

649 log.debug( 

650 "Extracted metadata has subject ID: %s, has date: %s, has time: %s, and has additional fields: %s", 

651 metadata.SubjectID is not None and metadata.SubjectID != "", 

652 metadata.MeasurementDate is not None and metadata.MeasurementDate != "", 

653 metadata.MeasurementTime is not None and metadata.MeasurementTime != "", 

654 ( 

655 metadata.additional_fields.keys() 

656 if len(metadata.additional_fields) > 0 

657 else "none" 

658 ), 

659 ) 

660 return metadata 

661 

662 

663def _extract_stims(data: pl.DataFrame) -> list[model.Stim]: 

664 """ 

665 Extract stimulus information into a list of model.Stim objects. 

666 

667 Parameters 

668 ---------- 

669 data : pl.DataFrame 

670 DataFrame containing time, task, and mark columns. 

671 

672 Returns 

673 ------- 

674 list[model.Stim] 

675 List of Stim objects, one for each unique task/stimulus type. 

676 

677 Notes 

678 ----- 

679 - In case of event-marked tasks, mark is 1 for the event and task contains the task number. 

680 Event 0Z marks zeroing to baseline. 

681 - In the output, task name is a string, Z for zeroing and the task number for others. 

682 - Event-marked operation allows task 0 to be used as a normal event, whereas the .csv file 

683 saved by the labnirs software doesn't contain information for task 0; the timings are also 

684 different: timings in the .csv are 1 sample later than in the .txt. 

685 - I'm uncertain how tasks are marked in other modus operandi, e.g. when tasks are generated 

686 by the labnirs software. 

687 - LabNIRS also stores stim information in a .csv file (not exported), which includes duration, 

688 pre-rest and post-rest periods. For now, reading that file is not supported. This function 

689 only extracts event onsets from the .txt file. 

690 """ 

691 log.info("Extracting stimulus information from data") 

692 task_df = ( 

693 data.lazy() 

694 .select(["time", "task", "mark"]) 

695 .filter(pl.col("mark") != "0") 

696 .with_columns( 

697 pl.when(pl.col("mark") == "0Z") 

698 .then(pl.lit("Z")) 

699 .otherwise(pl.col("task").cast(pl.String)) 

700 .alias("task_name"), 

701 ) 

702 .select(["time", "task_name"]) 

703 .collect() 

704 ) 

705 log.debug( 

706 "Extracted task dataframe has %d rows and %d columns", 

707 task_df.shape[0], 

708 task_df.shape[1], 

709 ) 

710 stims = [ 

711 model.Stim( 

712 name=task, 

713 data=task_df["time"].filter(task_df["task_name"] == task).to_numpy(), 

714 ) 

715 for task in task_df["task_name"].unique().sort() 

716 ] 

717 log.debug("Found %d stimulus types", len(stims)) 

718 for stim in stims: 

719 log.debug("Stimulus type '%s' has %d events", stim.name, len(stim.data)) 

720 return stims 

721 

722 

723def _match_line(pattern: str, lines: list[str]) -> dict[str, str]: 

724 """ 

725 Match a regexp pattern against each line until a match is found. 

726 

727 Parameters 

728 ---------- 

729 pattern : str 

730 Regular expression pattern with named capture groups. 

731 lines : list[str] 

732 List of lines to search through. 

733 

734 Returns 

735 ------- 

736 dict[str, str] 

737 Dictionary of matched groups (empty if no match found). 

738 """ 

739 log.debug("Matching pattern '%s' against header lines", pattern) 

740 pat = re.compile(pattern) 

741 for line in lines: 

742 m = pat.match(line) 

743 if m is not None: 

744 log.debug("Found pattern in line: %s", line.strip()) 

745 return m.groupdict() 

746 log.debug("Pattern not found in header") 

747 return dict() 

748 

749 

750def _read_header(data_file: Path) -> list[str]: 

751 """ 

752 Read header lines from a LabNIRS file. 

753 

754 Parameters 

755 ---------- 

756 data_file : Path 

757 Path to the LabNIRS data file. 

758 

759 Returns 

760 ------- 

761 list[str] 

762 List of header lines (35 lines expected). 

763 

764 Raises 

765 ------ 

766 LabNirsReadError 

767 If an error occurs while reading the file or if header format is invalid. 

768 """ 

769 log.info("Reading header lines from file %s", data_file) 

770 try: 

771 with open(data_file, encoding="ASCII") as f: 

772 header = [f.readline() for _ in range(DATA_START_LINE - 1)] 

773 log.debug( 

774 "Read header lines: requested %d, read %d lines", 

775 DATA_START_LINE - 1, 

776 len(header), 

777 ) 

778 except Exception as e: 

779 log.exception("Error reading the header of %s: %s", data_file, e) 

780 raise LabNirsReadError(f"Error reading the header of {data_file}") from e 

781 _verify_header_format(header) 

782 

783 return header 

784 

785 

786def _verify_header_format(header: list[str]) -> None: 

787 """ 

788 Verify that the header conforms to expected LabNIRS format. 

789 

790 Parameters 

791 ---------- 

792 header : list[str] 

793 List of header lines to verify. 

794 

795 Raises 

796 ------ 

797 LabNirsReadError 

798 If critical format errors are found (invalid top line or missing channel pairs). 

799 

800 Notes 

801 ----- 

802 - Critical errors (top line format, channel pairs) raise exceptions 

803 - Non-critical issues (version, metadata fields) only log warnings 

804 """ 

805 

806 log.info("Verifying header format with %d lines", len(header)) 

807 

808 # Critical errors 

809 # Check exact top line format 

810 log.debug("Checking for critical header format errors") 

811 if re.match(LINE_PATTERNS["top_line"], header[0]) is None: 

812 raise LabNirsReadError( 

813 f"Critical header format error: invalid top line in header: {header[0].strip()}", 

814 ) 

815 # Channel pairs are on line 33 

816 if re.match(LINE_PATTERNS["channel_pairs"], header[32]) is None: 

817 raise LabNirsReadError( 

818 f"Critical header format error: channel pairs not found in line 33: {header[32].strip()}. " 

819 "Expected format: (source,detector)(source,detector)...", 

820 ) 

821 

822 # Non-critical warnings (may produce errors later) 

823 # Version number and header type should be "11.0" 

824 if re.match(LINE_PATTERNS["version"], header[2]) is None: 

825 log.warning( 

826 "Version number in line 3 must be '11.0'. Current: %s. Errors may occur.", 

827 header[2].strip(), 

828 ) 

829 if re.match(LINE_PATTERNS["headertype"], header[3]) is None: 

830 log.warning( 

831 "HeaderType in line 4 must be '11.0/11.0'. Current: %s. Errors may occur.", 

832 header[3].strip(), 

833 ) 

834 if re.match(LINE_PATTERNS["id"], header[2]) is None: 

835 log.warning("Missing ID metadata in line 3: %s", header[2].strip()) 

836 if re.match(LINE_PATTERNS["measurement_datetime"], header[1]) is None: 

837 log.warning( 

838 "Missing measurement datetime metadata in line 2: %s", 

839 header[1].strip(), 

840 ) 

841 if re.match(LINE_PATTERNS["name"], header[3]) is None: 

842 log.warning("Missing subject name metadata in line 4: %s", header[3].strip()) 

843 if re.match(LINE_PATTERNS["comment"], header[4]) is None: 

844 log.warning("Missing comment metadata in line 5: %s", header[4].strip()) 

845 

846 log.debug("Header format verification completed")