IoTPy.code.agents package

Submodules

IoTPy.code.agents.element_agent module

This module consists of functions that operate on a single input stream to produce a single output stream. These stream-functions encapsulate functions that operate on standard data types such as integers. In this module, the encapsulated function operates on a single element of the input stream to produce a single element of the output stream.

Functions in the module:
  1. element_map_agent
  2. map_stream
  3. element_filter_agent
  4. filter_stream
Functions from stream to stream:
  1. map_stream is similar to map in Python except that it operates on streams rather than lists.
  2. filter_stream is similar to filter in Python except that it operates on streams rather than lists.
Agents:
  1. element_map_agent is the agent used by map_stream.
  2. element_filter_agent is the agent used by filter_stream
IoTPy.code.agents.element_agent.element_filter_agent(func, in_stream, out_stream, state=None, call_streams=None, name=None, *args, **kwargs)[source]

This agent uses the boolean function func to filter its single input stream to produce a single output stream.

Parameters:

func: function

function from an element of the in_stream to Boolean.

in_stream: Stream

The single input stream of this agent

out_stream: Stream

The single output streams of the agent

state: object

The state of the agent

call_streams: list of Stream

The list of call_streams. A new value in any stream in this list causes a state transition of this agent.

name: Str

Name of the agent created by this function.

Returns

——-

Agent.

The agent created by this function.

Uses

—-

  • Agent
  • check_map_agent_arguments
  • check_num_args_in_func
IoTPy.code.agents.element_agent.element_map_agent(func, in_stream, out_stream, state=None, call_streams=None, name=None, *args, **kwargs)[source]

This agent maps the function func from its single input stream to its single output stream.

Parameters:

func: function

function from an element of the in_stream to an element of the out_stream.

in_stream: Stream

The single input stream of this agent

out_stream: Stream

The single output streams of the agent

state: object

The state of the agent

call_streams: list of Stream

The list of call_streams. A new value in any stream in this list causes a state transition of this agent.

name: Str

Name of the agent created by this function.

Returns

——-

Agent.

The agent created by this function.

Uses

—-

  • Agent
  • check_map_agent_arguments
  • check_num_args_in_func

Used by

——-

map_stream

IoTPy.code.agents.element_agent.filter_stream(function, in_stream, state=None, *args, **kwargs)[source]

filter_stream returns out_stream, a stream obtained by applying the filter function to the input stream, in_stream.

Parameters:

function: function

function from an element of the in_stream to Boolean.

in_stream: Stream

The single input stream of this agent

state: object

function operates on a state, args, and kwargs

Returns

——-

out_stream: Stream

The output stream generated by map_stream

Uses

—-

element_filter_agent

Used by

——-

map_stream

IoTPy.code.agents.element_agent.map_stream(function, in_stream, state=None, *args, **kwargs)[source]

map_stream returns out_stream, a stream obtained by applying function to each element of the input stream, in_stream.

Parameters:

function: function

function from an element of the in_stream to an element of the out_stream.

in_stream: Stream

The single input stream of this agent

state: object

function operates on a state, args, and kwargs

Returns

——-

out_stream: Stream

The output stream generated by map_stream

Uses

——-

  • element_map_agent
IoTPy.code.agents.element_agent.test_element_agent()[source]

IoTPy.code.agents.gate_agent module

IoTPy.code.agents.gate_agent.gate_agent(in_stream, out_stream, call_streams, name=None)[source]
Parameters:

in_stream: Stream

The single input stream of the agent

out_stream: Stream

The single output stream of the agent

call_streams: list of Stream

The list of call_streams. A new value in any stream in this list causes a state transition of this agent.

name: str

Name of the agent created by this function.

Returns

——-

Agent.

The agent created by this function.

IoTPy.code.agents.gate_agent.test_gate_agents()[source]

IoTPy.code.agents.list_agent module

The list agent is one of a collection of agents, each of which has a different type of transition function, func.

The list-agent func operates on a list input and returns a list. func may also operate on a state which can be of any type.

