Skip to content

The taxondb submodule

This module contains functions to:

  1. download versions of the GBIF backbone taxonomy and NCBI taxonomy databases, and
  2. build and index SQLite3 databases of those datasets for use in local taxon validation.

The functions also store the timestamp of each database version in the SQLite3 files, so that datasets can include a taxonomy timestamp.

download_gbif_backbone(outdir, timestamp, url)

Download the GBIF backbone database.

This function downloads the data for a GBIF backbone taxonomy version to a given location.

Parameters:

Name Type Description Default
outdir str

The location to download the files to

required
timestamp str

The timestamp for an available GBIF backbone version.

required
url str

The download URL for the provided version.

required

Returns:

Type Description
dict

A dictionary giving the paths to the downloaded files and timestamp of the

dict

version downloaded.

Source code in safedata_validator/taxondb.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def download_gbif_backbone(outdir: str, timestamp: str, url: str) -> dict:
    """Download the GBIF backbone database.

    This function downloads the data for a GBIF backbone taxonomy version to a given
    location.

    Args:
        outdir: The location to download the files to
        timestamp: The timestamp for an available GBIF backbone version.
        url: The download URL for the provided version.

    Returns:
        A dictionary giving the paths to the downloaded files and timestamp of the
        version downloaded.
    """

    LOGGER.info(f"Downloading {timestamp} GBIF data to: {outdir}")
    FORMATTER.push()
    return_dict = {"timestamp": timestamp}

    # Two possible names for the key backbone file: backbone in earlier snapshots.
    simple_head = requests.head(url + "simple.txt.gz")
    backbone_head = requests.head(url + "backbone.txt.gz")
    deleted_head = requests.head(url + "simple-deleted.txt.gz")

    if not (simple_head.ok or backbone_head.ok):
        log_and_raise(
            "Timestamp version does not provide simple.txt.gz or backbone.txt.gz",
            ValueError,
        )

    # Download files to target directory - alternative names for simple backbone dump
    if simple_head.ok:
        targets = [
            ("simple", "simple.txt.gz", int(simple_head.headers["Content-Length"]))
        ]
    elif backbone_head.ok:
        targets = [
            ("simple", "backbone.txt.gz", int(backbone_head.headers["Content-Length"]))
        ]

    if deleted_head.ok:
        targets += [
            (
                "deleted",
                "simple-deleted.txt.gz",
                int(deleted_head.headers["Content-Length"]),
            )
        ]

    for key, file, fsize in targets:
        # Download the file with a TQDM progress bar
        LOGGER.info(f"Downloading {file}")
        file_req = requests.get(url + file, stream=True)
        out_path = os.path.join(outdir, file)
        with tqdm.wrapattr(file_req.raw, "read", total=fsize) as r_raw:
            with open(out_path, "wb") as outf:
                shutil.copyfileobj(r_raw, outf)

        # store the file path
        return_dict[key] = out_path

    FORMATTER.pop()

    return return_dict

download_ncbi_taxonomy(outdir, timestamp, filename, filesize)

Download the NCBI taxonomy database.

This function downloads the data for the NCBI taxonomy to a given location. By default, the most recent version is downloaded. A timestamp (e.g. '2021-11-26') can be provided to select a particular version from the following link.

https://ftp.ncbi.nlm.nih.gov/pub/taxonomy/taxdump_archive/

Parameters:

Name Type Description Default
outdir str

The location to download the taxonomy dump file to

required
timestamp str

The timestamp for an available NCBI taxonomy version.

required
filename str

The name of the corresponding dump file.

required
filesize int

The size in bytes of the dump file.

required

Returns:

Type Description
dict

A dictionary giving the paths to the downloaded files and timestamp of the

dict

version downloaded.

