Skip to content

vcf

Read and write vcf file.

IntoLazyFrameExtension

Bases: Enum

Enumeration use to control behavior of IntoLazyFrame.

MANAGE_SV class-attribute instance-attribute

MANAGE_SV = 1

into_lazyframe try to avoid structural variant id collision, SVTYPE/SVLEN info value must be present.

NOTHING class-attribute instance-attribute

NOTHING = 0

into_lazyframe not have any specific behavior

add_info_column

add_info_column(lf: LazyFrame, vcfinfo2parquet_name: list[tuple[str, str]]) -> LazyFrame

Construct an INFO column from multiple columns of lf.

Useful when you want serialise polars.LazyFrame in vcf file format.

Parameters:

Returns:

Source code in src/variantplaner/io/vcf.py
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
def add_info_column(lf: polars.LazyFrame, vcfinfo2parquet_name: list[tuple[str, str]]) -> polars.LazyFrame:
    """Construct an INFO column from multiple columns of lf.

    Useful when you want serialise [polars.LazyFrame](https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html) in vcf file format.

    Args:
        lf: A [polars.LazyFrame](https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html).
        vcfinfo2parquet_name: List of vcf column name and [polars.LazyFrame](https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html) column name.

    Returns:
        [polars.LazyFrame](https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html) with INFO column and remove lf column use.
    """
    lf = lf.with_columns(
        [
            polars.col(name).list.join(",").fill_null(".").alias(name)
            for name, dtype in zip(lf.columns, lf.dtypes)
            if isinstance(dtype, polars.List)
        ],
    )

    lf = lf.with_columns(
        [
            polars.col(name).cast(str).fill_null(".").alias(name)
            for name, dtype in zip(lf.columns, lf.dtypes)
            if not isinstance(dtype, polars.List)
        ],
    )

    lf = lf.with_columns(
        [
            polars.concat_str(
                [
                    polars.concat_str(
                        [
                            polars.lit(vcf_name),
                            polars.lit("="),
                            polars.col(parquet_name),
                        ],
                    )
                    for vcf_name, parquet_name in vcfinfo2parquet_name
                ],
                separator=";",
            ).alias("INFO"),
        ],
    )

    return lf.drop([p for (v, p) in vcfinfo2parquet_name])

build_rename_column

build_rename_column(chromosome: str, pos: str, identifier: str, ref: str, alt: str, qual: str | None = '.', filter_col: str | None = '.', info: list[tuple[str, str]] | None = None, format_string: str | None = None, sample: dict[str, dict[str, str]] | None = None) -> RenameCol

A helper function to generate rename column dict for variantplaner.io.vcf.from_lazyframe function parameter.

Returns:

  • RenameCol

    A rename column dictionary.

Source code in src/variantplaner/io/vcf.py
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
def build_rename_column(
    chromosome: str,
    pos: str,
    identifier: str,
    ref: str,
    alt: str,
    qual: str | None = ".",
    filter_col: str | None = ".",
    info: list[tuple[str, str]] | None = None,
    format_string: str | None = None,
    sample: dict[str, dict[str, str]] | None = None,
) -> RenameCol:
    """A helper function to generate rename column dict for [variantplaner.io.vcf.from_lazyframe][] function parameter.

    Returns:
        A rename column dictionary.
    """
    return {
        "#CHROM": chromosome,
        "POS": pos,
        "ID": identifier,
        "REF": ref,
        "ALT": alt,
        "QUAL": "." if qual is None else qual,
        "FILTER": "." if filter_col is None else filter_col,
        "INFO": [] if info is None else info,
        "FORMAT": "" if format_string is None else format_string,
        "sample": {} if sample is None else sample,
    }

extract_header

extract_header(input_path: Path) -> list[str]

Extract all header information of vcf file.

Line between start of file and first line start with '#CHROM' or not start with '#'

Parameters:

  • input_path (Path) –

    Path to vcf file.

Returns:

  • list[str]

    List of header string.

Raises:

Source code in src/variantplaner/io/vcf.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def extract_header(input_path: pathlib.Path) -> list[str]:
    """Extract all header information of vcf file.

    Line between start of file and first line start with '#CHROM' or not start with '#'

    Args:
        input_path: Path to vcf file.

    Returns:
        List of header string.

    Raises:
        NotAVCFError: If a line not starts with '#'
        NotAVCFError: If all line not start by '#CHROM'
    """
    headers = []
    with open(input_path) as fh:
        for line in (full_line.strip() for full_line in fh):
            if not line.startswith("#"):
                raise NotAVCFError(input_path)

            if line.startswith("#CHROM"):
                headers.append(line)
                return headers

            headers.append(line)

    raise NotAVCFError(input_path)

format2expr

format2expr(header: list[str], input_path: Path, select_format: set[str] | None = None) -> dict[str, Callable[[Expr, str], Expr]]

Read vcf header to generate a list of polars.Expr to extract genotypes information.

Warning: Float values can't be converted for the moment they are stored as String to keep information

Parameters:

  • header (list[str]) –

    Line of vcf header.

  • input_path (Path) –

    Path to vcf file.

  • select_format (set[str] | None, default: None ) –

    List of target format field.

Returns:

  • dict[str, Callable[[Expr, str], Expr]]

    A dict to link format id to pipeable function with Polars.Expr

Raises:

Source code in src/variantplaner/io/vcf.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def format2expr(
    header: list[str],
    input_path: pathlib.Path,
    select_format: set[str] | None = None,
) -> dict[str, Callable[[polars.Expr, str], polars.Expr]]:
    """Read vcf header to generate a list of [polars.Expr](https://pola-rs.github.io/polars/py-polars/html/reference/expressions/index.html) to extract genotypes information.

    **Warning**: Float values can't be converted for the moment they are stored as String to keep information

    Args:
        header: Line of vcf header.
        input_path: Path to vcf file.
        select_format: List of target format field.

    Returns:
        A dict to link format id to pipeable function with Polars.Expr

    Raises:
        NotAVCFError: If all line not start by '#CHR'
    """
    format_re = re.compile(
        "ID=(?P<id>[A-Za-z_][0-9A-Za-z_.]*),Number=(?P<number>[ARG0-9\\.]+),Type=(?P<type>Integer|Float|String|Character)",
    )

    expressions: dict[str, Callable[[polars.Expr, str], polars.Expr]] = {}

    for line in header:
        if line.startswith("#CHROM"):
            return expressions

        if not line.startswith("##FORMAT"):
            continue

        if (search := format_re.search(line)) and (not select_format or search["id"] in select_format):
            name = search["id"]
            number = search["number"]
            format_type = search["type"]

            if name == "GT":
                expressions["GT"] = __format_gt
                continue

            if number == "1":
                if format_type == "Integer":
                    expressions[name] = __format_one_int
                elif format_type == "Float":  # noqa: SIM114 Float isn't already support but in future
                    expressions[name] = __format_one_str
                elif format_type in {"String", "Character"}:
                    expressions[name] = __format_one_str
                else:
                    pass  # Not reachable

            else:
                if format_type == "Integer":
                    expressions[name] = __format_list_int
                elif format_type == "Float":  # noqa: SIM114 Float isn't already support but in future
                    expressions[name] = __format_list_str
                elif format_type in {"String", "Character"}:
                    expressions[name] = __format_list_str
                else:
                    pass  # Not reachable

    raise NotAVCFError(input_path)

from_lazyframe

from_lazyframe(lf: LazyFrame, output_path: Path, renaming: RenameCol = DEFAULT_RENAME) -> None

Write polars.LazyFrame in vcf format.

Chromosome name mapping table
  • 23: X
  • 24: Y
  • 25: MT

All other chromosome number isn't changed.

Warning: This function performs polars.LazyFrame.collect before write vcf, this can have a significant impact on memory usage.

Parameters:

  • lf (LazyFrame) –

    LazyFrame contains information.

  • output_path (Path) –

    Path to where vcf to write.

Returns:

  • None

    None