This file has the following agents:
list_map_agent: Has a single input stream and a single output stream. list_sink_agent: Has a single input stream and no outputs. list_merge_agent: Has a list of input streams and a single output stream. list_split_agent: Has a single input stream and a list of output streams. list_many_agent: Has a list of input streams and a list of output streams.

A single input stream is in_stream and a single output stream is out_stream. A list of input streams is in_streams and a list of output streams is out_streams. In general, plurals refer to lists and singular to single elements.

IoTPy.code.agents.list_agent.list_many_agent(func, in_streams, out_streams, state=None, call_streams=None, name=None, args=[], kwargs={})[source]
Parameters:

func: function

function from an input list to an output list

in_streams: list of Stream

The input streams of the agent

out_streams: list of Stream

The output streams of the agent

state: object

The state of the agent

call_streams: list of Stream

The list of call_stream. A new value in any stream in this list causes a state transition of this agent.

name: Str

Name of the agent created by this function.

Returns

——-

Agent.

The agent created by this function.

IoTPy.code.agents.list_agent.list_map_agent(func, in_stream, out_stream, state=None, call_streams=None, name=None, args=[], kwargs={})[source]

This agent maps the function func from its single input stream to its single output stream.

Parameters:

func: function

function from a list (a slice of the in_stream) to a list (a slice of the out_stream).

in_stream: Stream

The single input stream of this agent

out_stream: Stream

The single output streams of the agent

state: object

The state of the agent

call_streams: list of Stream

The list of call_streams. A new value in any stream in this list causes a state transition of this agent.

name: str

Name of the agent created by this function.

Returns:

Agent.

The agent created by this function.

IoTPy.code.agents.list_agent.list_merge_agent(func, in_streams, out_stream, state=None, call_streams=None, name=None, args=[], kwargs={})[source]
Parameters:

func: function

function from a list of lists (one list per input stream) to an output list

in_streams: list of Stream

The list of input streams of the agent

out_stream: Stream

The single output stream of the agent

state: object

The state of the agent

call_streams: list of Stream

The list of call_streams. A new value in any stream in this list causes a state transition of this agent.

name: Str

Name of the agent created by this function.

Returns

——-

Agent.

The agent created by this function.

IoTPy.code.agents.list_agent.list_sink_agent(func, in_stream, state=None, call_streams=None, name=None, args=[], kwargs={})[source]

This agent applies func to its single input stream. It has no output streams.

Parameters:

func: function

function from a list (a slice of the in_stream) to None (no output).

in_stream: Stream

The single input stream of this agent

state: object

The state of the agent

call_streams: list of Stream

The list of call_streams. A new value in any stream in this list causes a state transition of this agent.

name: Str

Name of the agent created by this function.

Returns

——-

Agent.

The agent created by this function.

IoTPy.code.agents.list_agent.list_split_agent(func, in_stream, out_streams, state=None, call_streams=None, name=None, args=[], kwargs={})[source]
Parameters:

func: function

function from an input list to a list of lists (one per output stream).

in_stream: Stream

The single input stream of the agent

out_streams: list of Stream

The list of output streams of the agent

state: object

The state of the agent

call_streams: list of Stream

The list of call_streams. A new value in any stream in this list causes a state transition of this agent.

name: Str

Name of the agent created by this function.

Returns

——-

Agent.

The agent created by this function.

IoTPy.code.agents.list_agent.test_list_agents()[source]

IoTPy.code.agents.merge module

This module consists of functions that merge multiple input streams into a single output stream.

Functions in the module:
  1. element_merge_agent
  2. zip_stream
  3. zip_map
  4. asynch_merge_agent
  5. mix
  6. blend_agent
  7. blend
Merge functions:
  1. zip_stream is similar to zip in Python except that it operates on streams rather than lists.
  2. zip_map is map_stream(zip_stream()), i.e., first zip then map the result.
  3. mix is an asynchronous merge of the input streams. The elements of the output stream identify the input streams that generated the elements.
  4. blend is a merge followed by a map.
