Skip to content

variants

Function relate to vcf structuration.

merge

merge(paths: list[Path], output: Path, memory_limit: int = 10000000000, polars_threads: int = 4) -> None

Perform merge of multiple parquet variants file in one file.

These function generate temporary file, by default file are written in /tmp but you can control where these files are written by set TMPDIR, TEMP or TMP directory.

Parameters:

  • paths (list[Path]) –

    List of file you want chunked.

  • output (Path) –

    Path where variants is written.

  • memory_limit (int, default: 10000000000 ) –

    Size of each chunk in bytes.

Returns:

  • None

    None

Source code in src/variantplaner/struct/variants.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def merge(
    paths: list[pathlib.Path],
    output: pathlib.Path,
    memory_limit: int = 10_000_000_000,
    polars_threads: int = 4,
) -> None:
    """Perform merge of multiple parquet variants file in one file.

    These function generate temporary file, by default file are written in `/tmp` but you can control where these files are written by set TMPDIR, TEMP or TMP directory.

    Args:
        paths: List of file you want chunked.
        output: Path where variants is written.
        memory_limit: Size of each chunk in bytes.

    Returns:
        None
    """
    all_threads = int(os.environ["POLARS_MAX_THREADS"])
    multi_threads = max(all_threads // polars_threads, 1)
    os.environ["POLARS_MAX_THREADS"] = str(polars_threads)

    inputs = paths
    temp_directory = tempfile.TemporaryDirectory()
    temp_prefix = pathlib.Path(temp_directory.name)
    logger.debug(f"{temp_prefix=}")

    while len(inputs) != 1:
        new_inputs = []

        inputs_outputs = []
        for input_chunk in __chunk_by_memory(inputs, bytes_limit=memory_limit):
            logger.debug(f"{len(input_chunk)=}")
            if len(input_chunk) > 1:
                # general case
                temp_output = temp_prefix / __random_string()

                new_inputs.append(temp_output)
                inputs_outputs.append((input_chunk, temp_output))

            elif len(input_chunk) == 1:
                # if chunk containt only one file it's last file of inputs
                # we add it to new_inputs list
                new_inputs.append(input_chunk[0])

            logger.debug(f"{new_inputs=}")
            inputs = new_inputs

        with multiprocessing.get_context("spawn").Pool(multi_threads) as pool:
            pool.starmap(__concat_uniq, inputs_outputs)

    # When loop finish we have one file in inputs with all merge
    # We just have to rename it
    shutil.move(inputs[0], output)

    # Call cleanup to remove all tempfile generate durring merging
    temp_directory.cleanup()