CAF 0.17.6
Loading...
Searching...
No Matches
caf::scheduled_actor Class Reference

A cooperatively scheduled, event-based actor implementation. More...

#include <scheduled_actor.hpp>

Inheritance diagram for caf::scheduled_actor:
caf::resumable caf::non_blocking_actor_base caf::event_based_actor caf::io::abstract_broker caf::raw_event_based_actor caf::raw_event_based_actor caf::io::broker caf::io::basp_broker

Classes

struct  mailbox_policy
 Configures the FIFO inbox with four nested queues: More...
 
struct  mailbox_visitor
 Consumes messages from the mailbox. More...
 

Public Types

enum class  message_category {
  ordinary ,
  internal ,
  skipped
}
 Categorizes incoming messages. More...
 
enum class  activation_result {
  success ,
  terminated ,
  skipped ,
  dropped
}
 Result of one-shot activations. More...
 
using super = local_actor
 Base type.
 
using stream_manager_map = std::map<stream_slot, stream_manager_ptr>
 Maps slot IDs to stream managers.
 
using normal_queue = intrusive::drr_cached_queue<policy::normal_messages>
 Stores asynchronous messages with default priority.
 
using urgent_queue = intrusive::drr_cached_queue<policy::urgent_messages>
 Stores asynchronous messages with hifh priority.
 
using upstream_queue = intrusive::drr_queue<policy::upstream_messages>
 Stores upstream messages.
 
using downstream_queue
 Stores downstream messages.
 
using mailbox_type = intrusive::fifo_inbox<mailbox_policy>
 A queue optimized for single-reader-many-writers.
 
using pending_response = std::pair<const message_id, behavior>
 The message ID of an outstanding response with its callback.
 
using pointer = scheduled_actor*
 A pointer to a scheduled actor.
 
using default_handler
 Function object for handling unmatched messages.
 
using error_handler = std::function<void (pointer, error&)>
 Function object for handling error messages.
 
using down_handler = std::function<void (pointer, down_msg&)>
 Function object for handling down messages.
 
using exit_handler = std::function<void (pointer, exit_msg&)>
 Function object for handling exit messages.
 
using exception_handler = std::function<error (pointer, std::exception_ptr&)>
 Function object for handling exit messages.
 
- Public Types inherited from caf::resumable
enum  resume_result {
  resume_later ,
  awaiting_message ,
  done ,
  shutdown_execution_unit
}
 Denotes the state in which a resumable returned from its last call to resume.
 
enum  subtype_t {
  unspecified ,
  scheduled_actor ,
  io_actor ,
  function_object
}
 Denotes common subtypes of resumable. More...
 

Public Member Functions

 scheduled_actor (actor_config &cfg)
 
void enqueue (mailbox_element_ptr ptr, execution_unit *eu) override
 
mailbox_element * peek_at_next_mailbox_element () override
 
const char * name () const override
 
void launch (execution_unit *eu, bool lazy, bool hide) override
 
bool cleanup (error &&fail_state, execution_unit *host) override
 
subtype_t subtype () const override
 Returns a subtype hint for this object.
 
void intrusive_ptr_add_ref_impl () override
 Add a strong reference count to this object.
 
void intrusive_ptr_release_impl () override
 Remove a strong reference count from this object.
 
resume_result resume (execution_unit *, size_t) override
 Resume any pending computation until it is either finished or needs to be re-scheduled later.
 
virtual proxy_registryproxy_registry_ptr ()
 Returns a factory for proxies created and managed by this actor or nullptr.
 
void quit (error x=error{})
 Finishes execution of this actor after any currently running message handler is done.
 
mailbox_typemailbox () noexcept
 Returns the queue for storing incoming messages.
 
stream_manager_mapstream_managers () noexcept
 Returns map for all active streams.
 
stream_manager_mappending_stream_managers () noexcept
 Returns map for all pending streams.
 
void set_default_handler (default_handler fun)
 Sets a custom handler for unexpected messages.
 
template<class F >
std::enable_if< std::is_convertible< F, std::function< result< message >(type_erased_tuple &)> >::value >::type set_default_handler (F fun)
 Sets a custom handler for unexpected messages.
 