Agents:
  1. element_merge_agent is the agent used by zip_stream and zip_map.
  2. asynch_merge_agent is the agent used by mix.
  3. blend_agent is the agent used by blend.
IoTPy.code.agents.merge.asynch_merge(in_streams, out_stream)[source]
Parameters:

in_streams: list of input streams

out_stream: single output stream

IoTPy.code.agents.merge.blend(function, in_streams, state=None, *args, **kwargs)[source]
IoTPy.code.agents.merge.blend_agent(func, in_streams, out_stream, state=None, call_streams=None, name=None, *args, **kwargs)[source]
Parameters:

func: function

function from an input list and args and kwargs to an output list

in_streams: list of Stream

The list of input streams of the agent

out_stream: Stream

The single output stream of the agent

state: object

The state of the agent

call_streams: list of Stream

The list of call_streams. A new value in any stream in this list causes a state transition of this agent.

name: str

Name of the agent created by this function.

Returns

——-

Agent.

The agent created by this function.

IoTPy.code.agents.merge.element_many_agent(func, in_streams, out_streams, state=None, call_streams=None, name=None, *args, **kwargs)[source]
Parameters:

func: function

function from an input list and args and kwargs to an output list

in_streams: list of Stream

The input streams of the agent

out_streams: list of Stream

The output streams of the agent

state: object

The state of the agent

call_streams: list of Stream

The list of call_stream. A new value in any stream in this list causes a state transition of this agent.

name: str

Name of the agent created by this function.

Returns

——-

Agent.

The agent created by this function.

IoTPy.code.agents.merge.element_merge_agent(func, in_streams, out_stream, state=None, call_streams=None, name=None, *args, **kwargs)[source]
Parameters:

func: function

function from an input list and args and kwargs to an output list

in_streams: list of Stream

The list of input streams of the agent

out_stream: Stream

The single output stream of the agent

state: object

The state of the agent

call_streams: list of Stream

The list of call_streams. A new value in any stream in this list causes a state transition of this agent.

name: str

Name of the agent created by this function.

Returns

——-

Agent.

The agent created by this function.

IoTPy.code.agents.merge.merge_split(function, in_streams, num_out_streams, state=None, *args, **kwargs)[source]
IoTPy.code.agents.merge.mix(in_streams)[source]
IoTPy.code.agents.merge.tests()[source]
IoTPy.code.agents.merge.zip_agent(in_streams, out_stream)[source]

zip_agent zips the input streams and returns values in out_stream

Parameters:

in_streams: list of Stream

The list of input streams that are zipped

out_stream: Stream

The Stream to add values to

Uses

——-

  • element_merge_agent
IoTPy.code.agents.merge.zip_map(function, in_streams, state=None, *args, **kwargs)[source]

zip_map returns out_stream, a stream obtained by applying function, with the specified state, args and kwargs, to the elements obtained by zipping the input streams.

Parameters:

in_streams: list of Stream

The list of input streams that are zipped

state: object

function operates on a state, args, and kwargs

Returns

——-

out_stream: Stream

The output stream generated by zip_map

Uses

——-

  • element_merge_agent
IoTPy.code.agents.merge.zip_stream(in_streams)[source]

zip_stream returns out_stream, a stream obtained by zipping the input streams. zip_stream is similar to zip.

Parameters:

in_streams: list of Stream

The list of input streams that are zipped

state: object

function operates on a state, args, and kwargs

Returns

——-

out_stream: Stream

The output stream generated by zip_stream

IoTPy.code.agents.sink module

IoTPy.code.agents.sink.element_sink_agent(func, in_stream, state=None, call_streams=None, name=None, *args, **kwargs)[source]

This agent applies func to its single input stream. It has no output streams.

Parameters:

func: function

function from an element of the in_stream and args and kwargs to None (no return).

in_stream: Stream

The single input stream of this agent

state: object

The state of the agent

call_streams: list of Stream

The list of call_streams. A new value in any stream in this list causes a state transition of this agent.

name: Str

Name of the agent created by this function.

