Qu’est-ce que tout cela ?

ReactiveX est la combinaison des meilleures idées du modèle observateur, du modèle itérateur et de la programmation fonctionnelle.
À l’aide des librairies Rx, vous pouvez aisément:
– Créer des flux de données ou d’évènements à partir de sources diverses comme des fichiers ou des services web
– Fusionner ou transformer ces flux grâce à divers opérateurs
– Souscrire aux flux et « réagir » à leurs émissions pour produire de nouvelles données

L’intérêt pour la programmation réactive a progressé ces dernières années. Peut-être avez-vous seulement entendu le terme employé ici et là, peut-être avez-vous adopté ce paradigme de programmation dans votre travail quotidien… Personnellement, je n’y ai été exposé qu’au cours du dernier mois lors d’un bootcamp Angular2. J’ai immédiatement été attiré par l’idée de réagir aux événements émis par des flux de données.
Nous autres, bioinformaticiens, aimons nos tuyaux (du type shell :)) Nous canalisons constamment la sortie d’un programme vers un autre programme.
Vous voulez savoir combien d’entrées un fichier fasta contient? Trop facile:

%> grep '>' my_fasta_file.txt | wc -l

Nous sommes donc déjà à l’aise avec la notion de flux de données (stdout dans ce cas) et leur consommation (avec stdin) pour produire des résultats.
À ce point-ci, j’avais évidemment besoin de jouer avec la programmation réactive pour mieux me familiariser avec ses concepts. En recherchant un petit projet à mettre en œuvre, je me suis souvenu de cette publication de Patrick et me suis dit: Ooooohhh je vais écrire quelque chose qui traite un fichier FastQ!
Justement, un projet de recherche sur lequel je travaille exige que j’identifie les fragments d’un transcriptome pouvant potentiellement coder pour un ensemble de peptides … Parfait!
J’ai donc débuté mon périple en me disant que celui-ci serait assez simple. Le web fourmille d’exemples tels que «Abonnez-vous à un flux d’indice boursiers en temps réel et affichez un avertissement si le cours d’une action a augmenté de plus de X % au cours des dernières Y secondes, le tout avec une seule ligne de code! ». Je devais donc atteindre mon objectif assez facilement, n’est-ce pas? Eh bien, il s’avère que ce fut un peu plus compliqué que prévu et j’espère d’autres pourront bénéficier des solutions que j’ai trouvées en cours de route.

Contexte et sources de documentation

Premièrement, je me dois de mentionner que la librairie ReactiveX est disponible pour de multiples langages de programmation mais puisque je suis un grand fan du langage Python, j’ai choisi de travailler avec RxPY. (Je vous laisse le soin de consulter la documentation afin de mettre en place un environnement de dévelopmenet fonctionnel, c’est plutôt trivial).
Il existe de nombreuses sources de documentation sur le web mais voici celles qui m’ont le plus aidé:
RxPY sur github
Rx Marbles
Le site officiel de ReactiveX
Les page de documentation de Dag Brattli’s
Et n’oublions pas:
L’introduction qu’il vous manquait (qui est en fait une traduction de The intro you’ve been missing)

Au travail !

Alors voici le plan d’action que je propose:
Pour chaque entrée d’un fichier FastQ, traduire la séquence d’ADN de l’entrée en fonction des 6 cadres de lecture et vérifier si l’une des 6 traductions contient un des peptides stockées dans un fichier texte séparé. Si l’un des peptides est trouvé, basculer l’entrée vers un autre fichier FastQ. Que des choses simples, quoi !

Lire un fichier

Le mantra de la programmation réactive est: « Tout est un flux » et RxPY offre de nombreuse fonctions utilitaires pour générer des objets Observables (Flux) depuis diverses sources. Celle qui m’intéresse dans ce cas-ci est la fonction from_ qui construit un Observable à partir de n’importe quel itérateur Python.

import gzip
from rx import Observable

fastq_entries = Observable.from_(gzip.open('my.fastq.gz', 'rt'))

Comme vous pouvez le constater, les fichiers FastQ sont généralement « gzippés » car ils sont plutôt lourds.
Mais tout ceci importe peu, nous sommes sur la bonne voie!

