Source code for IoTPy.code.agents.element_agent

"""
This module consists of functions that operate on a single input stream
to produce a single output stream. These stream-functions encapsulate
functions that operate on standard data types such as integers. In this
module, the encapsulated function operates on a single element of the
input stream to produce a single element of the output stream.

Functions in the module:
   1. element_map_agent
   2. map_stream
   3. element_filter_agent
   4. filter_stream

Functions from stream to stream:
   1. map_stream is similar to map in Python except that it operates
      on streams rather than lists.
   2. filter_stream is similar to filter in Python except that it
      operates on streams rather than lists.

Agents:
   1. element_map_agent is the agent used by map_stream.
   2. element_filter_agent is the agent used by filter_stream
"""

import sys
import os

from ..agent import Agent, InList
from ..stream import Stream, StreamArray
from ..stream import _no_value, _multivalue, _close
from ..helper_functions.check_agent_parameter_types import *
from ..helper_functions.recent_values import recent_values

[docs]def element_map_agent(func, in_stream, out_stream, state=None, call_streams=None, name=None, *args, **kwargs): """ This agent maps the function func from its single input stream to its single output stream. Parameters ---------- func: function function from an element of the in_stream to an element of the out_stream. in_stream: Stream The single input stream of this agent out_stream: Stream The single output streams of the agent state: object The state of the agent call_streams: list of Stream The list of call_streams. A new value in any stream in this list causes a state transition of this agent. name: Str Name of the agent created by this function. Returns ------- Agent. The agent created by this function. Uses ---- * Agent * check_map_agent_arguments * check_num_args_in_func Used by ------- map_stream """ check_map_agent_arguments(func, in_stream, out_stream, call_streams, name) check_num_args_in_func(state, name, func, args, kwargs) # The transition function for this agent. def transition(in_lists, state): num_in_streams = 1 check_in_lists_type(name, in_lists, num_in_streams) in_list = in_lists[0] input_list = in_list.list[in_list.start:in_list.stop] # If the new input data is empty then return an empty list for # the single output stream, and leave the state and the starting # point for the single input stream unchanged. if not input_list or len(input_list) == 0: return ([[]], state, [in_list.start]) if state is None: output_list = [func(v, *args, **kwargs) for v in input_list] else: output_list = [[]]*len(input_list) for i in range(len(input_list)): output_list[i], state = func(input_list[i], state, *args, **kwargs) return ([output_list], state, [in_list.start+len(input_list)]) # Finished transition # Create agent return Agent([in_stream], [out_stream], transition, state, call_streams, name)
[docs]def map_stream(function, in_stream, state=None, *args, **kwargs): """ map_stream returns out_stream, a stream obtained by applying function to each element of the input stream, in_stream. Parameters ---------- function: function function from an element of the in_stream to an element of the out_stream. in_stream: Stream The single input stream of this agent state: object function operates on a state, args, and kwargs Returns ------- out_stream: Stream The output stream generated by map_stream Uses ------- * element_map_agent """ out_stream = Stream(function.__name__+in_stream.name) element_map_agent(function, in_stream, out_stream, state, None, None, *args, **kwargs) return out_stream
[docs]def element_filter_agent(func, in_stream, out_stream, state=None, call_streams=None, name=None, *args, **kwargs): """ This agent uses the boolean function func to filter its single input stream to produce a single output stream. Parameters ---------- func: function function from an element of the in_stream to Boolean. in_stream: Stream The single input stream of this agent out_stream: Stream The single output streams of the agent state: object The state of the agent call_streams: list of Stream The list of call_streams. A new value in any stream in this list causes a state transition of this agent. name: Str Name of the agent created by this function. Returns ------- Agent. The agent created by this function. Uses ---- * Agent * check_map_agent_arguments * check_num_args_in_func """ check_map_agent_arguments(func, in_stream, out_stream, call_streams, name) check_num_args_in_func(state, name, func, args, kwargs) # The transition function for this agent. def transition(in_lists, state): num_in_streams = 1 check_in_lists_type(name, in_lists, num_in_streams) in_list = in_lists[0] input_list = in_list.list[in_list.start:in_list.stop] # If the new input data is empty then return an empty list for # the single output stream, and leave the state and the starting # point for the single input stream unchanged. if not input_list or len(input_list) == 0: return ([[]], state, [in_list.start]) if state is None: output_list = [v for v in input_list if func(v, *args, **kwargs) ] else: output_list = [] for i in range(len(input_list)): boole, state = func(input_list[i], state, *args, **kwargs) if boole: output_list.append(input_list[i]) return ([output_list], state, [in_list.start+len(input_list)]) # Finished transition # Create agent return Agent([in_stream], [out_stream], transition, state, call_streams, name)
[docs]def filter_stream(function, in_stream, state=None, *args, **kwargs): """ filter_stream returns out_stream, a stream obtained by applying the filter function to the input stream, in_stream. Parameters ---------- function: function function from an element of the in_stream to Boolean. in_stream: Stream The single input stream of this agent state: object function operates on a state, args, and kwargs Returns ------- out_stream: Stream The output stream generated by map_stream Uses ---- element_filter_agent Used by ------- map_stream """ out_stream = Stream(function.__name__+in_stream.name) element_filter_agent(function, in_stream, out_stream, state, *args, **kwargs) return out_stream
#------------------------------------------------------------------------------------------------ # ELEMENT AGENT TESTS #------------------------------------------------------------------------------------------------
[docs]def test_element_agent(): v = Stream('v') w = Stream('w') x = Stream('x') y = Stream('y') z = Stream('z') # Test simple map using element_map_agent # func operates on an element of the input stream and returns an element of # the output stream. def double(v): return 2*v a = element_map_agent(func=double, in_stream=x, out_stream=y, name='a') x.extend(range(3)) assert recent_values(y) == [0, 2, 4] x.extend(range(3, 5, 1)) assert recent_values(y) == [0, 2, 4, 6, 8] ymap = map_stream(function=double, in_stream=x) assert recent_values(ymap) == recent_values(y) # Test filtering def filtering(v): return v > 2 yfilter = filter_stream(function=filtering, in_stream=x) assert recent_values(yfilter) == [3, 4] # Test map with state using element_map_agent # func operates on an element of the input stream and state and returns an # element of the output stream and the new state. def f(u, state): return u+state, state+2 b = element_map_agent(func=f, in_stream=x, out_stream=z, state=0, name='b') assert recent_values(z) == [0, 3, 6, 9, 12] bmap = map_stream(function=f, in_stream=x, state=0) assert recent_values(bmap) == recent_values(z) # Test map with call streams # The agent executes a state transition when a value is added to call_streams. c = element_map_agent(func=f, in_stream=x, out_stream=v, state=10, call_streams=[w], name='c') assert v.stop == 0 w.append(0) assert recent_values(v) == [10, 13, 16, 19, 22] # Test _no_value # func returns _no_value to indicate that no value # is placed on the output stream. def f_no_value(v): """ Filters out odd values """ if v%2: return _no_value else: return v no_value_stream = Stream(name='no_value_stream') no_value_agent = element_map_agent( func=f_no_value, in_stream=x, out_stream=no_value_stream, name='no_value_agent') assert recent_values(no_value_stream) == [0, 2, 4] no_value_map = map_stream(function=f_no_value, in_stream=x) assert recent_values(no_value_map) == recent_values(no_value_stream) # Test _multivalue # func returns _multivalue(output_list) to indicate that # the list of elements in output_list should be placed in the # output stream. def f_multivalue(v): if v%2: return _no_value else: return _multivalue([v, v*2]) multivalue_stream = Stream('multivalue_stream') multivalue_agent = element_map_agent( func=f_multivalue, in_stream=x, out_stream=multivalue_stream, name='multivalue_agent') assert recent_values(multivalue_stream) == [0, 0, 2, 4, 4, 8] multivalue_map = map_stream(function=f_multivalue, in_stream=x) assert recent_values(multivalue_map) == recent_values(multivalue_stream) # Test element_map_agent with args def f_args(x, multiplicand, addition): return x*multiplicand+addition in_args = Stream('in_args') out_args = Stream('out_args') a_args = element_map_agent(f_args, in_args, out_args, None, None, 'a_args', 2, 10) in_args.extend(range(3)) assert recent_values(out_args) == [10, 12, 14] in_args.extend(range(3, 5, 1)) assert recent_values(out_args) == [10, 12, 14, 16, 18] args_map = map_stream(f_args, in_args, None, 2, 10) # Test element_map_agent with kwargs in_kwargs = Stream('in_kwargs') out_kwargs = Stream('out_kwargs') a_kwargs = element_map_agent(func=f_args, in_stream=in_kwargs, out_stream=out_kwargs, name='a_args', multiplicand=2, addition=10) in_kwargs.extend(range(3)) assert recent_values(out_kwargs) == [10, 12, 14] in_kwargs.extend(range(3, 5, 1)) assert recent_values(out_kwargs) == [10, 12, 14, 16, 18] # Test element_map_agent with state and kwargs # func operates on an element of the input stream and state and returns an # element of the output stream and the new state. def f_map_args_kwargs(u, state, multiplicand, addend): return u*multiplicand+addend+state, state+2 out_stream_map_kwargs = Stream('out_stream_map_kwargs') aa_map_kwargs_agent = element_map_agent( func=f_map_args_kwargs, in_stream=x, out_stream=out_stream_map_kwargs, state=0, name='aa_map_kwargs_agent', multiplicand=2, addend=10) assert recent_values(out_stream_map_kwargs) == [10, 14, 18, 22, 26] # Test element_map_agent with state and args out_stream_map_args = Stream('out_stream_map_args') aa_map_args_agent = element_map_agent( f_map_args_kwargs, x, out_stream_map_args, 0, None, 'aa_map_args_agent', 2, 10) assert recent_values(out_stream_map_kwargs) == [10, 14, 18, 22, 26] # Test element_map_agent with state and args and kwargs out_stream_map_args_kwargs = Stream('out_stream_map_args_kwargs') aa_map_args_kwargs_agent = element_map_agent( f_map_args_kwargs, x, out_stream_map_args_kwargs, 0, None, 'aa_map_args_kwargs_agent', 2, addend=10) assert recent_values(out_stream_map_args_kwargs) == [10, 14, 18, 22, 26] return
if __name__ == '__main__': test_element_agent()