Skip to content

Parsing Functions

collect_metadata(filedicts)

Helperfunction to collect the metadata from all reader files into a dataframe.

Source code in rda_toolbox/parser.py
def collect_metadata(filedicts: list[dict]) -> pd.DataFrame:
    """
    Helperfunction to collect the metadata from all reader files into a dataframe.
    """
    allmetadata_df = pd.DataFrame()
    for filedict in filedicts:
        meta_df = pd.DataFrame(filedict["metadata"], index=[0])
        meta_df["Barcode"] = filedict["Barcode"]
        allmetadata_df = pd.concat([allmetadata_df, meta_df], ignore_index=True)
    return allmetadata_df

collect_results(filedicts, resultmatrix_header_mapping)

Collect and merge results from the readerfiles.

Source code in rda_toolbox/parser.py
def collect_results(filedicts: list[dict], resultmatrix_header_mapping: dict[str, str]) -> pd.DataFrame:
    """
    Collect and merge results from the readerfiles.
    """
    allresults_df = pd.DataFrame(
        {"Row": [], "Column": [], "Measurement": [], "Measurement Type": [], "Overflow": []}
    )  # , "Layout": [], "Concentration": []})
    platetype_s = list(set(fd["plate_type"] for fd in filedicts))
    if len(platetype_s) == 1:
        platetype = platetype_s[0]
    else:
        raise Exception(f"Different plate types used {platetype_s}")

    for filedict in filedicts:
        for resultmatrix_header, resultmatrix_label in resultmatrix_header_mapping.items():
            long_rawdata_df = pd.melt(
                filedict[resultmatrix_header].reset_index(names="Row"),
                id_vars=["Row"],
                var_name="Column",
                value_name="Measurement",
            )
            long_overflow_df = pd.melt(
                filedict[f"Overflow {resultmatrix_header}"].reset_index(names="Row"),
                id_vars=["Row"],
                var_name="Column",
                value_name="Overflow",
            )
            long_rawdata_df["Overflow"] = long_overflow_df["Overflow"].astype(bool).values

            long_rawdata_df["Barcode"] = filedict["Barcode"]
            long_rawdata_df["Measurement Type"] = resultmatrix_header_mapping[resultmatrix_header]
            allresults_df = pd.concat([allresults_df, long_rawdata_df], axis=0)
            platetype = filedict["plate_type"]

    allresults_df.rename(
        columns={"Row": f"Row_{platetype}", "Column": f"Col_{platetype}"}, inplace=True
    )
    return allresults_df.reset_index(drop=True)

filepaths_to_filedicts(filepaths, resulttable_headers=['Results'])

Wrapper function to obtain a list of dictionaries which contain the raw files information like

  • different entries of metadata
    • Plate Type
    • Barcode
    • Date
    • Time
    • etc.
  • Raw Optical Density (DataFrame)
  • Concentration (DataFrame)
  • Layout (DataFrame)
Source code in rda_toolbox/parser.py
def filepaths_to_filedicts(
    filepaths: list[str], resulttable_headers: list[str] = ["Results"]
) -> list[dict]:
    """
    Wrapper function to obtain a list of dictionaries which contain the raw files information like

    - different entries of metadata
        - Plate Type
        - Barcode
        - Date
        - Time
        - etc.
    - Raw Optical Density (DataFrame)
    - Concentration (DataFrame)
    - Layout (DataFrame)
    """
    filedicts = []
    for path in filepaths:
        try:
            with open(path, encoding="utf-8", errors="ignore") as fh:
                filedicts.append(
                    readerfile_parser(
                        basename(path), fh, resulttable_headers=resulttable_headers
                    )
                )
        except OSError as exc:
            raise OSError(f"Failed to read {path!r}: {exc}") from exc
    return filedicts

parse_mappingfile(filepath, motherplate_column='Origin Plate', childplate_column='AcD Barcode 384')

Simple mappingfile parser function. Expects to start with a "Motherplate" line followed by corresponding "Childplates" in a single line.