Ajoutons quelques opérateurs à notre flux (Observable) afin de retirer les retours de chariots en fin de ligne (à l’aide de la transformation map) et filtrons les lignes vides:

import gzip
from rx import Observable

fastq_entries = Observable.from_(gzip.open('my.fastq.gz', 'rt')) \
    .map(lambda line: line.strip()) \
    .filter(lambda line: line != '')

Bien, bien 🙂
Et soit dit en passant, nous ferons un grand usage des fonctions lambda.

Les entrées d’un fichier FastQ prennent un format particulier: elles sont toutes composées de 4 lignes et ressemblent à ceci:

@HWI-ST942:38:C2N4BACXX:5:1101:1228:2134 1:N:0:ATCACG
TTTTCAGCCTACATCAAGGAGGTGGAGGAACGGCCGGCACCCACCCCGTGGGGCTCCAAGATGCCCTTTGGGGAACTGATGTTCGAATCCAGCAGTAGCT
+
@CC?DDFFHHHHGJJIJJJCHIAFHEFBACGGHGIIGIIJIIIGHHDD?CCDDBDA@DDC@C@CDDD<ACC9;7>A:?ABDB@CCCAB::AC:

Alors assurons-nous de n’émettre que des entrées complètes (ie: les 4 lignes) à l’aide d’une transformation de type buffer:

import gzip
from rx import Observable

fastq_entries = Observable.from_(gzip.open('my.fastq.gz', 'rt')) \
    .map(lambda line: line.strip()) \
    .filter(lambda line: line != '') \
    .buffer_with_count(4)

C’est si simple ! Je commence à tomber sous le charme des outils de cette librairie !

Attaquons-nous maintenant à la traduction des séquences d’ADN.
Permettez-moi d’introduire quelques bouts de codes pour gérer l’aspect traduction en tant que tel..

codonTable = {
    'TTT': 'F', 'TCT': 'S', 'TAT': 'Y', 'TGT': 'C',
    'TTC': 'F', 'TCC': 'S', 'TAC': 'Y', 'TGC': 'C',
    'TTA': 'L', 'TCA': 'S', 'TAA': '*', 'TGA': '*',
    'TTG': 'L', 'TCG': 'S', 'TAG': '*', 'TGG': 'W',

    'CTT': 'L', 'CTC': 'L', 'CTA': 'L', 'CTG': 'L',
    'CCT': 'P', 'CCC': 'P', 'CCA': 'P', 'CCG': 'P',
    'CAT': 'H', 'CAC': 'H', 'CAA': 'Q', 'CAG': 'Q',
    'CGT': 'R', 'CGC': 'R', 'CGA': 'R', 'CGG': 'R',

    'ATT': 'I', 'ATC': 'I', 'ATA': 'I', 'ATG': 'M',
    'ACT': 'T', 'ACC': 'T', 'ACA': 'T', 'ACG': 'T',
    'AAT': 'N', 'AAC': 'N', 'AAA': 'K', 'AAG': 'K',
    'AGT': 'S', 'AGC': 'S', 'AGA': 'R', 'AGG': 'R',

    'GTT': 'V', 'GTC': 'V', 'GTA': 'V', 'GTG': 'V',
    'GCT': 'A', 'GCC': 'A', 'GCA': 'A', 'GCG': 'A',
    'GAT': 'D', 'GAC': 'D', 'GAA': 'E', 'GAG': 'E',
    'GGT': 'G', 'GGC': 'G', 'GGA': 'G', 'GGG': 'G'
}


def reverse_complement(sequence):
    """Returns the reverse complement of a DNA sequence"""
    tb = str.maketrans('ACGT', 'TGCA')
    return sequence.translate(tb)[::-1]


def translate_dna(fastq_entry, frame='f1'):
    """Translates DNA to amino acids according specified reading frame"""
    starts = {'f1': 0, 'f2': 1, 'f3': 2,
              'r1': 0, 'r2': 1, 'r3': 2}

    dna = fastq_entry[1].strip('N').replace('N', 'A')

    if frame.startswith('r'):
        dna = reverse_complement(dna)

    sequence = dna[starts[frame]:]

    protein = ""

    for i in range(0, len(sequence),  3):
        if 3 == len(sequence[i:i+3]):
            protein += codonTable[sequence[i:i+3]]

    return protein


