Parsing Functions¶
collect_metadata(filedicts)
¶
Helperfunction to collect the metadata from all reader files into a dataframe.
Source code in rda_toolbox/parser.py
def collect_metadata(filedicts: list[dict]) -> pd.DataFrame:
"""
Helperfunction to collect the metadata from all reader files into a dataframe.
"""
allmetadata_df = pd.DataFrame()
for filedict in filedicts:
meta_df = pd.DataFrame(filedict["metadata"], index=[0])
meta_df["Barcode"] = filedict["Barcode"]
allmetadata_df = pd.concat([allmetadata_df, meta_df], ignore_index=True)
return allmetadata_df
collect_results(filedicts, resultmatrix_header_mapping)
¶
Collect and merge results from the readerfiles.
Source code in rda_toolbox/parser.py
def collect_results(filedicts: list[dict], resultmatrix_header_mapping: dict[str, str]) -> pd.DataFrame:
"""
Collect and merge results from the readerfiles.
"""
allresults_df = pd.DataFrame(
{"Row": [], "Column": [], "Measurement": [], "Measurement Type": [], "Overflow": []}
) # , "Layout": [], "Concentration": []})
platetype_s = list(set(fd["plate_type"] for fd in filedicts))
if len(platetype_s) == 1:
platetype = platetype_s[0]
else:
raise Exception(f"Different plate types used {platetype_s}")
for filedict in filedicts:
for resultmatrix_header, resultmatrix_label in resultmatrix_header_mapping.items():
long_rawdata_df = pd.melt(
filedict[resultmatrix_header].reset_index(names="Row"),
id_vars=["Row"],
var_name="Column",
value_name="Measurement",
)
long_overflow_df = pd.melt(
filedict[f"Overflow {resultmatrix_header}"].reset_index(names="Row"),
id_vars=["Row"],
var_name="Column",
value_name="Overflow",
)
long_rawdata_df["Overflow"] = long_overflow_df["Overflow"].astype(bool).values
long_rawdata_df["Barcode"] = filedict["Barcode"]
long_rawdata_df["Measurement Type"] = resultmatrix_header_mapping[resultmatrix_header]
allresults_df = pd.concat([allresults_df, long_rawdata_df], axis=0)
platetype = filedict["plate_type"]
allresults_df.rename(
columns={"Row": f"Row_{platetype}", "Column": f"Col_{platetype}"}, inplace=True
)
return allresults_df.reset_index(drop=True)
filepaths_to_filedicts(filepaths, resulttable_headers=['Results'])
¶
Wrapper function to obtain a list of dictionaries which contain the raw files information like
- different entries of metadata
- Plate Type
- Barcode
- Date
- Time
- etc.
- Raw Optical Density (DataFrame)
- Concentration (DataFrame)
- Layout (DataFrame)
Source code in rda_toolbox/parser.py
def filepaths_to_filedicts(
filepaths: list[str], resulttable_headers: list[str] = ["Results"]
) -> list[dict]:
"""
Wrapper function to obtain a list of dictionaries which contain the raw files information like
- different entries of metadata
- Plate Type
- Barcode
- Date
- Time
- etc.
- Raw Optical Density (DataFrame)
- Concentration (DataFrame)
- Layout (DataFrame)
"""
filedicts = []
for path in filepaths:
try:
with open(path, encoding="utf-8", errors="ignore") as fh:
filedicts.append(
readerfile_parser(
basename(path), fh, resulttable_headers=resulttable_headers
)
)
except OSError as exc:
raise OSError(f"Failed to read {path!r}: {exc}") from exc
return filedicts
parse_mappingfile(filepath, motherplate_column='Origin Plate', childplate_column='AcD Barcode 384')
¶
Simple mappingfile parser function. Expects to start with a "Motherplate" line followed by corresponding "Childplates" in a single line.
Source code in rda_toolbox/parser.py
def parse_mappingfile(
filepath: str,
motherplate_column: str = "Origin Plate",
childplate_column: str = "AcD Barcode 384",
):
"""
Simple mappingfile parser function.
Expects to start with a "Motherplate" line followed by corresponding "Childplates" in a single line.
"""
filedict = dict()
with open(filepath) as file:
filecontents = file.read().splitlines()
key = None
for i, line in enumerate(filecontents):
line = line.split(";")
if i % 2 == 0: # if i is even (expect MPs on even lines, alternating with childplates)
# if len(line) == 1:
key = line[0]
else:
if not key:
raise ValueError(
"Motherplate barcode expected on first line."
)
if key in filedict:
filedict[key].append(line)
else:
filedict[key] = [line]
mapping_df = pd.DataFrame(
[
(motherplate, childplate, rep_num, rack_nr)
for motherplate, replicates in filedict.items()
for rep_num, childplates in enumerate(replicates, start=1)
for rack_nr, childplate in enumerate(childplates, start=1)
],
columns=[motherplate_column, childplate_column, "Replicate", "Rack"],
)
return mapping_df
parse_readerfiles(path, resultmatrix_header_mapping={'Results': 'Raw Optical Density'})
¶
Reads CytationC10 readerfiles (plain text files) and merges the results into two DataFrames (rawdata and metadata) which is returned. Wrapper for readerfiles_rawdf to keep backwards compatibility. Improves readerfiles_rawdf, provide a single path for convenience.
Source code in rda_toolbox/parser.py
def parse_readerfiles(
path: str | None, resultmatrix_header_mapping: dict[str, str] = {"Results": "Raw Optical Density"}
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Reads CytationC10 readerfiles (plain text files) and merges the results into
two DataFrames (rawdata and metadata) which is returned.
Wrapper for readerfiles_rawdf to keep backwards compatibility.
Improves readerfiles_rawdf, provide a single path for convenience.
"""
resulttable_headers = list(resultmatrix_header_mapping.keys())
if not path:
return pd.DataFrame(), pd.DataFrame()
paths = [
os.path.join(path, f)
for f in os.listdir(path)
if os.path.isfile(os.path.join(path, f))
]
df_raw = readerfiles_rawdf(paths, resultmatrix_header_mapping=resultmatrix_header_mapping)
df_raw["Col_384"] = df_raw["Col_384"].astype(int)
df_meta = readerfiles_metadf(paths, resulttable_headers=resulttable_headers)
return df_raw, df_meta
process_inputfile(file_object)
¶
Read Input excel file which should have the following columns
- Barcode
- Organism
- Row_384
- Col_384
- ID
Optional columns: - Concentration in mg/mL (or other units) - Cutoff
Source code in rda_toolbox/parser.py
def process_inputfile(file_object):
"""
Read Input excel file which should have the following columns:
- Barcode
- Organism
- Row_384
- Col_384
- ID
Optional columns:
- Concentration in mg/mL (or other units)
- Cutoff
"""
if not file_object:
return None
excel_file = pd.ExcelFile(file_object)
substance_df = pd.read_excel(excel_file, "substances")
layout_df = pd.read_excel(excel_file, "layout")
df = pd.merge(layout_df, substance_df, how="cross")
df["ID"] = df["ID"].astype(str)
return df
read_platemapping(filecontents, orig_barcodes)
¶
Reads a mappingfile generated by the barcode reader. We expect the mapping files to ALWAYS have a line with a single motherplate barcode followed by a single line with childplate barcode(s).
Source code in rda_toolbox/parser.py
def read_platemapping(filecontents: list, orig_barcodes: list[str]):
"""
Reads a mappingfile generated by the barcode reader.
We expect the mapping files to ALWAYS have a line with a single motherplate barcode followed by a single line with childplate barcode(s).
"""
filedict = dict()
origin_barcode: str | None = None
origin_replicates = []
for line_num, line in enumerate(filter(None, filecontents)):
line = line.split(";")
if line_num % 2 == 0 and len(line) == 1 and line[0] in orig_barcodes: # singular entry indicates origin barcode
origin_barcode = line[0]
origin_replicates.append(origin_barcode)
if origin_barcode not in filedict:
filedict.setdefault(origin_barcode, [])
if origin_barcode is None:
continue
if line_num % 2 == 1: # childplates are on odd line numbers
filedict[origin_barcode].append(line)
origin_barcode = None
replicates_dict = {i:origin_replicates.count(i) for i in origin_replicates}
if sorted(list(filedict.keys())) != sorted(orig_barcodes):
raise ValueError(
f"The origin barcodes from the mappingfile and MP barcodes in MIC_input.xlsx do not coincide."
)
return filedict, replicates_dict
readerfile_parser(filename, file_object, resulttable_headers=['Results'])
¶
Parser for files created by the BioTek Cytation C10 Confocal Imaging Reader.
Source code in rda_toolbox/parser.py
def readerfile_parser(
filename: str, file_object: IO[str], resulttable_headers: list[str] = ["Results"]
) -> dict:
"""
Parser for files created by the BioTek Cytation C10 Confocal Imaging Reader.
"""
lines = file_object.readlines()
lines = list(filter(None, map(lambda x: x.strip("\n").strip("\r"), lines)))
if len(lines) == 0:
raise ValueError(f"Empty raw file {filename}.")
# search the file for plate type definition and use it to derive number of rows and columns
found_plate_type = re.findall(r"Plate Type;[A-z ]*([0-9]*)", "".join(lines))
plate_type = 96 # define default plate type and let it be 96-well plate as this is what we started with
if found_plate_type:
plate_type = int(found_plate_type[0])
num_rows, num_columns = get_rows_cols(plate_type)
filedict = dict()
metadata = dict()
filedict["Reader Filename"] = filename
filedict["plate_type"] = plate_type
# TODO: get barcode via regex
barcode_found = re.findall(
r"\d{3}[A-Z][a-z]?[a-zA-Z]\d{2}\d{3}", filedict["Reader Filename"]
)
if not barcode_found:
filedict["Barcode"] = filedict["Reader Filename"]
else:
filedict["Barcode"] = barcode_found[0]
# filedict["Barcode"] = Path(filedict["Reader Filename"]).stem.split("_")[-1]
overflow_events = []
results = np.empty([num_rows, num_columns], dtype=float)
overflow_results = np.zeros([num_rows, num_columns], dtype=bool)
# using dtype=str results in unicode strings of length 1 ('U1'), therefore we use 'U25'
layout = np.empty([num_rows, num_columns], dtype="U25")
concentrations = np.empty([num_rows, num_columns], dtype=float)
metadata_regex = r";?([a-zA-Z0-9 \/]*)[;:]+([a-zA-Z0-9 \/\\:_.-]*),?"
line_num = 0
for resulttable_header in resulttable_headers:
while line_num < len(lines):
if lines[line_num] == resulttable_header:
line_num += 1
header = list(
map(int, lines[line_num].strip("\n").split(";")[1:])
) # get the header as a concrete list
index = [""] * num_rows
for _row_num in range(num_rows): # for the next num_rows, read result data
line_num += 1
res_line = lines[line_num].split(";")
# Split at ; and slice off rowlabel and excitation/emission value:
row_name = res_line[0]
index[_row_num] = row_name
parsed_row = []
overflow_row = []
for _col_idx, token in enumerate(res_line[1:-1]):
col_value = header[_col_idx] if _col_idx < len(header) else None
parsed_value = _safe_float(
token,
filename,
overflow_events=overflow_events,
table=resulttable_header, # TODO: Change "Raw Optical Density" to resulttable_header
row=row_name,
col=col_value,
)
parsed_row.append(parsed_value)
overflow_row.append(token.strip().upper() == "OVRFLW")
results[_row_num] = parsed_row
overflow_results[_row_num] = overflow_row
# Initialize DataFrame from results and add it to filedict
filedict[resulttable_header] = pd.DataFrame(
data=results, index=index, columns=header
)
filedict[f"Overflow {resulttable_header}"] = pd.DataFrame(
data=overflow_results, index=index, columns=header
)
line_num += 1
elif lines[line_num] == "Layout": # For the next num_rows, read layout data
line_num += 1
header = list(
map(int, lines[line_num].strip("\n").split(";")[1:])
) # Because we use header twice here, we collect it via list()
index = [""] * num_rows
for _row_num in range(num_rows):
line_num += 1
layout_line = lines[line_num].split(";")
index[_row_num] = layout_line[0]
layout[_row_num] = layout_line[1:-1]
# Each second line yields a concentration layout line
line_num += 1
conc_line = lines[line_num].split(";")
concentrations[_row_num] = [
_safe_float(
x,
filename,
overflow_events=overflow_events,
table="Concentration",
row=index[_row_num],
col=header[_col_idx] if _col_idx < len(header) else None,
)
for _col_idx, x in enumerate(conc_line[1:-1])
]
# Add layouts to filedict
filedict["Layout"] = pd.DataFrame(data=layout, index=index, columns=header)
filedict["Concentration"] = pd.DataFrame(
data=concentrations, index=index, columns=header
)
line_num += 1
else:
if lines[line_num] in resulttable_headers:
line_num += num_rows + 1
metadata_pairs = re.findall(metadata_regex, lines[line_num])
line_num += 1
if not metadata_pairs:
continue
else:
for key, value in metadata_pairs:
if not all(
[key, value]
): # if any of the keys or values are empty, skip
continue
else:
metadata[key.strip(" :")] = value.strip(" ")
line_num = 0
filedict["metadata"] = metadata
filedict["overflow_events"] = overflow_events
return filedict
readerfiles_metadf(paths, resulttable_headers=['Results'])
¶
Parses metadata from files declared by filepaths and merges the results into a DataFrame.
Source code in rda_toolbox/parser.py
def readerfiles_metadf(
paths: list[str], resulttable_headers: list[str] = ["Results"]
) -> pd.DataFrame:
"""
Parses metadata from files declared by filepaths and merges the results into a DataFrame.
"""
filedicts = filepaths_to_filedicts(paths, resulttable_headers=resulttable_headers)
return collect_metadata(filedicts)
readerfiles_rawdf(paths, resultmatrix_header_mapping={'Results': 'Raw Optical Density'})
¶
Parses data from files declared by filepaths and merges the results into a DataFrame :param paths: A list of filepaths corresponding to the raw reader files generated by Cytation10 :type paths: list[str] :return: A DataFrame in tidy and long format with the raw readerfile contents :rtype: pd.DataFrame
:Example:
```Python
import glob
rawdata_df = readerfiles_rawdf(glob.glob('path/to/raw/files/*'))
```
Source code in rda_toolbox/parser.py
def readerfiles_rawdf(
paths: list[str], resultmatrix_header_mapping: dict = {"Results": "Raw Optical Density"}
) -> pd.DataFrame:
"""
Parses data from files declared by filepaths and merges the results into a DataFrame
:param paths: A list of filepaths corresponding to the raw reader files generated by Cytation10
:type paths: list[str]
:return: A DataFrame in tidy and long format with the raw readerfile contents
:rtype: pd.DataFrame
:Example:
```Python
import glob
rawdata_df = readerfiles_rawdf(glob.glob('path/to/raw/files/*'))
```
"""
filedicts = filepaths_to_filedicts(paths, resulttable_headers=list(resultmatrix_header_mapping.keys()))
rawdata_tables = []
# for resultmatrix_header, measurement_label in resultmatrix_header_mapping.items():
rawdata = collect_results(filedicts, resultmatrix_header_mapping)
overflow_count = int(rawdata["Overflow"].sum()) if "Overflow" in rawdata.columns else 0
if overflow_count > 0:
affected_files = {
event["Reader Filename"]
for filedict in filedicts
for event in filedict.get("overflow_events", [])
}
warnings.warn(
f"Detected {overflow_count} OVRFLW value(s) in reader files "
f"({", ".join(affected_files)})"
f"({len(affected_files)} file(s)). Values were set to NaN. "
"Inspect rows where rawdata['Overflow'] is True for details.",
RuntimeWarning,
stacklevel=2,
)
rawdata["Col_384"] = rawdata["Col_384"].astype(str)
rawdata.rename(columns={"Barcode": "AcD Barcode 384"}, inplace=True)
rawdata_tables.append(rawdata)
return pd.concat(rawdata_tables)