void set_error_handler (error_handler fun)
 Sets a custom handler for error messages.
 
template<class T >
auto set_error_handler (T fun) -> decltype(fun(std::declval< error & >()))
 Sets a custom handler for error messages.
 
void set_down_handler (down_handler fun)
 Sets a custom handler for down messages.
 
template<class T >
auto set_down_handler (T fun) -> decltype(fun(std::declval< down_msg & >()))
 Sets a custom handler for down messages.
 
void set_exit_handler (exit_handler fun)
 Sets a custom handler for error messages.
 
template<class T >
auto set_exit_handler (T fun) -> decltype(fun(std::declval< exit_msg & >()))
 Sets a custom handler for exit messages.
 
void set_exception_handler (exception_handler fun)
 Sets a custom exception handler for this actor.
 
template<class F >
std::enable_if< std::is_convertible< F, std::function< error(std::exception_ptr &)> >::value >::type set_exception_handler (F f)
 Sets a custom exception handler for this actor.
 
template<class Driver , class... Ts, class Init , class Pull , class Done , class Finalize = unit_t>
make_source_result_t< typename Driver::downstream_manager_type, Ts... > make_source (std::tuple< Ts... > xs, Init init, Pull pull, Done done, Finalize fin={})
 
template<class... Ts, class Init , class Pull , class Done , class Finalize = unit_t, class DownstreamManager = broadcast_downstream_manager< typename stream_source_trait_t<Pull>::output>>
make_source_result_t< DownstreamManager, Ts... > make_source (std::tuple< Ts... > xs, Init init, Pull pull, Done done, Finalize fin={}, policy::arg< DownstreamManager >={})
 
template<class Init , class Pull , class Done , class Finalize = unit_t, class DownstreamManager = default_downstream_manager_t<Pull>, class Trait = stream_source_trait_t<Pull>>
detail::enable_if_t<!is_actor_handle< Init >::value &&Trait::valid, make_source_result_t< DownstreamManager > > make_source (Init init, Pull pull, Done done, Finalize finalize={}, policy::arg< DownstreamManager > token={})
 
template<class ActorHandle , class... Ts, class Init , class Pull , class Done , class Finalize = unit_t, class DownstreamManager = default_downstream_manager_t<Pull>, class Trait = stream_source_trait_t<Pull>>
detail::enable_if_t< is_actor_handle< ActorHandle >::value, make_source_result_t< DownstreamManager > > make_source (const ActorHandle &dest, std::tuple< Ts... > xs, Init init, Pull pull, Done done, Finalize fin={}, policy::arg< DownstreamManager >={})
 
template<class ActorHandle , class Init , class Pull , class Done , class Finalize = unit_t, class DownstreamManager = default_downstream_manager_t<Pull>, class Trait = stream_source_trait_t<Pull>>
detail::enable_if_t< is_actor_handle< ActorHandle >::value &&Trait::valid, make_source_result_t< DownstreamManager > > make_source (const ActorHandle &dest, Init init, Pull pull, Done done, Finalize fin={}, policy::arg< DownstreamManager > token={})
 
template<class Driver , class Init , class Pull , class Done , class Finalize = unit_t>
Driver::source_ptr_type make_continuous_source (Init init, Pull pull, Done done, Finalize fin={})
 
template<class Init , class Pull , class Done , class Finalize = unit_t, class DownstreamManager = broadcast_downstream_manager< typename stream_source_trait_t<Pull>::output>>
stream_source_ptr< DownstreamManager > make_continuous_source (Init init, Pull pull, Done done, Finalize fin={}, policy::arg< DownstreamManager >={})
 
template<class Driver , class... Ts>
make_sink_result< typename Driver::input_type > make_sink (const stream< typename Driver::input_type > &src, Ts &&... xs)
 
template<class In , class Init , class Fun , class Finalize = unit_t, class Trait = stream_sink_trait_t<Fun>>
make_sink_result< In > make_sink (const stream< In > &in, Init init, Fun fun, Finalize fin={})
 