Returns

——-

Agent.

The agent created by this function.

IoTPy.code.agents.sink.element_stream_to_queue_agent(stream, queue, name=None, descriptor=None)[source]
IoTPy.code.agents.sink.sink(function, in_stream, state=None, *args, **kwargs)[source]
IoTPy.code.agents.sink.test_element_agents()[source]

IoTPy.code.agents.source module

This module consists of sources. A source is a function that generates a single stream. These functions are executed in separate threads. A function returns the thread and the output stream that are generated.

Functions in the module:
  1. make_outstream_and_thread is a helper function used by the other functions in this module.
  2. function_to_stream generates a stream whose values are specified by successive call to a function.
  3. file_to_stream generates a stream from a file specified by a file name.
  4. list_to_stream generates a stream from a list
  5. queue_to_stream generates a stream by waiting for messages to arrive on a queue and then appending the messages to a stream.
IoTPy.code.agents.source.file_to_stream(function, filename, time_interval=None, stream_length=None, state=None, *args, **kwargs)[source]

Places lines in a file on a stream.

Parameters:

function: function

This function is applied to each line read from the file and the result of this function is appended to the output stream.

filename: str

The name of the file that is read.

time_interval: float or int (optional)

The next line of the file is read every time_interval seconds.

stream_length: int (optional)

file_to_stream terminates after stream_length values are placed on the output stream. If stream_length is None then the file_to_stream terminates when the entire file is read.

state: object (optional)

The state of the function; an argument of function.

args: list

Positional arguments of function

kwargs: dict

Keyword arguments of function

Returns: thread, out_stream

——-

thread: Thread.thread

The thread created by this function. The thread must be started and thread.join() may have to be called to ensure that the thread terminates execution.

out_stream: Stream

The stream created by this function.

IoTPy.code.agents.source.function_to_stream(function, time_interval=0, stream_length=None, state=None, out_stream=None, push=False, *args, **kwargs)[source]

Calls a function and places returned values on a stream called out_stream.

Parameters:

function: function

This function is called and the result of this function is appended to the output stream.

time_interval: float or int (optional), time in seconds

An element is placed on the output stream every time_interval seconds.

stream_length: int (optional)

function_to_stream terminates after stream_length values are placed on the output stream. If stream_length is None then function_to_stream terminates only when an exception is raised.

state: object (optional)

The state of the function; an argument of function.

out_stream: Stream (optional)

The output stream

push: boolean (optional)

Whether the source will push values to the stream

args: list

Positional arguments of function

kwargs: dict

Keyword arguments of function

Returns: thread, out_stream

——-

thread: Thread.thread

The thread created by this function. The thread must be started and thread.join() may have to be called to ensure that the thread terminates execution.

out_stream: Stream

The stream created by this function.

IoTPy.code.agents.source.list_to_stream(function, in_list, time_interval=None, stream_length=None, state=None, *args, **kwargs)[source]

Places elements in a list on a stream.

Parameters:

function: function

This function is applied to each element in the list and the result of this function is appended to the output stream.

in_list: list

The list that is read.

out_stream: Stream

The output stream on which messages are placed.

time_interval: float or int (optional)

The next line of the file is read every time_interval seconds.

stream_length: int (optional)

file_to_stream terminates after stream_length values are placed on the output stream. If stream_length is None then the file_to_stream terminates when the entire file is read.

state: object (optional)

The state of the function; an argument of function.

args: list

Positional arguments of function

kwargs: dict

Keyword arguments of function

Returns: thread, out_stream

——-

thread: Thread.thread

The thread created by this function. The thread must be started and thread.join() may have to be called to ensure that the thread terminates execution.

out_stream: Stream

The stream created by this function.

IoTPy.code.agents.source.make_outstream_and_thread(function, time_interval, stream_length, state, args, kwargs, run_infinite_stateless, run_infinite_stateful, run_finite_stateless, run_finite_stateful, run_push_stateless, run_push_stateful, out_stream=None, push=False)[source]

Helper function called by the other functions. Parameters ———-

