提交饱和编辑的相关设计,及检验代码

This commit is contained in:
2026-02-26 14:02:42 +08:00
commit cb556b47c0
36 changed files with 5437 additions and 0 deletions

View File

@@ -0,0 +1,222 @@
import csv
import gzip
import logging
from typing import Set, Dict, Iterator
# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def read_excluded_sequences(file_path: str, sequence_column: str = 'sequence_name') -> Set[str]:
"""
从CSV文件中读取需要排除的序列名称
Args:
file_path: CSV文件路径
sequence_column: 序列名称所在的列名
Returns:
排除序列名称的集合
"""
excluded = set()
try:
with gzip.open(file_path, 'rt', newline='', encoding='utf-8') as f:
reader = csv.DictReader(f)
# 检查必要的列是否存在
if sequence_column not in reader.fieldnames:
raise ValueError(f"CSV文件中缺少'{sequence_column}'")
# 逐行读取,收集序列名称
for row in reader:
sequence_name = row.get(sequence_column)
if sequence_name:
excluded.add(sequence_name.strip())
logger.info(f"{file_path} 读取了 {len(excluded)} 个排除序列")
return excluded
except FileNotFoundError:
logger.error(f"文件不存在: {file_path}")
raise
except Exception as e:
logger.error(f"读取文件 {file_path} 时出错: {e}")
raise
def validate_csv_headers(file_path: str, expected_headers: Set[str], gzipped: bool = True) -> bool:
"""
验证CSV文件是否包含必需的列头
Args:
file_path: 文件路径
expected_headers: 必需的列头集合
gzipped: 是否为gzip压缩文件
Returns:
验证是否通过
"""
try:
if gzipped:
opener = gzip.open
mode = 'rt'
else:
opener = open
mode = 'r'
with opener(file_path, mode, newline='', encoding='utf-8') as f:
# 读取第一行作为表头
reader = csv.reader(f)
headers = next(reader, None)
if not headers:
logger.error(f"文件 {file_path} 没有表头或为空")
return False
# 检查必需列是否存在
headers_set = set(headers)
missing_headers = expected_headers - headers_set
if missing_headers:
logger.error(f"文件 {file_path} 缺少必需列: {missing_headers}")
return False
logger.info(f"文件 {file_path} 表头验证通过")
return True
except Exception as e:
logger.error(f"验证文件 {file_path} 表头时出错: {e}")
return False
def process_prime_design(primedesign_path: str, excluded_sequences: Set[str],
output_path: str, batch_size: int = 10000) -> int:
"""
处理PrimeDesign文件过滤排除序列
Args:
primedesign_path: PrimeDesign文件路径gzip压缩
excluded_sequences: 需要排除的序列集合
output_path: 输出文件路径
batch_size: 批量写入大小
Returns:
处理的行数
"""
processed_count = 0
written_count = 0
try:
with gzip.open(primedesign_path, 'rt', newline='', encoding='utf-8') as input_file, \
gzip.open(output_path, 'wt', newline='', encoding='utf-8') as output_file:
# 创建CSV读写器
reader = csv.DictReader(input_file)
writer = csv.DictWriter(output_file, fieldnames=reader.fieldnames)
# 写入表头
writer.writeheader()
# 逐行处理数据
for row in reader:
processed_count += 1
try:
gRNA_type = row.get('gRNA_type', '').strip()
target_name = row.get('Target_name', '').strip()
# 只处理pegRNA且不在排除列表中
if gRNA_type == "pegRNA" and target_name not in excluded_sequences:
writer.writerow(row)
written_count += 1
# 定期刷新缓冲区
if written_count % batch_size == 0:
output_file.flush()
except KeyError as e:
logger.warning(f"{processed_count} 行缺少字段 {e},跳过该行")
continue
except Exception as e:
logger.warning(f"处理第 {processed_count} 行时出错: {e},跳过该行")
continue
# 最终刷新缓冲区
output_file.flush()
logger.info(f"处理完成: 处理了 {processed_count} 行,写入了 {written_count}")
return written_count
except Exception as e:
logger.error(f"处理PrimeDesign文件时出错: {e}")
raise
def main(pegrna: str, primedesign: str, output: str) -> None:
"""
主函数处理PrimeDesign输出文件过滤排除序列
Args:
pegrna: 包含需要排除的序列的CSV文件
primedesign: PrimeDesign输出文件gzip压缩
output: 输出文件前缀
"""
logger.info("开始处理PrimeDesign文件")
# 步骤1: 验证输入文件格式
logger.info("验证输入文件格式...")
# 验证pegrna文件格式
if not validate_csv_headers(pegrna, {'sequence_name'}, gzipped=True):
raise ValueError("pegrna文件格式验证失败")
# 验证primedesign文件格式
if not validate_csv_headers(primedesign, {'gRNA_type', 'Target_name'}, gzipped=True):
raise ValueError("primedesign文件格式验证失败")
# 步骤2: 读取排除序列
logger.info("读取需要排除的序列...")
excluded_sequences = read_excluded_sequences(pegrna)
# 步骤3: 处理PrimeDesign文件
logger.info("开始处理PrimeDesign文件...")
output_path = f"{output}_PrimeDesign_pegRNA.csv.gz"
written_count = process_prime_design(
primedesign_path=primedesign,
excluded_sequences=excluded_sequences,
output_path=output_path,
batch_size=10000
)
logger.info(f"输出文件已保存: {output_path}")
logger.info(f"总共写入了 {written_count} 条pegRNA记录")
def safe_main(pegrna: str, primedesign: str, output: str) -> None:
"""
带错误处理的主函数包装器
Args:
pegrna: 包含需要排除的序列的CSV文件
primedesign: PrimeDesign输出文件gzip压缩
output: 输出文件前缀
"""
try:
main(pegrna, primedesign, output)
logger.info("程序执行成功!")
except Exception as e:
logger.error(f"程序执行失败: {e}")
raise
# 如果直接运行此脚本
if __name__ == "__main__":
import sys
pegrna_file = sys.argv[1]
primedesign_file = sys.argv[2]
output_prefix = sys.argv[3]
safe_main(pegrna_file, primedesign_file, output_prefix)