#!/usr/bin/env python3 # -*- coding: utf-8 -*- import json import math import os import re import sys from glob import glob import requests as rq import pandas as pd from loguru import logger import numpy as np from src.mutation import design_mutations_for_orf from src.reader import (extract_orf_sequence, get_cds_for_gene, load_uniprot_region, read_gtf, Region) from src.liftover import convert_interval from src.snp import decode_snp, generate_sequences_with_combinations import itertools from src.editseq import run_analysis # 清除默认的 handler logger.remove() # 添加一个只输出 INFO 及以上级别日志的 sink(如控制台) # logger.add(level="INFO") logger.add( sys.stderr, colorize=True, format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {message}", # {name}: {function}: {line} level="INFO" ) def split_regions(cds): u""" 切分原本的cds为3bp的氨基酸reigon 测试用例 14:103698801-103699017 14:103699133-103699179 14:103699364-103699576 14:103703173-103703327 14:103707003-103707215 14:103708522-103708659 14:103711033-103711087 """ regions = [] cds = sorted(cds, key=lambda x: (x.chrom, x.start, x.end)) aa_codon_len = 3 start = 0 for x in cds: # 如果start为0,则直接从目前的区域开始 if start == 0: start = x.start elif start < 0: # 如果start为负值,说明上一个cds并不能完整划分为不同的aa, # 因此,需要单独将起始的小区域单独写出来 regions.append(Region(x.chrom, x.start, x.start - start, kind="start")) regions[-1].addition = x start = x.start - start while start + aa_codon_len <= x.end: # 记录下是否跨边界,以及跨的是哪一个边界 code = "regular" if start == x.start: code = "start" elif start + aa_codon_len == x.end: code = "end" regions.append(Region(x.chrom, start, start + aa_codon_len, kind=code)) regions[-1].addition = x start += aa_codon_len if start < x.end: # 如果是跨到end边界上了,那么就记录跨的边界 regions.append(Region(x.chrom, start, x.end, kind="end")) regions[-1].addition = x start = start - x.end + 1 else: # 如果没有,则把start的指针归零 start = 0 return regions def download_uniprot_region(protein, output): resp = output.replace(".tsv", ".json") url = f"https://www.ebi.ac.uk/proteins/api/coordinates?accession={protein}" if os.path.exists(resp): with open(resp, "r") as r: resp = json.load(r) else: resp = rq.get(url, headers={"Accept": "application/json"}) resp = resp.json() with open(output.replace(".tsv", ".json"), "w+") as w: json.dump(resp, w, indent=4) if not resp[0]["name"].endswith("HUMAN"): raise ValueError(f"protein is not human") __chroms__ = [str(x) for x in range(1, 23)] + ["chr" + str(x) for x in range(1, 23)] + ["X", "Y", "chrX", "chrY"] with open(output, "w+") as w: w.write(f"#{url}\n") for coord in resp[0]["gnCoordinate"]: chromosome = coord["genomicLocation"]["chromosome"] if chromosome not in __chroms__: continue for row in coord["genomicLocation"]["exon"]: genome = row["genomeLocation"] genome = str(genome["begin"]["position"]) + "-" + str(genome["end"]["position"]) protein = row["proteinLocation"] if "end" not in protein and "position" in protein: protein = [str(protein["position"]["position"]), "-", str(protein["position"]["position"])] else: protein = [str(protein["begin"]["position"]), "-", str(protein["end"]["position"])] row = f"{chromosome}:{genome}\t{'\t'.join(protein)}" w.write(row + "\n") break def get_aa_coords(genes, output): os.makedirs(output, exist_ok=True) df = pd.read_excel(genes) # df = df.loc[df["Batch"] == 1, :] for _, row in df.iterrows(): gene_name = row[1] url = f"https://rest.uniprot.org/uniprotkb/search?query=gene:{gene_name}+AND+organism_id:9606+AND+reviewed:true&format=json" resp = rq.get(url) for row in resp.json().get("results", []): if "HUMAN" in row["uniProtkbId"]: priority = row["primaryAccession"] download_uniprot_region(priority, os.path.join(output, f"{gene_name}_{priority}.tsv")) break def adjust_cross_border_region(row): start = row.start end = row.end if len(row) < 3 and "cross" in row.kind: if row.kind == "cross_start": start = end - 3 else: end = start + 3 return f"{row.chrom}:{start}-{end}" return str(row) def design_by_aa(genes, fasta, output, stop_codon = False): u""" 根据氨基酸设计错配的位点和错配规则 """ df = [] for gene in glob(os.path.join(genes, "*.tsv")): logger.info(f"开始设计突变 {gene}...") key = os.path.basename(gene).split(".")[0] # 读取已有的区域 cds = load_uniprot_region(gene) # 按照氨基酸位置划分 cds = split_regions(cds) if not cds: continue # 提取序列 cds = extract_orf_sequence(fasta, cds, half_open=True) for idx, x in enumerate(cds): for strategy in ["3N"]: results = design_mutations_for_orf(x.sequence, strategy=strategy) for res in results: for var in res["variants"]: if var == res["original_codon"]: continue # 如果1个bp在起点,则说明2bp在前边end,该位点的前2bp必须与记录的内含子相同 if "cross_start" == x.kind and len(x) == 1 and var[:2] != x.sequence[:2]: continue # 如果2bp在起点,则说明1bp在前边end,则该位点的第一个碱基必须为内含子相同位点 elif "cross_start" == x.kind and len(x) == 2 and var[0] != x.sequence[0]: continue # 如果1bp在end,则说明2bp在后边,则该位点的2bp位点必须与内含子相同 elif "cross_end" == x.kind and len(x) == 1 and var[1:] != x.sequence[1:]: continue # 如果1bp在end,则说明1bp在后边,则该位点的1bp位点必须为C或G elif "cross_end" == x.kind and len(x) == 2 and var[-1] not in ["C", "G"]: continue row = [key, str(x.addition), idx+1, str(x), adjust_cross_border_region(x), x.kind, strategy, res["original_codon"], var] df.append(row) df = pd.DataFrame(df) df.columns = ["gene", "cds_region", "aa_index", "aa_region", "region_with_intron", "cross_cds_border", "strategy", "origial_code", "mutation_code"] strategy = [] for _, row in df.iterrows(): match = np.sum([x == y for x, y in zip(row["origial_code"], row["mutation_code"])]) strategy.append(f"{3-match}N") df["strategy"] = strategy if stop_codon: df = df[df["mutation_code"].isin(["TAA", "TAG", "TGA"])] df.to_csv(output, index = False) def design_by_snp(snp_info, targets, genes, fasta, fasta_hg38, output): logger.info("读取染色体") chroms = {} starts = {} for gene in glob(os.path.join(genes, "*.tsv")): key = os.path.basename(gene).split(".")[0] cds = load_uniprot_region(gene) cds = sorted(cds, key=lambda x:[x.chrom, x.start, x.end]) chroms[key] = cds[0].chrom starts[key] = cds[0].start logger.info(f"读取snp的信息:{snp_info}") all_sheets = pd.read_excel(snp_info, sheet_name=None) # 遍历所有工作表 res = {} for sheet_name, df in all_sheets.items(): temp = {} for _, row in df.iterrows(): cdna = row["DNA change (cDNA) "] hg38 = row["DNA change (genomic) (hg19)     "] temp[cdna] = hg38 for sheet in re.split(r"[\((\s\))]", sheet_name): res[sheet] = temp print(res.keys()) logger.info(f"读取目标:{targets}") df = pd.read_excel(targets, sheet_name=2) with open(output, "w+") as w: w.write(",".join(["gene", "cdna code", "genomic code", "mutation_region", "version", "original_codon", "mutation_code"]) + "\n") for column in df.columns: if "Unnamed" in column: continue for code in df[column]: if not isinstance(code, str) and math.isnan(code): continue genomic_code = res.get(column, {}).get(code) if genomic_code: sites, rule = decode_snp(genomic_code) elif str(code).startswith("c."): sites, rule = decode_snp(code, ref_start=starts["FANCD2" if column == "FAND2" else column]) else: continue region = Region(chroms["FANCD2" if column == "FAND2" else column], start=sites[0], end=sites[-1]) hg38 = False if genomic_code: region = extract_orf_sequence(fasta, [region])[0] elif str(code).startswith("c."): hg38 = True region = extract_orf_sequence(fasta_hg38, [region])[0] original, replacement = "", "" if ">" in rule: original, replacement = rule.split(">") original = region.sequence elif rule == "dup": original = region.sequence replacement = original * 2 elif rule == "del": original = region.sequence replacement = "" elif rule == "ins": replacement = region.sequence elif "delins" in rule: original = region.sequence replacement = rule.replace("delins", "") elif "ins" in rule: original = region.sequence replacement = rule.replace("ins", "") if not genomic_code: genomic_code = "" # 序列中所有N替换后的排列组合 for o, r in itertools.product(generate_sequences_with_combinations(original), generate_sequences_with_combinations(replacement)): w.write(",".join([column, code.strip(), str(genomic_code).strip(), str(region), "hg38" if hg38 else "hg19", o, r]) + "\n") # data = pd.DataFrame(data) # data.columns = ["gene", "cdna code", "genomic code", "mutation_region", "original_codon", "mutation_code"] # data.to_csv(output, index = False) def extract_fastq_seq(fastq: str, chrom, start, end): import pysam with pysam.FastaFile(fastq) as fh: rec = fh.fetch(str(chrom), start, end) # print(rec) return rec def decode_mutation(rule: str, sequence): original, replacement = "", "" if ">" in rule: original, replacement = rule.split(">") original = sequence elif rule == "dup": original = sequence replacement = original * 2 elif rule == "del": original = sequence replacement = "" elif rule == "ins": replacement = sequence elif "delins" in rule: original = sequence replacement = rule.replace("delins", "") elif "ins" in rule: original = sequence replacement = rule.replace("ins", "") return original, replacement def design_by_hmgd(data, fasta, outfile): import re res = pd.read_csv(data) # print(res.head()) # hgvs # chromosome # startCoord # endCoord data = [] for idx, row in res.iterrows(): key = row["gene"] + "_" + str(idx) try: seq = extract_fastq_seq(fasta, int(row["chromosome"]), row["startCoord"] - 1,row["endCoord"]) seq, replace = decode_mutation(row["hgvs"], seq) if not seq: continue replace = re.sub(r"[\d_]", "", replace) if "del" in replace: replace = "" print(key, seq, replace) before = extract_fastq_seq(fasta, int(row["chromosome"]), row["startCoord"] - 1 - 100, row["startCoord"]) after = extract_fastq_seq(fasta, int(row["chromosome"]), row["endCoord"], row["endCoord"] + 100) seq = f"{before}({seq}/{replace}){after}" data.append({"sequence_name": key, "editseq": seq}) except Exception: continue data = pd.DataFrame(data) data.to_csv(outfile, index=False) if __name__ == "__main__": from fire import Fire # get_aa_coords( # "../metainfo/Cancer and blood disorder panels_v2.xlsx", # "../gene_coords/batch2" # ) # get_aa_coords( # "../metainfo/DDR gene library in 2021 Cell.xlsx", # "../gene_coords/positive" # ) # # Fire({"aa": design_by_aa}) # design_by_aa( # "../gene_coords/batch2", # fasta="../ref/ensembl_115/Homo_sapiens.GRCh38.dna.primary_assembly.fa.gz", # output="../gene_aa_target_batch2.csv.gz" # ) # design_by_aa( # "../gene_coords/positive", # fasta="../ref/ensembl_115/Homo_sapiens.GRCh38.dna.primary_assembly.fa.gz", # output="../gene_aa_target_positive.csv.gz", # stop_codon = True # ) # run_analysis( # "../gene_aa_target_batch2.csv.gz", # reference="../ref/ensembl_115/Homo_sapiens.GRCh38.dna.primary_assembly.fa.gz", # outdir="../../prediction/input/batch2" # ) # run_analysis( # "../gene_aa_target_positive.csv.gz", # reference="../ref/ensembl_115/Homo_sapiens.GRCh38.dna.primary_assembly.fa.gz", # outdir="../../prediction/input/positive" # ) # 生成snp结构,snp_info是整理完的snp信息 # targets是记录了需要处理的基因 # design_by_snp( # snp_info="../metainfo/副本FA家族基因-20250829-DJJ_XD.xlsx", # targets="../metainfo/实验计划.xlsx", # output="gene_snp_target.csv", # fasta="../ref/gencode/GRCh37.p13.genome.fa.gz", # fasta_hg38="../ref/ensembl_115/Homo_sapiens.GRCh38.dna.primary_assembly.fa.gz", # genes="../gene_coords" # ) # design_by_hmgd( # "../metainfo/allmut.csv", # fasta="../ref/ensembl_115/Homo_sapiens.GRCh38.dna.primary_assembly.fa.gz", # outfile="../../prediction/input/pos_v2.csv.gz" # ) # url = "https://www.ebi.ac.uk/proteins/api/coordinates?accession=P21359-1" # download_uniprot_region("Test", "P21359")