function: function
This function is applied to each line read from the source and the result of this function is appended to the output stream.
time_interval: float or int, time in seconds
An element is placed on an output stream every time_interval seconds. (See the functions that call this one for out_stream.)
stream_length: int (optional)
The source terminates after stream_length values are placed on the output stream. If stream_length is None then the source function terminates when an exception is raised or the source has no more data.
state: object (optional)
The state of the function. This is a parameter of function
args: list
list of positional arguments of function
kwargs: dict
dict of keywork arguments of function

run_infinite_stateless, run_infinite_stateful: functions run_finite_stateless, run_finite_stateful: functions run_push_stateless, run_push_stateful: functions

Target functions of threads
out_stream: Stream
The output stream (default is None)
push: boolean
Whether the source will push values to the stream (default is False)

This function structures creates threads and varies the target function for the threads for the following 6 cases: 1. stream_length and state are None and push is False:

target is run_infinite_stateless
  1. stream_length is None and state is not None and push is False: target is run_infinite_stateful
  2. stream_length is not None and state is None and push is False: target is run_finite_stateless
  3. stream_length is not None and state is not None and push is False: target is run_finite_stateful
  4. push is True and state is None: target is run_push_stateless
  5. push is True and state is not None: target is run_push_stateful
IoTPy.code.agents.source.queue_to_stream(function, queue, stop_message=None, state=None, *args, **kwargs)[source]

Reads queue and places messages it receives from the queue on a stream called out_stream.

Parameters:

function: function

This function is applied to each element read from the queue and the result of this function is appended to the output stream.

queue: multiprocessing.Queue or Queue.Queue

The queue from which messages are obtained.

stop_message: object

When a stop_message is received from the queue the thread terminates execution.

state: object (optional)

The state of the function; an argument of function.

args: list

Positional arguments of function

kwargs: dict

Keyword arguments of function

Returns: thread, out_stream

——-

thread: Thread.thread

The thread created by this function. The thread must be started and thread.join() may have to be called to ensure that the thread terminates execution.

out_stream: Stream

The stream created by this function.

IoTPy.code.agents.source.test()[source]

IoTPy.code.agents.split module

This module consists of functions that split a single input stream into multiple output streams.

Functions in the module:
  1. element_split_agent
  2. split_stream
  3. separate_agent
  4. separate
  5. unzip_agent
  6. unzip_stream
Split functions:
  1. split_stream: a function returns a list with each element placed in a different stream.
  2. separate: separate is the inverse of mix (see merge.py). The elements of the input stream are pairs (i, v) and the value v is placed on the i-th output stream.
  3. unzip_stream: unzip_stream is the inverse of zip_stream (see merge.py). The elements of the input stream are lists and the i-th element of the list is placed on the i-th output stream.
Agents:
  1. element_split_agent: agent used by split_stream
  2. separate_agent: agent used by separate
  3. unzip_agent: not used by any function in this module. It is retained only for backward compatibility.
IoTPy.code.agents.split.element_split_agent(func, in_stream, out_streams, state=None, call_streams=None, name=None, *args, **kwargs)[source]
Parameters:

func: function

function from an input list and args and kwargs to an output list

in_stream: Stream

The single input stream of the agent

out_streams: list of Stream

The list of output streams of the agent

state: object

The state of the agent

call_streams: list of Stream

The list of call_streams. A new value in any stream in this list causes a state transition of this agent.

name: str

Name of the agent created by this function.

Returns

——-

Agent.

The agent created by this function.

IoTPy.code.agents.split.separate(in_stream, num_out_streams)[source]

separate returns out_streams, a list of num_out_streams streams. separate is the inverse of mix (see merge.py). The elements of the input stream are pairs (i, v) and the value v is placed on the i-th output stream.

Parameters:

in_stream: Stream

The stream that will be split

num_out_streams: int

The number of output streams.

Returns:

out_streams: List of Stream

The output streams generated by split_stream

Uses

  • separate_agent