Source code in safedata_validator/taxondb.py
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
def download_ncbi_taxonomy(
    outdir: str, timestamp: str, filename: str, filesize: int
) -> dict:
    """Download the NCBI taxonomy database.

    This function downloads the data for the NCBI taxonomy to a given location. By
    default, the most recent version is downloaded. A timestamp (e.g. '2021-11-26') can
    be provided to select a particular version from the following link.

        https://ftp.ncbi.nlm.nih.gov/pub/taxonomy/taxdump_archive/

    Args:
        outdir: The location to download the taxonomy dump file to
        timestamp: The timestamp for an available NCBI taxonomy version.
        filename: The name of the corresponding dump file.
        filesize: The size in bytes of the dump file.

    Returns:
        A dictionary giving the paths to the downloaded files and timestamp of the
        version downloaded.
    """

    LOGGER.info(f"Downloading {timestamp} NCBI data to: {outdir}")
    FORMATTER.push()
    return_dict = {}

    return_dict["timestamp"] = timestamp

    # Retrieve the requested file, using a callback wrapper to track progress
    out_file = os.path.join(outdir, filename)
    LOGGER.info(f"Downloading taxonomy to: {out_file}")

    with open(out_file, "wb") as outf:
        with tqdm(
            total=filesize,
            unit="B",
            unit_scale=True,
            unit_divisor=1024,
        ) as pbar:

            def _callback(data):
                data_len = len(data)
                pbar.update(data_len)
                outf.write(data)

            try:
                LOGGER.info("Connecting to NCBI FTP server")
                ftp = ftplib.FTP(host="ftp.ncbi.nlm.nih.gov")
                ftp.login()
                ftp.cwd("pub/taxonomy/taxdump_archive/")
            except ftplib.all_errors:
                log_and_raise("Could not connect to FTP site", IOError)

            ftp.retrbinary(f"RETR {filename}", _callback)
            ftp.close()

    # store the file path
    return_dict["taxdmp"] = out_file

    FORMATTER.pop()

    return return_dict

build_local_gbif(outfile, timestamp, simple, deleted=None, keep=False)

Create a local GBIF backbone database.

This function takes the paths to downloaded data files for the GBIF backbone taxonomy and builds a SQLite3 database file for use in local validation in the safedata_validator package. The location of this file then needs to be included in the package configuration to be used in validation.

The data files can be downloaded using the download_gbif_backbone function and two files can be used. The main data is in 'simple.txt.gz' but deleted taxa can also be included from 'simple-deleted.txt.gz'. Data is read automatically from the compressed files - they do not need to be extracted.

By default, the downloaded files are deleted after the database has been created, but the 'keep' argument can be used to retain them.

Parameters:

Name Type Description Default
outfile str

The filepath to use to create the SQLite file

required
timestamp str

The timestamp of the downloaded version.

required
simple str

The path to the simple.txt.gz file.

required
deleted str | None

The path to the simple-deleted.txt.gz

None
keep bool

Should the original datafiles be retained.