Source code in rda_toolbox/parser.py
def parse_mappingfile(
    filepath: str,
    motherplate_column: str = "Origin Plate",
    childplate_column: str = "AcD Barcode 384",
):
    """
    Simple mappingfile parser function.
    Expects to start with a "Motherplate" line followed by corresponding "Childplates" in a single line.
    """
    filedict = dict()
    with open(filepath) as file:
        filecontents = file.read().splitlines()
        key = None
        for i, line in enumerate(filecontents):
            line = line.split(";")
            if i % 2 == 0:  # if i is even (expect MPs on even lines, alternating with childplates)
            # if len(line) == 1:
                key = line[0]
            else:
                if not key:
                    raise ValueError(
                        "Motherplate barcode expected on first line."
                    )
                if key in filedict:
                    filedict[key].append(line)
                else:
                    filedict[key] = [line]
    mapping_df = pd.DataFrame(
        [
            (motherplate, childplate, rep_num, rack_nr)
            for motherplate, replicates in filedict.items()
            for rep_num, childplates in enumerate(replicates, start=1)
            for rack_nr, childplate in enumerate(childplates, start=1)
        ],
        columns=[motherplate_column, childplate_column, "Replicate", "Rack"],
    )
    return mapping_df

parse_readerfiles(path, resultmatrix_header_mapping={'Results': 'Raw Optical Density'})

Reads CytationC10 readerfiles (plain text files) and merges the results into two DataFrames (rawdata and metadata) which is returned. Wrapper for readerfiles_rawdf to keep backwards compatibility. Improves readerfiles_rawdf, provide a single path for convenience.

