Source code for IoTPy.tools.multicore.sink

import IoTPy.tools.assemble


[docs]def addToQueue(value, queue, part_name, name, index=None): """ Adds a value to a queue Parameters ---------- value : object Value to send queue : multiprocessing.Queue Queue to add value part_name : str Name of the part to send to name : str Name of the parameter index : int, optional Index of the parameter (the default is None) """ if index is None: print "Adding {0} to queue for {1} for {2}".format( value, part_name, name) queue.put([name, value]) else: print "Adding {0} to queue for {1} for {2},{3}".format( value, part_name, name, index) queue.put([name, index, value])
[docs]def sink(queues, fields, dict_parts): """ Create sinks for each stream in fields Parameters ---------- queues : dict Dict of queues for each part fields : list List of field names dict_parts : dict Dict containing values for fields """ for field in fields: # Typed field if isinstance(field, list): field_type, field_name = field if field_type == "stream": # List of streams if isinstance(dict_parts[field_name], list): for stream in dict_parts[field_name]: # Iterate over stream destinations for des in stream.dest: queue = queues[des.name] parameters = [queue] + des.to_list() args = { "in": stream.value, "function": addToQueue, "parameters": parameters} IoTPy.tools.assemble.assemble( "sink", "sink", __name__, **args) else: # Iterate over stream destinations for des in dict_parts[field_name].dest: queue = queues[des.name] parameters = [queue] + des.to_list() args = { "in": dict_parts[field_name].value, "function": addToQueue, "parameters": parameters} IoTPy.tools.assemble.assemble( "sink", "sink", __name__, **args)