Read all genotypes parquet file and use information to generate a hive like struct, based on 63rd and 55th bits included of variant id with genotype information.
Real number of threads use are equal to \(min(threads, len(paths))\).
Output format look like: {output_prefix}/id_part=[0..255]/0.parquet
.
Parameters:
-
paths
(list[Path]
) – list of file you want reorganize
-
output_prefix
(Path
) – -
threads
(int
) – number of multiprocessing threads run
-
file_per_thread
(int
) – number of file manage per multiprocessing threads
Returns:
Source code in src/variantplaner/struct/genotypes.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127 | def hive(
paths: list[pathlib.Path],
output_prefix: pathlib.Path,
threads: int,
file_per_thread: int,
) -> None:
r"""Read all genotypes parquet file and use information to generate a hive like struct, based on 63rd and 55th bits included of variant id with genotype information.
Real number of threads use are equal to $min(threads, len(paths))$.
Output format look like: `{output_prefix}/id_part=[0..255]/0.parquet`.
Args:
paths: list of file you want reorganize
output_prefix: prefix of hive
threads: number of multiprocessing threads run
file_per_thread: number of file manage per multiprocessing threads
Returns:
None
"""
logger.info(f"{paths=} {output_prefix=}, {threads=}, {file_per_thread=}")
if len(paths) == 0:
return
for i in range(pow(2, NUMBER_OF_BITS)):
(output_prefix / f"id_part={i}").mkdir(parents=True, exist_ok=True)
path_groups: typing.Iterable[typing.Iterable[pathlib.Path]] = list(
[[path] for path in paths]
if file_per_thread < 2 # noqa: PLR2004 if number of file is lower than 2 file grouping isn't required
else itertools.zip_longest(
*[iter(paths)] * file_per_thread,
),
)
basenames = ["_".join(p.stem for p in g_paths if p is not None) for g_paths in path_groups]
lf_groups = [[polars.scan_parquet(p) for p in g_paths if p is not None] for g_paths in path_groups]
logger.info(f"{path_groups=}, {basenames=}")
with multiprocessing.get_context("spawn").Pool(threads) as pool:
pool.starmap(
__hive_worker,
[(lf_group, basename, output_prefix) for lf_group, basename in zip(lf_groups, basenames)],
)
pool.starmap(
__merge_file,
[(output_prefix / f"id_part={id_part}", basenames) for id_part in range(pow(2, NUMBER_OF_BITS))],
)
|