IoTPy.code.agents.split.separate_agent(in_stream, out_streams, name=None)[source]
Parameters:

in_stream: Stream

The single input stream of the agent

out_streams: list of Stream

The list of output streams of the agent

name: str

Name of the agent created by this function.

Returns

——-

Agent.

The agent created by this function.

IoTPy.code.agents.split.split_stream(function, in_stream, num_out_streams, state=None, *args, **kwargs)[source]

split_stream returns out_streams, a list of num_out_streams streams. The function, with the specified state, args and kwargs, is applied to the elements of the input stream. The return value of the function must be a list of length num_out_streams. The i-th value of the returned list is placed in the i-th output stream.

Parameters:

in_stream: Stream

The stream that will be split

num_out_streams: int

The number of output streams.

state: object

function operates on a state, args, and kwargs

Returns

——-

out_streams: List of Stream

The output streams generated by split_stream

Uses

——-

  • element_split_agent
IoTPy.code.agents.split.test_element_agents()[source]
IoTPy.code.agents.split.timed_unzip(in_stream, num_out_streams)[source]

timed_unzip returns out_streams which is a list of num_out_streams streams. timed_unzip is the inverse of timed_zip (see merge.py). The elements of the input stream are pairs (t, v) where v is a list of length num_out_streams. The i-th element of the list, with the timestamp t is placed on the i-th output stream if and only if v is not None.

Parameters:

in_stream: Stream

The stream that will be split

num_out_streams: int

The number of output streams.

Returns:

out_streams: List of Stream

The output streams generated by split_stream

Uses

  • split_stream
IoTPy.code.agents.split.unzip_agent(in_stream, out_streams, name=None)[source]
Parameters:

in_stream: Stream

The single input stream of the agent

out_streams: list of Stream

The list of output streams of the agent

name: str

Name of the agent created by this function.

Returns

——-

Agent.

The agent created by this function.

Note

—-

Not used by any function in this module. Used by external modules

IoTPy.code.agents.split.unzip_stream(input_stream, num_out_streams)[source]

unzip_stream returns out_streams, a list of num_out_streams streams. unzip_stream is the inverse of zip_stream (see merge.py). The elements of the input stream are lists of length num_out_streams and the i-th element of the list is placed on the i-th output stream.

Parameters:

in_stream: Stream

The stream that will be split

num_out_streams: int

The number of output streams.

Returns:

out_streams: List of Stream

The output streams generated by split_stream

Uses

  • element_split_agent

IoTPy.code.agents.timed_agent module

This module has timed_zip and timed_window which are described in the manual documentation.

IoTPy.code.agents.timed_agent.test_timed_zip_agents()[source]
IoTPy.code.agents.timed_agent.timed_window(function, in_stream, window_duration, step_time, state=None, args=[], kwargs={})[source]
IoTPy.code.agents.timed_agent.timed_window_agent(func, in_stream, out_stream, window_duration, step_time, window_start_time=0, state=None, call_streams=None, name=None, args=[], kwargs={})[source]
IoTPy.code.agents.timed_agent.timed_zip(list_of_streams)[source]
IoTPy.code.agents.timed_agent.timed_zip_agent(in_streams, out_stream, call_streams=None, name=None)[source]
Parameters:

in_streams: list of Stream

The list of input streams of the agent

out_stream: Stream

The single output stream of the agent

call_streams: list of Stream

The list of call_streams. A new value in any stream in this list causes a state transition of this agent.

name: Str

Name of the agent created by this function.

Returns

——-

Agent.

The agent created by this function.

Notes

—–

Each stream in in_streams must be a stream of tuples or lists or NumPy arrays where element[0] is a time and where time is a total order. Each stream in in_stream must be strictly monotonically increasing in time.

out_stream merges the in_streams in order of time. An element of out_stream is a list where element[0] is a time T and element[1] is a list consisting of all elements of in in_streams that have time T.

IoTPy.code.agents.timed_merge_agent module