def translate_dna_6frames(fastq_entry):
    """Translates DNA sequence according all 6 reading frames"""
    frames = ['f1', 'f2', 'f3', 'r1', 'r2', 'r3']
    
    translations = []
    for frame in frames:
        translations.append(translate_dna(fastq_entry, frame))

    return translations

Je n’entrerai pas dans les détails de ce que font ces bouts de code, j’en ai seulement besoin pour faire un exemple focntionnel.
Le seul point à noter est que je retire les ‘N’s en début et fin de séquence et que je remplace les ‘N’ présents dans la séquence par des ‘A’.

Très bien, nous pouvons maintenant faire appel à map une fois de plus afin de traduire chaque entrée FastQ:

fastq_entries.map(lambda entry: translate_dna_6frames(entry))

Comment ça se passe, à date ?

Il est probablement temps pour moi d’introduire la méthode .subscribe() qui permet d’observer l’exécution de notre programme.
.subscribe() est une méthode permettant de souscrire un Observateur à notre Observable afin de consommer ses émissions et d’y réagir.
Pour le bien de cet exemple, nous ne ferons qu’imprimer la sortie à la ligne de commande

fastq_entries.map(lambda entry: translate_dna_6frames(entry)) 
    .subscribe(lambda translations: print(translations)

Output

['VHYDRSGRSLGTADVHFERKADALKAMKQYNGV', 'CTMIALVAA*EQQTCTLSGRQMP*RP*SSTTA', 
 'AL*SLWSQLRNSRRAL*AEGRCPEGHEAVQRR', 'DAVVLLHGLQGICLPLKVHVCCS*AATRAIIVH', 
 'TPLYCFMAFRASAFRSKCTSAVPKLRPERS*C', 'RRCTASWPSGHLPSAQSARLLFLSCDQSDHSA']...

La première chose qui m’a frappé lorsque j’ai commencé à bidouiller avec les flux c’est qu’ils peuvent être relativement difficiles à observer/débugger en cours d’utilisation.
Par exemple: si vous ne faites pas usage de la fonction .subscribe() pour souscrire à vos flux à un moment ou à un autre dans votre code, RIEN ne se produira à l’exécution du script et celui-ci fera preuve d’un mutisme parfait. Pas très convivial !
Dans un autre ordre d’idée, il serait aussi pertinent de vous familiariser avec la notion de Schedulers (surtout si vous comptez faire usage d’Observables « temporaux » tels que Observable.interval() ou désirez rendre votre code asynchrone).

Trouvons les peptides

Afin de trouver les peptides, nous aurons besoin de cette fonction simple:

def find_peptide(peptide, translations):
    for translation in translations:
        if peptide in translation:
            return True

    return False

C’est ici que les chose se corsent :).
Construire un flux à partir de notre fichier de peptides est simple, nous savons déjà comment nous y prendre grâce à nos tribulations avec le fichier FastQ.

peptides = Observable.from_(open('peptides.txt', 'r')) 
    .map(lambda line: line.strip()).filter(lambda line: line != '')

Mais nous devons maintenant reproduire l’équivalent d’une boucle imbriquée dans le contexte de la programmation réactive.
Rappelez-vous, nous souhaitons identifier les fragments qui codent pour n’importe lequel de nos peptides. Hmmm..
Voici la solution que j’ai trouvée:

def lookup(peptides, translations):
    """Serves as the nested loop"""
    found = peptides.map(lambda p: find_peptide(p, translations)) 
        .reduce(lambda x, y: x or y, False)

    return found
lookups = fastq_entries.map(lambda entry: translate_dna_6frames(entry)) 
    .map(lambda translations: lookup(peptides, translations))

Alors, qu’est-ce qui se passe ici ?
Premièrement, nous « mappons » .lookup() sur nos chacune de nos traductions qui, à leur tour, « mappent » .find_peptides() sur notre flux de peptides. Nous reduisons ensuite le résultat de ce flux à une émission unique prenant la forme d’un booléen. Super, nous y sommes presque !

Mais, attendez une minute.. Si nous « souscrivons » (avec .subscribe()) au flux lookups et affichons ses émissions, il devient vite apparent que nous avons des soucis:
1- Notre boucle imbriquée ne s’exécute qu’une seule fois et non pour pour chaque émission du flux translations
2- .lookup() retourne une liste d’Observables plutôt que de retourner le booléen auquel nous nous attendions (et ce, même si .reduce() n’émet que la valeur finale puisque cette appel est toujours encapsulé dans un Observable)

