Your first blocks ================= If you're lucky, all the bits and pieces you need to build your pipeline are already blocks in bifrost. More likely, you have a particular algorithm you'd like to plug in. For this, you'll need to create a new block. This basically comes down to definining a class with two mandatory functions: ``on_sequence()``, which is called whenever a new sequence is started, and ``on_data()``, which is called whenever there is new data ready at the ring buffer. TransformBlock -------------- A ``TransformBlock`` reads data from one ring buffer, does something to it, and then writes it out. Take this example, which adds a runtime-specified value to every element in the ring buffer: .. code:: python class UselessAddBlock(bfp.TransformBlock): def __init__(self, iring, n_to_add, *args, **kwargs): super(UselessAddBlock, self).__init__(iring, *args, **kwargs) self.n_to_add = n_to_add def on_sequence(self, iseq): ohdr = deepcopy(iseq.header) ohdr["name"] += "_with_added_value" return ohdr def on_data(self, ispan, ospan): in_nframe = ispan.nframe out_nframe = in_nframe idata = ispan.data odata = ospan.data odata[...] = idata + self.n_to_add return out_nframe This is a new class that subclasses ``TransformBlock``. First, let's look at the ``__init__`` method. This takes two parameters: - ``iring`` - an input ring buffer. This argument is required, but bifrost handles the setup of the ring buffers. - ``n_to_add`` - this is a new argument that we've added ourselves. The ``super(UselessAddBlock, self)`` call passes the ``iring``, and optional ``*args`` and ``**kwargs`` on to the parent class for initialization. Next, we have an ``on_sequence()`` method that is called whenever a new sequence arrives. For example, reading a new file may trigger a new sequence, with new metadata in the header. The ``on_sequence()`` method requires an ``iseq`` argument, and needs to output its own sequence header. The ``deepcopy`` is (currently) required to make sure the original dictionary isn't passed on by accident. Note that all we are doing here is changing the name by appending a string. Finally, there's the ``on_data()`` method that requires an ``ispan`` and ``ospan`` argument, for reading and writing data in and out of the ring buffers. ``on_data()`` needs to return the number of frames in the output span. SinkBlock --------- A ``SinkBlock`` also needs an ``on_sequence()`` and ``on_data()`` method, but doesn't need to output anything, so neither method should return anything. Here is a simple block to print stuff to screen: .. code:: python class PrintStuffBlock(bfp.SinkBlock): def __init__(self, iring, *args, **kwargs): super(PrintStuffBlock, self).__init__(iring, *args, **kwargs) self.n_iter = 0 def on_sequence(self, iseq): print("[%s]" % datetime.now()) print(iseq.name) pprint(iseq.header) self.n_iter = 0 def on_data(self, ispan): now = datetime.now() if self.n_iter % 100 == 0: print("[%s] %s" % (now, ispan.data)) self.n_iter += 1 Note that ``on_data()`` shall not have an ``ospan`` argument! SourceBlock ----------- The ``SourceBlock`` is a little trickier to get up and going as it requires some fun with `context managers `__. The source block also has the important task of setting up all the metadata required to make ``bifrost`` work -- a little extra effort at the start allows useful metadata to propagate through the full pipeline, simplifying future blocks. Here is the source code for the `binary\_io `__ block to read from data saved using the useful ``numpy.tofile()``: .. code:: python class BinaryFileRead(object): """ Simple file-like reading object for pipeline testing Args: filename (str): Name of file to open dtype (np.dtype or str): datatype of data, e.g. float32. This should be a *numpy* dtype, not a bifrost.ndarray dtype (eg. float32, not f32) gulp_size (int): How much data to read per gulp, (i.e. sub-array size) """ def __init__(self, filename, gulp_size, dtype): super(BinaryFileRead, self).__init__() self.file_obj = open(filename, 'r') self.dtype = dtype self.gulp_size = gulp_size def read(self): d = np.fromfile(self.file_obj, dtype=self.dtype, count=self.gulp_size) return d def __enter__(self): return self def close(self): pass def __exit__(self, type, value, tb): self.close() class BinaryFileReadBlock(bfp.SourceBlock): """ Block for reading binary data from file and streaming it into a bifrost pipeline Args: filenames (list): A list of filenames to open gulp_size (int): Number of elements in a gulp (i.e. sub-array size) gulp_nframe (int): Number of frames in a gulp. (Ask Ben / Miles for good explanation) dtype (bifrost dtype string): dtype, e.g. f32, cf32 """ def __init__(self, filenames, gulp_size, gulp_nframe, dtype, *args, **kwargs): super(BinaryFileReadBlock, self).__init__(filenames, gulp_nframe, *args, **kwargs) self.dtype = dtype self.gulp_size = gulp_size def create_reader(self, filename): print "Loading %s" % filename # Do a lookup on bifrost datatype to numpy datatype dcode = self.dtype.rstrip('0123456789') nbits = int(self.dtype[len(dcode):]) np_dtype = name_nbit2numpy(dcode, nbits) return BinaryFileRead(filename, self.gulp_size, np_dtype) def on_sequence(self, ireader, filename): ohdr = {'name': filename, '_tensor': { 'dtype': self.dtype, 'shape': [-1, self.gulp_size], }, } return [ohdr] def on_data(self, reader, ospans): indata = reader.read() if indata.shape[0] == self.gulp_size: ospans[0].data[0] = indata return [1] else: return [0] As ``bifrost`` requires a reader with baked-in context management, we have explicitly created a ``BinaryFileRead`` object that has an ``__enter__`` and ``__exit__`` method; these are *mandatory*. This also has a crucially important ``read()`` function, to read data into the ring. The second class, ``BinaryFileReadBlock`` is doing the reading, and again has an ``on_sequence()`` and ``on_data()`` method. There is also a mandatory ``create_reader`` method, that does some setup, in this case of the file handler. The \_tensor dict ~~~~~~~~~~~~~~~~~ The ``on_sequence()`` method has an important job to setup the header metadata. This requires a mandatory (and *unique* ``name``) and making a ``_tensor`` dictionary that describes the dimensions and datatype of the data in each span: .. code:: python ohdr = {'name': filename, '_tensor': { 'dtype': self.dtype, 'shape': [-1, self.gulp_size], }, } A complete pipeline ------------------- Putting it all together, we have this complete pipeline below, which reads from a file, adds something to it with out ``UselessAddBlock``, and then prints out some diagnostic info with our ``PrintStuffBlock``. This is also available in the `testbench `__ directory in the repository. .. code:: python """ # your_first_block.py This testbench initializes a simple bifrost pipeline that reads from a binary file, and then writes the data to an output file. """ import os import numpy as np import bifrost.pipeline as bfp from bifrost.blocks import BinaryFileReadBlock import glob from datetime import datetime from copy import deepcopy from pprint import pprint class UselessAddBlock(bfp.TransformBlock): def __init__(self, iring, n_to_add, *args, **kwargs): super(UselessAddBlock, self).__init__(iring, *args, **kwargs) self.n_to_add = n_to_add def on_sequence(self, iseq): ohdr = deepcopy(iseq.header) ohdr["name"] += "_with_added_value" return ohdr def on_data(self, ispan, ospan): in_nframe = ispan.nframe out_nframe = in_nframe idata = ispan.data odata = ospan.data odata[...] = idata + self.n_to_add return out_nframe class PrintStuffBlock(bfp.SinkBlock): def __init__(self, iring, *args, **kwargs): super(PrintStuffBlock, self).__init__(iring, *args, **kwargs) self.n_iter = 0 def on_sequence(self, iseq): print("[%s]" % datetime.now()) print(iseq.name) pprint(iseq.header) self.n_iter = 0 def on_data(self, ispan): now = datetime.now() if self.n_iter % 100 == 0: print("[%s] %s" % (now, ispan.data)) self.n_iter += 1 if __name__ == "__main__": # Setup pipeline filenames = sorted(glob.glob('testdata/sin_data*.bin')) b_read = BinaryFileReadBlock(filenames, 32768, 1, 'f32') b_add = UselessAddBlock(b_read, n_to_add=100) b_print = PrintStuffBlock(b_read) b_print2 = PrintStuffBlock(b_add) # Run pipeline pipeline = bfp.get_default_pipeline() print pipeline.dot_graph() pipeline.run()