CAF 0.17.6
|
A broker implementation for the Binary Actor System Protocol (BASP). More...
#include <basp_broker.hpp>
Public Types | |
using | super = broker |
using | ctx_map = std::unordered_map<connection_handle, basp::endpoint_context> |
using | monitored_actor_map |
![]() | |
using | super |
using | signatures = none_t |
![]() | |
enum class | message_category { ordinary , internal , skipped } |
Categorizes incoming messages. More... | |
enum class | activation_result { success , terminated , skipped , dropped } |
Result of one-shot activations. More... | |
using | super = local_actor |
Base type. | |
using | stream_manager_map = std::map<stream_slot, stream_manager_ptr> |
Maps slot IDs to stream managers. | |
using | normal_queue = intrusive::drr_cached_queue<policy::normal_messages> |
Stores asynchronous messages with default priority. | |
using | urgent_queue = intrusive::drr_cached_queue<policy::urgent_messages> |
Stores asynchronous messages with hifh priority. | |
using | upstream_queue = intrusive::drr_queue<policy::upstream_messages> |
Stores upstream messages. | |
using | downstream_queue |
Stores downstream messages. | |
using | mailbox_type = intrusive::fifo_inbox<mailbox_policy> |
A queue optimized for single-reader-many-writers. | |
using | pending_response = std::pair<const message_id, behavior> |
The message ID of an outstanding response with its callback. | |
using | pointer = scheduled_actor* |
A pointer to a scheduled actor. | |
using | default_handler |
Function object for handling unmatched messages. | |
using | error_handler = std::function<void (pointer, error&)> |
Function object for handling error messages. | |
using | down_handler = std::function<void (pointer, down_msg&)> |
Function object for handling down messages. | |
using | exit_handler = std::function<void (pointer, exit_msg&)> |
Function object for handling exit messages. | |
using | exception_handler = std::function<error (pointer, std::exception_ptr&)> |
Function object for handling exit messages. | |
![]() | |
enum | resume_result { resume_later , awaiting_message , done , shutdown_execution_unit } |
Denotes the state in which a resumable returned from its last call to resume . | |
enum | subtype_t { unspecified , scheduled_actor , io_actor , function_object } |
Denotes common subtypes of resumable . More... | |
![]() | |
using | buffer_type = std::vector<char> |
Public Member Functions | |
basp_broker (actor_config &cfg) | |
void | on_exit () override |
const char * | name () const override |
behavior | make_behavior () override |
proxy_registry * | proxy_registry_ptr () override |
Returns a factory for proxies created and managed by this actor or nullptr . | |
resume_result | resume (execution_unit *, size_t) override |
Resume any pending computation until it is either finished or needs to be re-scheduled later. | |
strong_actor_ptr | make_proxy (node_id nid, actor_id aid) override |
Creates a new proxy instance. | |
void | set_last_hop (node_id *ptr) override |
Sets the thread-local last-hop pointer to detect indirect connections. | |
void | finalize_handshake (const node_id &nid, actor_id aid, std::set< std::string > &sigs) override |
Called if a server handshake was received and the connection to nid is established. | |
void | purge_state (const node_id &nid) override |
Called whenever a direct connection was closed or a node became unrechable for other reasons before this node gets erased from the routing table. | |
void | proxy_announced (const node_id &nid, actor_id aid) override |
Called whenever a remote node created a proxy for one of our local actors. | |
void | learned_new_node_directly (const node_id &nid, bool was_indirectly_before) override |
Called whenever BASP learns the ID of a remote node to which it does not have a direct connection. | |
void | learned_new_node_indirectly (const node_id &nid) override |
Called whenever BASP learns the ID of a remote node to which it does not have a direct connection. | |
buffer_type & | get_buffer (connection_handle hdl) override |
Returns a reference to the sent buffer. | |
void | flush (connection_handle hdl) override |
Flushes the underlying write buffer of hdl . | |
void | handle_heartbeat () override |
Called if a heartbeat was received from nid | |
execution_unit * | current_execution_unit () override |
Returns the current CAF scheduler context. | |
strong_actor_ptr | this_actor () override |
Returns a handle to the callee actor. | |
void | learned_new_node (const node_id &nid) |
Performs bookkeeping such as managing spawn_servers . | |
void | set_context (connection_handle hdl) |
Sets this_context by either creating or accessing state for hdl . | |
void | connection_cleanup (connection_handle hdl, sec code) |
Cleans up any state for hdl . | |
void | send_basp_down_message (const node_id &nid, actor_id aid, error err) |
Sends a basp::down_message message to a remote node. | |
void | handle_down_msg (down_msg &) |
actor_system & | system () |
const actor_system_config & | config () |
const node_id & | this_node () const |
Returns the node identifier of the underlying BASP instance. | |
![]() | |
template<class F , class... Ts> | |
infer_handle_from_fun< F >::type | fork (F fun, connection_handle hdl, Ts &&... xs) |
void | initialize () override |
broker (actor_config &cfg) | |
![]() | |
void | enqueue (mailbox_element_ptr, execution_unit *) override |
void | enqueue (strong_actor_ptr, message_id, message, execution_unit *) override |
void | launch (execution_unit *eu, bool lazy, bool hide) override |
bool | cleanup (error &&reason, execution_unit *host) override |
template<class Handle > | |
void | halt (Handle hdl) |
Suspends activities on hdl unconditionally. | |
template<class Handle > | |
void | trigger (Handle hdl) |
Allows activities on hdl unconditionally (default). | |
template<class Handle > | |
void | trigger (Handle hdl, size_t num_events) |
Allows num_events activities on hdl . | |
void | configure_read (connection_handle hdl, receive_policy::config cfg) |
Modifies the receive policy for a given connection. | |
void | ack_writes (connection_handle hdl, bool enable) |
Enables or disables write notifications for a given connection. | |
std::vector< char > & | wr_buf (connection_handle hdl) |
Returns the write buffer for a given connection. | |
void | write (connection_handle hdl, size_t bs, const void *buf) |
Writes data into the buffer for a given connection. | |
void | flush (connection_handle hdl) |
Sends the content of the buffer for a given connection. | |
void | ack_writes (datagram_handle hdl, bool enable) |
Enables or disables write notifications for a given datagram socket. | |
std::vector< char > & | wr_buf (datagram_handle hdl) |
Returns the write buffer for a given sink. | |
void | enqueue_datagram (datagram_handle, std::vector< char >) |
Enqueue a buffer to be sent as a datagram via a given endpoint. | |
void | write (datagram_handle hdl, size_t data_size, const void *data) |
Writes data into the buffer of a given sink. | |
void | flush (datagram_handle hdl) |
Sends the content of the buffer to a UDP endpoint. | |
middleman & | parent () |
Returns the middleman instance this broker belongs to. | |
void | add_scribe (scribe_ptr ptr) |
Adds the unitialized scribe instance ptr to this broker. | |
connection_handle | add_scribe (network::native_socket fd) |
Creates and assigns a new scribe from given native socked fd . | |
expected< connection_handle > | add_tcp_scribe (const std::string &host, uint16_t port) |
Tries to connect to host on given port and creates a new scribe describing the connection afterwards. | |
void | move_scribe (scribe_ptr ptr) |
Moves the initialized scribe instance ptr from another broker to this broker. | |
void | add_doorman (doorman_ptr ptr) |
Adds a doorman instance to this broker. | |
accept_handle | add_doorman (network::native_socket fd) |
Creates and assigns a new doorman from given native socked fd . | |
void | move_doorman (doorman_ptr ptr) |
Adds a doorman instance to this broker. | |
expected< std::pair< accept_handle, uint16_t > > | add_tcp_doorman (uint16_t port=0, const char *in=nullptr, bool reuse_addr=false) |
Tries to open a local port and creates a doorman managing it on success. | |
void | add_datagram_servant (datagram_servant_ptr ptr) |
Adds a datagram_servant to this broker. | |
void | add_hdl_for_datagram_servant (datagram_servant_ptr ptr, datagram_handle hdl) |
Adds the datagram_servant under an additional hdl . | |
datagram_handle | add_datagram_servant (network::native_socket fd) |
Creates and assigns a new datagram_servant from a given socket fd . | |
datagram_handle | add_datagram_servant_for_endpoint (network::native_socket fd, const network::ip_endpoint &ep) |
Creates and assigns a new datagram_servant from a given socket fd for the remote endpoint ep . | |
expected< datagram_handle > | add_udp_datagram_servant (const std::string &host, uint16_t port) |
Creates a new datagram_servant for the remote endpoint host and port . | |
expected< std::pair< datagram_handle, uint16_t > > | add_udp_datagram_servant (uint16_t port=0, const char *in=nullptr, bool reuse_addr=false) |
Tries to open a local port and creates a datagram_servant managing it on success. | |
void | move_datagram_servant (datagram_servant_ptr ptr) |
Moves an initialized datagram_servant instance ptr from another broker to this one. | |
std::string | remote_addr (connection_handle hdl) |
Returns the remote address associated with hdl or empty string if hdl is invalid. | |
uint16_t | remote_port (connection_handle hdl) |
Returns the remote port associated with hdl or 0 if hdl is invalid. | |
std::string | local_addr (accept_handle hdl) |
Returns the local address associated with hdl or empty string if hdl is invalid. | |
uint16_t | local_port (accept_handle hdl) |
Returns the local port associated with hdl or 0 if hdl is invalid. | |
accept_handle | hdl_by_port (uint16_t port) |
Returns the handle associated with given local port or none . | |
datagram_handle | datagram_hdl_by_port (uint16_t port) |
Returns the dgram handle associated with given local port or none . | |
std::string | remote_addr (datagram_handle hdl) |
Returns the remote address associated with hdl or an empty string if hdl is invalid. | |
uint16_t | remote_port (datagram_handle hdl) |
Returns the remote port associated with hdl or 0 if hdl is invalid. | |
uint16_t | local_port (datagram_handle hdl) |
Returns the remote port associated with hdl or 0 if hdl is invalid. | |
bool | remove_endpoint (datagram_handle hdl) |
Remove the endpoint hdl from the broker. | |
void | close_all () |
Closes all connections and acceptors. | |
template<class Handle > | |
bool | close (Handle hdl) |
Closes the connection or acceptor identified by handle . | |
template<class Handle > | |
bool | valid (Handle hdl) |
Checks whether hdl is assigned to broker. | |
const char * | name () const override |
subtype_t | subtype () const override |
Returns a subtype hint for this object. | |
size_t | num_connections () const |
Returns the number of open connections. | |
std::vector< connection_handle > | connections () const |
Returns all handles of all scribe instances attached to this broker. | |
network::multiplexer & | backend () |
Returns the multiplexer running this broker. | |
![]() | |
scheduled_actor (actor_config &cfg) | |
void | enqueue (mailbox_element_ptr ptr, execution_unit *eu) override |
mailbox_element * | peek_at_next_mailbox_element () override |
const char * | name () const override |
void | launch (execution_unit *eu, bool lazy, bool hide) override |
bool | cleanup (error &&fail_state, execution_unit *host) override |
subtype_t | subtype () const override |
Returns a subtype hint for this object. | |
void | intrusive_ptr_add_ref_impl () override |
Add a strong reference count to this object. | |
void | intrusive_ptr_release_impl () override |
Remove a strong reference count from this object. | |
resume_result | resume (execution_unit *, size_t) override |
Resume any pending computation until it is either finished or needs to be re-scheduled later. | |
void | quit (error x=error{}) |
Finishes execution of this actor after any currently running message handler is done. | |
mailbox_type & | mailbox () noexcept |
Returns the queue for storing incoming messages. | |
stream_manager_map & | stream_managers () noexcept |
Returns map for all active streams. | |
stream_manager_map & | pending_stream_managers () noexcept |
Returns map for all pending streams. | |
void | set_default_handler (default_handler fun) |
Sets a custom handler for unexpected messages. | |
template<class F > | |
std::enable_if< std::is_convertible< F, std::function< result< message >(type_erased_tuple &)> >::value >::type | set_default_handler (F fun) |
Sets a custom handler for unexpected messages. | |
void | set_error_handler (error_handler fun) |
Sets a custom handler for error messages. | |
template<class T > | |
auto | set_error_handler (T fun) -> decltype(fun(std::declval< error & >())) |
Sets a custom handler for error messages. | |
void | set_down_handler (down_handler fun) |
Sets a custom handler for down messages. | |
template<class T > | |
auto | set_down_handler (T fun) -> decltype(fun(std::declval< down_msg & >())) |
Sets a custom handler for down messages. | |
void | set_exit_handler (exit_handler fun) |
Sets a custom handler for error messages. | |
template<class T > | |
auto | set_exit_handler (T fun) -> decltype(fun(std::declval< exit_msg & >())) |
Sets a custom handler for exit messages. | |
void | set_exception_handler (exception_handler fun) |
Sets a custom exception handler for this actor. | |
template<class F > | |
std::enable_if< std::is_convertible< F, std::function< error(std::exception_ptr &)> >::value >::type | set_exception_handler (F f) |
Sets a custom exception handler for this actor. | |
template<class Driver , class... Ts, class Init , class Pull , class Done , class Finalize = unit_t> | |
make_source_result_t< typename Driver::downstream_manager_type, Ts... > | make_source (std::tuple< Ts... > xs, Init init, Pull pull, Done done, Finalize fin={}) |
template<class... Ts, class Init , class Pull , class Done , class Finalize = unit_t, class DownstreamManager = broadcast_downstream_manager< typename stream_source_trait_t<Pull>::output>> | |
make_source_result_t< DownstreamManager, Ts... > | make_source (std::tuple< Ts... > xs, Init init, Pull pull, Done done, Finalize fin={}, policy::arg< DownstreamManager >={}) |
template<class Init , class Pull , class Done , class Finalize = unit_t, class DownstreamManager = default_downstream_manager_t<Pull>, class Trait = stream_source_trait_t<Pull>> | |
detail::enable_if_t<!is_actor_handle< Init >::value &&Trait::valid, make_source_result_t< DownstreamManager > > | make_source (Init init, Pull pull, Done done, Finalize finalize={}, policy::arg< DownstreamManager > token={}) |
template<class ActorHandle , class... Ts, class Init , class Pull , class Done , class Finalize = unit_t, class DownstreamManager = default_downstream_manager_t<Pull>, class Trait = stream_source_trait_t<Pull>> | |
detail::enable_if_t< is_actor_handle< ActorHandle >::value, make_source_result_t< DownstreamManager > > | make_source (const ActorHandle &dest, std::tuple< Ts... > xs, Init init, Pull pull, Done done, Finalize fin={}, policy::arg< DownstreamManager >={}) |
template<class ActorHandle , class Init , class Pull , class Done , class Finalize = unit_t, class DownstreamManager = default_downstream_manager_t<Pull>, class Trait = stream_source_trait_t<Pull>> | |
detail::enable_if_t< is_actor_handle< ActorHandle >::value &&Trait::valid, make_source_result_t< DownstreamManager > > | make_source (const ActorHandle &dest, Init init, Pull pull, Done done, Finalize fin={}, policy::arg< DownstreamManager > token={}) |
template<class Driver , class Init , class Pull , class Done , class Finalize = unit_t> | |
Driver::source_ptr_type | make_continuous_source (Init init, Pull pull, Done done, Finalize fin={}) |
template<class Init , class Pull , class Done , class Finalize = unit_t, class DownstreamManager = broadcast_downstream_manager< typename stream_source_trait_t<Pull>::output>> | |
stream_source_ptr< DownstreamManager > | make_continuous_source (Init init, Pull pull, Done done, Finalize fin={}, policy::arg< DownstreamManager >={}) |
template<class Driver , class... Ts> | |
make_sink_result< typename Driver::input_type > | make_sink (const stream< typename Driver::input_type > &src, Ts &&... xs) |
template<class In , class Init , class Fun , class Finalize = unit_t, class Trait = stream_sink_trait_t<Fun>> | |
make_sink_result< In > | make_sink (const stream< In > &in, Init init, Fun fun, Finalize fin={}) |
template<class Driver , class In , class... Ts, class... Us> | |
make_stage_result_t< In, typename Driver::downstream_manager_type, Ts... > | make_stage (const stream< In > &src, std::tuple< Ts... > xs, Us &&... ys) |
template<class In , class... Ts, class Init , class Fun , class Finalize = unit_t, class DownstreamManager = default_downstream_manager_t<Fun>, class Trait = stream_stage_trait_t<Fun>> | |
make_stage_result_t< In, DownstreamManager, Ts... > | make_stage (const stream< In > &in, std::tuple< Ts... > xs, Init init, Fun fun, Finalize fin={}, policy::arg< DownstreamManager > token={}) |
template<class In , class Init , class Fun , class Finalize = unit_t, class DownstreamManager = default_downstream_manager_t<Fun>, class Trait = stream_stage_trait_t<Fun>> | |
make_stage_result_t< In, DownstreamManager > | make_stage (const stream< In > &in, Init init, Fun fun, Finalize fin={}, policy::arg< DownstreamManager > token={}) |
template<class Driver , class... Ts> | |
Driver::stage_ptr_type | make_continuous_stage (Ts &&... xs) |
template<class Init , class Fun , class Cleanup , class DownstreamManager = default_downstream_manager_t<Fun>, class Trait = stream_stage_trait_t<Fun>> | |
stream_stage_ptr< typename Trait::input, DownstreamManager > | make_continuous_stage (Init init, Fun fun, Cleanup cleanup, policy::arg< DownstreamManager > token={}) |
void | enqueue (strong_actor_ptr sender, message_id mid, message msg, execution_unit *host) override |
![]() | |
![]() | |
![]() | |
callee (actor_system &sys, proxy_registry::backend &backend) | |
proxy_registry & | proxies () |
Returns the actor namespace associated to this BASP protocol instance. | |
Public Attributes | ||
union { | ||
basp::instance instance | ||
}; | ||
Protocol instance of BASP. | ||
ctx_map | ctx | |
Keeps context information for all open connections. | ||
basp::endpoint_context * | this_context | |
points to the current context for callbacks. | ||
std::unordered_map< node_id, actor > | spawn_servers | |
Stores handles to spawn servers for other nodes. | ||
bool | automatic_connections = false | |
Configures whether BASP automatically open new connections to optimize routing paths by forming a mesh between all nodes. | ||
monitored_actor_map | monitored_actors | |
Keeps track of nodes that monitor local actors. | ||
Additional Inherited Members | |
![]() | |
static void | default_error_handler (pointer ptr, error &x) |
static void | default_down_handler (pointer ptr, down_msg &x) |
static void | default_exit_handler (pointer ptr, exit_msg &x) |
static error | default_exception_handler (pointer ptr, std::exception_ptr &x) |
![]() | |
static constexpr size_t | urgent_queue_index = 0 |
static constexpr size_t | normal_queue_index = 1 |
static constexpr size_t | upstream_queue_index = 2 |
static constexpr size_t | downstream_queue_index = 3 |
![]() | |
using | doorman_map = std::unordered_map<accept_handle, intrusive_ptr<doorman>> |
using | scribe_map |
using | datagram_servant_map = std::unordered_map<datagram_handle, intrusive_ptr<datagram_servant>> |
![]() | |
![]() | |
void | init_broker () |
abstract_broker (actor_config &cfg) | |
template<class Handle > | |
auto | by_id (Handle hdl) -> optional< decltype(*ptr_of(hdl))> |
Returns a scribe or doorman identified by hdl . | |
![]() | |
proxy_registry | namespace_ |
![]() | |
result< message > | reflect (scheduled_actor *, message_view &) |
Default handler function that sends the message back to the sender. | |
result< message > | reflect_and_quit (scheduled_actor *, message_view &) |
Default handler function that sends the message back to the sender and then quits. | |
result< message > | print_and_drop (scheduled_actor *, message_view &) |
Default handler function that prints messages message via aout and drops them afterwards. | |
result< message > | drop (scheduled_actor *, message_view &) |
Default handler function that simply drops messages. | |
A broker implementation for the Binary Actor System Protocol (BASP).
using caf::io::basp_broker::monitored_actor_map |
|
overridevirtual |
Returns the current CAF scheduler context.
Implements caf::io::basp::instance::callee.
|
overridevirtual |
Called if a server handshake was received and the connection to nid
is established.
Implements caf::io::basp::instance::callee.
|
overridevirtual |
Flushes the underlying write buffer of hdl
.
Implements caf::io::basp::instance::callee.
|
overridevirtual |
Returns a reference to the sent buffer.
Implements caf::io::basp::instance::callee.
|
overridevirtual |
Called if a heartbeat was received from nid
Implements caf::io::basp::instance::callee.
|
overridevirtual |
Called whenever BASP learns the ID of a remote node to which it does not have a direct connection.
Implements caf::io::basp::instance::callee.
|
overridevirtual |
Called whenever BASP learns the ID of a remote node to which it does not have a direct connection.
Implements caf::io::basp::instance::callee.
|
overridevirtual |
Reimplemented from caf::io::broker.
|
overridevirtual |
Creates a new proxy instance.
Implements caf::proxy_registry::backend.
|
overridevirtual |
Called whenever a remote node created a proxy for one of our local actors.
Implements caf::io::basp::instance::callee.
|
overridevirtual |
Returns a factory for proxies created and managed by this actor or nullptr
.
Reimplemented from caf::scheduled_actor.
|
overridevirtual |
Called whenever a direct connection was closed or a node became unrechable for other reasons before this node gets erased from the routing table.
Implements caf::io::basp::instance::callee.
|
overridevirtual |
Resume any pending computation until it is either finished or needs to be re-scheduled later.
Reimplemented from caf::io::abstract_broker.
void caf::io::basp_broker::set_context | ( | connection_handle | hdl | ) |
Sets this_context
by either creating or accessing state for hdl
.
Automatically sets endpoint_context::last_seen
to clock().now()
.
|
overridevirtual |
Sets the thread-local last-hop pointer to detect indirect connections.
Implements caf::proxy_registry::backend.
|
overridevirtual |
Returns a handle to the callee actor.
Implements caf::io::basp::instance::callee.
Stores handles to spawn servers for other nodes.
These servers are spawned whenever the broker learns a new node ID and tries to get a 'SpawnServ' instance on the remote side.