Source code for src.data_access.via_DMS.FileOperations

import os
import fnmatch
import requests
import logging
from bs4 import BeautifulSoup
from urllib.parse import urlparse
from utility.utils import timeit

[docs]class FileOperations: ''' Grab locations of the MSGF+ & MASIC analysis tools using analysis_jobs object''' #TODO Move it under viaDMS directory def __init__(self, analysis_jobs= None, parent_folder= None, job_info=None): ''' :param analysis_jobs: :param parent_folder: :param job_info: ''' self.Input = analysis_jobs self.parent_folder = parent_folder self.job_info= job_info self.url = None self.started_from= parent_folder self.file_pattern_types = [ "*syn.txt", "*SeqToProteinMap.txt", "*ResultToSeqMap.txt", "*_SICstats.txt", "*.raw"] self.row=None
[docs] def create_dir(self,folder): ''' :param folder: :return: ''' if not os.path.exists(folder): os.makedirs(folder) os.chdir(folder)
[docs] def write_to_disk(self, url: str): ''' :param url: Job's file path on DMS. :return: ''' if not os.path.isfile(url.split('/')[-1]): try: os.system('wget %s' % url) # logging.info("Files transferred successfully!") except Exception as e: logging.info("FAILED to download file!")
[docs] def check_url(self, url): ''' :param url: :return: ''' response = requests.get(url) try: response.raise_for_status() except requests.exceptions.RequestException as e: print("Error: " + str(e)+ ", if .fasta, searching on another location!") return False return True
[docs] def download_over_http(self): ''' Given a url, copy files from DMS to disk! :return: ''' if self.check_url(self.url): response = requests.get(self.url) soup = BeautifulSoup(response.text, 'html.parser') filenames= [ link.get('href') for link in soup.find_all('a')] for file in filenames: for p in self.file_pattern_types: if fnmatch.fnmatch(file, p): parsed_uri = urlparse(self.url) domain_name = '{uri.scheme}://{uri.netloc}/'.format(uri=parsed_uri) file_url = domain_name + file self.write_to_disk(file_url)
[docs] def parse_fileserverpath_to_web_url(self, file_server_path): ''' Converts Windows FileSever path to webURL. :param file_server_path: windows server file path. :return: ''' folders = file_server_path.split('\\') self.url = 'http://' + folders[2] + '.pnl.gov/' + '/'.join(folders[3:])
[docs] def download_msgf_jobs(self,df): ''' :param df: :return: ''' self.create_dir(self.parent_folder + '/' + 'DMS_MSGFjobs' + '/' + str(df['MSGFPlusJob']) ) # print("donwload_msgf", os.getcwd()) path_or_url = str(df['MSGFplus_loc']) if not path_or_url.startswith("http"): self.parse_fileserverpath_to_web_url(path_or_url) self.download_over_http() else: self.url = path_or_url self.download_over_http() return path_or_url
[docs] def download_masic_jobs(self,df): ''' :param df: :return: ''' self.create_dir(self.parent_folder + '/' + 'DMS_MASICjob' + '/' + str(df['NewestMasicJob']) ) # print("donwload_masic", os.getcwd()) path_or_url = str(df['MASIC_loc']) if not path_or_url.startswith("http"): self.parse_fileserverpath_to_web_url(path_or_url) self.download_over_http() else: self.url= path_or_url self.download_over_http()
[docs] def download_raw_files(self ,df , path_or_url): ''' ''' os.chdir(self.parent_folder) # print("donwload_raw", os.getcwd()) # print("!!!", path_or_url) if not path_or_url.startswith("http"): # works with datasets | jobs split_path = path_or_url.split("\\") path_or_url = '\\'.join(split_path[:-1]) self.parse_fileserverpath_to_web_url(path_or_url) self.download_over_http() else: # print(">>", path_or_url) split_path = path_or_url.split("/") path_or_url = '/'.join(split_path[:-2]) self.url = path_or_url # print("<<", self.url) self.download_over_http()
[docs] def download_fasta_param_files(self): ''' :return: ''' self.create_dir(self.started_from + '/' + 'DMS_fasta_param' ) fasta_file = list(set(self.job_info["OrganismDBName"])) param_file = list(set(self.job_info["ParameterFileName"])) # print(fasta_file) # print(param_file) # print("Downloading here", os.getcwd()) for file in fasta_file: url = "http://gigasax/DMS_Organism_Files/Microbial_Communities/FASTA/" + file if self.check_url(url): # print("MC downlaod", url) self.write_to_disk(url) url = "http://gigasax/DMS_FASTA_File_Archive/dynamic/forward/" + file if self.check_url(url): # print("Forward downlaod", url) self.write_to_disk(url) else: print("Can't find FASTA!") for file in param_file: url= "http://gigasax/DMS_Parameter_Files/MSGFPlus/" + file if self.check_url(url): self.write_to_disk(url) else: print("Failed to grab params file")
[docs] def use_df(self, df): ''' Called for each dataset in the dataFrame! :param df: reference to analysis_jobs object. :return: ''' self.row=df self.parent_folder = self.started_from + '/' + str(self.row['Dataset_ID']) self.create_dir(self.parent_folder) path_or_url = self.download_msgf_jobs(self.row) self.download_masic_jobs(self.row) self.download_raw_files(self.row, path_or_url)
@timeit def get_files(self): '''Start's any any File operation. :return: ''' if not os.path.exists(self.started_from): os.chdir(self.started_from) # print("get_files()", os.getcwd()) self.download_fasta_param_files() self.Input.apply( lambda x: self.use_df(x), axis=1) print("`"*5) print("Finished downloading data at loc:{}".format(self.started_from)) print("`"*5) print("Data already exist at loc:{}".format(self.started_from))
# def download_over_ftp(self): # ''' # TODO : Directly from Proto-X windows file-server # Eg. folder_path :"\\proto-6\QExactHF03\2015_2\MinT_Kans_No_Gly_pool_19_Qexactive_22May15_Arwen_14-12-03\SIC201505251246_Auto1197920" # :return: # ''' # # def download_using_DMS_api(self): # ''' # TODO: need to explore! # Source: https://prismwiki.pnl.gov/wiki/DMS_Data_Export#Advanced_DMS_Data_Export # url= https://dms2.pnl.gov/data/ax/tsv/aux_info_categories/aux_info_def/501 # :return: # ''' # def handle_workflow_failure(self): # # TODO: NMDC-10, add logic to check which files were downloaded before pipeline failure. It should only download the ones which weren't successful and start from there. # # Using MD5 checksum. # pass