4. Your first blocks¶
If you’re lucky, all the bits and pieces you need to build your pipeline
are already blocks in bifrost. More likely, you have a particular
algorithm you’d like to plug in. For this, you’ll need to create a new
block. This basically comes down to definining a class with two
mandatory functions: on_sequence()
, which is called whenever a new
sequence is started, and on_data()
, which is called whenever there
is new data ready at the ring buffer.
4.1. TransformBlock¶
A TransformBlock
reads data from one ring buffer, does something to
it, and then writes it out. Take this example, which adds a
runtime-specified value to every element in the ring buffer:
class UselessAddBlock(bfp.TransformBlock):
def __init__(self, iring, n_to_add, *args, **kwargs):
super(UselessAddBlock, self).__init__(iring, *args, **kwargs)
self.n_to_add = n_to_add
def on_sequence(self, iseq):
ohdr = deepcopy(iseq.header)
ohdr["name"] += "_with_added_value"
return ohdr
def on_data(self, ispan, ospan):
in_nframe = ispan.nframe
out_nframe = in_nframe
idata = ispan.data
odata = ospan.data
odata[...] = idata + self.n_to_add
return out_nframe
This is a new class that subclasses TransformBlock
. First, let’s
look at the __init__
method. This takes two parameters:
iring
- an input ring buffer. This argument is required, but bifrost handles the setup of the ring buffers.n_to_add
- this is a new argument that we’ve added ourselves.
The super(UselessAddBlock, self)
call passes the iring
, and
optional *args
and **kwargs
on to the parent class for
initialization.
Next, we have an on_sequence()
method that is called whenever a new
sequence arrives. For example, reading a new file may trigger a new
sequence, with new metadata in the header. The on_sequence()
method
requires an iseq
argument, and needs to output its own sequence
header. The deepcopy
is (currently) required to make sure the
original dictionary isn’t passed on by accident. Note that all we are
doing here is changing the name by appending a string.
Finally, there’s the on_data()
method that requires an ispan
and
ospan
argument, for reading and writing data in and out of the ring
buffers. on_data()
needs to return the number of frames in the
output span.
4.2. SinkBlock¶
A SinkBlock
also needs an on_sequence()
and on_data()
method, but doesn’t need to output anything, so neither method should
return anything. Here is a simple block to print stuff to screen:
class PrintStuffBlock(bfp.SinkBlock):
def __init__(self, iring, *args, **kwargs):
super(PrintStuffBlock, self).__init__(iring, *args, **kwargs)
self.n_iter = 0
def on_sequence(self, iseq):
print("[%s]" % datetime.now())
print(iseq.name)
pprint(iseq.header)
self.n_iter = 0
def on_data(self, ispan):
now = datetime.now()
if self.n_iter % 100 == 0:
print("[%s] %s" % (now, ispan.data))
self.n_iter += 1
Note that on_data()
shall not have an ospan
argument!
4.3. SourceBlock¶
The SourceBlock
is a little trickier to get up and going as it
requires some fun with context
managers.
The source block also has the important task of setting up all the
metadata required to make bifrost
work – a little extra effort at
the start allows useful metadata to propagate through the full pipeline,
simplifying future blocks.
Here is the source code for the
binary_io
block to read from data saved using the useful numpy.tofile()
:
class BinaryFileRead(object):
""" Simple file-like reading object for pipeline testing
Args:
filename (str): Name of file to open
dtype (np.dtype or str): datatype of data, e.g. float32. This should be a *numpy* dtype,
not a bifrost.ndarray dtype (eg. float32, not f32)
gulp_size (int): How much data to read per gulp, (i.e. sub-array size)
"""
def __init__(self, filename, gulp_size, dtype):
super(BinaryFileRead, self).__init__()
self.file_obj = open(filename, 'r')
self.dtype = dtype
self.gulp_size = gulp_size
def read(self):
d = np.fromfile(self.file_obj, dtype=self.dtype, count=self.gulp_size)
return d
def __enter__(self):
return self
def close(self):
pass
def __exit__(self, type, value, tb):
self.close()
class BinaryFileReadBlock(bfp.SourceBlock):
""" Block for reading binary data from file and streaming it into a bifrost pipeline
Args:
filenames (list): A list of filenames to open
gulp_size (int): Number of elements in a gulp (i.e. sub-array size)
gulp_nframe (int): Number of frames in a gulp. (Ask Ben / Miles for good explanation)
dtype (bifrost dtype string): dtype, e.g. f32, cf32
"""
def __init__(self, filenames, gulp_size, gulp_nframe, dtype, *args, **kwargs):
super(BinaryFileReadBlock, self).__init__(filenames, gulp_nframe, *args, **kwargs)
self.dtype = dtype
self.gulp_size = gulp_size
def create_reader(self, filename):
print "Loading %s" % filename
# Do a lookup on bifrost datatype to numpy datatype
dcode = self.dtype.rstrip('0123456789')
nbits = int(self.dtype[len(dcode):])
np_dtype = name_nbit2numpy(dcode, nbits)
return BinaryFileRead(filename, self.gulp_size, np_dtype)
def on_sequence(self, ireader, filename):
ohdr = {'name': filename,
'_tensor': {
'dtype': self.dtype,
'shape': [-1, self.gulp_size],
},
}
return [ohdr]
def on_data(self, reader, ospans):
indata = reader.read()
if indata.shape[0] == self.gulp_size:
ospans[0].data[0] = indata
return [1]
else:
return [0]
As bifrost
requires a reader with baked-in context management, we
have explicitly created a BinaryFileRead
object that has an
__enter__
and __exit__
method; these are mandatory. This also
has a crucially important read()
function, to read data into the
ring.
The second class, BinaryFileReadBlock
is doing the reading, and
again has an on_sequence()
and on_data()
method. There is also a
mandatory create_reader
method, that does some setup, in this case
of the file handler.
4.3.1. The _tensor dict¶
The on_sequence()
method has an important job to setup the header
metadata. This requires a mandatory (and unique name
) and making a
_tensor
dictionary that describes the dimensions and datatype of the
data in each span:
ohdr = {'name': filename,
'_tensor': {
'dtype': self.dtype,
'shape': [-1, self.gulp_size],
},
}
4.4. A complete pipeline¶
Putting it all together, we have this complete pipeline below, which
reads from a file, adds something to it with out UselessAddBlock
,
and then prints out some diagnostic info with our PrintStuffBlock
.
This is also available in the
testbench
directory in the repository.
"""
# your_first_block.py
This testbench initializes a simple bifrost pipeline that reads from a binary file,
and then writes the data to an output file.
"""
import os
import numpy as np
import bifrost.pipeline as bfp
from bifrost.blocks import BinaryFileReadBlock
import glob
from datetime import datetime
from copy import deepcopy
from pprint import pprint
class UselessAddBlock(bfp.TransformBlock):
def __init__(self, iring, n_to_add, *args, **kwargs):
super(UselessAddBlock, self).__init__(iring, *args, **kwargs)
self.n_to_add = n_to_add
def on_sequence(self, iseq):
ohdr = deepcopy(iseq.header)
ohdr["name"] += "_with_added_value"
return ohdr
def on_data(self, ispan, ospan):
in_nframe = ispan.nframe
out_nframe = in_nframe
idata = ispan.data
odata = ospan.data
odata[...] = idata + self.n_to_add
return out_nframe
class PrintStuffBlock(bfp.SinkBlock):
def __init__(self, iring, *args, **kwargs):
super(PrintStuffBlock, self).__init__(iring, *args, **kwargs)
self.n_iter = 0
def on_sequence(self, iseq):
print("[%s]" % datetime.now())
print(iseq.name)
pprint(iseq.header)
self.n_iter = 0
def on_data(self, ispan):
now = datetime.now()
if self.n_iter % 100 == 0:
print("[%s] %s" % (now, ispan.data))
self.n_iter += 1
if __name__ == "__main__":
# Setup pipeline
filenames = sorted(glob.glob('testdata/sin_data*.bin'))
b_read = BinaryFileReadBlock(filenames, 32768, 1, 'f32')
b_add = UselessAddBlock(b_read, n_to_add=100)
b_print = PrintStuffBlock(b_read)
b_print2 = PrintStuffBlock(b_add)
# Run pipeline
pipeline = bfp.get_default_pipeline()
print pipeline.dot_graph()
pipeline.run()