Attaquons-nous à résoudre ces problèmes un à la fois:

Problème 1

L’explication de ce comportement se trouve dans le fait que lorsque l’on crée un Observable à partir d’un itérateur de fichier à l’aide de l’utilitaire from_, nous ne pouvons y souscrire qu’une seule fois. Toute souscription subséquente n’émettra aucune valeur puisque l’itérateur a atteint la fin du fichier lors de la première consommation du flux. Il existe différentes façons de contourner ce problème et l’une d’entre elle consiste à encapsuler la construction de l’Observable dans un appel à .defer().

peptides = Observable.defer(
    lambda: 
    Observable.from_(open('peptides.txt', 'r'))
        .map(lambda line: line.strip())
        .filter(lambda line: line != '')
)

Assez simple à régler me direz-vous, mais encore faut-il comprendre ce qui se passe! Et comme cet aspect n’est pas très bien documenté, j’ai tourné en rond un bon moment avant de trouver la solution.

Problème 2

Contrairement à .translate_dna_6frames(), la fonction .lookup() retourne des flux (Observables) plutôt qu’une valeur alors nous devons trouver une façon de combiner cette liste de flux en un seul flux auquel nous pourrons alors souscrire pour récupérer nos booléens. Il nous faut « croiser les effluves » ! ReactiveX expose bon nombre d’opérateurs de combinaison mais celui que nous utiliserons ici est switch (ou .switch_latest(), l’implémentation dans RxPY)

lookups = fastq_entries.map(lambda entry: translate_dna_6frames(entry)) 
    .map(lambda translations: lookup(peptides, translations)) 
    .switch_latest()

Combinaison et Multicasting

Le seule chose qui nous reste à faire est de combiner fastq_entries and lookups en un seul flux que nous pourrons filtrer (avec .filter()) and souscrire (avec .subscribe()) afin de n’exporter que les entrées FastQ qui présentent le potentiel de coder pour l’un de nos peptides.
Pour l’étape de combinaison, j’ai choisi .zip() (ou .zip_array() dans RxPY).

Observable.zip_array(found, fastq_entries)
    .filter(lambda arr: arr[0]) 
    .subscribe(print)

Malheureusement un nouveau problème se présente: lookups et le résultat de zip_array souscrivent tous les deux au flux fastq_entries et reçoivent donc des entrées FastQ distinctes, en alternance.. Pas bon ça. Nous voudrions plutôt que zip_array() « zip » le même fastq_entry qui a été consommé et transformé par la chaîne d’action lookups. Cette mécanique existe et porte le nom poétique de multicasting et nous pouvons obtenir le résultat escompté en ajoutant un appel à .publish() lors de la création de notre flux fastq_entries:

fastq_entries = Observable.from_(gzip.open('small.fastq.gz', 'rt')) 
    .map(lambda line: line.strip()) 
    .buffer_with_count(4) 
    .filter(lambda line: line != '') 
    .publish()

Cet ajout tout simple transforme fastq_entries en un flux connectable qui attendra un appel à .connect() pour débuter ses émissions à tous ses souscripteurs enregistrés.

La bête dans toute sa splendeur

Voici finalement la version finale de notre script:

import sys
import gzip

from rx import Observable

