What’s all this about ?
ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming.
Using Rx, you can easily:
– Create event or data emitting streams from sources such as a file or a web service
– Compose and transform streams with query-like operators
– Subscribe to any observable stream and “react” to its emissions to perform side effects
Reactive programming has been gaining traction these past few years. Maybe you’ve only heard the term, maybe you’ve adopted the programming paradigm in your daily workflow.. I’m a bit of a late comer to the party as I was only exposed to it about a month ago during an Angular2 bootcamp. I was immediately drawn to the idea of reacting to events taking place in data streams.
We bioinformaticians love our pipes (the shell kind :)) We’re constantly piping the output of a program into another program..
Want to know how many entries a fasta file contains? Easy-peasy:
%> grep '>' my_fasta_file.txt | wc -l
So we’re already quite comfortable with the notion of data streams (stdout in this case) and consuming those streams (with stdin) to produce results.
By now, I obviously needed to play with reactive programming in order to get better acquainted. While looking for a pet project to implement I remembered this post from Patrick and I thought: Ooooohhh I’ll do something which processes a FastQ File !
As it turns out, a research project I’m working on needs me to identify the reads from a transcriptome which have the potential to code for a set of peptides… Perfect !
So I set off on a journey I thought would be fairly painless. The net is chock-full of examples such as “Subscribe to a realtime stockquote ticker and print a warning if the price of a stock has risen by more than x % within the last y seconds, and do it all with a 1-liner !”, so I should achieve my goal easily, right ? Well, turns out it was a bit more complicated than anticipated and, hopefully, others will benefit from the solutions I found along the way.
Context and Documentation sources
First things first, the ReactiveX library is available for many programming languages but since I’m a big Python fan, I chose to work with RxPY. (I’ll let you read the documentation on how to setup the environment, it’s pretty trivial).
You can find documentation in various places on the web but here are the sources which helped me the most:
RxPY on github
Rx Marbles
ReactiveX’s Home
Dag Brattli’s Documentation Pages
And let’s not forget:
The intro you’ve been missing
Let’s get to work !
So here’s the plan in Layman’s terms:
For each entry in a FastQ file, translate the DNA sequence of the entry according to all 6 reading frames and check to see if any of the 6 translations contain one the peptides stored in a separate text file. If one of the peptides is found, dump the entry in another FastQ file. Pretty straightforward stuff !
Reading a file
The reactive mantra is: “Everything is a Stream” and the RxPY offers quite a few factories to generate Observables (Streams) from various sources. Of interest to us here is the from_
factory which builds an Observable
from any Python iterable.
import gzip
from rx import Observable
fastq_entries = Observable.from_(gzip.open('my.fastq.gz', 'rt'))
As you can see, FastQ files are usually gzipped because they are pretty weighty.
But this is beside the point, we’re off to a good start !
Let’s add a few operators to our Observable to remove newline characters (using the map transformation) and filter out any empty lines:
import gzip
from rx import Observable
fastq_entries = Observable.from_(gzip.open('my.fastq.gz', 'rt')) \
.map(lambda line: line.strip()) \
.filter(lambda line: line != '')
Good, good 🙂
Oh and by the way, we are going to use lambda functions a lot.
FastQ entries have a specific format: they are each composed of 4 lines which look like this:
@HWI-ST942:38:C2N4BACXX:5:1101:1228:2134 1:N:0:ATCACG TTTTCAGCCTACATCAAGGAGGTGGAGGAACGGCCGGCACCCACCCCGTGGGGCTCCAAGATGCCCTTTGGGGAACTGATGTTCGAATCCAGCAGTAGCT + @CC?DDFFHHHHGJJIJJJCHIAFHEFBACGGHGIIGIIJIIIGHHDD?CCDDBDA@DDC@C@CDDD<ACC9;7>A:?ABDB@CCCAB::AC:
So let’s make sure our Observable only emits complete entries (ie: all 4 lines) using a buffer
transformation:
import gzip
from rx import Observable
fastq_entries = Observable.from_(gzip.open('my.fastq.gz', 'rt')) \
.map(lambda line: line.strip()) \
.filter(lambda line: line != '') \
.buffer_with_count(4)
I’m liking reactive programming more and more by the minute !
All right, onto the translation part !
Let me bring in a bit of kitchen sink code to handle the DNA translation part..
codonTable = {
'TTT': 'F', 'TCT': 'S', 'TAT': 'Y', 'TGT': 'C',
'TTC': 'F', 'TCC': 'S', 'TAC': 'Y', 'TGC': 'C',
'TTA': 'L', 'TCA': 'S', 'TAA': '*', 'TGA': '*',
'TTG': 'L', 'TCG': 'S', 'TAG': '*', 'TGG': 'W',
'CTT': 'L', 'CTC': 'L', 'CTA': 'L', 'CTG': 'L',
'CCT': 'P', 'CCC': 'P', 'CCA': 'P', 'CCG': 'P',
'CAT': 'H', 'CAC': 'H', 'CAA': 'Q', 'CAG': 'Q',
'CGT': 'R', 'CGC': 'R', 'CGA': 'R', 'CGG': 'R',
'ATT': 'I', 'ATC': 'I', 'ATA': 'I', 'ATG': 'M',
'ACT': 'T', 'ACC': 'T', 'ACA': 'T', 'ACG': 'T',
'AAT': 'N', 'AAC': 'N', 'AAA': 'K', 'AAG': 'K',
'AGT': 'S', 'AGC': 'S', 'AGA': 'R', 'AGG': 'R',
'GTT': 'V', 'GTC': 'V', 'GTA': 'V', 'GTG': 'V',
'GCT': 'A', 'GCC': 'A', 'GCA': 'A', 'GCG': 'A',
'GAT': 'D', 'GAC': 'D', 'GAA': 'E', 'GAG': 'E',
'GGT': 'G', 'GGC': 'G', 'GGA': 'G', 'GGG': 'G'
}
def reverse_complement(sequence):
"""Returns the reverse complement of a DNA sequence"""
tb = str.maketrans('ACGT', 'TGCA')
return sequence.translate(tb)[::-1]
def translate_dna(fastq_entry, frame='f1'):
"""Translates DNA to amino acids according specified reading frame"""
starts = {'f1': 0, 'f2': 1, 'f3': 2,
'r1': 0, 'r2': 1, 'r3': 2}
dna = fastq_entry[1].strip('N').replace('N', 'A')
if frame.startswith('r'):
dna = reverse_complement(dna)
sequence = dna[starts[frame]:]
protein = ""
for i in range(0, len(sequence), 3):
if 3 == len(sequence[i:i+3]):
protein += codonTable[sequence[i:i+3]]
return protein
def translate_dna_6frames(fastq_entry):
"""Translates DNA sequence according all 6 reading frames"""
frames = ['f1', 'f2', 'f3', 'r1', 'r2', 'r3']
translations = []
for frame in frames:
translations.append(translate_dna(fastq_entry, frame))
return translations
I won’t explain this code as I just need it to build a complete working example.
The only noteworthy aspect is that I trim the DNA sequence of any leading or trailing ‘N’s and replace any remaining ones inside the sequence with ‘A’s.
All right, so we can now use map
again to translate every FastQ entry:
fastq_entries.map(lambda entry: translate_dna_6frames(entry))
How are we doing so far ?
Now is probably a good time to bring up the .subscribe()
method to observe the status of our execution.
.subscribe()
is a way to attach an Observer
to our Observable
to consume its emissions.
In this case, let’s just print the output to the console
fastq_entries.map(lambda entry: translate_dna_6frames(entry)) \
.subscribe(lambda translations: print(translations)
Output
['VHYDRSGRSLGTADVHFERKADALKAMKQYNGV', 'CTMIALVAA*EQQTCTLSGRQMP*RP*SSTTA', 'AL*SLWSQLRNSRRAL*AEGRCPEGHEAVQRR', 'DAVVLLHGLQGICLPLKVHVCCS*AATRAIIVH', 'TPLYCFMAFRASAFRSKCTSAVPKLRPERS*C', 'RRCTASWPSGHLPSAQSARLLFLSCDQSDHSA']...
First thing I realized when I started playing with Streams is that they can be somewhat hard to debug/observe mid process.
For instance: if you do not .subscribe()
an Observable
at some point in your code, NOTHING takes place when you run it and the script just returns.
You might also want to look into Schedulers at some point (especially if you make use on any Timed Observables like Observable.interval()
or want to make your code asynchronous).
Finding the peptides
In order to find the peptides we’ll need this simple function:
def find_peptide(peptide, translations):
for translation in translations:
if peptide in translation:
return True
return False
And now for the tricky part :).
Getting an Observable from our peptides file is easy, we already know how to do that from our FastQ file exploration.
peptides = Observable.from_(open('peptides.txt', 'r')) \
.map(lambda line: line.strip()).filter(lambda line: line != '')
But now we actually need to reproduce a nested loop in the context of reactive programming.
Remember, we want to find reads which code for ANYONE of our peptides. Hmmm..
Here’s the solution I came up with:
def lookup(peptides, translations):
"""Serves as the nested loop"""
found = peptides.map(lambda p: find_peptide(p, translations)) \
.reduce(lambda x, y: x or y, False)
return found
lookups = fastq_entries.map(lambda entry: translate_dna_6frames(entry)) \
.map(lambda translations: lookup(peptides, translations))
So what’s going on here ?
First, we are mapping .lookup()
on our translations which, in turn, maps .find_peptides()
onto our peptides Observable
and reduces the results of that stream to a single emission in the form of a boolean. Nice, we’re almost there !
But wait a second.. If you subscribe to lookups and print its results, it quickly becomes apparent that there are issues:
1- Our nested loop only runs once and not for every emission of translations
2- .lookup()
actually returns a list of Observables
and not the actual Boolean answers we were hoping to get back (even though .reduce()
only emits the final value, it is still wrapped inside an Observable
)
So let’s address these issues one by one:
Issue 1
As it turns out, when you create an observable from a file iterator using the from_
factory, you can only subscribe to it once. Any subsequent subscription will not emit any values as you have reached the end of the file during the first consumption. There a few options to work around that and one of them is to wrap the construction of the Observable with .defer()
.
peptides = Observable.defer(
lambda:
Observable.from_(open('peptides.txt', 'r'))
.map(lambda line: line.strip())
.filter(lambda line: line != '')
)
That was a simple fix, but this aspect is not clearly documented and this issue tormented me for quite a while.. !
Issue 2
Contrary to .translate_dna_6frames()
, .lookup()
returns Observables and not a value so we need a way to turn that list of Observables into a single Observable which we can subscribe to get our values. ReactiveX exposes many combining operators but the one which we’ll use here is switch (or .switch_latest(), it’s RxPY implementation)
lookups = fastq_entries.map(lambda entry: translate_dna_6frames(entry)) \
.map(lambda translations: lookup(peptides, translations)) \
.switch_latest()
Combining and Multicasting
The last thing we need to do is to combine fastq_entries and lookups into a single Observable
which we can .filter()
and .subscribe()
in order to output only the FastQ entries which COULD code for one of our peptides.
For the combining part, I chose to go with .zip() (or .zip_array() in RxPY).
Observable.zip_array(found, fastq_entries)\
.filter(lambda arr: arr[0]) \
.subscribe(print)
Unfortunately we encounter another issue: lookups and our zipped array
are both subscribing to the fastq_entries Observable
and are thus getting distinct values from the Observable.. That’s not good. We want zip_array()
to zip the same fastq_entry that got consumed and transformed by the lookups pipeline. This is called multicasting and we can achieve the desired result by adding the .publish()
statement to the creation of our fastq_entries Observable
like this:
fastq_entries = Observable.from_(gzip.open('small.fastq.gz', 'rt')) \
.map(lambda line: line.strip()) \
.buffer_with_count(4) \
.filter(lambda line: line != '') \
.publish()
This simple addition turns fastq_entries into a connectable Observable
which will wait for the .connect()
signal to start emitting to all it registered subscribers. We just need to add
fastq_entries.connect()
after we have registered all subscribers and we will then get multicasting.
Putting it all together
So here’s the entire, final version of our script:
import sys
import gzip
from rx import Observable
codonTable = {
'TTT': 'F', 'TCT': 'S', 'TAT': 'Y', 'TGT': 'C',
'TTC': 'F', 'TCC': 'S', 'TAC': 'Y', 'TGC': 'C',
'TTA': 'L', 'TCA': 'S', 'TAA': '*', 'TGA': '*',
'TTG': 'L', 'TCG': 'S', 'TAG': '*', 'TGG': 'W',
'CTT': 'L', 'CTC': 'L', 'CTA': 'L', 'CTG': 'L',
'CCT': 'P', 'CCC': 'P', 'CCA': 'P', 'CCG': 'P',
'CAT': 'H', 'CAC': 'H', 'CAA': 'Q', 'CAG': 'Q',
'CGT': 'R', 'CGC': 'R', 'CGA': 'R', 'CGG': 'R',
'ATT': 'I', 'ATC': 'I', 'ATA': 'I', 'ATG': 'M',
'ACT': 'T', 'ACC': 'T', 'ACA': 'T', 'ACG': 'T',
'AAT': 'N', 'AAC': 'N', 'AAA': 'K', 'AAG': 'K',
'AGT': 'S', 'AGC': 'S', 'AGA': 'R', 'AGG': 'R',
'GTT': 'V', 'GTC': 'V', 'GTA': 'V', 'GTG': 'V',
'GCT': 'A', 'GCC': 'A', 'GCA': 'A', 'GCG': 'A',
'GAT': 'D', 'GAC': 'D', 'GAA': 'E', 'GAG': 'E',
'GGT': 'G', 'GGC': 'G', 'GGA': 'G', 'GGG': 'G'
}
def reverse_complement(sequence):
tb = str.maketrans('ACGT', 'TGCA')
return sequence.translate(tb)[::-1]
def translate_dna(fastq_entry, frame='f1'):
"""Translates DNA to amino acids according specified reading frame"""
starts = {'f1': 0, 'f2': 1, 'f3': 2,
'r1': 0, 'r2': 1, 'r3': 2}
dna = fastq_entry[1].strip('N').replace('N', 'A')
if frame.startswith('r'):
dna = reverse_complement(dna)
sequence = dna[starts[frame]:]
protein = ""
for i in range(0, len(sequence), 3):
if 3 == len(sequence[i:i+3]):
protein += codonTable[sequence[i:i+3]]
return protein
def translate_dna_6frames(fastq_entry):
frames = ['f1', 'f2', 'f3', 'r1', 'r2', 'r3']
translations = []
for frame in frames:
translations.append(translate_dna(fastq_entry, frame))
return translations
def find_peptide(peptide, translations):
for translation in translations:
if peptide in translation:
return True
return False
def lookup(peptides, translations):
found = peptides.map(lambda p: find_peptide(p, translations)) \
.reduce(lambda x, y: x or y, False)
return found
def main():
out = gzip.open('new.fastq.gz', 'wt')
peptides = Observable.defer(
lambda:
Observable.from_(open('peptides.txt', 'r'))
.map(lambda line: line.strip())
.filter(lambda line: line != '')
)
fastq_entries = Observable.from_(gzip.open('small.fastq.gz', 'rt')) \
.map(lambda line: line.strip()) \
.filter(lambda line: line != '') \
.buffer_with_count(4) \
.publish()
lookups = fastq_entries.map(lambda entry: translate_dna_6frames(entry)) \
.map(lambda translations: lookup(peptides, translations)) \
.switch_latest()
Observable.zip_array(lookups, fastq_entries) \
.filter(lambda arr: arr[0]) \
.subscribe(on_next=lambda arr: print('\n'.join(arr[1]), sep='', end='\n', file=out, flush=True),
on_completed=lambda: out.close())
fastq_entries.connect()
if __name__ == '__main__':
sys.exit(main())
Final thoughts
Reactive promises to make your code
- more succinct
- more readable
- less prone to bugs
I think in this example, Reactive Programming delivered on its promises.
The code is certainly succinct and, once you get better acquainted with the operators, becomes very readable.
As for the bugs, well, time will tell but one thing’s for sure: less code equals less chances to make errors !
On the downside, I did find it harder to debug and the documentation is quite an issue to beginners right now.
I’ve really only scratched the surface of Reactive Programming but come out of this experience fairly enthusiastic.
And yes, I do realize that my example does not make use of Reactive’s greatest strength: handling asynchronicity like a pro, but you have to start somewhere !
Feel free to leave comments or suggest modifications !
Great article! First time I hear of ReactiveX. Do you know how it compares to Python asyncio?
It would have been nice to see how much succinct code is without all this. Can you comment on memory consumption and speed?
Hi Mathieu.
To be frank, I haven’t used asyncio a whole lot but I can say this:
1- RxPY seems to offer a lot of stream operators (such as buffer, filter or debounce), which are not part of asyncio and would need to be implemented “by hand”.
2- RxPY can actually make use of the asyncio scheduler, so I believe RxPY can be seen as complementary to asyncio..
But of course, I could be mistaken here.
As for comparing this script with a “non-RxPY” version, I thought there was already a lot of code here, so I decided to keep it for another post. I’ll make sure to include memory consumption and speed comparison parts as well. Stay tuned !
Hi Jean-Philippe,
thanks or the awesome article! I wonder though if the approach to directly pass the result of `open` to `Observable.from_` generates a resource leak, since the file won’t be closed when the stream completes.
Wouldn’t it be better to use the `using` method for file handling?
Hello Dr Leue,
I think you’re right !
We would, indeed, be accumulating file handles over the process of running through the entries of the FASTQ.
For large files (hundreds of millions of reads, for example), they could certainly add up.
Thanks for pointing it out, I’ll update the code example shortly.