template<class Driver , class In , class... Ts, class... Us>
make_stage_result_t< In, typename Driver::downstream_manager_type, Ts... > make_stage (const stream< In > &src, std::tuple< Ts... > xs, Us &&... ys)
 
template<class In , class... Ts, class Init , class Fun , class Finalize = unit_t, class DownstreamManager = default_downstream_manager_t<Fun>, class Trait = stream_stage_trait_t<Fun>>
make_stage_result_t< In, DownstreamManager, Ts... > make_stage (const stream< In > &in, std::tuple< Ts... > xs, Init init, Fun fun, Finalize fin={}, policy::arg< DownstreamManager > token={})
 
template<class In , class Init , class Fun , class Finalize = unit_t, class DownstreamManager = default_downstream_manager_t<Fun>, class Trait = stream_stage_trait_t<Fun>>
make_stage_result_t< In, DownstreamManager > make_stage (const stream< In > &in, Init init, Fun fun, Finalize fin={}, policy::arg< DownstreamManager > token={})
 
template<class Driver , class... Ts>
Driver::stage_ptr_type make_continuous_stage (Ts &&... xs)
 
template<class Init , class Fun , class Cleanup , class DownstreamManager = default_downstream_manager_t<Fun>, class Trait = stream_stage_trait_t<Fun>>
stream_stage_ptr< typename Trait::input, DownstreamManager > make_continuous_stage (Init init, Fun fun, Cleanup cleanup, policy::arg< DownstreamManager > token={})
 
void enqueue (strong_actor_ptr sender, message_id mid, message msg, execution_unit *host) override
 
- Public Member Functions inherited from caf::resumable

Static Public Member Functions

static void default_error_handler (pointer ptr, error &x)
 
static void default_down_handler (pointer ptr, down_msg &x)
 
static void default_exit_handler (pointer ptr, exit_msg &x)
 
static error default_exception_handler (pointer ptr, std::exception_ptr &x)
 

Static Public Attributes

static constexpr size_t urgent_queue_index = 0
 
static constexpr size_t normal_queue_index = 1
 
static constexpr size_t upstream_queue_index = 2
 
static constexpr size_t downstream_queue_index = 3
 

Related Symbols

(Note that these are not member symbols.)

result< messagereflect (scheduled_actor *, message_view &)
 Default handler function that sends the message back to the sender.
 
result< messagereflect_and_quit (scheduled_actor *, message_view &)
 Default handler function that sends the message back to the sender and then quits.
 
result< messageprint_and_drop (scheduled_actor *, message_view &)
 Default handler function that prints messages message via aout and drops them afterwards.
 
result< messagedrop (scheduled_actor *, message_view &)
 Default handler function that simply drops messages.
 

Detailed Description

A cooperatively scheduled, event-based actor implementation.

Member Typedef Documentation

◆ default_handler

Initial value:
std::function<result<message>(pointer, message_view&)>
scheduled_actor * pointer
A pointer to a scheduled actor.
Definition scheduled_actor.hpp:195

Function object for handling unmatched messages.

◆ downstream_queue

Initial value:
intrusive::wdrr_dynamic_multiplexed_queue<policy::downstream_messages>

Stores downstream messages.

Member Enumeration Documentation

◆ activation_result

Result of one-shot activations.

Enumerator
success 

Actor is still alive and handled the activation message.

terminated 

Actor handled the activation message and terminated.

skipped 

Actor skipped the activation message.

dropped 

Actor dropped the activation message.

◆ message_category

Categorizes incoming messages.

Enumerator
ordinary 

Triggers the current behavior.

internal 

Triggers handlers for system messages such as exit_msg or down_msg.

skipped 

Delays processing.

Member Function Documentation

◆ intrusive_ptr_add_ref_impl()

void caf::scheduled_actor::intrusive_ptr_add_ref_impl ( )
overridevirtual

Add a strong reference count to this object.

Implements caf::resumable.

◆ intrusive_ptr_release_impl()

void caf::scheduled_actor::intrusive_ptr_release_impl ( )
overridevirtual

Remove a strong reference count from this object.

Implements caf::resumable.

◆ make_continuous_source() [1/2]