Source code in rda_toolbox/parser.py
def parse_readerfiles(
    path: str | None, resultmatrix_header_mapping: dict[str, str] = {"Results": "Raw Optical Density"}
) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    Reads CytationC10 readerfiles (plain text files) and merges the results into
    two DataFrames (rawdata and metadata) which is returned.
    Wrapper for readerfiles_rawdf to keep backwards compatibility.
    Improves readerfiles_rawdf, provide a single path for convenience.
    """
    resulttable_headers = list(resultmatrix_header_mapping.keys())
    if not path:
        return pd.DataFrame(), pd.DataFrame()
    paths = [
            os.path.join(path, f)
            for f in os.listdir(path)
            if os.path.isfile(os.path.join(path, f))
    ]
    df_raw = readerfiles_rawdf(paths, resultmatrix_header_mapping=resultmatrix_header_mapping)
    df_raw["Col_384"] = df_raw["Col_384"].astype(int)
    df_meta = readerfiles_metadf(paths, resulttable_headers=resulttable_headers)
    return df_raw, df_meta

process_inputfile(file_object)

Read Input excel file which should have the following columns
  • Barcode
  • Organism
  • Row_384
  • Col_384
  • ID

Optional columns: - Concentration in mg/mL (or other units) - Cutoff

Source code in rda_toolbox/parser.py
def process_inputfile(file_object):
    """
    Read Input excel file which should have the following columns:
        - Barcode
        - Organism
        - Row_384
        - Col_384
        - ID
    Optional columns:
        - Concentration in mg/mL (or other units)
        - Cutoff
    """
    if not file_object:
        return None
    excel_file = pd.ExcelFile(file_object)
    substance_df = pd.read_excel(excel_file, "substances")
    layout_df = pd.read_excel(excel_file, "layout")
    df = pd.merge(layout_df, substance_df, how="cross")
    df["ID"] = df["ID"].astype(str)
    return df

read_platemapping(filecontents, orig_barcodes)

Reads a mappingfile generated by the barcode reader. We expect the mapping files to ALWAYS have a line with a single motherplate barcode followed by a single line with childplate barcode(s).

Source code in rda_toolbox/parser.py
def read_platemapping(filecontents: list, orig_barcodes: list[str]):
    """
    Reads a mappingfile generated by the barcode reader.
    We expect the mapping files to ALWAYS have a line with a single motherplate barcode followed by a single line with childplate barcode(s).
    """
    filedict = dict()
    origin_barcode: str | None = None
    origin_replicates = []
    for line_num, line in enumerate(filter(None, filecontents)):
        line = line.split(";")
        if line_num % 2 == 0 and len(line) == 1 and line[0] in orig_barcodes:  # singular entry indicates origin barcode
            origin_barcode = line[0]
            origin_replicates.append(origin_barcode)
            if origin_barcode not in filedict:
                filedict.setdefault(origin_barcode, [])
        if origin_barcode is None:
            continue
        if line_num % 2 == 1: # childplates are on odd line numbers
            filedict[origin_barcode].append(line)
            origin_barcode = None
    replicates_dict = {i:origin_replicates.count(i) for i in origin_replicates}
    if sorted(list(filedict.keys())) != sorted(orig_barcodes):
        raise ValueError(
            f"The origin barcodes from the mappingfile and MP barcodes in MIC_input.xlsx do not coincide."
        )
    return filedict, replicates_dict

readerfile_parser(filename, file_object, resulttable_headers=['Results'])

Parser for files created by the BioTek Cytation C10 Confocal Imaging Reader.

Source code in rda_toolbox/parser.py
def readerfile_parser(
    filename: str, file_object: IO[str], resulttable_headers: list[str] = ["Results"]
) -> dict:
    """
    Parser for files created by the BioTek Cytation C10 Confocal Imaging Reader.
    """
    lines = file_object.readlines()
    lines = list(filter(None, map(lambda x: x.strip("\n").strip("\r"), lines)))
    if len(lines) == 0:
        raise ValueError(f"Empty raw file {filename}.")

    # search the file for plate type definition and use it to derive number of rows and columns
    found_plate_type = re.findall(r"Plate Type;[A-z ]*([0-9]*)", "".join(lines))
    plate_type = 96  # define default plate type and let it be 96-well plate as this is what we started with
    if found_plate_type:
        plate_type = int(found_plate_type[0])

    num_rows, num_columns = get_rows_cols(plate_type)

    filedict = dict()
    metadata = dict()
    filedict["Reader Filename"] = filename
    filedict["plate_type"] = plate_type
    # TODO: get barcode via regex
    barcode_found = re.findall(
        r"\d{3}[A-Z][a-z]?[a-zA-Z]\d{2}\d{3}", filedict["Reader Filename"]
    )
    if not barcode_found:
        filedict["Barcode"] = filedict["Reader Filename"]
    else:
        filedict["Barcode"] = barcode_found[0]
    # filedict["Barcode"] = Path(filedict["Reader Filename"]).stem.split("_")[-1]

    overflow_events = []
    results = np.empty([num_rows, num_columns], dtype=float)
    overflow_results = np.zeros([num_rows, num_columns], dtype=bool)
    # using dtype=str results in unicode strings of length 1 ('U1'), therefore we use 'U25'
    layout = np.empty([num_rows, num_columns], dtype="U25")
    concentrations = np.empty([num_rows, num_columns], dtype=float)

    metadata_regex = r";?([a-zA-Z0-9 \/]*)[;:]+([a-zA-Z0-9 \/\\:_.-]*),?"
    line_num = 0
    for resulttable_header in resulttable_headers:
        while line_num < len(lines):
            if lines[line_num] == resulttable_header:
                line_num += 1
                header = list(
                    map(int, lines[line_num].strip("\n").split(";")[1:])
                )  # get the header as a concrete list
                index = [""] * num_rows
                for _row_num in range(num_rows):  # for the next num_rows, read result data
                    line_num += 1
                    res_line = lines[line_num].split(";")
                    # Split at ; and slice off rowlabel and excitation/emission value:
                    row_name = res_line[0]
                    index[_row_num] = row_name
                    parsed_row = []
                    overflow_row = []
                    for _col_idx, token in enumerate(res_line[1:-1]):
                        col_value = header[_col_idx] if _col_idx < len(header) else None
                        parsed_value = _safe_float(
                            token,
                            filename,
                            overflow_events=overflow_events,
                            table=resulttable_header, # TODO: Change "Raw Optical Density" to resulttable_header
                            row=row_name,
                            col=col_value,
                        )
                        parsed_row.append(parsed_value)
                        overflow_row.append(token.strip().upper() == "OVRFLW")
                    results[_row_num] = parsed_row
                    overflow_results[_row_num] = overflow_row
                # Initialize DataFrame from results and add it to filedict
                filedict[resulttable_header] = pd.DataFrame(
                    data=results, index=index, columns=header
                )
                filedict[f"Overflow {resulttable_header}"] = pd.DataFrame(
                    data=overflow_results, index=index, columns=header
                )
                line_num += 1
            elif lines[line_num] == "Layout":  # For the next num_rows, read layout data
                line_num += 1
                header = list(
                    map(int, lines[line_num].strip("\n").split(";")[1:])
                )  # Because we use header twice here, we collect it via list()
                index = [""] * num_rows
                for _row_num in range(num_rows):
                    line_num += 1
                    layout_line = lines[line_num].split(";")
                    index[_row_num] = layout_line[0]
                    layout[_row_num] = layout_line[1:-1]
                    # Each second line yields a concentration layout line
                    line_num += 1
                    conc_line = lines[line_num].split(";")
                    concentrations[_row_num] = [
                        _safe_float(
                            x,
                            filename,
                            overflow_events=overflow_events,
                            table="Concentration",
                            row=index[_row_num],
                            col=header[_col_idx] if _col_idx < len(header) else None,
                        )
                        for _col_idx, x in enumerate(conc_line[1:-1])
                    ]
                # Add layouts to filedict
                filedict["Layout"] = pd.DataFrame(data=layout, index=index, columns=header)
                filedict["Concentration"] = pd.DataFrame(
                    data=concentrations, index=index, columns=header
                )
                line_num += 1
            else:
                if lines[line_num] in resulttable_headers:
                    line_num += num_rows + 1
                metadata_pairs = re.findall(metadata_regex, lines[line_num])
                line_num += 1
                if not metadata_pairs:
                    continue
                else:
                    for key, value in metadata_pairs:
                        if not all(
                            [key, value]
                        ):  # if any of the keys or values are empty, skip
                            continue
                        else:
                            metadata[key.strip(" :")] = value.strip(" ")
        line_num = 0
    filedict["metadata"] = metadata
    filedict["overflow_events"] = overflow_events
    return filedict

readerfiles_metadf(paths, resulttable_headers=['Results'])

Parses metadata from files declared by filepaths and merges the results into a DataFrame.

Source code in rda_toolbox/parser.py
def readerfiles_metadf(
    paths: list[str], resulttable_headers: list[str] = ["Results"]
) -> pd.DataFrame:
    """
    Parses metadata from files declared by filepaths and merges the results into a DataFrame.
    """
    filedicts = filepaths_to_filedicts(paths, resulttable_headers=resulttable_headers)
    return collect_metadata(filedicts)

readerfiles_rawdf(paths, resultmatrix_header_mapping={'Results': 'Raw Optical Density'})

Parses data from files declared by filepaths and merges the results into a DataFrame :param paths: A list of filepaths corresponding to the raw reader files generated by Cytation10 :type paths: list[str] :return: A DataFrame in tidy and long format with the raw readerfile contents :rtype: pd.DataFrame

:Example:

```Python
import glob

rawdata_df = readerfiles_rawdf(glob.glob('path/to/raw/files/*'))
```
Source code in rda_toolbox/parser.py
def readerfiles_rawdf(
    paths: list[str], resultmatrix_header_mapping: dict = {"Results": "Raw Optical Density"}
) -> pd.DataFrame:
    """
    Parses data from files declared by filepaths and merges the results into a DataFrame
    :param paths: A list of filepaths corresponding to the raw reader files generated by Cytation10
    :type paths: list[str]
    :return: A DataFrame in tidy and long format with the raw readerfile contents
    :rtype: pd.DataFrame

    :Example:

        ```Python
        import glob

        rawdata_df = readerfiles_rawdf(glob.glob('path/to/raw/files/*'))
        ```
    """
    filedicts = filepaths_to_filedicts(paths, resulttable_headers=list(resultmatrix_header_mapping.keys()))
    rawdata_tables = []
    # for resultmatrix_header, measurement_label in resultmatrix_header_mapping.items():
    rawdata = collect_results(filedicts, resultmatrix_header_mapping)
    overflow_count = int(rawdata["Overflow"].sum()) if "Overflow" in rawdata.columns else 0
    if overflow_count > 0:
        affected_files = {
            event["Reader Filename"]
            for filedict in filedicts
            for event in filedict.get("overflow_events", [])
        }
        warnings.warn(
            f"Detected {overflow_count} OVRFLW value(s) in reader files "
            f"({", ".join(affected_files)})"
            f"({len(affected_files)} file(s)). Values were set to NaN. "
            "Inspect rows where rawdata['Overflow'] is True for details.",
            RuntimeWarning,
            stacklevel=2,
        )
    rawdata["Col_384"] = rawdata["Col_384"].astype(str)
    rawdata.rename(columns={"Barcode": "AcD Barcode 384"}, inplace=True)
    rawdata_tables.append(rawdata)
    return pd.concat(rawdata_tables)