"""
This module consists of functions that operate on a single input stream
to produce a single output stream. These stream-functions encapsulate
functions that operate on standard data types such as integers. In this
module, the encapsulated function operates on a single element of the
input stream to produce a single element of the output stream.
Functions in the module:
1. element_map_agent
2. map_stream
3. element_filter_agent
4. filter_stream
Functions from stream to stream:
1. map_stream is similar to map in Python except that it operates
on streams rather than lists.
2. filter_stream is similar to filter in Python except that it
operates on streams rather than lists.
Agents:
1. element_map_agent is the agent used by map_stream.
2. element_filter_agent is the agent used by filter_stream
"""
import sys
import os
from ..agent import Agent, InList
from ..stream import Stream, StreamArray
from ..stream import _no_value, _multivalue, _close
from ..helper_functions.check_agent_parameter_types import *
from ..helper_functions.recent_values import recent_values
[docs]def element_map_agent(func, in_stream, out_stream, state=None, call_streams=None, name=None,
*args, **kwargs):
"""
This agent maps the function func from its single input stream to its
single output stream.
Parameters
----------
func: function
function from an element of the in_stream to an element of the out_stream.
in_stream: Stream
The single input stream of this agent
out_stream: Stream
The single output streams of the agent
state: object
The state of the agent
call_streams: list of Stream
The list of call_streams. A new value in any stream in this
list causes a state transition of this agent.
name: Str
Name of the agent created by this function.
Returns
-------
Agent.
The agent created by this function.
Uses
----
* Agent
* check_map_agent_arguments
* check_num_args_in_func
Used by
-------
map_stream
"""
check_map_agent_arguments(func, in_stream, out_stream, call_streams, name)
check_num_args_in_func(state, name, func, args, kwargs)
# The transition function for this agent.
def transition(in_lists, state):
num_in_streams = 1
check_in_lists_type(name, in_lists, num_in_streams)
in_list = in_lists[0]
input_list = in_list.list[in_list.start:in_list.stop]
# If the new input data is empty then return an empty list for
# the single output stream, and leave the state and the starting
# point for the single input stream unchanged.
if not input_list or len(input_list) == 0:
return ([[]], state, [in_list.start])
if state is None:
output_list = [func(v, *args, **kwargs) for v in input_list]
else:
output_list = [[]]*len(input_list)
for i in range(len(input_list)):
output_list[i], state = func(input_list[i], state, *args, **kwargs)
return ([output_list], state, [in_list.start+len(input_list)])
# Finished transition
# Create agent
return Agent([in_stream], [out_stream], transition, state, call_streams, name)
[docs]def map_stream(function, in_stream, state=None, *args, **kwargs):
"""
map_stream returns out_stream, a stream obtained by applying function to each
element of the input stream, in_stream.
Parameters
----------
function: function
function from an element of the in_stream to an element of the out_stream.
in_stream: Stream
The single input stream of this agent
state: object
function operates on a state, args, and kwargs
Returns
-------
out_stream: Stream
The output stream generated by map_stream
Uses
-------
* element_map_agent
"""
out_stream = Stream(function.__name__+in_stream.name)
element_map_agent(function, in_stream, out_stream, state,
None, None,
*args, **kwargs)
return out_stream
[docs]def element_filter_agent(func, in_stream, out_stream, state=None, call_streams=None, name=None,
*args, **kwargs):
"""
This agent uses the boolean function func to filter its single input stream
to produce a single output stream.
Parameters
----------
func: function
function from an element of the in_stream to Boolean.
in_stream: Stream
The single input stream of this agent
out_stream: Stream
The single output streams of the agent
state: object
The state of the agent
call_streams: list of Stream
The list of call_streams. A new value in any stream in this
list causes a state transition of this agent.
name: Str
Name of the agent created by this function.
Returns
-------
Agent.
The agent created by this function.
Uses
----
* Agent
* check_map_agent_arguments
* check_num_args_in_func
"""
check_map_agent_arguments(func, in_stream, out_stream, call_streams, name)
check_num_args_in_func(state, name, func, args, kwargs)
# The transition function for this agent.
def transition(in_lists, state):
num_in_streams = 1
check_in_lists_type(name, in_lists, num_in_streams)
in_list = in_lists[0]
input_list = in_list.list[in_list.start:in_list.stop]
# If the new input data is empty then return an empty list for
# the single output stream, and leave the state and the starting
# point for the single input stream unchanged.
if not input_list or len(input_list) == 0:
return ([[]], state, [in_list.start])
if state is None:
output_list = [v for v in input_list if func(v, *args, **kwargs) ]
else:
output_list = []
for i in range(len(input_list)):
boole, state = func(input_list[i], state, *args, **kwargs)
if boole: output_list.append(input_list[i])
return ([output_list], state, [in_list.start+len(input_list)])
# Finished transition
# Create agent
return Agent([in_stream], [out_stream], transition, state, call_streams, name)
[docs]def filter_stream(function, in_stream, state=None, *args, **kwargs):
"""
filter_stream returns out_stream, a stream obtained by applying the filter function
to the input stream, in_stream.
Parameters
----------
function: function
function from an element of the in_stream to Boolean.
in_stream: Stream
The single input stream of this agent
state: object
function operates on a state, args, and kwargs
Returns
-------
out_stream: Stream
The output stream generated by map_stream
Uses
----
element_filter_agent
Used by
-------
map_stream
"""
out_stream = Stream(function.__name__+in_stream.name)
element_filter_agent(function, in_stream, out_stream, state, *args,
**kwargs)
return out_stream
#------------------------------------------------------------------------------------------------
# ELEMENT AGENT TESTS
#------------------------------------------------------------------------------------------------
[docs]def test_element_agent():
v = Stream('v')
w = Stream('w')
x = Stream('x')
y = Stream('y')
z = Stream('z')
# Test simple map using element_map_agent
# func operates on an element of the input stream and returns an element of
# the output stream.
def double(v): return 2*v
a = element_map_agent(func=double, in_stream=x, out_stream=y, name='a')
x.extend(range(3))
assert recent_values(y) == [0, 2, 4]
x.extend(range(3, 5, 1))
assert recent_values(y) == [0, 2, 4, 6, 8]
ymap = map_stream(function=double, in_stream=x)
assert recent_values(ymap) == recent_values(y)
# Test filtering
def filtering(v): return v > 2
yfilter = filter_stream(function=filtering, in_stream=x)
assert recent_values(yfilter) == [3, 4]
# Test map with state using element_map_agent
# func operates on an element of the input stream and state and returns an
# element of the output stream and the new state.
def f(u, state):
return u+state, state+2
b = element_map_agent(func=f, in_stream=x, out_stream=z, state=0, name='b')
assert recent_values(z) == [0, 3, 6, 9, 12]
bmap = map_stream(function=f, in_stream=x, state=0)
assert recent_values(bmap) == recent_values(z)
# Test map with call streams
# The agent executes a state transition when a value is added to call_streams.
c = element_map_agent(func=f, in_stream=x, out_stream=v, state=10,
call_streams=[w], name='c')
assert v.stop == 0
w.append(0)
assert recent_values(v) == [10, 13, 16, 19, 22]
# Test _no_value
# func returns _no_value to indicate that no value
# is placed on the output stream.
def f_no_value(v):
""" Filters out odd values
"""
if v%2:
return _no_value
else:
return v
no_value_stream = Stream(name='no_value_stream')
no_value_agent = element_map_agent(
func=f_no_value, in_stream=x, out_stream=no_value_stream,
name='no_value_agent')
assert recent_values(no_value_stream) == [0, 2, 4]
no_value_map = map_stream(function=f_no_value, in_stream=x)
assert recent_values(no_value_map) == recent_values(no_value_stream)
# Test _multivalue
# func returns _multivalue(output_list) to indicate that
# the list of elements in output_list should be placed in the
# output stream.
def f_multivalue(v):
if v%2:
return _no_value
else:
return _multivalue([v, v*2])
multivalue_stream = Stream('multivalue_stream')
multivalue_agent = element_map_agent(
func=f_multivalue, in_stream=x, out_stream=multivalue_stream,
name='multivalue_agent')
assert recent_values(multivalue_stream) == [0, 0, 2, 4, 4, 8]
multivalue_map = map_stream(function=f_multivalue, in_stream=x)
assert recent_values(multivalue_map) == recent_values(multivalue_stream)
# Test element_map_agent with args
def f_args(x, multiplicand, addition):
return x*multiplicand+addition
in_args = Stream('in_args')
out_args = Stream('out_args')
a_args = element_map_agent(f_args, in_args, out_args, None, None, 'a_args', 2, 10)
in_args.extend(range(3))
assert recent_values(out_args) == [10, 12, 14]
in_args.extend(range(3, 5, 1))
assert recent_values(out_args) == [10, 12, 14, 16, 18]
args_map = map_stream(f_args, in_args, None, 2, 10)
# Test element_map_agent with kwargs
in_kwargs = Stream('in_kwargs')
out_kwargs = Stream('out_kwargs')
a_kwargs = element_map_agent(func=f_args, in_stream=in_kwargs,
out_stream=out_kwargs, name='a_args',
multiplicand=2, addition=10)
in_kwargs.extend(range(3))
assert recent_values(out_kwargs) == [10, 12, 14]
in_kwargs.extend(range(3, 5, 1))
assert recent_values(out_kwargs) == [10, 12, 14, 16, 18]
# Test element_map_agent with state and kwargs
# func operates on an element of the input stream and state and returns an
# element of the output stream and the new state.
def f_map_args_kwargs(u, state, multiplicand, addend):
return u*multiplicand+addend+state, state+2
out_stream_map_kwargs = Stream('out_stream_map_kwargs')
aa_map_kwargs_agent = element_map_agent(
func=f_map_args_kwargs, in_stream=x, out_stream=out_stream_map_kwargs,
state=0, name='aa_map_kwargs_agent',
multiplicand=2, addend=10)
assert recent_values(out_stream_map_kwargs) == [10, 14, 18, 22, 26]
# Test element_map_agent with state and args
out_stream_map_args = Stream('out_stream_map_args')
aa_map_args_agent = element_map_agent(
f_map_args_kwargs, x, out_stream_map_args,
0, None, 'aa_map_args_agent',
2, 10)
assert recent_values(out_stream_map_kwargs) == [10, 14, 18, 22, 26]
# Test element_map_agent with state and args and kwargs
out_stream_map_args_kwargs = Stream('out_stream_map_args_kwargs')
aa_map_args_kwargs_agent = element_map_agent(
f_map_args_kwargs, x, out_stream_map_args_kwargs,
0, None, 'aa_map_args_kwargs_agent',
2, addend=10)
assert recent_values(out_stream_map_args_kwargs) == [10, 14, 18, 22, 26]
return
if __name__ == '__main__':
test_element_agent()