codonTable = {
    'TTT': 'F', 'TCT': 'S', 'TAT': 'Y', 'TGT': 'C',
    'TTC': 'F', 'TCC': 'S', 'TAC': 'Y', 'TGC': 'C',
    'TTA': 'L', 'TCA': 'S', 'TAA': '*', 'TGA': '*',
    'TTG': 'L', 'TCG': 'S', 'TAG': '*', 'TGG': 'W',

    'CTT': 'L', 'CTC': 'L', 'CTA': 'L', 'CTG': 'L',
    'CCT': 'P', 'CCC': 'P', 'CCA': 'P', 'CCG': 'P',
    'CAT': 'H', 'CAC': 'H', 'CAA': 'Q', 'CAG': 'Q',
    'CGT': 'R', 'CGC': 'R', 'CGA': 'R', 'CGG': 'R',

    'ATT': 'I', 'ATC': 'I', 'ATA': 'I', 'ATG': 'M',
    'ACT': 'T', 'ACC': 'T', 'ACA': 'T', 'ACG': 'T',
    'AAT': 'N', 'AAC': 'N', 'AAA': 'K', 'AAG': 'K',
    'AGT': 'S', 'AGC': 'S', 'AGA': 'R', 'AGG': 'R',

    'GTT': 'V', 'GTC': 'V', 'GTA': 'V', 'GTG': 'V',
    'GCT': 'A', 'GCC': 'A', 'GCA': 'A', 'GCG': 'A',
    'GAT': 'D', 'GAC': 'D', 'GAA': 'E', 'GAG': 'E',
    'GGT': 'G', 'GGC': 'G', 'GGA': 'G', 'GGG': 'G'
}


def reverse_complement(sequence):
    tb = str.maketrans('ACGT', 'TGCA')
    return sequence.translate(tb)[::-1]


def translate_dna(fastq_entry, frame='f1'):
    """Translates DNA to amino acids according specified reading frame"""
    starts = {'f1': 0, 'f2': 1, 'f3': 2,
              'r1': 0, 'r2': 1, 'r3': 2}

    dna = fastq_entry[1].strip('N').replace('N', 'A')

    if frame.startswith('r'):
        dna = reverse_complement(dna)

    sequence = dna[starts[frame]:]

    protein = ""

    for i in range(0, len(sequence),  3):
        if 3 == len(sequence[i:i+3]):
            protein += codonTable[sequence[i:i+3]]

    return protein


def translate_dna_6frames(fastq_entry):
    frames = ['f1', 'f2', 'f3', 'r1', 'r2', 'r3']

    translations = []
    for frame in frames:
        translations.append(translate_dna(fastq_entry, frame))

    return translations


def find_peptide(peptide, translations):
    for translation in translations:
        if peptide in translation:
            return True

    return False


def lookup(peptides, translations):
    found = peptides.map(lambda p: find_peptide(p, translations)) 
        .reduce(lambda x, y: x or y, False)

    return found


def main():
    out = gzip.open('new.fastq.gz', 'wt')

    peptides = Observable.defer(
        lambda:
        Observable.from_(open('peptides.txt', 'r'))
            .map(lambda line: line.strip())
            .filter(lambda line: line != '')
    )

    fastq_entries = Observable.from_(gzip.open('small.fastq.gz', 'rt')) 
        .map(lambda line: line.strip()) 
        .filter(lambda line: line != '') 
        .buffer_with_count(4) 
        .publish()

    lookups = fastq_entries.map(lambda entry: translate_dna_6frames(entry)) 
        .map(lambda translations: lookup(peptides, translations)) 
        .switch_latest()
    
    Observable.zip_array(lookups, fastq_entries) 
        .filter(lambda arr: arr[0]) 
        .subscribe(on_next=lambda arr: print('n'.join(arr[1]), sep='', end='n', file=out, flush=True),
                   on_completed=lambda: out.close())

    fastq_entries.connect()

if __name__ == '__main__':
    sys.exit(main())

Le mot de la fin

La Programmation Réactive nous promet de rendre notre code

  • plus succinct
  • plus lisible
  • moins sujet au bugs

Je crois que, pour cet exemple, la programmation réactive livre la marchandise.
Le code est certainement succinct et, une fois qu’on s’est familiarisé avec ses opérateurs, devient effectivement très lisible.
Pour ce qui est des bogues, on verra à l’usage, mais une chose est sûre, moins de code égale moins de chances de faire des erreurs !

Les points négatifs que je soulignerais sont la difficulté à debugger le code et l’état actuel de la documentation qui est carrément restrictive pour le néophyte.

Nous n’avons fait qu’effleurer la surface des possibilités qu’offre la programmation réactive mais je dois avouer être sorti de l’expérience avec une attitude plutôt enthousiaste.
Et oui, je suis pleinement conscient que mon exemple ne permet pas vraiment de faire état d’une des grandes forces de la programmation réactive, à savoir: sa capacité à gérer les traitements asynchrones comme un pro. Mais il fallait bien commencer quelque part !

Laissez-moi des commentaires ou des suggestion de modifications !