Files
procas12f/combine_pridict2_primedesign.py

222 lines
6.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import csv
import gzip
import logging
from typing import Set, Dict, Iterator
# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def read_excluded_sequences(file_path: str, sequence_column: str = 'sequence_name') -> Set[str]:
"""
从CSV文件中读取需要排除的序列名称
Args:
file_path: CSV文件路径
sequence_column: 序列名称所在的列名
Returns:
排除序列名称的集合
"""
excluded = set()
try:
with gzip.open(file_path, 'rt', newline='', encoding='utf-8') as f:
reader = csv.DictReader(f)
# 检查必要的列是否存在
if sequence_column not in reader.fieldnames:
raise ValueError(f"CSV文件中缺少'{sequence_column}'")
# 逐行读取,收集序列名称
for row in reader:
sequence_name = row.get(sequence_column)
if sequence_name:
excluded.add(sequence_name.strip())
logger.info(f"{file_path} 读取了 {len(excluded)} 个排除序列")
return excluded
except FileNotFoundError:
logger.error(f"文件不存在: {file_path}")
raise
except Exception as e:
logger.error(f"读取文件 {file_path} 时出错: {e}")
raise
def validate_csv_headers(file_path: str, expected_headers: Set[str], gzipped: bool = True) -> bool:
"""
验证CSV文件是否包含必需的列头
Args:
file_path: 文件路径
expected_headers: 必需的列头集合
gzipped: 是否为gzip压缩文件
Returns:
验证是否通过
"""
try:
if gzipped:
opener = gzip.open
mode = 'rt'
else:
opener = open
mode = 'r'
with opener(file_path, mode, newline='', encoding='utf-8') as f:
# 读取第一行作为表头
reader = csv.reader(f)
headers = next(reader, None)
if not headers:
logger.error(f"文件 {file_path} 没有表头或为空")
return False
# 检查必需列是否存在
headers_set = set(headers)
missing_headers = expected_headers - headers_set
if missing_headers:
logger.error(f"文件 {file_path} 缺少必需列: {missing_headers}")
return False
logger.info(f"文件 {file_path} 表头验证通过")
return True
except Exception as e:
logger.error(f"验证文件 {file_path} 表头时出错: {e}")
return False
def process_prime_design(primedesign_path: str, excluded_sequences: Set[str],
output_path: str, batch_size: int = 10000) -> int:
"""
处理PrimeDesign文件过滤排除序列
Args:
primedesign_path: PrimeDesign文件路径gzip压缩
excluded_sequences: 需要排除的序列集合
output_path: 输出文件路径
batch_size: 批量写入大小
Returns:
处理的行数
"""
processed_count = 0
written_count = 0
try:
with gzip.open(primedesign_path, 'rt', newline='', encoding='utf-8') as input_file, \
gzip.open(output_path, 'wt', newline='', encoding='utf-8') as output_file:
# 创建CSV读写器
reader = csv.DictReader(input_file)
writer = csv.DictWriter(output_file, fieldnames=reader.fieldnames)
# 写入表头
writer.writeheader()
# 逐行处理数据
for row in reader:
processed_count += 1
try:
gRNA_type = row.get('gRNA_type', '').strip()
target_name = row.get('Target_name', '').strip()
# 只处理pegRNA且不在排除列表中
if gRNA_type == "pegRNA" and target_name not in excluded_sequences:
writer.writerow(row)
written_count += 1
# 定期刷新缓冲区
if written_count % batch_size == 0:
output_file.flush()
except KeyError as e:
logger.warning(f"{processed_count} 行缺少字段 {e},跳过该行")
continue
except Exception as e:
logger.warning(f"处理第 {processed_count} 行时出错: {e},跳过该行")
continue
# 最终刷新缓冲区
output_file.flush()
logger.info(f"处理完成: 处理了 {processed_count} 行,写入了 {written_count}")
return written_count
except Exception as e:
logger.error(f"处理PrimeDesign文件时出错: {e}")
raise
def main(pegrna: str, primedesign: str, output: str) -> None:
"""
主函数处理PrimeDesign输出文件过滤排除序列
Args:
pegrna: 包含需要排除的序列的CSV文件
primedesign: PrimeDesign输出文件gzip压缩
output: 输出文件前缀
"""
logger.info("开始处理PrimeDesign文件")
# 步骤1: 验证输入文件格式
logger.info("验证输入文件格式...")
# 验证pegrna文件格式
if not validate_csv_headers(pegrna, {'sequence_name'}, gzipped=True):
raise ValueError("pegrna文件格式验证失败")
# 验证primedesign文件格式
if not validate_csv_headers(primedesign, {'gRNA_type', 'Target_name'}, gzipped=True):
raise ValueError("primedesign文件格式验证失败")
# 步骤2: 读取排除序列
logger.info("读取需要排除的序列...")
excluded_sequences = read_excluded_sequences(pegrna)
# 步骤3: 处理PrimeDesign文件
logger.info("开始处理PrimeDesign文件...")
output_path = f"{output}_PrimeDesign_pegRNA.csv.gz"
written_count = process_prime_design(
primedesign_path=primedesign,
excluded_sequences=excluded_sequences,
output_path=output_path,
batch_size=10000
)
logger.info(f"输出文件已保存: {output_path}")
logger.info(f"总共写入了 {written_count} 条pegRNA记录")
def safe_main(pegrna: str, primedesign: str, output: str) -> None:
"""
带错误处理的主函数包装器
Args:
pegrna: 包含需要排除的序列的CSV文件
primedesign: PrimeDesign输出文件gzip压缩
output: 输出文件前缀
"""
try:
main(pegrna, primedesign, output)
logger.info("程序执行成功!")
except Exception as e:
logger.error(f"程序执行失败: {e}")
raise
# 如果直接运行此脚本
if __name__ == "__main__":
import sys
pegrna_file = sys.argv[1]
primedesign_file = sys.argv[2]
output_prefix = sys.argv[3]
safe_main(pegrna_file, primedesign_file, output_prefix)