Let Your Data Flow: Streams and Reactive Programming

Let Your Data Flow: Streams and Reactive Programming

What’s all this about ?

ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming.
Using Rx, you can easily:
– Create event or data emitting streams from sources such as a file or a web service
– Compose and transform streams with query-like operators
– Subscribe to any observable stream and “react” to its emissions to perform side effects

Reactive programming has been gaining traction these past few years. Maybe you’ve only heard the term, maybe you’ve adopted the programming paradigm in your daily workflow.. I’m a bit of a late comer to the party as I was only exposed to it about a month ago during an Angular2 bootcamp. I was immediately drawn to the idea of reacting to events taking place in data streams.
We bioinformaticians love our pipes (the shell kind :)) We’re constantly piping the output of a program into another program..
Want to know how many entries a fasta file contains? Easy-peasy:

%> grep '>' my_fasta_file.txt | wc -l

So we’re already quite comfortable with the notion of data streams (stdout in this case) and consuming those streams (with stdin) to produce results.
By now, I obviously needed to play with reactive programming in order to get better acquainted. While looking for a pet project to implement I remembered this post from Patrick and I thought: Ooooohhh I’ll do something which processes a FastQ File !
As it turns out, a research project I’m working on needs me to identify the reads from a transcriptome which have the potential to code for a set of peptides… Perfect !
So I set off on a journey I thought would be fairly painless. The net is chock-full of examples such as “Subscribe to a realtime stockquote ticker and print a warning if the price of a stock has risen by more than x % within the last y seconds, and do it all with a 1-liner !”, so I should achieve my goal easily, right ? Well, turns out it was a bit more complicated than anticipated and, hopefully, others will benefit from the solutions I found along the way.

Context and Documentation sources

First things first, the ReactiveX library is available for many programming languages but since I’m a big Python fan, I chose to work with RxPY. (I’ll let you read the documentation on how to setup the environment, it’s pretty trivial).
You can find documentation in various places on the web but here are the sources which helped me the most:
RxPY on github
Rx Marbles
ReactiveX’s Home
Dag Brattli’s Documentation Pages
And let’s not forget:
The intro you’ve been missing

Let’s get to work !

So here’s the plan in Layman’s terms:
For each entry in a FastQ file, translate the DNA sequence of the entry according to all 6 reading frames and check to see if any of the 6 translations contain one the peptides stored in a separate text file. If one of the peptides is found, dump the entry in another FastQ file. Pretty straightforward stuff !

Reading a file

The reactive mantra is: “Everything is a Stream” and the RxPY offers quite a few factories to generate Observables (Streams) from various sources. Of interest to us here is the from_ factory which builds an Observable from any Python iterable.

import gzip
from rx import Observable

fastq_entries = Observable.from_(gzip.open('my.fastq.gz', 'rt'))

As you can see, FastQ files are usually gzipped because they are pretty weighty.
But this is beside the point, we’re off to a good start !

Let’s add a few operators to our Observable to remove newline characters (using the map transformation) and filter out any empty lines:

import gzip
from rx import Observable

fastq_entries = Observable.from_(gzip.open('my.fastq.gz', 'rt')) \
    .map(lambda line: line.strip()) \
    .filter(lambda line: line != '')

Good, good 🙂
Oh and by the way, we are going to use lambda functions a lot.

FastQ entries have a specific format: they are each composed of 4 lines which look like this:

@HWI-ST942:38:C2N4BACXX:5:1101:1228:2134 1:N:0:ATCACG
TTTTCAGCCTACATCAAGGAGGTGGAGGAACGGCCGGCACCCACCCCGTGGGGCTCCAAGATGCCCTTTGGGGAACTGATGTTCGAATCCAGCAGTAGCT
+
@CC?DDFFHHHHGJJIJJJCHIAFHEFBACGGHGIIGIIJIIIGHHDD?CCDDBDA@DDC@C@CDDD<ACC9;7>A:?ABDB@CCCAB::AC:

So let’s make sure our Observable only emits complete entries (ie: all 4 lines) using a buffer transformation:

import gzip
from rx import Observable

fastq_entries = Observable.from_(gzip.open('my.fastq.gz', 'rt')) \
    .map(lambda line: line.strip()) \
    .filter(lambda line: line != '') \
    .buffer_with_count(4)

I’m liking reactive programming more and more by the minute !

All right, onto the translation part !
Let me bring in a bit of kitchen sink code to handle the DNA translation part..

codonTable = {
    'TTT': 'F', 'TCT': 'S', 'TAT': 'Y', 'TGT': 'C',
    'TTC': 'F', 'TCC': 'S', 'TAC': 'Y', 'TGC': 'C',
    'TTA': 'L', 'TCA': 'S', 'TAA': '*', 'TGA': '*',
    'TTG': 'L', 'TCG': 'S', 'TAG': '*', 'TGG': 'W',

    'CTT': 'L', 'CTC': 'L', 'CTA': 'L', 'CTG': 'L',
    'CCT': 'P', 'CCC': 'P', 'CCA': 'P', 'CCG': 'P',
    'CAT': 'H', 'CAC': 'H', 'CAA': 'Q', 'CAG': 'Q',
    'CGT': 'R', 'CGC': 'R', 'CGA': 'R', 'CGG': 'R',

    'ATT': 'I', 'ATC': 'I', 'ATA': 'I', 'ATG': 'M',
    'ACT': 'T', 'ACC': 'T', 'ACA': 'T', 'ACG': 'T',
    'AAT': 'N', 'AAC': 'N', 'AAA': 'K', 'AAG': 'K',
    'AGT': 'S', 'AGC': 'S', 'AGA': 'R', 'AGG': 'R',

    'GTT': 'V', 'GTC': 'V', 'GTA': 'V', 'GTG': 'V',
    'GCT': 'A', 'GCC': 'A', 'GCA': 'A', 'GCG': 'A',
    'GAT': 'D', 'GAC': 'D', 'GAA': 'E', 'GAG': 'E',
    'GGT': 'G', 'GGC': 'G', 'GGA': 'G', 'GGG': 'G'
}


def reverse_complement(sequence):
    """Returns the reverse complement of a DNA sequence"""
    tb = str.maketrans('ACGT', 'TGCA')
    return sequence.translate(tb)[::-1]


def translate_dna(fastq_entry, frame='f1'):
    """Translates DNA to amino acids according specified reading frame"""
    starts = {'f1': 0, 'f2': 1, 'f3': 2,
              'r1': 0, 'r2': 1, 'r3': 2}

    dna = fastq_entry[1].strip('N').replace('N', 'A')

    if frame.startswith('r'):
        dna = reverse_complement(dna)

    sequence = dna[starts[frame]:]

    protein = ""

    for i in range(0, len(sequence),  3):
        if 3 == len(sequence[i:i+3]):
            protein += codonTable[sequence[i:i+3]]

    return protein


def translate_dna_6frames(fastq_entry):
    """Translates DNA sequence according all 6 reading frames"""
    frames = ['f1', 'f2', 'f3', 'r1', 'r2', 'r3']
    
    translations = []
    for frame in frames:
        translations.append(translate_dna(fastq_entry, frame))

    return translations

I won’t explain this code as I just need it to build a complete working example.
The only noteworthy aspect is that I trim the DNA sequence of any leading or trailing ‘N’s and replace any remaining ones inside the sequence with ‘A’s.

All right, so we can now use map again to translate every FastQ entry:

fastq_entries.map(lambda entry: translate_dna_6frames(entry))

How are we doing so far ?

Now is probably a good time to bring up the .subscribe() method to observe the status of our execution.
.subscribe() is a way to attach an Observer to our Observable to consume its emissions.
In this case, let’s just print the output to the console

fastq_entries.map(lambda entry: translate_dna_6frames(entry)) \
    .subscribe(lambda translations: print(translations)

Output

['VHYDRSGRSLGTADVHFERKADALKAMKQYNGV', 'CTMIALVAA*EQQTCTLSGRQMP*RP*SSTTA', 
 'AL*SLWSQLRNSRRAL*AEGRCPEGHEAVQRR', 'DAVVLLHGLQGICLPLKVHVCCS*AATRAIIVH', 
 'TPLYCFMAFRASAFRSKCTSAVPKLRPERS*C', 'RRCTASWPSGHLPSAQSARLLFLSCDQSDHSA']...

First thing I realized when I started playing with Streams is that they can be somewhat hard to debug/observe mid process.
For instance: if you do not .subscribe() an Observable at some point in your code, NOTHING takes place when you run it and the script just returns.
You might also want to look into Schedulers at some point (especially if you make use on any Timed Observables like Observable.interval() or want to make your code asynchronous).

Finding the peptides

In order to find the peptides we’ll need this simple function:

def find_peptide(peptide, translations):
    for translation in translations:
        if peptide in translation:
            return True

    return False

And now for the tricky part :).
Getting an Observable from our peptides file is easy, we already know how to do that from our FastQ file exploration.

peptides = Observable.from_(open('peptides.txt', 'r')) \
    .map(lambda line: line.strip()).filter(lambda line: line != '')

But now we actually need to reproduce a nested loop in the context of reactive programming.
Remember, we want to find reads which code for ANYONE of our peptides. Hmmm..
Here’s the solution I came up with:

def lookup(peptides, translations):
    """Serves as the nested loop"""
    found = peptides.map(lambda p: find_peptide(p, translations)) \
        .reduce(lambda x, y: x or y, False)

    return found
lookups = fastq_entries.map(lambda entry: translate_dna_6frames(entry)) \
    .map(lambda translations: lookup(peptides, translations))

So what’s going on here ?
First, we are mapping .lookup() on our translations which, in turn, maps .find_peptides() onto our peptides Observable and reduces the results of that stream to a single emission in the form of a boolean. Nice, we’re almost there !

But wait a second.. If you subscribe to lookups and print its results, it quickly becomes apparent that there are issues:
1- Our nested loop only runs once and not for every emission of translations
2- .lookup() actually returns a list of Observables and not the actual Boolean answers we were hoping to get back (even though .reduce() only emits the final value, it is still wrapped inside an Observable)

So let’s address these issues one by one:

Issue 1

As it turns out, when you create an observable from a file iterator using the from_ factory, you can only subscribe to it once. Any subsequent subscription will not emit any values as you have reached the end of the file during the first consumption. There a few options to work around that and one of them is to wrap the construction of the Observable with .defer().

peptides = Observable.defer(
    lambda: 
    Observable.from_(open('peptides.txt', 'r'))
        .map(lambda line: line.strip())
        .filter(lambda line: line != '')
)

That was a simple fix, but this aspect is not clearly documented and this issue tormented me for quite a while.. !

Issue 2

Contrary to .translate_dna_6frames(), .lookup() returns Observables and not a value so we need a way to turn that list of Observables into a single Observable which we can subscribe to get our values. ReactiveX exposes many combining operators but the one which we’ll use here is switch (or .switch_latest(), it’s RxPY implementation)

lookups = fastq_entries.map(lambda entry: translate_dna_6frames(entry)) \
    .map(lambda translations: lookup(peptides, translations)) \
    .switch_latest()

Combining and Multicasting

The last thing we need to do is to combine fastq_entries and lookups into a single Observable which we can .filter() and .subscribe() in order to output only the FastQ entries which COULD code for one of our peptides.
For the combining part, I chose to go with .zip() (or .zip_array() in RxPY).

Observable.zip_array(found, fastq_entries)\
    .filter(lambda arr: arr[0]) \
    .subscribe(print)

Unfortunately we encounter another issue: lookups and our zipped array are both subscribing to the fastq_entries Observable and are thus getting distinct values from the Observable.. That’s not good. We want zip_array() to zip the same fastq_entry that got consumed and transformed by the lookups pipeline. This is called multicasting and we can achieve the desired result by adding the .publish() statement to the creation of our fastq_entries Observable like this:

fastq_entries = Observable.from_(gzip.open('small.fastq.gz', 'rt')) \
    .map(lambda line: line.strip()) \
    .buffer_with_count(4) \
    .filter(lambda line: line != '') \
    .publish()

This simple addition turns fastq_entries into a connectable Observable which will wait for the .connect() signal to start emitting to all it registered subscribers. We just need to add

fastq_entries.connect()

after we have registered all subscribers and we will then get multicasting.

Putting it all together

So here’s the entire, final version of our script:

import sys
import gzip

from rx import Observable

codonTable = {
    'TTT': 'F', 'TCT': 'S', 'TAT': 'Y', 'TGT': 'C',
    'TTC': 'F', 'TCC': 'S', 'TAC': 'Y', 'TGC': 'C',
    'TTA': 'L', 'TCA': 'S', 'TAA': '*', 'TGA': '*',
    'TTG': 'L', 'TCG': 'S', 'TAG': '*', 'TGG': 'W',

    'CTT': 'L', 'CTC': 'L', 'CTA': 'L', 'CTG': 'L',
    'CCT': 'P', 'CCC': 'P', 'CCA': 'P', 'CCG': 'P',
    'CAT': 'H', 'CAC': 'H', 'CAA': 'Q', 'CAG': 'Q',
    'CGT': 'R', 'CGC': 'R', 'CGA': 'R', 'CGG': 'R',

    'ATT': 'I', 'ATC': 'I', 'ATA': 'I', 'ATG': 'M',
    'ACT': 'T', 'ACC': 'T', 'ACA': 'T', 'ACG': 'T',
    'AAT': 'N', 'AAC': 'N', 'AAA': 'K', 'AAG': 'K',
    'AGT': 'S', 'AGC': 'S', 'AGA': 'R', 'AGG': 'R',

    'GTT': 'V', 'GTC': 'V', 'GTA': 'V', 'GTG': 'V',
    'GCT': 'A', 'GCC': 'A', 'GCA': 'A', 'GCG': 'A',
    'GAT': 'D', 'GAC': 'D', 'GAA': 'E', 'GAG': 'E',
    'GGT': 'G', 'GGC': 'G', 'GGA': 'G', 'GGG': 'G'
}


def reverse_complement(sequence):
    tb = str.maketrans('ACGT', 'TGCA')
    return sequence.translate(tb)[::-1]


def translate_dna(fastq_entry, frame='f1'):
    """Translates DNA to amino acids according specified reading frame"""
    starts = {'f1': 0, 'f2': 1, 'f3': 2,
              'r1': 0, 'r2': 1, 'r3': 2}

    dna = fastq_entry[1].strip('N').replace('N', 'A')

    if frame.startswith('r'):
        dna = reverse_complement(dna)

    sequence = dna[starts[frame]:]

    protein = ""

    for i in range(0, len(sequence),  3):
        if 3 == len(sequence[i:i+3]):
            protein += codonTable[sequence[i:i+3]]

    return protein


def translate_dna_6frames(fastq_entry):
    frames = ['f1', 'f2', 'f3', 'r1', 'r2', 'r3']

    translations = []
    for frame in frames:
        translations.append(translate_dna(fastq_entry, frame))

    return translations


def find_peptide(peptide, translations):
    for translation in translations:
        if peptide in translation:
            return True

    return False


def lookup(peptides, translations):
    found = peptides.map(lambda p: find_peptide(p, translations)) \
        .reduce(lambda x, y: x or y, False)

    return found


def main():
    out = gzip.open('new.fastq.gz', 'wt')

    peptides = Observable.defer(
        lambda:
        Observable.from_(open('peptides.txt', 'r'))
            .map(lambda line: line.strip())
            .filter(lambda line: line != '')
    )

    fastq_entries = Observable.from_(gzip.open('small.fastq.gz', 'rt')) \
        .map(lambda line: line.strip()) \
        .filter(lambda line: line != '') \
        .buffer_with_count(4) \
        .publish()

    lookups = fastq_entries.map(lambda entry: translate_dna_6frames(entry)) \
        .map(lambda translations: lookup(peptides, translations)) \
        .switch_latest()
    
    Observable.zip_array(lookups, fastq_entries) \
        .filter(lambda arr: arr[0]) \
        .subscribe(on_next=lambda arr: print('\n'.join(arr[1]), sep='', end='\n', file=out, flush=True),
                   on_completed=lambda: out.close())

    fastq_entries.connect()

if __name__ == '__main__':
    sys.exit(main())

Final thoughts

Reactive promises to make your code

  • more succinct
  • more readable
  • less prone to bugs

I think in this example, Reactive Programming delivered on its promises.
The code is certainly succinct and, once you get better acquainted with the operators, becomes very readable.
As for the bugs, well, time will tell but one thing’s for sure: less code equals less chances to make errors !

On the downside, I did find it harder to debug and the documentation is quite an issue to beginners right now.

I’ve really only scratched the surface of Reactive Programming but come out of this experience fairly enthusiastic.
And yes, I do realize that my example does not make use of Reactive’s greatest strength: handling asynchronicity like a pro, but you have to start somewhere !

Feel free to leave comments or suggest modifications !

By | 2017-05-03T09:19:14+00:00 May 2, 2017|Categories: Bioinformatics, Computer science, Data Analysis|Tags: , |2 Comments

About the Author:

Originally trained in molecular biology, I quickly realized my heart lied with bioinformatics ! (How can anyone be presented an HMM and not fall in love ?). While I spend most of my days writing Python code, I must admit I am starting to enjoy my occasional dip in R.

2 Comments

  1. Mathieu Courcelles May 5, 2017 at 08:44 - Reply

    Great article! First time I hear of ReactiveX. Do you know how it compares to Python asyncio?

    It would have been nice to see how much succinct code is without all this. Can you comment on memory consumption and speed?

    • Jean-Philippe May 8, 2017 at 11:48 - Reply

      Hi Mathieu.

      To be frank, I haven’t used asyncio a whole lot but I can say this:
      1- RxPY seems to offer a lot of stream operators (such as buffer, filter or debounce), which are not part of asyncio and would need to be implemented “by hand”.
      2- RxPY can actually make use of the asyncio scheduler, so I believe RxPY can be seen as complementary to asyncio..

      But of course, I could be mistaken here.

      As for comparing this script with a “non-RxPY” version, I thought there was already a lot of code here, so I decided to keep it for another post. I’ll make sure to include memory consumption and speed comparison parts as well. Stay tuned !

Leave A Comment