Source code in src/variantplaner/io/vcf.py
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
def from_lazyframe(
    lf: polars.LazyFrame,
    output_path: pathlib.Path,
    renaming: RenameCol = DEFAULT_RENAME,
) -> None:
    """Write [polars.LazyFrame](https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html) in vcf format.

    Chromosome name mapping table:
      - 23: X
      - 24: Y
      - 25: MT

    All other chromosome number isn't changed.

    Warning: This function performs [polars.LazyFrame.collect][] before write vcf, this can have a significant impact on memory usage.

    Args:
        lf: LazyFrame contains information.
        output_path: Path to where vcf to write.

    Returns:
        None
    """
    select_column: list[str] = []

    lf = lf.with_columns(
        [
            polars.col(renaming["#CHROM"])
            .cast(polars.Utf8)
            .str.replace("23", "X")
            .str.replace("24", "Y")
            .str.replace("25", "MT")
            .alias("#CHROM"),
            polars.col(renaming["POS"]).alias("POS"),
            polars.col(renaming["ID"]).alias("ID"),
            polars.col(renaming["REF"]).alias("REF"),
            polars.col(renaming["ALT"]).alias("ALT"),
        ],
    )

    select_column.extend(["#CHROM", "POS", "ID", "REF", "ALT"])

    header = __generate_header(lf, renaming["INFO"], list(renaming["sample"].keys()), renaming["FORMAT"])

    if renaming["QUAL"] != ".":
        lf = lf.with_columns([polars.col(renaming["QUAL"]).alias("QUAL")])
    else:
        lf = lf.with_columns([polars.lit(".").alias("QUAL")])

    select_column.append("QUAL")

    if renaming["FILTER"] != ".":
        lf = lf.with_columns([polars.col(renaming["FILTER"]).alias("FILTER")])
    else:
        lf = lf.with_columns([polars.lit(".").alias("FILTER")])

    select_column.append("FILTER")

    lf = add_info_column(lf, renaming["INFO"]) if renaming["INFO"] else lf.with_columns(polars.lit(".").alias("INFO"))

    select_column.append("INFO")

    if renaming["FORMAT"]:
        lf = lf.with_columns(polars.lit(renaming["FORMAT"]).alias("FORMAT"))
        select_column.append("FORMAT")

    if renaming["FORMAT"] and renaming["sample"]:
        for sample_name in renaming["sample"]:
            lf = lf.with_columns(
                [
                    __lazy2format(
                        sample_name,
                        renaming["FORMAT"],
                        dict(zip(lf.columns, lf.dtypes)),
                    ).alias(sample_name),
                ],
            )
            select_column.append(sample_name)

    lf = lf.select([polars.col(col) for col in select_column])

    with open(output_path, "wb") as fh:
        fh.write(header.encode())
        fh.write(lf.collect().write_csv(separator="\t").encode())

info2expr

info2expr(header: list[str], input_path: Path, select_info: set[str] | None = None) -> list[Expr]

Read vcf header to generate a list of polars.Expr to extract variants information.

Parameters:

  • header (list[str]) –

    Line of vcf header

  • input_path (Path) –

    Path to vcf file.

  • select_info (set[str] | None, default: None ) –

    List of target info field

Returns:

Raises:

Source code in src/variantplaner/io/vcf.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def info2expr(header: list[str], input_path: pathlib.Path, select_info: set[str] | None = None) -> list[polars.Expr]:
    """Read vcf header to generate a list of [polars.Expr](https://pola-rs.github.io/polars/py-polars/html/reference/expressions/index.html) to extract variants information.

    Args:
        header: Line of vcf header
        input_path: Path to vcf file.
        select_info: List of target info field

    Returns:
        List of [polars.Expr](https://pola-rs.github.io/polars/py-polars/html/reference/expressions/index.html) to parse info columns.

    Raises:
        NotAVCFError: If all line not start by '#CHR'
    """
    info_re = re.compile(
        r"ID=(?P<id>([A-Za-z_][0-9A-Za-z_.]*|1000G)),Number=(?P<number>[ARG0-9\.]+),Type=(?P<type>Integer|Float|String|Character)",
    )

    expressions: list[polars.Expr] = []

    for line in header:
        if line.startswith("#CHROM"):
            return expressions

        if not line.startswith("##INFO"):
            continue

        if (search := info_re.search(line)) and (not select_info or search["id"] in select_info):
            regex = rf"{search['id']}=([^;]+);?"

            local_expr = polars.col("info").str.extract(regex, 1)

            if search["number"] == "1":
                if search["type"] == "Integer":
                    local_expr = local_expr.cast(polars.Int64)
                elif search["type"] == "Float":
                    local_expr = local_expr.cast(polars.Float64)
                elif search["type"] in {"String", "Character"}:
                    pass  # Not do anything on string or character
                else:
                    pass  # Not reachable

            else:
                local_expr = local_expr.str.split(",")
                if search["type"] == "Integer":
                    local_expr = local_expr.cast(polars.List(polars.Int64))
                elif search["type"] == "Float":
                    local_expr = local_expr.cast(polars.List(polars.Float64))
                elif search["type"] in {"String", "Character"}:
                    pass  # Not do anything on string or character
                else:
                    pass  # Not reachable

            expressions.append(local_expr.alias(search["id"]))

    raise NotAVCFError(input_path)