template<class Driver , class Init , class Pull , class Done , class Finalize = unit_t>
Driver::source_ptr_type caf::scheduled_actor::make_continuous_source ( Init init,
Pull pull,
Done done,
Finalize fin = {} )
Deprecated
Please use attach_continuous_stream_source instead.

◆ make_continuous_source() [2/2]

template<class Init , class Pull , class Done , class Finalize = unit_t, class DownstreamManager = broadcast_downstream_manager< typename stream_source_trait_t<Pull>::output>>
stream_source_ptr< DownstreamManager > caf::scheduled_actor::make_continuous_source ( Init init,
Pull pull,
Done done,
Finalize fin = {},
policy::arg< DownstreamManager > = {} )
Deprecated
Please use attach_continuous_stream_source instead.

◆ make_continuous_stage() [1/2]

template<class Init , class Fun , class Cleanup , class DownstreamManager = default_downstream_manager_t<Fun>, class Trait = stream_stage_trait_t<Fun>>
stream_stage_ptr< typename Trait::input, DownstreamManager > caf::scheduled_actor::make_continuous_stage ( Init init,
Fun fun,
Cleanup cleanup,
policy::arg< DownstreamManager > token = {} )
Deprecated
Please use attach_continuous_stream_stage instead.

◆ make_continuous_stage() [2/2]

template<class Driver , class... Ts>
Driver::stage_ptr_type caf::scheduled_actor::make_continuous_stage ( Ts &&... xs)
Deprecated
Please use attach_continuous_stream_stage instead.

◆ make_sink() [1/2]

template<class In , class Init , class Fun , class Finalize = unit_t, class Trait = stream_sink_trait_t<Fun>>
make_sink_result< In > caf::scheduled_actor::make_sink ( const stream< In > & in,
Init init,
Fun fun,
Finalize fin = {} )
Deprecated
Please use attach_stream_sink instead.

◆ make_sink() [2/2]

template<class Driver , class... Ts>
make_sink_result< typename Driver::input_type > caf::scheduled_actor::make_sink ( const stream< typename Driver::input_type > & src,
Ts &&... xs )
Deprecated
Please use attach_stream_sink instead.

◆ make_source() [1/5]

template<class ActorHandle , class Init , class Pull , class Done , class Finalize = unit_t, class DownstreamManager = default_downstream_manager_t<Pull>, class Trait = stream_source_trait_t<Pull>>
detail::enable_if_t< is_actor_handle< ActorHandle >::value &&Trait::valid, make_source_result_t< DownstreamManager > > caf::scheduled_actor::make_source ( const ActorHandle & dest,
Init init,
Pull pull,
Done done,
Finalize fin = {},
policy::arg< DownstreamManager > token = {} )
Deprecated
Please use attach_stream_source instead.

◆ make_source() [2/5]

template<class ActorHandle , class... Ts, class Init , class Pull , class Done , class Finalize = unit_t, class DownstreamManager = default_downstream_manager_t<Pull>, class Trait = stream_source_trait_t<Pull>>
detail::enable_if_t< is_actor_handle< ActorHandle >::value, make_source_result_t< DownstreamManager > > caf::scheduled_actor::make_source ( const ActorHandle & dest,
std::tuple< Ts... > xs,
Init init,
Pull pull,
Done done,
Finalize fin = {},
policy::arg< DownstreamManager > = {} )
Deprecated
Please use attach_stream_source instead.

◆ make_source() [3/5]

template<class Init , class Pull , class Done , class Finalize = unit_t, class DownstreamManager = default_downstream_manager_t<Pull>, class Trait = stream_source_trait_t<Pull>>
detail::enable_if_t<!is_actor_handle< Init >::value &&Trait::valid, make_source_result_t< DownstreamManager > > caf::scheduled_actor::make_source ( Init init,
Pull pull,
Done done,
Finalize finalize = {},
policy::arg< DownstreamManager > token = {} )
Deprecated
Please use attach_stream_source instead.

◆ make_source() [4/5]

template<class Driver , class... Ts, class Init , class Pull , class Done , class Finalize = unit_t>
make_source_result_t< typename Driver::downstream_manager_type, Ts... > caf::scheduled_actor::make_source ( std::tuple< Ts... > xs,
Init init,
Pull pull,
Done done,
Finalize fin = {} )
Deprecated
Please use attach_stream_source instead.

