Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions uniprot_utils/uinprot_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from minio import Minio
from minio.error import S3Error

#Janaka E

# Initialize Spark session and MinIO client
spark = get_spark_session()
minio_client = get_minio_client()

# File-to-table mapping
file_table_mapping = {
"janaka_db-source/sp_proteins_Jan8_parquet/feature_x_protein.parquet": "feature_x_protein",
"janaka_db-source/sp_proteins_Jan8_parquet/protein_table.parquet": "protein",
"janaka_db-source/sp_proteins_Jan8_parquet/name_table.parquet": "name",
"janaka_db-source/sp_proteins_Jan8_parquet/identifier_table.parquet": "identifier",
"janaka_db-source/sp_proteins_Jan8_parquet/association_table.parquet": "association",
}

# Bucket and namespace information
bucket_name = "cdm-lake"
namespace = "janaka_db"

# Existing Delta table locations (aligned with the metastore)
existing_delta_locations = {
"protein": "janaka_db-deltalake/protein_table_delta",
"feature_x_protein": "janaka_db-deltalake/feature_x_protein_delta",
"name": "janaka_db-deltalake/name_table_delta",
"identifier": "janaka_db-deltalake/identifier_table_delta",
"association": "janaka_db-deltalake/association_table_delta",
}

# Process each file and upload/update to its respective Delta table
for file_name, table_name in file_table_mapping.items():
try:
# Derive paths
parquet_file_path = f"s3a://{bucket_name}/{file_name}"
delta_table_path = existing_delta_locations[table_name]
delta_table_s3_path = f"s3a://{bucket_name}/{delta_table_path}"
spark_table = f"{namespace}.{table_name}"

print(f"Processing file: {file_name} -> Table: {table_name}")

# Load the Parquet file into Spark
df_spark = spark.read.parquet(parquet_file_path)
print(f"Loaded Parquet file from {parquet_file_path} into Spark DataFrame.")

# Check if Delta table exists
if DeltaTable.isDeltaTable(spark, delta_table_s3_path):
# Perform upsert (merge) into the Delta table
delta_table = DeltaTable.forPath(spark, delta_table_s3_path)
# Define merge condition based on primary keys (adjust based on schema)
if table_name == "feature_x_protein":
merge_condition = "existing.protein_id = updates.protein_id AND existing.feature_id = updates.feature_id"
else:
merge_condition = "existing.protein_id = updates.protein_id"

# Perform the merge
delta_table.alias("existing").merge(
df_spark.alias("updates"),
merge_condition
).whenMatchedUpdateAll(
).whenNotMatchedInsertAll(
).execute()
print(f"Table '{table_name}' updated successfully with new data.")
else:
# Create a new Delta table
(df_spark.write.mode("overwrite")
.format("delta")
.option("path", delta_table_s3_path)
.saveAsTable(spark_table))
print(f"New table '{table_name}' created successfully in namespace '{namespace}'.")

# Verify the table contents
result = spark.sql(f"SELECT * FROM {spark_table} LIMIT 5")
result.show(truncate=False)
print(f"Verification successful for table: {table_name}.")

except Exception as e:
print(f"Error processing file {file_name} for table {table_name}: {e}")
122 changes: 122 additions & 0 deletions uniprot_utils/uniProtDatFileParser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import pandas as pd
from Bio import SwissProt
from io import StringIO

# This code parses the Uniprot .dat dumps into a tab delimited file
# Janaka E

def parse_swissprot_file_in_chunks(file_path, output_file, chunk_size=5000000):
def write_to_file(records, output_file, write_mode):
"""Write processed records to the output file."""
df = pd.DataFrame(records)
df.to_csv(output_file, sep="\t", index=False, mode=write_mode, header=(write_mode == 'w'))

def process_buffer(buffer, output_file, chunk_number):
"""Process lines in the buffer and write to the output file."""
records = []
try:
buffer_as_string = "".join(buffer)
buffer_stream = StringIO(buffer_as_string)
for record in SwissProt.parse(buffer_stream):
try:
# Extract evidence codes from features
evidence_codes = []
for feature in record.features:
if isinstance(feature.qualifiers, dict) and "evidence" in feature.qualifiers:
evidence_values = feature.qualifiers.get("evidence", [])
evidence_codes.extend(evidence_values if isinstance(evidence_values, list) else [evidence_values])
evidence_codes = "; ".join(map(str, evidence_codes)) if evidence_codes else "NULL"

# Extract publication details
publications = []
for ref in record.references:
authors = ref.authors.strip() if ref.authors else "Unknown Authors"
title = ref.title.strip() if ref.title else "No Title"
location = ref.location.strip() if ref.location else "No Journal Info"
pubmed = ""
for db, id_ in ref.references:
if db.lower() == "pubmed":
pubmed = f"PubMed:{id_}"
publication = f"{authors}. {title}. {location}. {pubmed}".strip()
publications.append(publication)
publications = "; ".join(publications) if publications else "NULL"