False
Source code in safedata_validator/taxondb.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
def build_local_gbif(
    outfile: str,
    timestamp: str,
    simple: str,
    deleted: str | None = None,
    keep: bool = False,
) -> None:
    """Create a local GBIF backbone database.

    This function takes the paths to downloaded data files for the GBIF backbone
    taxonomy and builds a SQLite3 database file for use in local validation in the
    safedata_validator package. The location of this file then needs to be included in
    the package configuration to be used in validation.

    The data files can be downloaded using the download_gbif_backbone function and two
    files can be used. The main data is in 'simple.txt.gz' but deleted taxa can also be
    included from 'simple-deleted.txt.gz'. Data is read automatically from the
    compressed files - they do not need to be extracted.

    By default, the downloaded files are deleted after the database has been created,
    but the 'keep' argument can be used to retain them.

    Args:
        outfile: The filepath to use to create the SQLite file
        timestamp: The timestamp of the downloaded version.
        simple: The path to the simple.txt.gz file.
        deleted: The path to the simple-deleted.txt.gz
        keep: Should the original datafiles be retained.
    """

    # Guard against madly long fields: these are typically the issues and published in
    # fields that are dropped in building the database itself, but they can be so long
    # that they exceed the default CSV field read limit which explodes the csv reader.
    # For example, the 2022-11-23 version has a row with 231Kb of crab literature. This
    # increases that limit to 512Kb.
    csv.field_size_limit(524288)

    # Create the output file and turn off safety features for speed
    LOGGER.info(f"Building GBIF backbone database in: {outfile}")
    FORMATTER.push()

    con = sqlite3.connect(outfile)
    con.execute("PRAGMA synchronous = OFF")

    # Write the timestamp into a table
    con.execute("CREATE TABLE timestamp (timestamp date);")
    con.execute(f"INSERT INTO timestamp VALUES ('{timestamp}');")
    con.commit()
    LOGGER.info("Timestamp table created")

    # Create the schema for the backbone table, using drop fields to remove unwanted
    # fields in the schema and the data tuples. The file_schema list describes the full
    # set of fields provided by GBIF
    file_schema = [
        ("id", "int PRIMARY KEY"),
        ("parent_key", "int"),
        ("basionym_key", "int"),
        ("is_synonym", "boolean"),
        ("status", "text"),
        ("rank", "text"),
        ("nom_status", "text[]"),
        ("constituent_key", "text"),
        ("origin", "text"),
        ("source_taxon_key", "int"),
        ("kingdom_key", "int"),
        ("phylum_key", "int"),
        ("class_key", "int"),
        ("order_key", "int"),
        ("family_key", "int"),
        ("genus_key", "int"),
        ("species_key", "int"),
        ("name_id", "int"),
        ("scientific_name", "text"),
        ("canonical_name", "text"),
        ("genus_or_above", "text"),
        ("specific_epithet", "text"),
        ("infra_specific_epithet", "text"),
        ("notho_type", "text"),
        ("authorship", "text"),
        ("year", "text"),
        ("bracket_authorship", "text"),
        ("bracket_year", "text"),
        ("name_published_in", "text"),
        ("issues", "text[]"),
    ]

    drop_fields = ["name_published_in", "issues"]

    # Get a logical index of which fields are being kept
    drop_index = [True if vl[0] in drop_fields else False for vl in file_schema]

    # Create the final schema for the backbone table and insert statement
    output_schema = ", ".join(
        [" ".join(val) for val, drop in zip(file_schema, drop_index) if not drop]
    )
    output_schema = f"CREATE TABLE backbone ({output_schema})"

    insert_placeholders = ",".join(["?"] * (len(drop_index) - sum(drop_index)))
    insert_statement = f"INSERT INTO backbone VALUES ({insert_placeholders})"

    # Create the table
    con.execute(output_schema)
    con.commit()
    LOGGER.info("Backbone table created")

    # Import data from the simple backbone and deleted taxa

    # The approach below is more efficient but makes it impossible to drop fields and
    # substitute \\N to None. Although converting \\N to None can be done later with an
    # update statement, you _cannot_ drop fields in sqlite3, so that has to be done up
    # front.
    #
    # con.executemany( insert_statement, bb_reader )

    LOGGER.info("Adding core backbone taxa")

    with gzip.open(simple, "rt", encoding="utf-8") as bbn:
        # The files are tab delimited but the quoting is sometimes unclosed,
        # so turning off quoting - includes quotes in the fields where present
        bb_reader = csv.reader(bbn, delimiter="\t", quoting=csv.QUOTE_NONE)

        # There is no obvious way of finding the number of rows in simple.txt without
        # reading the file and counting them. And that is a huge cost just to provide a
        # progress bar with real percentages, so just show a progress meter to show
        # things happening
        with tqdm(total=None) as pbar:
            # Loop over the lines in the file.
            for row in bb_reader:
                row_clean = [
                    None if val == "\\N" else val
                    for val, drp in zip(row, drop_index)
                    if not drp
                ]

                con.execute(insert_statement, row_clean)
                pbar.update()

        con.commit()

    if deleted is not None:
        LOGGER.info("Adding deleted taxa")

        with gzip.open(deleted, "rt", encoding="utf-8") as dlt:
            # The files are tab delimited but the quoting is sometimes unclosed,
            # so turning off quoting - includes quotes in the fields where present
            dl_reader = csv.reader(dlt, delimiter="\t", quoting=csv.QUOTE_NONE)

            with tqdm(total=None) as pbar:
                for row in dl_reader:
                    row_clean = [
                        None if val == "\\N" else val
                        for val, drp in zip(row, drop_index)
                        if not drp
                    ]

                    # replace the status with DELETED
                    row_clean[4] = "DELETED"

                    con.execute(insert_statement, row_clean)
                    pbar.update()

        con.commit()

    # Create the indices
    LOGGER.info("Creating database indexes")

    con.execute("CREATE INDEX backbone_name_rank ON backbone (canonical_name, rank);")
    con.execute("CREATE INDEX backbone_id ON backbone (id);")
    con.commit()

    # Delete the downloaded files
    if not keep:
        LOGGER.info("Removing downloaded files")
        os.remove(simple)
        if deleted is not None:
            os.remove(deleted)

    FORMATTER.pop()

