{"id":3357,"date":"2017-05-02T15:06:19","date_gmt":"2017-05-02T19:06:19","guid":{"rendered":"http:\/\/bioinfo.iric.ca\/?p=3357\/"},"modified":"2017-05-03T09:21:41","modified_gmt":"2017-05-03T13:21:41","slug":"flux-de-donnees-et-programmation-reactive","status":"publish","type":"post","link":"https:\/\/bioinfo.iric.ca\/fr\/flux-de-donnees-et-programmation-reactive\/","title":{"rendered":"Flux de donn\u00e9es et programmation r\u00e9active"},"content":{"rendered":"<h2>Qu&rsquo;est-ce que tout cela ?<\/h2>\n<blockquote><p>ReactiveX est la combinaison des meilleures id\u00e9es du mod\u00e8le observateur, du mod\u00e8le it\u00e9rateur et de la programmation fonctionnelle.<br \/>\n\u00c0 l&rsquo;aide des librairies Rx, vous pouvez ais\u00e9ment:<br \/>\n&#8211; Cr\u00e9er des flux de donn\u00e9es ou d&rsquo;\u00e9v\u00e8nements \u00e0 partir de sources diverses comme des fichiers ou des services web<br \/>\n&#8211; Fusionner ou transformer ces flux gr\u00e2ce \u00e0 divers op\u00e9rateurs<br \/>\n&#8211; Souscrire aux flux et \u00ab\u00a0r\u00e9agir\u00a0\u00bb \u00e0 leurs \u00e9missions pour produire de nouvelles donn\u00e9es<\/p><\/blockquote>\n<p>L&rsquo;int\u00e9r\u00eat pour la programmation r\u00e9active a progress\u00e9 ces derni\u00e8res ann\u00e9es. Peut-\u00eatre avez-vous seulement entendu le terme employ\u00e9 ici et l\u00e0, peut-\u00eatre avez-vous adopt\u00e9 ce paradigme de programmation dans votre travail quotidien&#8230; Personnellement, je n&rsquo;y ai \u00e9t\u00e9 expos\u00e9 qu&rsquo;au cours du dernier mois lors d&rsquo;un bootcamp Angular2. J&rsquo;ai imm\u00e9diatement \u00e9t\u00e9 attir\u00e9 par l&rsquo;id\u00e9e de r\u00e9agir aux \u00e9v\u00e9nements \u00e9mis par des flux de donn\u00e9es.<br \/>\nNous autres, bioinformaticiens, aimons nos tuyaux (du type <em>shell<\/em> :)) Nous canalisons constamment la sortie d&rsquo;un programme vers un autre programme.<br \/>\nVous voulez savoir combien d&rsquo;entr\u00e9es un fichier fasta contient? Trop facile:<\/p>\n<pre><code class=\"bash\">%&gt; grep '&gt;' my_fasta_file.txt | wc -l<\/code><\/pre>\n<p>Nous sommes donc d\u00e9j\u00e0 \u00e0 l&rsquo;aise avec la notion de flux de donn\u00e9es (stdout dans ce cas) et leur consommation (avec stdin) pour produire des r\u00e9sultats.<br \/>\n\u00c0 ce point-ci, j&rsquo;avais \u00e9videmment besoin de jouer avec la programmation r\u00e9active pour mieux me familiariser avec ses concepts. En recherchant un petit projet \u00e0 mettre en \u0153uvre, je me suis souvenu de cette <a href=\"https:\/\/bioinfo.iric.ca\/working-with-large-files\/\">publication de Patrick<\/a> et me suis dit: Ooooohhh je vais \u00e9crire quelque chose qui traite un fichier FastQ!<br \/>\nJustement, un projet de recherche sur lequel je travaille exige que j&rsquo;identifie les fragments d&rsquo;un transcriptome pouvant potentiellement coder pour un ensemble de peptides &#8230; Parfait!<br \/>\nJ&rsquo;ai donc d\u00e9but\u00e9 mon p\u00e9riple en me disant que celui-ci serait assez simple. Le web fourmille d&rsquo;exemples tels que \u00abAbonnez-vous \u00e0 un flux d&rsquo;indice boursiers en temps r\u00e9el et affichez un avertissement si le cours d&rsquo;une action a augment\u00e9 de plus de X % au cours des derni\u00e8res Y secondes, le tout avec une seule ligne de code!\u00a0\u00bb. Je devais donc atteindre mon objectif assez facilement, n&rsquo;est-ce pas? Eh bien, il s&rsquo;av\u00e8re que ce fut un peu plus compliqu\u00e9 que pr\u00e9vu et j&rsquo;esp\u00e8re d&rsquo;autres pourront b\u00e9n\u00e9ficier des solutions que j&rsquo;ai trouv\u00e9es en cours de route.<\/p>\n<h2>Contexte et sources de documentation<\/h2>\n<p>Premi\u00e8rement, je me dois de mentionner que la librairie ReactiveX est disponible pour de multiples langages de programmation mais puisque je suis un grand fan du langage Python, j&rsquo;ai choisi de travailler avec RxPY. (Je vous laisse le soin de consulter la <a href=\"https:\/\/github.com\/ReactiveX\/RxPY#install\">documentation<\/a> afin de mettre en place un environnement de d\u00e9velopmenet fonctionnel, c&rsquo;est plut\u00f4t trivial).<br \/>\nIl existe de nombreuses sources de documentation sur le web mais voici celles qui m&rsquo;ont le plus aid\u00e9:<br \/>\n<a href=\"https:\/\/github.com\/ReactiveX\/RxPY#the-basics\">RxPY sur github<\/a><br \/>\n<a href=\"http:\/\/rxmarbles.com\/\">Rx Marbles<\/a><br \/>\n<a href=\"http:\/\/reactivex.io\/\">Le site officiel de ReactiveX<\/a><br \/>\n<a href=\"https:\/\/ninmesara.github.io\/RxPY\/index.html\">Les page de documentation de Dag Brattli&rsquo;s<\/a><br \/>\nEt n&rsquo;oublions pas:<br \/>\n<a href=\"http:\/\/home.heeere.com\/tech-intro-programmation-reactive.html\">L&rsquo;introduction qu&rsquo;il vous manquait<\/a> (qui est en fait une traduction de <a href=\"https:\/\/xgrommx.github.io\/rx-book\/content\/guidelines\/introduction\/index.html\">The intro you&rsquo;ve been missing<\/a>)<\/p>\n<h2>Au travail !<\/h2>\n<p>Alors voici le plan d&rsquo;action que je propose:<br \/>\nPour chaque entr\u00e9e d&rsquo;un fichier FastQ, traduire la s\u00e9quence d&rsquo;ADN de l&rsquo;entr\u00e9e en fonction des 6 cadres de lecture et v\u00e9rifier si l&rsquo;une des 6 traductions contient un des peptides stock\u00e9es dans un fichier texte s\u00e9par\u00e9. Si l&rsquo;un des peptides est trouv\u00e9, basculer l&rsquo;entr\u00e9e vers un autre fichier FastQ. Que des choses simples, quoi !<\/p>\n<h3>Lire un fichier<\/h3>\n<p>Le mantra de la programmation r\u00e9active est: \u00ab\u00a0<strong>Tout est un flux<\/strong>\u00a0\u00bb et RxPY offre de nombreuse fonctions utilitaires pour g\u00e9n\u00e9rer des objets <a href=\"http:\/\/reactivex.io\/documentation\/observable.html\">Observables<\/a> (Flux) depuis diverses sources. Celle qui m&rsquo;int\u00e9resse dans ce cas-ci est la fonction <code class=\"python\">from_<\/code> qui construit un <code>Observable<\/code> \u00e0 partir de n&rsquo;importe quel it\u00e9rateur Python.<\/p>\n<pre><code class=\"python\">import gzip\r\nfrom rx import Observable\r\n\r\nfastq_entries = Observable.from_(gzip.open('my.fastq.gz', 'rt'))\r\n<\/code><\/pre>\n<p>Comme vous pouvez le constater, les fichiers FastQ sont g\u00e9n\u00e9ralement \u00ab\u00a0gzipp\u00e9s\u00a0\u00bb car ils sont plut\u00f4t lourds.<br \/>\nMais tout ceci importe peu, nous sommes sur la bonne voie!<\/p>\n<p>Ajoutons quelques op\u00e9rateurs \u00e0 notre flux (<code>Observable<\/code>) afin de retirer les retours de chariots en fin de ligne (\u00e0 l&rsquo;aide de la transformation\u00a0<a href=\"http:\/\/reactivex.io\/documentation\/operators\/map.html\">map<\/a>) et <a href=\"http:\/\/reactivex.io\/documentation\/operators\/filter.html\">filtrons<\/a> les lignes vides:<\/p>\n<pre><code class=\"python\">import gzip\r\nfrom rx import Observable\r\n\r\nfastq_entries = Observable.from_(gzip.open('my.fastq.gz', 'rt')) \\\r\n    .map(lambda line: line.strip()) \\\r\n    .filter(lambda line: line != '')\r\n<\/code><\/pre>\n<p>Bien, bien \ud83d\ude42<br \/>\nEt soit dit en passant, nous ferons un grand usage des fonctions lambda.<\/p>\n<p>Les entr\u00e9es d&rsquo;un fichier FastQ prennent un format particulier: elles sont toutes compos\u00e9es de 4 lignes et ressemblent \u00e0 ceci:<\/p>\n<pre class=\"text\">@HWI-ST942:38:C2N4BACXX:5:1101:1228:2134 1:N:0:ATCACG\r\nTTTTCAGCCTACATCAAGGAGGTGGAGGAACGGCCGGCACCCACCCCGTGGGGCTCCAAGATGCCCTTTGGGGAACTGATGTTCGAATCCAGCAGTAGCT\r\n+\r\n@CC?DDFFHHHHGJJIJJJCHIAFHEFBACGGHGIIGIIJIIIGHHDD?CCDDBDA@DDC@C@CDDD&lt;ACC9;7&gt;A:?ABDB@CCCAB::AC:\r\n<\/pre>\n<p>Alors assurons-nous de n&rsquo;\u00e9mettre que des entr\u00e9es compl\u00e8tes (ie: les 4 lignes) \u00e0 l&rsquo;aide d&rsquo;une transformation de type <a href=\"http:\/\/reactivex.io\/documentation\/operators\/buffer.html\"><code>buffer<\/code><\/a>:<\/p>\n<pre><code class=\"python\">import gzip\r\nfrom rx import Observable\r\n\r\nfastq_entries = Observable.from_(gzip.open('my.fastq.gz', 'rt')) \\\r\n    .map(lambda line: line.strip()) \\\r\n    .filter(lambda line: line != '') \\\r\n    .buffer_with_count(4)\r\n<\/code><\/pre>\n<p>C&rsquo;est si simple ! Je commence \u00e0 tomber sous le charme des outils de cette librairie !<\/p>\n<p>Attaquons-nous maintenant \u00e0 la traduction des s\u00e9quences d&rsquo;ADN.<br \/>\nPermettez-moi d&rsquo;introduire quelques bouts de codes pour g\u00e9rer l&rsquo;aspect <em>traduction<\/em> en tant que tel..<\/p>\n<pre><code class=\"python\">codonTable = {\r\n    'TTT': 'F', 'TCT': 'S', 'TAT': 'Y', 'TGT': 'C',\r\n    'TTC': 'F', 'TCC': 'S', 'TAC': 'Y', 'TGC': 'C',\r\n    'TTA': 'L', 'TCA': 'S', 'TAA': '*', 'TGA': '*',\r\n    'TTG': 'L', 'TCG': 'S', 'TAG': '*', 'TGG': 'W',\r\n\r\n    'CTT': 'L', 'CTC': 'L', 'CTA': 'L', 'CTG': 'L',\r\n    'CCT': 'P', 'CCC': 'P', 'CCA': 'P', 'CCG': 'P',\r\n    'CAT': 'H', 'CAC': 'H', 'CAA': 'Q', 'CAG': 'Q',\r\n    'CGT': 'R', 'CGC': 'R', 'CGA': 'R', 'CGG': 'R',\r\n\r\n    'ATT': 'I', 'ATC': 'I', 'ATA': 'I', 'ATG': 'M',\r\n    'ACT': 'T', 'ACC': 'T', 'ACA': 'T', 'ACG': 'T',\r\n    'AAT': 'N', 'AAC': 'N', 'AAA': 'K', 'AAG': 'K',\r\n    'AGT': 'S', 'AGC': 'S', 'AGA': 'R', 'AGG': 'R',\r\n\r\n    'GTT': 'V', 'GTC': 'V', 'GTA': 'V', 'GTG': 'V',\r\n    'GCT': 'A', 'GCC': 'A', 'GCA': 'A', 'GCG': 'A',\r\n    'GAT': 'D', 'GAC': 'D', 'GAA': 'E', 'GAG': 'E',\r\n    'GGT': 'G', 'GGC': 'G', 'GGA': 'G', 'GGG': 'G'\r\n}\r\n\r\n\r\ndef reverse_complement(sequence):\r\n    \"\"\"Returns the reverse complement of a DNA sequence\"\"\"\r\n    tb = str.maketrans('ACGT', 'TGCA')\r\n    return sequence.translate(tb)[::-1]\r\n\r\n\r\ndef translate_dna(fastq_entry, frame='f1'):\r\n    \"\"\"Translates DNA to amino acids according specified reading frame\"\"\"\r\n    starts = {'f1': 0, 'f2': 1, 'f3': 2,\r\n              'r1': 0, 'r2': 1, 'r3': 2}\r\n\r\n    dna = fastq_entry[1].strip('N').replace('N', 'A')\r\n\r\n    if frame.startswith('r'):\r\n        dna = reverse_complement(dna)\r\n\r\n    sequence = dna[starts[frame]:]\r\n\r\n    protein = \"\"\r\n\r\n    for i in range(0, len(sequence),  3):\r\n        if 3 == len(sequence[i:i+3]):\r\n            protein += codonTable[sequence[i:i+3]]\r\n\r\n    return protein\r\n\r\n\r\ndef translate_dna_6frames(fastq_entry):\r\n    \"\"\"Translates DNA sequence according all 6 reading frames\"\"\"\r\n    frames = ['f1', 'f2', 'f3', 'r1', 'r2', 'r3']\r\n    \r\n    translations = []\r\n    for frame in frames:\r\n        translations.append(translate_dna(fastq_entry, frame))\r\n\r\n    return translations\r\n<\/code><\/pre>\n<p>Je n&rsquo;entrerai pas dans les d\u00e9tails de ce que font ces bouts de code, j&rsquo;en ai seulement besoin pour faire un exemple focntionnel.<br \/>\nLe seul point \u00e0 noter est que je retire les &lsquo;N&rsquo;s en d\u00e9but et fin de s\u00e9quence et que je remplace les &lsquo;N&rsquo; pr\u00e9sents dans la s\u00e9quence par des &lsquo;A&rsquo;.<\/p>\n<p>Tr\u00e8s bien, nous pouvons maintenant faire appel \u00e0 <code>map<\/code> une fois de plus afin de traduire chaque entr\u00e9e FastQ:<\/p>\n<pre><code class=\"python\">fastq_entries.map(lambda entry: translate_dna_6frames(entry))<\/code><\/pre>\n<h3>Comment \u00e7a se passe, \u00e0 date ?<\/h3>\n<p>Il est probablement temps pour moi d&rsquo;introduire la m\u00e9thode <code>.subscribe()<\/code> qui permet d&rsquo;observer l&rsquo;ex\u00e9cution de notre programme.<br \/>\n<code>.subscribe()<\/code> est une m\u00e9thode permettant de <em>souscrire<\/em> un <code>Observateur<\/code> \u00e0 notre <code>Observable<\/code> afin de <strong>consommer ses \u00e9missions et d&rsquo;y r\u00e9agir<\/strong>.<br \/>\nPour le bien de cet exemple, nous ne ferons qu&rsquo;imprimer la sortie \u00e0 la ligne de commande<\/p>\n<pre><code class=\"python\">fastq_entries.map(lambda entry: translate_dna_6frames(entry)) \r\n    .subscribe(lambda translations: print(translations)\r\n<\/code><\/pre>\n<h4>Output<\/h4>\n<pre>['VHYDRSGRSLGTADVHFERKADALKAMKQYNGV', 'CTMIALVAA*EQQTCTLSGRQMP*RP*SSTTA', \r\n 'AL*SLWSQLRNSRRAL*AEGRCPEGHEAVQRR', 'DAVVLLHGLQGICLPLKVHVCCS*AATRAIIVH', \r\n 'TPLYCFMAFRASAFRSKCTSAVPKLRPERS*C', 'RRCTASWPSGHLPSAQSARLLFLSCDQSDHSA']...\r\n<\/pre>\n<p>La premi\u00e8re chose qui m&rsquo;a frapp\u00e9 lorsque j&rsquo;ai commenc\u00e9 \u00e0 bidouiller avec les flux c&rsquo;est qu&rsquo;ils peuvent \u00eatre relativement difficiles \u00e0 observer\/d\u00e9bugger en cours d&rsquo;utilisation.<br \/>\nPar exemple:\u00a0si vous ne faites pas usage de la fonction <code>.subscribe()<\/code> pour souscrire \u00e0 vos flux \u00e0 un moment ou \u00e0 un autre dans votre code, RIEN ne se produira \u00e0 l&rsquo;ex\u00e9cution du script et celui-ci fera preuve d&rsquo;un mutisme parfait. Pas tr\u00e8s convivial !<br \/>\nDans un autre ordre d&rsquo;id\u00e9e, il serait aussi pertinent de vous familiariser avec la notion de <a href=\"https:\/\/github.com\/ReactiveX\/RxPY#schedulers\">Schedulers<\/a> (surtout si vous comptez faire usage d&rsquo;Observables \u00ab\u00a0temporaux\u00a0\u00bb tels que <code>Observable.interval()<\/code> ou d\u00e9sirez rendre votre code asynchrone).<\/p>\n<h3>Trouvons les peptides<\/h3>\n<p>Afin de trouver les peptides, nous aurons besoin de cette fonction simple:<\/p>\n<pre><code class=\"python\">def find_peptide(peptide, translations):\r\n    for translation in translations:\r\n        if peptide in translation:\r\n            return True\r\n\r\n    return False\r\n<\/code><\/pre>\n<p>C&rsquo;est ici que les chose se corsent :).<br \/>\nConstruire un flux \u00e0 partir de notre fichier de peptides est simple, nous savons d\u00e9j\u00e0 comment nous y prendre gr\u00e2ce \u00e0 nos tribulations avec le fichier FastQ.<\/p>\n<pre><code class=\"python\">peptides = Observable.from_(open('peptides.txt', 'r')) \r\n    .map(lambda line: line.strip()).filter(lambda line: line != '')\r\n<\/code><\/pre>\n<p>Mais nous devons maintenant reproduire l&rsquo;\u00e9quivalent d&rsquo;une boucle imbriqu\u00e9e dans le contexte de la programmation r\u00e9active.<br \/>\nRappelez-vous, nous souhaitons identifier les fragments qui codent pour n&rsquo;importe lequel de nos peptides. Hmmm..<br \/>\nVoici la solution que j&rsquo;ai trouv\u00e9e:<\/p>\n<pre><code class=\"python\">def lookup(peptides, translations):\r\n    \"\"\"Serves as the nested loop\"\"\"\r\n    found = peptides.map(lambda p: find_peptide(p, translations)) \r\n        .reduce(lambda x, y: x or y, False)\r\n\r\n    return found\r\n<\/code><\/pre>\n<pre><code class=\"python\">lookups = fastq_entries.map(lambda entry: translate_dna_6frames(entry)) \r\n    .map(lambda translations: lookup(peptides, translations))\r\n<\/code><\/pre>\n<p>Alors, qu&rsquo;est-ce qui se passe ici ?<br \/>\nPremi\u00e8rement, nous \u00ab\u00a0mappons\u00a0\u00bb <code>.lookup()<\/code> sur nos chacune de nos traductions qui, \u00e0 leur tour, \u00ab\u00a0mappent\u00a0\u00bb <code>.find_peptides()<\/code> sur notre flux de peptides. Nous <a href=\"http:\/\/reactivex.io\/documentation\/operators\/reduce.html\">reduisons<\/a> ensuite le r\u00e9sultat de ce flux \u00e0 une \u00e9mission unique prenant la forme d&rsquo;un bool\u00e9en. Super, nous y sommes presque !<\/p>\n<p>Mais, attendez une minute.. Si nous \u00ab\u00a0souscrivons\u00a0\u00bb (avec <em>.subscribe()<\/em>) au flux <strong>lookups<\/strong> et affichons ses \u00e9missions, il devient vite apparent que nous avons des soucis:<br \/>\n1- Notre boucle imbriqu\u00e9e ne s&rsquo;ex\u00e9cute qu&rsquo;une seule fois et non pour pour chaque \u00e9mission du flux <strong>translations<\/strong><br \/>\n2- <code>.lookup()<\/code> retourne une <strong>liste d&rsquo;<code>Observables<\/code><\/strong> plut\u00f4t que de retourner le bool\u00e9en auquel nous nous attendions (et ce, m\u00eame si <code>.reduce()<\/code> n&rsquo;\u00e9met que la valeur finale puisque cette appel est toujours encapsul\u00e9 dans un <code>Observable<\/code>)<\/p>\n<p>Attaquons-nous \u00e0 r\u00e9soudre ces probl\u00e8mes un \u00e0 la fois:<\/p>\n<h4>Probl\u00e8me 1<\/h4>\n<p>L&rsquo;explication de ce comportement se trouve dans le fait que lorsque l&rsquo;on cr\u00e9e un <code>Observable<\/code> \u00e0 partir d&rsquo;un it\u00e9rateur de fichier \u00e0 l&rsquo;aide de l&rsquo;utilitaire <code>from_<\/code>, nous ne pouvons y souscrire qu&rsquo;une seule fois. Toute souscription subs\u00e9quente n&rsquo;\u00e9mettra aucune valeur puisque l&rsquo;it\u00e9rateur a atteint la fin du fichier lors de la premi\u00e8re consommation du flux. Il existe diff\u00e9rentes fa\u00e7ons de contourner ce probl\u00e8me et l&rsquo;une d&rsquo;entre elle consiste \u00e0 encapsuler la construction de l&rsquo;Observable dans un appel \u00e0 <code>.defer()<\/code>.<\/p>\n<pre><code class=\"python\">peptides = Observable.defer(\r\n    lambda: \r\n    Observable.from_(open('peptides.txt', 'r'))\r\n        .map(lambda line: line.strip())\r\n        .filter(lambda line: line != '')\r\n)\r\n<\/code><\/pre>\n<p>Assez simple \u00e0 r\u00e9gler me direz-vous, mais encore faut-il comprendre ce qui se passe! Et comme cet aspect n&rsquo;est pas tr\u00e8s bien document\u00e9, j&rsquo;ai tourn\u00e9 en rond un bon moment avant de trouver la solution.<\/p>\n<h4>Probl\u00e8me 2<\/h4>\n<p>Contrairement \u00e0 <code>.translate_dna_6frames()<\/code>, la fonction <code>.lookup()<\/code> retourne des flux (<em>Observables<\/em>) plut\u00f4t qu&rsquo;une valeur alors nous devons trouver une fa\u00e7on de combiner cette liste de flux en <strong>un seul<\/strong> flux auquel nous pourrons alors souscrire pour r\u00e9cup\u00e9rer nos bool\u00e9ens. Il nous faut \u00ab\u00a0croiser les effluves\u00a0\u00bb ! ReactiveX expose bon nombre d&rsquo;<a href=\"http:\/\/reactivex.io\/documentation\/operators.html#combining\">op\u00e9rateurs de combinaison<\/a> mais celui que nous utiliserons ici est <a href=\"http:\/\/reactivex.io\/documentation\/operators\/switch.html\">switch<\/a> (ou .switch_latest(), l&rsquo;impl\u00e9mentation dans RxPY)<\/p>\n<pre><code class=\"python\">lookups = fastq_entries.map(lambda entry: translate_dna_6frames(entry)) \r\n    .map(lambda translations: lookup(peptides, translations)) \r\n    .switch_latest()\r\n<\/code><\/pre>\n<h3>Combinaison et <em>Multicasting<\/em><\/h3>\n<p>Le seule chose qui nous reste \u00e0 faire est de combiner <strong>fastq_entries<\/strong> and <strong>lookups<\/strong> en un seul flux que nous pourrons filtrer (avec <code>.filter()<\/code>) and souscrire (avec <code>.subscribe()<\/code>) afin de n&rsquo;exporter que les entr\u00e9es FastQ qui pr\u00e9sentent le potentiel de coder pour l&rsquo;un de nos peptides.<br \/>\nPour l&rsquo;\u00e9tape de combinaison, j&rsquo;ai choisi <a href=\"http:\/\/reactivex.io\/documentation\/operators\/zip.html\">.zip()<\/a> (ou <a href=\"https:\/\/ninmesara.github.io\/RxPY\/api\/operators\/zip.html?highlight=zip_array#rx.Observable.zip_array\">.zip_array()<\/a> dans RxPY).<\/p>\n<pre><code>Observable.zip_array(found, fastq_entries)\r\n    .filter(lambda arr: arr[0]) \r\n    .subscribe(print)\r\n<\/code><\/pre>\n<p>Malheureusement un nouveau probl\u00e8me se pr\u00e9sente: <strong>lookups<\/strong> et le r\u00e9sultat de <code>zip_array<\/code> souscrivent tous les deux au flux <strong>fastq_entries<\/strong> et re\u00e7oivent donc des entr\u00e9es FastQ distinctes, en alternance.. Pas bon \u00e7a. Nous voudrions plut\u00f4t que  <code>zip_array()<\/code> \u00ab\u00a0<em>zip<\/em>\u00a0\u00bb le <strong>m\u00eame<\/strong> fastq_entry qui a \u00e9t\u00e9 consomm\u00e9 et transform\u00e9 par la cha\u00eene d&rsquo;action <em>lookups<\/em>. Cette m\u00e9canique existe et porte le nom po\u00e9tique de <em>multicasting<\/em> et nous pouvons obtenir le r\u00e9sultat escompt\u00e9 en ajoutant un appel \u00e0 <code>.publish()<\/code> lors de la cr\u00e9ation de notre flux fastq_entries:<\/p>\n<pre><code class=\"python\">fastq_entries = Observable.from_(gzip.open('small.fastq.gz', 'rt')) \r\n    .map(lambda line: line.strip()) \r\n    .buffer_with_count(4) \r\n    .filter(lambda line: line != '') \r\n    .publish()\r\n<\/code><\/pre>\n<p>Cet ajout tout simple transforme fastq_entries en un flux <strong>connectable<\/strong> qui attendra un appel \u00e0 <code>.connect()<\/code> pour d\u00e9buter ses \u00e9missions \u00e0 tous ses souscripteurs enregistr\u00e9s.<\/p>\n<h3>La b\u00eate dans toute sa splendeur<\/h3>\n<p>Voici finalement la version finale de notre script:<\/p>\n<pre><code class=\"python\">import sys\r\nimport gzip\r\n\r\nfrom rx import Observable\r\n\r\ncodonTable = {\r\n    'TTT': 'F', 'TCT': 'S', 'TAT': 'Y', 'TGT': 'C',\r\n    'TTC': 'F', 'TCC': 'S', 'TAC': 'Y', 'TGC': 'C',\r\n    'TTA': 'L', 'TCA': 'S', 'TAA': '*', 'TGA': '*',\r\n    'TTG': 'L', 'TCG': 'S', 'TAG': '*', 'TGG': 'W',\r\n\r\n    'CTT': 'L', 'CTC': 'L', 'CTA': 'L', 'CTG': 'L',\r\n    'CCT': 'P', 'CCC': 'P', 'CCA': 'P', 'CCG': 'P',\r\n    'CAT': 'H', 'CAC': 'H', 'CAA': 'Q', 'CAG': 'Q',\r\n    'CGT': 'R', 'CGC': 'R', 'CGA': 'R', 'CGG': 'R',\r\n\r\n    'ATT': 'I', 'ATC': 'I', 'ATA': 'I', 'ATG': 'M',\r\n    'ACT': 'T', 'ACC': 'T', 'ACA': 'T', 'ACG': 'T',\r\n    'AAT': 'N', 'AAC': 'N', 'AAA': 'K', 'AAG': 'K',\r\n    'AGT': 'S', 'AGC': 'S', 'AGA': 'R', 'AGG': 'R',\r\n\r\n    'GTT': 'V', 'GTC': 'V', 'GTA': 'V', 'GTG': 'V',\r\n    'GCT': 'A', 'GCC': 'A', 'GCA': 'A', 'GCG': 'A',\r\n    'GAT': 'D', 'GAC': 'D', 'GAA': 'E', 'GAG': 'E',\r\n    'GGT': 'G', 'GGC': 'G', 'GGA': 'G', 'GGG': 'G'\r\n}\r\n\r\n\r\ndef reverse_complement(sequence):\r\n    tb = str.maketrans('ACGT', 'TGCA')\r\n    return sequence.translate(tb)[::-1]\r\n\r\n\r\ndef translate_dna(fastq_entry, frame='f1'):\r\n    \"\"\"Translates DNA to amino acids according specified reading frame\"\"\"\r\n    starts = {'f1': 0, 'f2': 1, 'f3': 2,\r\n              'r1': 0, 'r2': 1, 'r3': 2}\r\n\r\n    dna = fastq_entry[1].strip('N').replace('N', 'A')\r\n\r\n    if frame.startswith('r'):\r\n        dna = reverse_complement(dna)\r\n\r\n    sequence = dna[starts[frame]:]\r\n\r\n    protein = \"\"\r\n\r\n    for i in range(0, len(sequence),  3):\r\n        if 3 == len(sequence[i:i+3]):\r\n            protein += codonTable[sequence[i:i+3]]\r\n\r\n    return protein\r\n\r\n\r\ndef translate_dna_6frames(fastq_entry):\r\n    frames = ['f1', 'f2', 'f3', 'r1', 'r2', 'r3']\r\n\r\n    translations = []\r\n    for frame in frames:\r\n        translations.append(translate_dna(fastq_entry, frame))\r\n\r\n    return translations\r\n\r\n\r\ndef find_peptide(peptide, translations):\r\n    for translation in translations:\r\n        if peptide in translation:\r\n            return True\r\n\r\n    return False\r\n\r\n\r\ndef lookup(peptides, translations):\r\n    found = peptides.map(lambda p: find_peptide(p, translations)) \r\n        .reduce(lambda x, y: x or y, False)\r\n\r\n    return found\r\n\r\n\r\ndef main():\r\n    out = gzip.open('new.fastq.gz', 'wt')\r\n\r\n    peptides = Observable.defer(\r\n        lambda:\r\n        Observable.from_(open('peptides.txt', 'r'))\r\n            .map(lambda line: line.strip())\r\n            .filter(lambda line: line != '')\r\n    )\r\n\r\n    fastq_entries = Observable.from_(gzip.open('small.fastq.gz', 'rt')) \r\n        .map(lambda line: line.strip()) \r\n        .filter(lambda line: line != '') \r\n        .buffer_with_count(4) \r\n        .publish()\r\n\r\n    lookups = fastq_entries.map(lambda entry: translate_dna_6frames(entry)) \r\n        .map(lambda translations: lookup(peptides, translations)) \r\n        .switch_latest()\r\n    \r\n    Observable.zip_array(lookups, fastq_entries) \r\n        .filter(lambda arr: arr[0]) \r\n        .subscribe(on_next=lambda arr: print('n'.join(arr[1]), sep='', end='n', file=out, flush=True),\r\n                   on_completed=lambda: out.close())\r\n\r\n    fastq_entries.connect()\r\n\r\nif __name__ == '__main__':\r\n    sys.exit(main())\r\n<\/code><\/pre>\n<h2>Le mot de la fin<\/h2>\n<p>La Programmation R\u00e9active nous promet de rendre notre code<\/p>\n<ul>\n<li>plus succinct<\/li>\n<li>plus lisible<\/li>\n<li>moins sujet au <em>bugs<\/em><\/li>\n<\/ul>\n<p>Je crois que, pour cet exemple, la programmation r\u00e9active livre la marchandise.<br \/>\nLe code est certainement succinct et, une fois qu&rsquo;on s&rsquo;est familiaris\u00e9 avec ses op\u00e9rateurs, devient effectivement tr\u00e8s lisible.<br \/>\nPour ce qui est des bogues, on verra \u00e0 l&rsquo;usage, mais une chose est s\u00fbre, moins de code \u00e9gale moins de chances de faire des erreurs !<\/p>\n<p>Les points n\u00e9gatifs que je soulignerais sont la difficult\u00e9 \u00e0 debugger le code et l&rsquo;\u00e9tat actuel de la documentation qui est carr\u00e9ment restrictive pour le n\u00e9ophyte.<\/p>\n<p>Nous n&rsquo;avons fait qu&rsquo;effleurer la surface des possibilit\u00e9s qu&rsquo;offre la programmation r\u00e9active mais je dois avouer \u00eatre sorti de l&rsquo;exp\u00e9rience avec une attitude plut\u00f4t enthousiaste.<br \/>\nEt oui, je suis pleinement conscient que mon exemple ne permet pas vraiment de faire \u00e9tat d&rsquo;une des grandes forces de la programmation r\u00e9active, \u00e0 savoir: sa capacit\u00e9 \u00e0 g\u00e9rer les traitements asynchrones comme un pro. Mais il fallait bien commencer quelque part !<\/p>\n<p>Laissez-moi des commentaires ou des suggestion de modifications !<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Qu&rsquo;est-ce que tout cela ? ReactiveX est la combinaison des meilleures id\u00e9es du mod\u00e8le observateur, du mod\u00e8le it\u00e9rateur et de la programmation fonctionnelle. \u00c0 l&rsquo;aide des librairies Rx, vous pouvez ais\u00e9ment: &#8211; Cr\u00e9er des flux de donn\u00e9es ou d&rsquo;\u00e9v\u00e8nements \u00e0 partir de sources diverses comme des fichiers ou des services web &#8211; Fusionner ou transformer ces flux gr\u00e2ce \u00e0 divers op\u00e9rateurs &#8211; Souscrire aux flux et \u00ab\u00a0r\u00e9agir\u00a0\u00bb \u00e0 leurs \u00e9missions pour produire de nouvelles donn\u00e9es L&rsquo;int\u00e9r\u00eat pour la programmation r\u00e9active <a href=\"https:\/\/bioinfo.iric.ca\/fr\/flux-de-donnees-et-programmation-reactive\/\"> [&#8230;]<\/a><\/p>\n","protected":false},"author":1,"featured_media":3372,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"jetpack_post_was_ever_published":false,"footnotes":"","jetpack_publicize_message":"","jetpack_publicize_feature_enabled":true,"jetpack_social_post_already_shared":true,"jetpack_social_options":{"image_generator_settings":{"template":"highway","default_image_id":0,"font":"","enabled":false},"version":2}},"categories":[161,41,160],"tags":[159,158],"class_list":["post-3357","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-analyse-de-donnees-fr","category-bioinformatique","category-informatique-fr","tag-programmation-reactive","tag-rxpy-fr"],"jetpack_publicize_connections":[],"jetpack_featured_media_url":"https:\/\/bioinfo.iric.ca\/wpbioinfo\/wp-content\/uploads\/2017\/05\/buffer_with_count3.py_.png","jetpack_sharing_enabled":true,"_links":{"self":[{"href":"https:\/\/bioinfo.iric.ca\/fr\/wp-json\/wp\/v2\/posts\/3357","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/bioinfo.iric.ca\/fr\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/bioinfo.iric.ca\/fr\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/bioinfo.iric.ca\/fr\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/bioinfo.iric.ca\/fr\/wp-json\/wp\/v2\/comments?post=3357"}],"version-history":[{"count":11,"href":"https:\/\/bioinfo.iric.ca\/fr\/wp-json\/wp\/v2\/posts\/3357\/revisions"}],"predecessor-version":[{"id":3371,"href":"https:\/\/bioinfo.iric.ca\/fr\/wp-json\/wp\/v2\/posts\/3357\/revisions\/3371"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/bioinfo.iric.ca\/fr\/wp-json\/wp\/v2\/media\/3372"}],"wp:attachment":[{"href":"https:\/\/bioinfo.iric.ca\/fr\/wp-json\/wp\/v2\/media?parent=3357"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/bioinfo.iric.ca\/fr\/wp-json\/wp\/v2\/categories?post=3357"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/bioinfo.iric.ca\/fr\/wp-json\/wp\/v2\/tags?post=3357"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}