# Extract GO terms
go_terms = [
f"{xref[1]} ({xref[2]})"
for xref in record.cross_references
if len(xref) >= 3 and xref[0] == "GO"
]
go_terms = "; ".join(map(str, go_terms)) if go_terms else "NULL"

# Consolidate all key data
entry_data = {
"Entry": record.accessions[0] if record.accessions else "NULL",
"Entry Name": record.entry_name if record.entry_name else "NULL",
"Reviewed": "Reviewed" if record.data_class == "Reviewed" else "Unreviewed",
"Protein names": record.description if record.description else "NULL",
"Gene Names": "; ".join(map(str, record.gene_name)) if record.gene_name else "NULL",
"Organism": record.organism if record.organism else "NULL",
"Taxonomy": "; ".join(map(str, record.organism_classification)) if record.organism_classification else "NULL",
"Length": len(record.sequence) if record.sequence else "NULL",
"Sequence": record.sequence if record.sequence else "NULL",
"PE": record.protein_existence if hasattr(record, "protein_existence") and record.protein_existence else "NULL",
"EMBL": "; ".join(id_ for db, id_, *_ in record.cross_references if db == "EMBL") or "NULL",
"RefSeq": "; ".join(id_ for db, id_, *_ in record.cross_references if db == "RefSeq") or "NULL",
"GeneID": "; ".join(id_ for db, id_, *_ in record.cross_references if db == "GeneID") or "NULL",
"PDB": "; ".join(id_ for db, id_, *_ in record.cross_references if db == "PDB") or "NULL",
"KEGG": "; ".join(id_ for db, id_, *_ in record.cross_references if db == "KEGG") or "NULL",
"Reactome": "; ".join(id_ for db, id_, *_ in record.cross_references if db == "Reactome") or "NULL",
"HGNC": "; ".join(id_ for db, id_, *_ in record.cross_references if db == "HGNC") or "NULL",
"STRING": "; ".join(id_ for db, id_, *_ in record.cross_references if db == "STRING") or "NULL",
"BioCyc": "; ".join(id_ for db, id_, *_ in record.cross_references if db == "BioCyc") or "NULL",
"Pfam": "; ".join(id_ for db, id_, *_ in record.cross_references if db == "Pfam") or "NULL",
"InterPro": "; ".join(id_ for db, id_, *_ in record.cross_references if db == "InterPro") or "NULL",
"GO": go_terms,
"Proteomes": "; ".join(id_ for db, id_, *_ in record.cross_references if db == "Proteomes") or "NULL",
"Keywords": "; ".join(map(str, record.keywords)) if record.keywords else "NULL",
"Evidence Codes": evidence_codes,
"Publications": publications,
}
records.append(entry_data)
except Exception as e:
print(f"Error parsing record: {e}")
continue
except Exception as e:
print(f"Error processing buffer: {e}")

# Write to file
if records:
write_mode = 'a' if chunk_number > 1 else 'w'
write_to_file(records, output_file, write_mode)

# Main logic to read in chunks
chunk_number = 1
line_buffer = []
with open(file_path, "r") as handle:
for line in handle:
line_buffer.append(line)
if line.startswith("//") and len(line_buffer) >= chunk_size:
print(f"Processing chunk {chunk_number}, lines read so far: {len(line_buffer)}")
process_buffer(line_buffer, output_file, chunk_number)
line_buffer = [] # Clear buffer
chunk_number += 1

# Process any remaining lines
if line_buffer:
print(f"Processing final chunk, total lines read: {len(line_buffer)}")
process_buffer(line_buffer, output_file, chunk_number)

print(f"Parsed data saved to {output_file}")

# Input and output file paths
# File paths are written to match the file paths in Sequoia -
#input_file = "/home/janakae/scratch/Uniprot/Trembl/uniprot_trembl.dat" # Path to your UniProt text file
#output_file = "/home/janakae/scratch/Uniprot/Trembl/Full_parsed_trembl_data.tsv"

#Test files in the repo
input_file = "/uniprotTest/uniprotTest.dat" # Path to your UniProt text file
output_file = "/uniprotTest/Full_parsed_swissprot_data_test.tsv"

# Parse the UniProt/SwissProt file in chunks
parse_swissprot_file_in_chunks(input_file, output_file)
139 changes: 139 additions & 0 deletions uniprot_utils/uniProtDatFileParserTrembl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import pandas as pd
import hashlib
import ast
import uuid
import os

# This code was specifically written to handle the trempl .dat file and able to parse into a tab delimited file
# Given the large size of trembl file, this code would able to process the trembl dataset as chunks and produce consolidated output file
# Janaka E

# Function to calculate SHA256 hash for sequences
def calculate_hash(sequence):
return hashlib.sha256(sequence.encode('utf-8')).hexdigest() if pd.notnull(sequence) else None

# Function to generate UUID
def generate_uuid():
return str(uuid.uuid4())

# Function to process each chunk
def process_chunk(chunk, hash_to_uuid_map, output_dir, chunk_number):
# Treat missing values as NULL
chunk = chunk.fillna("NULL")