◆ make_source() [5/5]

template<class... Ts, class Init , class Pull , class Done , class Finalize = unit_t, class DownstreamManager = broadcast_downstream_manager< typename stream_source_trait_t<Pull>::output>>
make_source_result_t< DownstreamManager, Ts... > caf::scheduled_actor::make_source ( std::tuple< Ts... > xs,
Init init,
Pull pull,
Done done,
Finalize fin = {},
policy::arg< DownstreamManager > = {} )
Deprecated
Please use attach_stream_source instead.

◆ make_stage() [1/3]

template<class In , class Init , class Fun , class Finalize = unit_t, class DownstreamManager = default_downstream_manager_t<Fun>, class Trait = stream_stage_trait_t<Fun>>
make_stage_result_t< In, DownstreamManager > caf::scheduled_actor::make_stage ( const stream< In > & in,
Init init,
Fun fun,
Finalize fin = {},
policy::arg< DownstreamManager > token = {} )
Deprecated
Please use attach_stream_stage instead.

◆ make_stage() [2/3]

template<class In , class... Ts, class Init , class Fun , class Finalize = unit_t, class DownstreamManager = default_downstream_manager_t<Fun>, class Trait = stream_stage_trait_t<Fun>>
make_stage_result_t< In, DownstreamManager, Ts... > caf::scheduled_actor::make_stage ( const stream< In > & in,
std::tuple< Ts... > xs,
Init init,
Fun fun,
Finalize fin = {},
policy::arg< DownstreamManager > token = {} )
Deprecated
Please use attach_stream_stage instead.

◆ make_stage() [3/3]

template<class Driver , class In , class... Ts, class... Us>
make_stage_result_t< In, typename Driver::downstream_manager_type, Ts... > caf::scheduled_actor::make_stage ( const stream< In > & src,
std::tuple< Ts... > xs,
Us &&... ys )
Deprecated
Please use attach_stream_stage instead.

◆ proxy_registry_ptr()

virtual proxy_registry * caf::scheduled_actor::proxy_registry_ptr ( )
virtual

Returns a factory for proxies created and managed by this actor or nullptr.

Reimplemented in caf::io::basp_broker.

◆ quit()

void caf::scheduled_actor::quit ( error x = error{})

Finishes execution of this actor after any currently running message handler is done.

This member function clears the behavior stack of the running actor and invokes on_exit(). The actors does not finish execution if the implementation of on_exit() sets a new behavior. When setting a new behavior in on_exit(), one has to make sure to not produce an infinite recursion.

If on_exit() did not set a new behavior, the actor sends an exit message to all of its linked actors, sets its state to exited and finishes execution.

In case this actor uses the blocking API, this member function unwinds the stack by throwing an actor_exited exception.

Warning
This member function throws immediately in thread-based actors that do not use the behavior stack, i.e., actors that use blocking API calls such as receive().
Examples
dancing_kirby.cpp.

◆ resume()

resume_result caf::scheduled_actor::resume ( execution_unit * ,
size_t max_throughput )
overridevirtual

Resume any pending computation until it is either finished or needs to be re-scheduled later.

Implements caf::resumable.

◆ set_exception_handler() [1/2]

void caf::scheduled_actor::set_exception_handler ( exception_handler fun)

Sets a custom exception handler for this actor.

If multiple handlers are defined, only the functor that was added last is being executed.

◆ set_exception_handler() [2/2]

template<class F >
std::enable_if< std::is_convertible< F, std::function< error(std::exception_ptr &)> >::value >::type caf::scheduled_actor::set_exception_handler ( F f)

Sets a custom exception handler for this actor.

If multiple handlers are defined, only the functor that was added last is being executed.

◆ subtype()

subtype_t caf::scheduled_actor::subtype ( ) const
overridevirtual

Returns a subtype hint for this object.

This allows an execution unit to limit processing to a specific set of resumables and delegate other subtypes to dedicated workers.

Reimplemented from caf::resumable.


The documentation for this class was generated from the following file: