Writing Pipelines - dna-storage/framed GitHub Wiki
Writing Pipelines
Pipeline Function
To leverage this infrastructure for large scale experiments, we will go step by step in how to write pipelines and their components so that custom pipeline can be instantiated and experimented with. First, we begin with an example of a very simple pipeline in the following code block to explain the high level anatomy of a typical pipeline. In this code block, we create a pipeline with 4 pipeline components. Namely, and outer Reed Solomon encoder, a Hedges inner encoder, and 2 DNA passes that prepend and append DNA fragments to the final encoded DNA strand. Every pipeline should have the same function signature, where the first argument pf
refers to an object that feeds file data to the pipeline, and kwargs
are the input parameters to the pipeline. The kwargs
parameters can be anything that you want them to be to instantiate your components, though there are some common parameters that are always going to be necessary. They are listed as:
blockSizeInBytes
: This sets the number of bytes that a packet will be.strandSizeInBytes
: This sets the initial length of the Byte Strand, final length may be longer depending on different encodings. This can be interpreted as the number of file bytes that will be on a single Byte Strand.barcode
: This is a tuple of integers (each should be >=0 and <256) that represent the total barcode of each strand in the pipeline.upper_strand_length
: This is a value that helps debug pipelines. It sets the upper limit a strand should be, in base pairs. It allows for designs to be known if they yield too long of DNA strands.
Besides these 4 parameters, it is up to the pipeline writer to include additional parameters needed for other components. Once the component objects are instantiated, they must be passed to the PipeLine
constructor as a tuple, and must be in the correct order as they would exist in the pipeline, e.g. recall that the outer encoding should occur before the inner encoding passes. All remaining parameters for the PipeLine
object should generally remain the same. Finally, this PipeLine
object is returned and this concludes the majority of the work needed to get a pipeline that will encode and decode information between digital and DNA form. Note, required python modules will be necessary as related to each component that is used in the pipeline, the imports in this example should cover the exampled components.
#imports necessary for the example
from dnastorage.codec import PipeLine as pipeline
from dnastorage.codec.hedges import *
from dnastorage.codec.phys import *
from dnastorage.codec.block import *
BasicPipeline(pf,**kwargs):
#basic pipeline parameters, should be a part of every pipeline
blockSizeInBytes=kwargs.get("blockSizeInBytes",150*15)
strandSizeInBytes=kwargs.get("strandSizeInBytes",15)
barcode = kwargs.get("barcode",tuple())
upper_strand_length = kwargs.get("upper_strand_length",2000)
#parameters with specific components
primer5 = kwargs.get("primer5","")
primer3 = kwargs.get("primer3","")
outerECCStrands = kwargs.get("outerECCStrands",50)
outerECCDivisor = kwargs.get("outerECCDivisor",50)
#instantiate components
p5 = PrependSequencePipeline(primer5,handler="align",search_range=20)
p3 = AppendSequencePipeline(reverse_complement(primer3),handler="align",search_range=20)
consolidator = SimpleMajorityVote()
hedges = FastHedgesPipeline()
rsOuter = ReedSolomonOuterPipeline(outerECCDivisor,outerECCStrands)
#put components together and return the PipeLine object
return pipeline.PipeLine((rsOuter,hedges,p5,p3),consolidator,blockSizeInBytes,strandSizeInBytes,upper_strand_length,1,
packetizedfile=pf,barcode=barcode)
Registering a New Pipeline Function
The final step in writing a new pipeline before it can be used in experiments is to register it in a datastructure so that it is known to other parts of the infrastructure. This datastructure, which is a dictionary named FileSystemFormats
, that can be found in the formats.py file. Given the file signature given to our BasicPipeline
from before, we simply need to add an entry to the dictionary as follows.
FileSystemFormats = {
...
0x0704 : [0x0704,None,None,"BasicPipeline","Simple example of a new pipeline",BasicPipeline,BasicPipeline],
...
}
A description of each field following the order shown is:
- This is a unique integer identifying the pipeline. This is the same as the dictionary key (as shown), and should be different than any other key in the dictionary.
- Deprecated, can be left
None
- Deprecated, can be left
None
- A name for the pipeline, can be whatever the user wants it to be. Here it is chosen to be the same as the function name.
- Description for your pipeline
- Encoding function reference
- Decoding function reference, note this is just the reference to the function written in the example, and for pipelines will be the same as the encoding function since a pipeline handles both decode and encode.
How to Write New Components
There are a number of components that are available for immediate use in the codec directory of this project, and there are a number of pipelines that can be immediately used in the builder.py file, but to further extend this infrastructure for your own needs you will likely need to write your own components. The following sections will go over the required inheritance and signatures for a working pipeline component that can then be imported to your pipeline function.
Class Inheritance and Required Methods
Inner Components
Any inner pipeline component will have a structure identical to the following code block.
#necessary imports
from dnastorage.codec.base import *
class MyPipelineComponent(BaseCodec, <ComponentType>):
def __init__(self,...):
<ComponentType>.__init__(self)
BaseCodec.__init__(self)
#implement component's initialization
def _encode(self,strand):
#implement component's encoding pass
def _decode(self,strand):
#implement component's decoding pass
def _encode_header(self):
data =[]
#implement component's header (optional)
return data
def _decode_header(self,buff):
pos = 0
#implement component's header decoding
return buff[pos:]
In general, the component must inherit from BaseCodec
, which is a base object that implements crucial infrastructure needed to connect components together, and it must inherit from a certain <ComponentType>
which indicates what type of component is being implemented. The component's type helps the PipeLine
object make sense of the ordering between components in the pipeline tuple shown before. A list of the types of components is given with a description as follows, these classes can be found in the codec_types.py file:
CWtoCW
: Component type that indicates that the pass will convert a Byte Strand to another Byte Strand.CWtoDNA
: Component type that indicates that the pass will convert a Byte Strand to a DNA strandDNAtoDNA
: Component type that indicates that the pass will convert DNA strand to DNA strand.Probe
: Component type that should not perform any real encoding/decoding. This type is reserved for analysis passes.
Depending on what pass type is chosen, the behavior of the _encode
and _decode
methods will be different. A basic component pass will modify one of 2 attributes in the input strand
variable. The strand
variable will be some derived class of BaseDNA
as defined in the strand_representation.py file. The two attributes of interest to a given pass are the codewords
and dna_strand
properties. The model for doing work on the input strands for a given pass is to modify the attributes, rather than returning a value. Representing strands as a class helps bundle relevant information together that just isn't the Byte Strand or the DNA strand, but it allows for other attributes to be attached to the strand making it easier to extend strands to have more information, e.g adding flags indicating if the strand is RNA, etc. The modification rules for each component types is as follows:
CWtoCW
:codewords
should be modified on encode and decode.CWtoDNA
:dna_strand
should be modified on encode, andcodewords
should be modified on decodeDNAtoDNA
:dna_strand
should be modified on both encode and decodeProbe
: Neithercodewords
ordna_strand
should be modified.
With the encoding and decoding passes implemented, there are two other methods that the programmer may be interested in, and these are related to creating the header for the pass. Creating a header is done in the _encode_header
method. To implement this method properly, the programmer should fill the data
list with integers in a valid byte range (0 to 255) which serialize the information that they want to store in the header. So, right now there is not too much support for the serialization steps per se. The user is responsible for choosing the bytes to represent their information, but the information they choose to store is arbitrary. On finishing, the data
list should be returned. The _decode_header
method reverses this process, and takes in buff
which is a list of bytes. The user is responsible for decoding the pass's bytes by reversing the encoding process, and then advancing the head of the buff
list. Advancing the head is done by incrementing the pos
variable as information is decoded from buff
, ultimately the value of pos
at the end of the method should be equal to the size of data
at the end of decoding. The programmer must return a new buffer from the _decode_header
method which slices out this pass's data as shown in the code block.
Rules for Outer Encodings
Implementing outer encodings is largely the same as inner-components, the same 4 methods need to be implemented, but because the outer encoding process is coupled with the indexing process and because a group of strands is operated on rather than a single strand, the _encode
and _decode
methods must be written accordingly. The following code block provides an outline of an outer encoding component:
#necessary imports
from dnastorage.codec.base import *
from dnastorage.strand_representation import *
class MyOuterComponent(BaseOuterCodec):
def __init__(self,packet_divisor,...):
BaseOuterCodec.__init__(self,packet_divisor)
#implement rest of initialization
def _encode(self,packets):
parity_packets = []
#implement outer encoding
return packets+parity_packets
def _decode(self,packets):
#implement outer decoding
return packets
def _encode_header(self):
data = []
#implement header encode
return BaseOuterCodec._encode_header(self) + data
def _decode_header(self,buff):
b = BaseOuterCodec._decode_header(self,buff)
pos = 0
#implement header decode
return b[pos:]
Starting from the initialization method, one difference in outer encodings compared to inner is that a packet divisor must specified. This parameter dictates how the packet will be divided into sub-packets. These sub-packets will be the subject of error correction encoding and decoding. Before getting to the _encode
and _decode
methods, it should be pointed out that the header methods need a slight change compared to the inner encoding. This is because the BaseOuterCodec
object implements needs its own header information related to indexing strands. Thus, we need to make sure to call the overridden super class methods for both encoding and decoding the header. In general, if any component inherits from another that implements a header, this is the approach that should be taken so that all data is accounted for in the header.
Because outer encodings work with multiple Byte Strands, the _encode
and _decode
processes are significantly different than for an inner encoding. First, the _encode
method takes in a list named packets
. This is a nested list where each packets[i]
is an individual packet of strands and packets[i][j]
is an individual DNA strand of type BaseDNA
that has attributes as mentioned in the inner components section. The encoding process is up to the programmer, but the order of packets is important when returning from the _encode
method. This ordering is more important for the added error correction packets rather than the input data packets since the data packets already have an assigned index. The error correction packets will be given an index after returning from this method. Thus, they must be placed in an order that is easy for the infrastructure to infer where they are, so the infrastructure assumes that these packets will occur after the data packets in the returned set of packets. An example how to best handle this is shown in the code block, where all packets are returned, with the parity packets appended to the end of the list.
The _decode
method input is similar, in that the input is a nested list of DNA strands, where packets[i]
is the ith packet and packets[i][j]
is the jth strand in the ith packet. Instead, here the packets should include the parity packets as well, and since the infrastructure is aware of any missing strands based on indexing information that is book-kept, there should be a type derived from BaseDNA
for every position. If a strand is missing, all bytes in the strand will be set to None
. Note, all strands and packets will be in the order as defined by their index. Given this information, the decode process must correct any strands it deems necessary by modifying the strand objects within the packets, ultimately returning the packets with the corrected strands in the same order. The easiest way to accomplish this is by simply modifying the strand objects and returning the input argument.
Probes
The last type of pass that is important to know about are Probes
. Probes are meant to be a way to measure events that occur, especially during the decode process so that different encoding schemes can be evaluated. They shall not modify key values in the strand reference, but may add attributes on the fly. Utilizing python's ability to dynamically add attributes to a class allows using setattr
provides the ability to easily remove and add analysis to the pipeline. Typically, probes will only need to implement their __init__
, _encode
, and _decode
methods. Typically, probes are only really useful in the case of studying fault injection results, or sequencing data results since analysis is typically done during the decode pass. To give a working example of a probe, the following code block is from probes.py, and demonstrates a probe that takes a snapshot of a Byte Strand during encoding. This snapshot can then be used as the ground truth to compare against after the decode process of a possibly erroneous DNA strand. This probe calculates byte-by-byte errors and aggregates information in a stats
object which is dumped at the end of simulation, which will be covered in the following section.
class CodewordErrorRateProbe(DNAErrorProbe):
#This probe should be placed after a single strand codec so that analysis can be performed
probe_id=0
def __init__(self,probe_name="",dna_hook=None,CodecObj=None):
if probe_name=="":
self.name = "codeword_probe_{}".format(CodewordErrorRateProbe.probe_id)
else:
self.name = probe_name
DNAErrorProbe.__init__(self,self.name,CodecObj)
self._initial_codeword_attr = "{}::init_codewords".format(self.name)
self._total_error_rate_key = "{}::total_errors".format(self.name)
self._strands_seen_key = "{}::total_strands".format(self.name)
self._correct_key="{}::correct_strands".format(self.name)
self._incorrect_key="{}::incorrect_strands".format(self.name)
self._incorrect_not_none="{}::incorrect_strands_not_none".format(self.name)
self._first_byte_error="{}::first_error".format(self.name)
self.dna_attr = dna_hook
CodewordErrorRateProbe.probe_id+=1
def _encode(self,s):
setattr(s,self._initial_codeword_attr,copy.copy(s.codewords)) #take a snapshot of the codewords under an attribute specific to this isntantiation
return s
def _decode(self,s):
if not hasattr(s,self._initial_codeword_attr): return s #might have a leak over from some other encoding/decoding pipeline, cannot rely on this being true always, best compromise is to return
base_codewords = getattr(s,self._initial_codeword_attr)
fault_codewords = s.codewords
#now analyze error rates b/w the base and fault
first=True
for i in range(len(base_codewords)):
if i>=len(fault_codewords) or base_codewords[i]!=fault_codewords[i]:
stats.inc(self._total_error_rate_key,dflt=np.zeros((len(base_codewords),)),coords=i)
if first:
first=False
stats.inc(self._first_byte_error,dflt=np.zeros((len(base_codewords),)),coords=i)
if i<len(fault_codewords) and base_codewords[i]!=fault_codewords[i] and fault_codewords[i]!=None:
stats.inc(self._incorrect_not_none,dflt=np.zeros((len(base_codewords),)),coords=i)
if(fault_codewords==base_codewords):
stats.inc(self._correct_key)
if self._initial_dna_attr:
DNAErrorProbe._decode(self,s) #call to the dna error rate analysis
else:
stats.inc(self._incorrect_key)
stats.inc(self._strands_seen_key)
return s
A main point to note here is how the probe takes a snapshot of the byte strand at encode time. This snapshot is stored in an attribute defined in the __init__
method. Although this attribute is not a part of the original BaseDNA
attributes, the fault injection infrastructure has built in support for copying these attributes over when decoding strands from fault injection. Also, note the use of the stats
object, which is the main object for storing counters that track events, which can range from integers, to arrays, to histograms, to dictionaries, which is covered in the next section.
Keeping Statistics
To keep track of statistics, one needs to do the following imports:
from dnastorage.util.stats import *
In this module is a global reference to dnastats
, named stats
. stats
is the main object for storing statistics. This object is global, so there should be only 1 per process. The following data structures are able to be used to gather statistics:
- numpy arrays
- basic scalars
- lists for histograms of statistics
- dictionaries for key-value histograms
In general, programmers should not need to worry about aggregating statistics from parallel processes used during sequencing and fault injection analysis, the surrounding infrastructure aggregates information together as appropriate before dumping to a file. A description of key methods that the user can use are given as follows, more details should be followed up with the code. Eventually there should be class level documentation that accompanies this documentation.
register_file
: takes in a statistic name and a file name, the given statistic will be dumped to the given file at persist time.register_hist
: takes in a statistic name, will calculate a numpy.histogram before persisting data. Useful for avoiding storing huge arrays for distribution analysis.inc
: increments a counter by a given name, counter can be any integer not just 1. dflt can be used to initialize counter/type of counter, e.g. numpy array. coords allows for coordinates to be provided for incrementing arrays that have dimensions.append
: Useful for appending values to a list. This in combination withregister_hist
helps build distributions.
DNA Strand Consolidation
The last component that needs to be mentioned are the components that consolidate multiple reads into 1 read. The general signature of a consolidation object is as follows:
class ConsolidationComponent(BaseCodec, <ConsolidationType>):
def __init__(self,...):
<ConsolidationType>.__init__(self)
BaseCodec.__init__(self)
#implement component's initialization
def _decode(self,strands):
out_array = []
#fill out_array with some strands
return out_array
The general idea of consolidation is simple, the input argument strands
is a list of strands and the consolidation process needs to return a list of DNA strands which should represent a subset of the input strands, as exampled by the out_array
variable. The <ConsolidationType>
should either be of type DNAConsolidate
or CWConsolidate
. The former consolidates DNA strands, and the latter consolidates identical Byte Strands. It is important to tag the consolidator with one of these types so that its placement in the decode pipeline is correct.