Skip to content

genotypes

Function relate to genotype structuration.

NUMBER_OF_BITS module-attribute

NUMBER_OF_BITS = 8

Number of high weight bit used in id to perform partitioning.

hive

hive(paths: list[Path], output_prefix: Path, threads: int, file_per_thread: int) -> None

Read all genotypes parquet file and use information to generate a hive like struct, based on 63rd and 55th bits included of variant id with genotype information.

Real number of threads use are equal to \(min(threads, len(paths))\).

Output format look like: {output_prefix}/id_part=[0..255]/0.parquet.

Parameters:

  • paths (list[Path]) –

    list of file you want reorganize

  • output_prefix (Path) –

    prefix of hive

  • threads (int) –

    number of multiprocessing threads run

  • file_per_thread (int) –

    number of file manage per multiprocessing threads

Returns:

  • None

    None

Source code in src/variantplaner/struct/genotypes.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def hive(
    paths: list[pathlib.Path],
    output_prefix: pathlib.Path,
    threads: int,
    file_per_thread: int,
) -> None:
    r"""Read all genotypes parquet file and use information to generate a hive like struct, based on 63rd and 55th bits included of variant id with genotype information.

    Real number of threads use are equal to $min(threads, len(paths))$.

    Output format look like: `{output_prefix}/id_part=[0..255]/0.parquet`.

    Args:
        paths: list of file you want reorganize
        output_prefix: prefix of hive
        threads: number of multiprocessing threads run
        file_per_thread: number of file manage per multiprocessing threads

    Returns:
        None
    """
    logger.info(f"{paths=} {output_prefix=}, {threads=}, {file_per_thread=}")

    if len(paths) == 0:
        return

    for i in range(pow(2, NUMBER_OF_BITS)):
        (output_prefix / f"id_part={i}").mkdir(parents=True, exist_ok=True)

    path_groups: typing.Iterable[typing.Iterable[pathlib.Path]] = list(
        [[path] for path in paths]
        if file_per_thread < 2  # noqa: PLR2004 if number of file is lower than 2 file grouping isn't required
        else itertools.zip_longest(
            *[iter(paths)] * file_per_thread,
        ),
    )

    basenames = ["_".join(p.stem for p in g_paths if p is not None) for g_paths in path_groups]

    lf_groups = [[polars.scan_parquet(p) for p in g_paths if p is not None] for g_paths in path_groups]

    logger.info(f"{path_groups=}, {basenames=}")

    with multiprocessing.get_context("spawn").Pool(threads) as pool:
        pool.starmap(
            __hive_worker,
            [(lf_group, basename, output_prefix) for lf_group, basename in zip(lf_groups, basenames)],
        )
        pool.starmap(
            __merge_file,
            [(output_prefix / f"id_part={id_part}", basenames) for id_part in range(pow(2, NUMBER_OF_BITS))],
        )