Source code for IoTPy.code.agents.source

"""
This module consists of sources. A source is a function that generates
a single stream. These functions are executed in separate threads. A
function returns the thread and the output stream that are generated.

Functions in the module:
   1. make_outstream_and_thread is a helper function used by the other
      functions in this module.   
   2. function_to_stream generates a stream whose values are specified
      by successive call to a function.
   3. file_to_stream generates a stream from a file specified by a
      file name.
   4. list_to_stream generates a stream from a list
   5. queue_to_stream generates a stream by waiting for messages to
      arrive on a queue and then appending the messages to a stream.

"""


import sys
import os
# sys.path.append(os.path.abspath("../"))

from ..agent import Agent, InList
from ..stream import Stream, StreamArray
from ..stream import _no_value, _multivalue, _close
from ..helper_functions.check_agent_parameter_types import *
from ..helper_functions.recent_values import recent_values
import random
import time
import threading
import multiprocessing

[docs]def make_outstream_and_thread( function, time_interval, stream_length, state, args, kwargs, run_infinite_stateless, run_infinite_stateful, run_finite_stateless, run_finite_stateful, run_push_stateless, run_push_stateful, out_stream=None, push=False ): """ Helper function called by the other functions. Parameters ---------- function: function This function is applied to each line read from the source and the result of this function is appended to the output stream. time_interval: float or int, time in seconds An element is placed on an output stream every time_interval seconds. (See the functions that call this one for out_stream.) stream_length: int (optional) The source terminates after stream_length values are placed on the output stream. If stream_length is None then the source function terminates when an exception is raised or the source has no more data. state: object (optional) The state of the function. This is a parameter of function args: list list of positional arguments of function kwargs: dict dict of keywork arguments of function run_infinite_stateless, run_infinite_stateful: functions run_finite_stateless, run_finite_stateful: functions run_push_stateless, run_push_stateful: functions Target functions of threads out_stream: Stream The output stream (default is None) push: boolean Whether the source will push values to the stream (default is False) This function structures creates threads and varies the target function for the threads for the following 6 cases: 1. stream_length and state are None and push is False: target is run_infinite_stateless 2. stream_length is None and state is not None and push is False: target is run_infinite_stateful 3. stream_length is not None and state is None and push is False: target is run_finite_stateless 4. stream_length is not None and state is not None and push is False: target is run_finite_stateful 5. push is True and state is None: target is run_push_stateless 6. push is True and state is not None: target is run_push_stateful """ if not out_stream: out_stream = Stream() if push: if state == None: thread = threading.Thread( target=run_push_stateless, args=(out_stream, function, args, kwargs)) else: thread = threading.Thread( target=run_push_stateful, args=(out_stream, function, state, args, kwargs)) elif stream_length == None: if state == None: thread = threading.Thread( target=run_infinite_stateless, args=(out_stream, function, time_interval, args, kwargs)) else: thread = threading.Thread( target=run_infinite_stateful, args=(out_stream, function, time_interval, state, args, kwargs)) thread.daemon = True else: if state == None: thread = threading.Thread( target=run_finite_stateless, args=(out_stream, function, time_interval, stream_length, args, kwargs)) else: thread = threading.Thread( target=run_finite_stateful, args=(out_stream, function, time_interval, stream_length, state, args, kwargs)) thread.daemon = True return thread, out_stream
[docs]def function_to_stream(function, time_interval=0, stream_length=None, state=None, out_stream=None, push=False, *args, **kwargs): """ Calls a function and places returned values on a stream called out_stream. Parameters ---------- function: function This function is called and the result of this function is appended to the output stream. time_interval: float or int (optional), time in seconds An element is placed on the output stream every time_interval seconds. stream_length: int (optional) function_to_stream terminates after stream_length values are placed on the output stream. If stream_length is None then function_to_stream terminates only when an exception is raised. state: object (optional) The state of the function; an argument of function. out_stream: Stream (optional) The output stream push: boolean (optional) Whether the source will push values to the stream args: list Positional arguments of function kwargs: dict Keyword arguments of function Returns: thread, out_stream ------- thread: Thread.thread The thread created by this function. The thread must be started and thread.join() may have to be called to ensure that the thread terminates execution. out_stream: Stream The stream created by this function. """ def run_infinite_stateless(out_stream, function, time_interval, args, kwargs): while True: try: output_increment = function(*args, **kwargs) except: break out_stream.append(output_increment) time.sleep(time_interval) return def run_infinite_stateful(out_stream, function, time_interval, state, args, kwargs): while True: try: output_increment, state = function( state, *args, **kwargs) except: break out_stream.append(output_increment) time.sleep(time_interval) return def run_finite_stateless(out_stream, function, time_interval, stream_length, args, kwargs): for _ in range(stream_length): try: output_increment = function(*args, **kwargs) except: break out_stream.append(output_increment) time.sleep(time_interval) return def run_finite_stateful( out_stream, function, time_interval, stream_length, state, args, kwargs): for _ in range(stream_length): try: output_increment, state = function( state, *args, **kwargs) except: print 'run_finite_stateful exception' break out_stream.append(output_increment) time.sleep(time_interval) return def run_push_stateful(out_stream, function, state, args, kwargs): function(out_stream, state, *args, **kwargs) return def run_push_stateless(out_stream, function, args, kwargs): function(out_stream, *args, **kwargs) return return make_outstream_and_thread( function, time_interval, stream_length, state, args, kwargs, run_infinite_stateless, run_infinite_stateful, run_finite_stateless, run_finite_stateful, run_push_stateless, run_push_stateful, out_stream, push)
[docs]def file_to_stream( function, filename, time_interval=None, stream_length=None, state=None, *args, **kwargs): """ Places lines in a file on a stream. Parameters ---------- function: function This function is applied to each line read from the file and the result of this function is appended to the output stream. filename: str The name of the file that is read. time_interval: float or int (optional) The next line of the file is read every time_interval seconds. stream_length: int (optional) file_to_stream terminates after stream_length values are placed on the output stream. If stream_length is None then the file_to_stream terminates when the entire file is read. state: object (optional) The state of the function; an argument of function. args: list Positional arguments of function kwargs: dict Keyword arguments of function Returns: thread, out_stream ------- thread: Thread.thread The thread created by this function. The thread must be started and thread.join() may have to be called to ensure that the thread terminates execution. out_stream: Stream The stream created by this function. """ def run_infinite_stateless(out_stream, function, time_interval, args, kwargs): with open(filename, 'r') as input_file: for line in input_file: out_stream.append(function(line, *args, **kwargs)) time.sleep(time_interval) return def run_infinite_stateful(out_stream, function, time_interval, state, args, kwargs): with open(filename, 'r') as input_file: for line in input_file: out_stream.append( function(line, state, *args, **kwargs)) time.sleep(time_interval) return def run_finite_stateless(out_stream, function, time_interval, stream_length, args, kwargs): num_lines_read = 0 with open(filename, 'r') as input_file: for line in input_file: out_stream.append(function(line, *args, **kwargs)) num_lines_read += 1 if num_lines_read >= stream_length: break time.sleep(time_interval) return def run_finite_stateful(out_stream, function, time_interval, stream_length, state, args, kwargs): num_lines_read = 0 with open(filename, 'r') as input_file: for line in input_file: out_stream.append( function(line, state, *args, **kwargs)) num_lines_read += 1 if num_lines_read >= stream_length: break time.sleep(time_interval) return return make_outstream_and_thread( function, time_interval, stream_length, state, args, kwargs, run_infinite_stateless, run_infinite_stateful, run_finite_stateless, run_finite_stateful)
[docs]def list_to_stream(function, in_list, time_interval=None, stream_length=None, state=None, *args, **kwargs): """ Places elements in a list on a stream. Parameters ---------- function: function This function is applied to each element in the list and the result of this function is appended to the output stream. in_list: list The list that is read. out_stream: Stream The output stream on which messages are placed. time_interval: float or int (optional) The next line of the file is read every time_interval seconds. stream_length: int (optional) file_to_stream terminates after stream_length values are placed on the output stream. If stream_length is None then the file_to_stream terminates when the entire file is read. state: object (optional) The state of the function; an argument of function. args: list Positional arguments of function kwargs: dict Keyword arguments of function Returns: thread, out_stream ------- thread: Thread.thread The thread created by this function. The thread must be started and thread.join() may have to be called to ensure that the thread terminates execution. out_stream: Stream The stream created by this function. """ def run_infinite_stateless(out_stream, function, time_interval, args, kwargs): for element in in_list: out_stream.append(function(element, *args, **kwargs)) time.sleep(time_interval) return def run_infinite_stateful(out_stream, function, time_interval, state, args, kwargs): for element in in_list: out_stream.append( function(element, state, *args, **kwargs)) time.sleep(time_interval) return def run_finite_stateless(out_stream, function, time_interval, stream_length, args, kwargs): num_elements_read = 0 for element in in_list: out_stream.append(function(element, *args, **kwargs)) if num_elements_read >= stream_length: break time.sleep(time_interval) return def run_finite_stateful(out_stream, function, time_interval, stream_length, state, args, kwargs): num_elements_read = 0 for element in in_list: out_stream.append( function(element, state, *args, **kwargs)) if num_elements_read >= stream_length: break time.sleep(time_interval) return return make_outstream_and_thread( function, time_interval, stream_length, state, args, kwargs, run_infinite_stateless, run_infinite_stateful, run_finite_stateless, run_finite_stateful)
[docs]def queue_to_stream(function, queue, stop_message=None, state=None, *args, **kwargs): """ Reads queue and places messages it receives from the queue on a stream called out_stream. Parameters ---------- function: function This function is applied to each element read from the queue and the result of this function is appended to the output stream. queue: multiprocessing.Queue or Queue.Queue The queue from which messages are obtained. stop_message: object When a stop_message is received from the queue the thread terminates execution. state: object (optional) The state of the function; an argument of function. args: list Positional arguments of function kwargs: dict Keyword arguments of function Returns: thread, out_stream ------- thread: Thread.thread The thread created by this function. The thread must be started and thread.join() may have to be called to ensure that the thread terminates execution. out_stream: Stream The stream created by this function. """ out_stream = Stream('queue to stream') def queue_to_stream_stateless_run( function, queue, out_stream, stop_message, args, kwargs): no_stop_message_received = True while no_stop_message_received: while not queue.empty(): message = queue.get() if stop_message is not None: if message == stop_message: no_stop_message_received = False break out_stream.append( function(message, *args, **kwargs)) return def queue_to_stream_stateful_run( function, queue, out_stream, stop_message, state, args, kwargs): no_stop_message_received = True while no_stop_message_received: while not queue.empty(): message = queue.get() if stop_message is not None: if message == stop_message: no_stop_message_received = False break output_element, state = function( message, state, *args, **kwargs) out_stream.append(output_element) return if state is None: thread = threading.Thread( target=queue_to_stream_stateless_run, args=(function, queue, out_stream, stop_message, args, kwargs)) else: thread = threading.Thread( target=queue_to_stream_stateful_run, args=(function, queue, out_stream, stop_message, state, args, kwargs)) return thread, out_stream
#------------------------------------------------------------------------------------------------ # TESTS #------------------------------------------------------------------------------------------------
[docs]def test(): # Test function_to_stream def clock_ticks(state, max_value=2**30): return state, (state+1)%max_value # In the following call, state = 0 and max_value = 4 clock_thread, clock = function_to_stream( clock_ticks, 0.001, 16, 0, max_value=4) clock_thread.start() clock_thread.join() rrrr = range(4) for _ in range(3): rrrr.extend(range(4)) assert recent_values(clock) == rrrr # Test file_to_stream with open('test_file.txt', 'a') as the_file: mm = 4 for i in range(mm): the_file.write(str(i) + '\n') file_thread, file_stream = file_to_stream( function=int, filename='test_file.txt', time_interval=0.01) file_thread.start() file_thread.join() assert recent_values(file_stream) == range(mm) with open('test_file.txt', 'w'): pass def f(v, addend): return int(v)+addend add_constant=10 mm = 4 with open('test_file.txt', 'a') as the_file: for i in range(mm): the_file.write(str(i) + '\n') file_addend_thread, file_addend_stream = file_to_stream( function=f, filename='test_file.txt', time_interval=0.01, addend=add_constant) file_addend_thread.start() file_addend_thread.join() assert recent_values(file_addend_stream) == [ v+add_constant for v in range(mm)] with open('test_file.txt', 'w'): pass # Test list_to_stream list_thread, list_stream = list_to_stream( function=f, in_list=range(mm), time_interval=0.01, addend=add_constant) list_thread.start() list_thread.join() assert recent_values(file_addend_stream) == recent_values(list_stream) # Test queue_to_stream import Queue queue = Queue.Queue() for i in range(mm): queue.put(i) queue.put('stop') queue_thread, queue_out_stream = queue_to_stream( function=f, queue=queue, stop_message='stop', addend=add_constant) queue_thread.start() queue_thread.join() assert recent_values(queue_out_stream) == recent_values(file_addend_stream) return
if __name__ == '__main__': test()