# Calculate SHA256 hash for sequences
chunk['hash'] = chunk['Sequence'].apply(calculate_hash)

# Remove duplicate sequences based on the hash
unique_chunk = chunk.drop_duplicates(subset=['hash'])

# Map UUIDs to each unique hash
new_uuids = {hash_val: generate_uuid() for hash_val in unique_chunk['hash'] if hash_val not in hash_to_uuid_map}
hash_to_uuid_map.update(new_uuids)

# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)

# Generate and save the 'protein' table for the chunk
protein_data = pd.DataFrame({
'protein_id': unique_chunk['hash'].map(hash_to_uuid_map),
'name': unique_chunk['Entry'],
'length': pd.to_numeric(unique_chunk['Length'], errors='coerce').fillna(0).astype(int),
'sequence': unique_chunk['Sequence'],
'hash': unique_chunk['hash'],
'description': unique_chunk['Protein names']
})
protein_data.to_parquet(os.path.join(output_dir, f"protein_table_chunk_{chunk_number}.parquet"), index=False)

# Generate and save the 'name' table for the chunk
def extract_source(gene_names):
if gene_names == "NULL" or not gene_names.strip():
return "NULL"
try:
parsed_data = ast.literal_eval(gene_names)
if isinstance(parsed_data, dict) and 'ORFNames' in parsed_data:
orf_names = parsed_data.get('ORFNames', [])
if isinstance(orf_names, list) and len(orf_names) > 0:
return orf_names[0]
return "NULL"
except (ValueError, SyntaxError):
return "NULL"

name_data = pd.DataFrame({
'protein_id': unique_chunk['hash'].map(hash_to_uuid_map),
'name': unique_chunk['Entry'],
'entry': unique_chunk['Entry Name'],
'source': unique_chunk['Gene Names'].apply(extract_source),
'description': unique_chunk['Protein names']
})
name_data.to_parquet(os.path.join(output_dir, f"name_table_chunk_{chunk_number}.parquet"), index=False)

# Generate and save the 'identifier' table for the chunk
identifier_data = pd.DataFrame({
'protein_id': unique_chunk['hash'].map(hash_to_uuid_map),
'identifier': unique_chunk['Entry Name'],
'source': unique_chunk['Entry'].apply(lambda x: f"https://www.uniprot.org/uniprotkb/{x}/entry"),
'description': unique_chunk['Protein names']
})
identifier_data.to_parquet(os.path.join(output_dir, f"identifier_table_chunk_{chunk_number}.parquet"), index=False)

# Generate and save the 'association' table for the chunk
def parse_ontologies(row):
ontologies = []
if row['KEGG'] != "NULL":
ontologies.append(f"KEGG: {row['KEGG']}")
if row['GO'] != "NULL":
go_terms = [term.split(' ')[0] for term in row['GO'].split('; ') if term]
ontologies.extend(go_terms)
return ontologies

association_data = []
for _, row in unique_chunk.iterrows():
protein_id = hash_to_uuid_map[row['hash']]
ontologies = parse_ontologies(row)
for ontology in ontologies:
association_data.append({
'subject': protein_id,
'ontology_id': ontology,
'publications': row['Publications'] if row['Publications'] != "NULL" else "NULL",
'evidence_type': row['Evidence Codes'] if row['Evidence Codes'] != "NULL" else "NULL"
})
pd.DataFrame(association_data).to_parquet(os.path.join(output_dir, f"association_table_chunk_{chunk_number}.parquet"), index=False)

# Generate and save the 'feature_x_protein' table for the chunk
feature_x_protein_data = []
for _, row in unique_chunk.iterrows():
protein_id = hash_to_uuid_map[row['hash']]
gene_ids = row['GeneID'] if row['GeneID'] != "NULL" else None

if gene_ids:
for gene_id in gene_ids.split("; "): # Handle multiple GeneIDs
feature_x_protein_data.append({
'protein_id': protein_id,
'feature_id': gene_id.strip(),
'protocol_id': "SwissProt/NCBI"
})
else:
feature_x_protein_data.append({
'protein_id': protein_id,
'feature_id': "NULL",
'protocol_id': "SwissProt/NCBI"
})
pd.DataFrame(feature_x_protein_data).to_parquet(os.path.join(output_dir, f"feature_x_protein_chunk_{chunk_number}.parquet"), index=False)

return len(new_uuids)

# Input and output file paths
input_file = "Full_parsed_trembl_data.tsv"
chunk_size = 1000000 # Process 500,000 lines at a time
output_dir = "output_parquet_files"

# Initialize hash-to-UUID mapping
hash_to_uuid_map = {}

# Process the input file in chunks
for chunk_number, chunk in enumerate(pd.read_csv(input_file, sep='\t', chunksize=chunk_size, dtype=str), start=1):
print(f"Processing chunk {chunk_number}, lines read so far: {chunk_number * chunk_size}")
process_chunk(chunk, hash_to_uuid_map, output_dir, chunk_number)

print(f"Tables generated and saved in {output_dir}.")
Loading