IoTPy.code.agents.timed_merge_agent.test_timed_merge_agents()[source]
IoTPy.code.agents.timed_merge_agent.timed_merge_agent(in_streams, out_stream, call_streams=None, name=None)[source]
Parameters:

in_streams: list of Stream

The list of input streams of the agent

out_stream: Stream

The single output stream of the agent

call_streams: list of Stream

The list of call_streams. A new value in any stream in this list causes a state transition of this agent.

name: Str

Name of the agent created by this function.

Returns

——-

Agent.

The agent created by this function.

Notes

—–

Each stream in in_streams must be a stream of tuples or lists or NumPy arrays where element[0] is a time and where time is a total order. Each stream in in_stream must be strictly monotonically increasing in time.

out_stream merges the in_streams in order of time. An element of out_stream is a list where element[0] is a time T and element[1] is a list consisting of all elements of in in_streams that have time T.

IoTPy.code.agents.window_agent module

IoTPy.code.agents.window_agent.dynamic_window_agent(func, in_stream, out_stream, state, min_window_size, max_window_size, step_size, args={}, kwargs={})[source]
IoTPy.code.agents.window_agent.test_window_agents()[source]
IoTPy.code.agents.window_agent.window(function, in_stream, window_size, step_size, state=None, args=[], kwargs={})[source]
IoTPy.code.agents.window_agent.window_many_agent(func, in_streams, out_streams, window_size, step_size, state=None, call_streams=None, name=None, args=[], kwargs={})[source]
Parameters:

func: function

function from a list of windows with one window for each input stream to an output list containing a single element for each output stream.

in_streams: list of Stream

The list of input streams of the agent

out_streams: list of Stream

The list of output streams of the agent

window_size: int

Positive integer. The size of the moving window

step_size: int

Positive integer. The step size of the moving window

state: object

The state of the agent

call_streams: list of Stream

The list of call_streams. A new value in any stream in this list causes a state transition of this agent.

name: Str

Name of the agent created by this function.

Returns

——-

Agent.

The agent created by this function.

IoTPy.code.agents.window_agent.window_map_agent(func, in_stream, out_stream, window_size, step_size, state=None, call_streams=None, name=None, args=[], kwargs={})[source]
Parameters:

func: function

function from a single element of the input stream to a single element of the output stream

in_stream: Stream

The single input stream of this agent

out_stream: Stream

The single output streams of the agent

window_size: int

Positive integer. The size of the moving window

step_size: int

Positive integer. The step size of the moving window

state: object

The state of the agent

call_streams: list of Stream

The list of call_streams. A new value in any stream in this list causes a state transition of this agent.

name: Str

Name of the agent created by this function.

Returns

——-

Agent.

The agent created by this function.

IoTPy.code.agents.window_agent.window_merge_agent(func, in_streams, out_stream, window_size, step_size, state=None, call_streams=None, name=None, args=[], kwargs={})[source]
Parameters:

func: function

function from a list of windows with one window per input stream to a single element of the output stream.

in_streams: list of Stream

The list of input streams of the agent

out_stream: Stream

The single output streams of the agent

window_size: int

Positive integer. The size of the moving window

step_size: int

Positive integer. The step size of the moving window

state: object

The state of the agent

call_streams: list of Stream

The list of call_streams. A new value in any stream in this list causes a state transition of this agent.

name: Str

Name of the agent created by this function.

Returns

——-

Agent.

The agent created by this function.

IoTPy.code.agents.window_agent.window_split_agent(func, in_stream, out_streams, window_size, step_size, state=None, call_streams=None, name=None, args=[], kwargs={})[source]
Parameters:

func: function

function from a window to a list containing a single element for each output stream.

in_stream: Stream

The single input stream of the agent

out_streams: list of Stream

The list of output streams of the agent

window_size: int

Positive integer. The size of the moving window

step_size: int

Positive integer. The step size of the moving window

state: object

The state of the agent

call_streams: list of Stream

The list of call_streams. A new value in any stream in this list causes a state transition of this agent.

name: Str

Name of the agent created by this function.

Returns

——-

Agent.

The agent created by this function.

Module contents