into_lazyframe

into_lazyframe(input_path: Path, chr2len_path: Path, extension: IntoLazyFrameExtension = IntoLazyFrameExtension.NOTHING) -> LazyFrame

Read a vcf file and convert it in polars.LazyFrame.

Parameters:

Returns:

  • LazyFrame

    A polars.LazyFrame that contains vcf information ('chr', 'pos', 'vid', 'ref', 'alt', 'qual', 'filter', 'info', ['format'], ['genotypes',…], 'id').

Source code in src/variantplaner/io/vcf.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
def into_lazyframe(
    input_path: pathlib.Path,
    chr2len_path: pathlib.Path,
    extension: IntoLazyFrameExtension = IntoLazyFrameExtension.NOTHING,
) -> polars.LazyFrame:
    """Read a vcf file and convert it in [polars.LazyFrame](https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html).

    Args:
        input_path: Path to vcf file.
        chr2len_path: Path to chr2length csv.
        extension: Control behavior of into_lazyframe.

    Returns:
        A [polars.LazyFrame](https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html) that contains vcf information ('chr', 'pos', 'vid', 'ref', 'alt', 'qual', 'filter', 'info', ['format'], ['genotypes',…], 'id').
    """
    header = extract_header(input_path)

    col_name = {f"column_{i}": name for (i, name) in enumerate(__column_name(header, input_path), start=1)}

    lf = polars.scan_csv(
        input_path,
        separator="\t",
        comment_prefix="#",
        has_header=False,
        dtypes={"column_1": polars.Utf8, "column_2": polars.UInt64},
        ignore_errors=True,
    )

    chr2len = io.csv.chr2length_into_lazyframe(chr2len_path)
    lf = lf.rename(col_name)

    if extension == IntoLazyFrameExtension.MANAGE_SV:
        lf = lf.with_columns(info2expr(header, input_path, {"SVTYPE", "SVLEN"}))

    lf = normalization.add_variant_id(lf, chr2len)

    if extension == IntoLazyFrameExtension.MANAGE_SV:
        drop_column = {"SVTYPE", "SVLEN"}
        lf = lf.collect().select([col for col in lf.columns if col not in drop_column]).lazy()

    return lf

sample_index

sample_index(header: list[str], input_path: Path) -> dict[str, int] | None

Read vcf header to generate an association map between sample name and index.

Parameters:

  • header (list[str]) –

    Header string.

Returns:

  • dict[str, int] | None

    Map that associate a sample name to is sample index.

Raises:

Source code in src/variantplaner/io/vcf.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
def sample_index(header: list[str], input_path: pathlib.Path) -> dict[str, int] | None:
    """Read vcf header to generate an association map between sample name and index.

    Args:
        header: Header string.

    Returns:
        Map that associate a sample name to is sample index.

    Raises:
        NotAVCFError: If all line not start by '#CHR'
    """
    for line in reversed(header):
        if line.startswith("#CHR"):
            split_line = line.strip().split("\t")
            if len(split_line) <= MINIMAL_COL_NUMBER:
                return None

            return {sample: i for (i, sample) in enumerate(split_line[SAMPLE_COL_BEGIN:])}

    raise NotAVCFError(input_path)