build_local_ncbi(outfile, timestamp, taxdmp, keep=False)

Create a local NCBI taxonomy database.

This function takes the path to a downloaded data file from the NCBI Taxonomy archive and builds a SQLite3 database file for use in local validation in the safedata_validator package. The database file is created in the provided outdir location with the name 'ncbi_taxonomy_timestamp.sqlite'. The location of this file then needs to be included in the package configuration to be used in validation.

The data file can be downloaded using the download_ncbi_taxonomy function, which will retrieve a taxdmp_timestamp.zip archive. Data is read automatically from the compressed file - it does not need to be decompressed.

By default, the downloaded files are deleted after the database has been created, but the 'keep' argument can be used to retain them.

Parameters:

Name Type Description Default
outfile str

The filepath to use to create the SQLite file

required
timestamp str

The timestamp of the downloaded version.

required
taxdmp str

The path to the taxdmp ZIP archive.

required
keep bool

Should the original archive be retained.

False
Source code in safedata_validator/taxondb.py
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
def build_local_ncbi(
    outfile: str, timestamp: str, taxdmp: str, keep: bool = False
) -> None:
    """Create a local NCBI taxonomy database.

    This function takes the path to a downloaded data file from the NCBI Taxonomy
    archive and builds a SQLite3 database file for use in local validation in the
    safedata_validator package. The database file is created in the provided outdir
    location with the name 'ncbi_taxonomy_timestamp.sqlite'. The location of this file
    then needs to be included in the package configuration to be used in validation.

    The data file can be downloaded using the download_ncbi_taxonomy function, which
    will retrieve a `taxdmp_timestamp.zip` archive. Data is read automatically from the
    compressed file - it does not need to be decompressed.

    By default, the downloaded files are deleted after the database has been created,
    but the 'keep' argument can be used to retain them.

    Args:
        outfile: The filepath to use to create the SQLite file
        timestamp: The timestamp of the downloaded version.
        taxdmp: The path to the taxdmp ZIP archive.
        keep: Should the original archive be retained.
    """

    # Create the output file
    LOGGER.info(f"Building GBIF backbone database in: {outfile}")
    FORMATTER.push()

    # Create the output file and turn off safety features for speed
    con = sqlite3.connect(outfile)
    con.execute("PRAGMA synchronous = OFF")

    # Write the timestamp into a table
    con.execute("CREATE TABLE timestamp (timestamp date);")
    con.execute(f"INSERT INTO timestamp VALUES ('{timestamp}');")
    con.commit()
    LOGGER.info("Timestamp table created")

    # Define the retained fields and schema for each required table
    tables = {
        "nodes": {
            "file": "nodes.dmp",
            "drop": [
                "embl_code",
                "division_id",
                "inherited_div_flag",
                "genetic_code_id",
                "inherited_GC_flag",
                "mito_code_id",
                "inherited_MGC_flag",
                "GenBank_hidden_flag",
                "hidden_subtree_root_flag",
            ],
            "schema": [
                ("tax_id", "int PRIMARY KEY"),
                ("parent_tax_id", "int"),
                ("rank", "text"),
                ("embl_code", "text"),
                ("division_id", "int"),
                ("inherited_div_flag", "boolean"),
                ("genetic_code_id", "int"),
                ("inherited_GC_flag", "boolean"),
                ("mito_code_id", "int"),
                ("inherited_MGC_flag", "boolean"),
                ("GenBank_hidden_flag", "boolean"),
                ("hidden_subtree_root_flag", "boolean"),
                ("comments", "text"),
            ],
        },
        "names": {
            "file": "names.dmp",
            "drop": [],  # ["unique_name"],
            "schema": [
                ("tax_id", "int"),
                ("name_txt", "text"),
                ("unique_name", "text"),
                ("name_class", "text"),
            ],
        },
        "merge": {
            "file": "merged.dmp",
            "drop": [],
            "schema": [("old_tax_id", "int"), ("new_tax_id", "int")],
        },
    }

    archive = zipfile.ZipFile(taxdmp)

    # Process each table
    for tbl, info in tables.items():
        # Get a logical index of which fields are being kept for this table
        drop_index = [True if vl[0] in info["drop"] else False for vl in info["schema"]]

        # Create the final schema for this table
        schema = ", ".join(
            [" ".join(val) for val, drop in zip(info["schema"], drop_index) if not drop]
        )

        schema = f"CREATE TABLE {tbl} ({schema})"

        LOGGER.info(f"Creating {tbl} table")
        con.execute(schema)
        con.commit()

        # Create the insert statement
        placeholders = ",".join(["?"] * (len(drop_index) - sum(drop_index)))
        insert_statement = f"INSERT INTO {tbl} VALUES ({placeholders})"

        # Import data from the archive
        with archive.open(str(info["file"]), "r") as data:
            LOGGER.info(f"Populating {tbl} table from {info['file']}")

            # Use TextIOWrapper to expose the binary data from the Zip as text for CSV
            data_text = TextIOWrapper(data)

            # The files are pipe delimited but the quoting is sometimes unclosed,
            # so turning off quoting - includes quotes in the fields where present
            data_reader = csv.reader(data_text, delimiter="|", quoting=csv.QUOTE_NONE)

            with tqdm(total=None) as pbar:
                for row in data_reader:
                    row = [val.strip() for val, drp in zip(row, drop_index) if not drp]

                    con.execute(insert_statement, row)
                    pbar.update()

        con.commit()

    archive.close()

    # Populate a unique ranks table for this database
    LOGGER.info("Creating unique ranks table")
    con.execute("CREATE TABLE unique_ncbi_ranks (rank_index int, rank str);")

    # Get the unique pairs of child + parent ranks
    cur = con.execute(
        """select distinct ch.rank, pr.rank
            from nodes ch inner join nodes pr
            on ch.parent_tax_id = pr.tax_id
        """
    )

    # Filter out no rank and clade which have variable position
    ranks = [
        rw for rw in cur.fetchall() if ("no rank" not in rw) and ("clade" not in rw)
    ]

    # Group by parent
    children_by_parents = {}
    ranks.sort(key=lambda x: x[1])
    grp_by_parent = groupby(ranks, lambda x: x[1])

    for ky, gp in grp_by_parent:
        children_by_parents[ky] = {ch for ch, _ in gp}

    # Remove a circular reference and then find the static order through the graph
    children_by_parents["forma specialis"].remove("forma specialis")
    sorter = graphlib.TopologicalSorter(children_by_parents)
    taxon_order = reversed(list(sorter.static_order()))

    # Add the
    con.executemany(
        "INSERT INTO unique_ncbi_ranks VALUES (?, ?)", enumerate(taxon_order)
    )
    con.commit()

    # Create the indices
    LOGGER.info("Creating database indexes")
    con.execute("CREATE INDEX node_id ON nodes (tax_id);")
    con.execute("CREATE INDEX all_names ON names (name_txt);")
    con.execute("CREATE INDEX id_name_class ON names (tax_id, name_class);")
    con.execute("CREATE INDEX merged_id ON merge (old_tax_id);")
    con.commit()

    # Delete the downloaded files
    if not keep:
        LOGGER.info("Removing downloaded archive")
        os.remove(taxdmp)

    FORMATTER.pop()