# You may distribute under the terms of either the GNU General Public License # or the Artistic License (the same terms as Perl itself) # # (C) Paul Evans, 2012-2024 -- leonerd@leonerd.org.uk package IO::Async::Routine 0.803; use v5.14; use warnings; use base qw( IO::Async::Notifier ); use Carp; use IO::Async::OS; use IO::Async::Process; use Struct::Dumb qw( readonly_struct ); =head1 NAME C - execute code in an independent sub-process or thread =head1 SYNOPSIS use IO::Async::Routine; use IO::Async::Channel; use IO::Async::Loop; my $loop = IO::Async::Loop->new; my $nums_ch = IO::Async::Channel->new; my $ret_ch = IO::Async::Channel->new; my $routine = IO::Async::Routine->new( channels_in => [ $nums_ch ], channels_out => [ $ret_ch ], code => sub { my @nums = @{ $nums_ch->recv }; my $ret = 0; $ret += $_ for @nums; # Can only send references $ret_ch->send( \$ret ); }, on_finish => sub { say "The routine aborted early - $_[-1]"; $loop->stop; }, ); $loop->add( $routine ); $nums_ch->send( [ 10, 20, 30 ] ); $ret_ch->recv( on_recv => sub { my ( $ch, $totalref ) = @_; say "The total of 10, 20, 30 is: $$totalref"; $loop->stop; } ); $loop->run; =head1 DESCRIPTION This L contains a body of code and executes it in a sub-process or thread, allowing it to act independently of the main program. Once set up, all communication with the code happens by values passed into or out of the Routine via L objects. The code contained within the Routine is free to make blocking calls without stalling the rest of the program. This makes it useful for using existing code which has no option not to block within an L-based program. To create asynchronous wrappers of functions that return a value based only on their arguments, and do not generally maintain state within the process it may be more convenient to use an L instead, which uses an C to contain the body of the function and manages the Channels itself. =head2 Models A choice of detachment model is available. Each has various advantages and disadvantages. Not all of them may be available on a particular system. =head3 The C model The code in this model runs within its own process, created by calling C from the main process. It is isolated from the rest of the program in terms of memory, CPU time, and other resources. Because it is started using C, the initial process state is a clone of the main process. This model performs well on UNIX-like operating systems which possess a true native C system call, but is not available on C for example, because the operating system does not provide full fork-like semantics. =head3 The C model The code in this model runs inside a separate thread within the main process. It therefore shares memory and other resources such as open filehandles with the main thread. As with the C model, the initial thread state is cloned from the main controlling thread. This model is only available on perls built to support threading. =head3 The C model I The code in this model runs within its own freshly-created process running another copy of the perl interpreter. Similar to the C model it therefore has its own memory, CPU time, and other resources. However, since it is started freshly rather than by cloning the main process, it starts up in a clean state, without any shared resources from its parent. Since this model creates a new fresh process rather than sharing existing state, it cannot use the C argument to specify the routine body; it must instead use only the C and C arguments. In the current implementation this model requires exactly one input channel and exactly one output channel; both must be present, and there cannot be more than one of either. =cut =head1 EVENTS =head2 on_finish $exitcode For C-based Routines, this is invoked after the process has exited and is passed the raw exitcode status. =head2 on_finish $type, @result For thread-based Routines, this is invoked after the thread has returned from its code block and is passed the C result. As the behaviour of these events differs per model, it may be more convenient to use C and C instead. =head2 on_return $result Invoked if the code block returns normally. Note that C-based Routines can only transport an integer result between 0 and 255, as this is the actual C value. =head2 on_die $exception Invoked if the code block fails with an exception. =cut =head1 PARAMETERS The following named parameters may be passed to C or C: =head2 model => "fork" | "thread" | "spawn" Optional. Defines how the routine will detach itself from the main process. See the L section above for more detail. If the model is not specified, the environment variable C is used to pick a default. If that isn't defined, C is preferred if it is available, otherwise C. =head2 channels_in => ARRAY of IO::Async::Channel ARRAY reference of L objects to set up for passing values in to the Routine. =head2 channels_out => ARRAY of IO::Async::Channel ARRAY reference of L objects to set up for passing values out of the Routine. =head2 code => CODE CODE reference to the body of the Routine, to execute once the channels are set up. When using the C model, this is not permitted; you must use C and C instead. =head2 module => STRING =head2 func => STRING I An alternative to the C argument, which names a module to load and a function to call within it. C should give a perl module name (i.e. C, not a filename like F), and C should give the basename of a function within that module (i.e. without the module name prefixed). It will be invoked as the main code body of the object, and passed in a list of all the channels; first the input ones then the output ones. module::func( @channels_in, @channels_out ); =head2 setup => ARRAY Optional. For C-based Routines, gives a reference to an array to pass to the underlying C C method. Ignored for thread-based Routines. =cut use constant PREFERRED_MODEL => IO::Async::OS->HAVE_POSIX_FORK ? "fork" : IO::Async::OS->HAVE_THREADS ? "thread" : die "No viable Routine models"; sub _init { my $self = shift; my ( $params ) = @_; $params->{model} ||= $ENV{IO_ASYNC_ROUTINE_MODEL} || PREFERRED_MODEL; $self->SUPER::_init( @_ ); } my %SETUP_CODE; sub configure { my $self = shift; my %params = @_; # TODO: Can only reconfigure when not running foreach (qw( channels_in channels_out code module func setup on_finish on_return on_die )) { $self->{$_} = delete $params{$_} if exists $params{$_}; } defined $self->{code} and defined $self->{func} and croak "Cannot ->configure both 'code' and 'func'"; defined $self->{func} and !defined $self->{module} and croak "'func' parameter requires a 'module' as well"; if( defined( my $model = delete $params{model} ) ) { ( $SETUP_CODE{$model} ||= $self->can( "_setup_$model" ) ) or die "Unrecognised Routine model $model"; # TODO: optional plugin "configure" check here? $model eq "fork" and !IO::Async::OS->HAVE_POSIX_FORK and croak "Cannot use 'fork' model as fork() is not available"; $model eq "thread" and !IO::Async::OS->HAVE_THREADS and croak "Cannot use 'thread' model as threads are not available"; $self->{model} = $model; } $self->SUPER::configure( %params ); } sub _add_to_loop { my $self = shift; my ( $loop ) = @_; $self->SUPER::_add_to_loop( $loop ); my $model = $self->{model}; my $code = ( $SETUP_CODE{$model} ||= $self->can( "_setup_$model" ) ) or die "Unrecognised Routine model $model"; $self->$code(); } readonly_struct ChannelSetup => [qw( chan myfd otherfd )]; sub _create_channels_in { my $self = shift; my @channels_in; foreach my $ch ( @{ $self->{channels_in} || [] } ) { my ( $rd, $wr ); unless( $rd = $ch->_extract_read_handle ) { ( $rd, $wr ) = IO::Async::OS->pipepair; } push @channels_in, ChannelSetup( $ch, $wr, $rd ); } return @channels_in; } sub _create_channels_out { my $self = shift; my @channels_out; foreach my $ch ( @{ $self->{channels_out} || [] } ) { my ( $rd, $wr ); unless( $wr = $ch->_extract_write_handle ) { ( $rd, $wr ) = IO::Async::OS->pipepair; } push @channels_out, ChannelSetup( $ch, $rd, $wr ); } return @channels_out; } sub _adopt_channels_in { my $self = shift; my ( @channels_in ) = @_; foreach ( @channels_in ) { my $ch = $_->chan; $ch->setup_async_mode( write_handle => $_->myfd ); $self->add_child( $ch ) unless $ch->parent; } } sub _adopt_channels_out { my $self = shift; my ( @channels_out ) = @_; foreach ( @channels_out ) { my $ch = $_->chan; $ch->setup_async_mode( read_handle => $_->myfd ); $self->add_child( $ch ) unless $ch->parent; } } sub _setup_fork { my $self = shift; my @channels_in = $self->_create_channels_in; my @channels_out = $self->_create_channels_out; my $code = $self->{code}; my $module = $self->{module}; my $func = $self->{func}; my @setup = map { $_->otherfd => "keep" } @channels_in, @channels_out; my $setup = $self->{setup}; push @setup, @$setup if $setup; my $process = IO::Async::Process->new( setup => \@setup, code => sub { foreach ( @channels_in, @channels_out ) { $_->chan->setup_sync_mode( $_->otherfd ); } if( defined $module ) { ( my $file = "$module.pm" ) =~ s{::}{/}g; require $file; $code = $module->can( $func ) or die "Module '$module' has no '$func'\n"; } my $ret = $code->( map { $_->chan } @channels_in, @channels_out ); foreach ( @channels_in, @channels_out ) { $_->chan->close; } return $ret; }, on_finish => $self->_replace_weakself( sub { my $self = shift or return; my ( $exitcode ) = @_; $self->maybe_invoke_event( on_finish => $exitcode ); unless( $exitcode & 0x7f ) { $self->maybe_invoke_event( on_return => ($exitcode >> 8) ); $self->result_future->done( $exitcode >> 8 ); } }), on_exception => $self->_replace_weakself( sub { my $self = shift or return; my ( $exception, $errno, $exitcode ) = @_; $self->maybe_invoke_event( on_die => $exception ); $self->result_future->fail( $exception, routine => ); }), ); $self->_adopt_channels_in ( @channels_in ); $self->_adopt_channels_out( @channels_out ); $self->add_child( $self->{process} = $process ); $self->{id} = "P" . $process->pid; $_->otherfd->close for @channels_in, @channels_out; } sub _setup_thread { my $self = shift; my @channels_in = $self->_create_channels_in; my @channels_out = $self->_create_channels_out; my $code = $self->{code}; my $module = $self->{module}; my $func = $self->{func}; my $tid = $self->loop->create_thread( code => sub { foreach ( @channels_in, @channels_out ) { $_->chan->setup_sync_mode( $_->otherfd ); defined and $_->close for $_->myfd; } if( defined $func ) { ( my $file = "$module.pm" ) =~ s{::}{/}g; require $file; $code = $module->can( $func ) or die "Module '$module' has no '$func'\n"; } my $ret = $code->( map { $_->chan } @channels_in, @channels_out ); foreach ( @channels_in, @channels_out ) { $_->chan->close; } return $ret; }, on_joined => $self->_capture_weakself( sub { my $self = shift or return; my ( $ev, @result ) = @_; $self->maybe_invoke_event( on_finish => @_ ); if( $ev eq "return" ) { $self->maybe_invoke_event( on_return => @result ); $self->result_future->done( @result ); } if( $ev eq "died" ) { $self->maybe_invoke_event( on_die => $result[0] ); $self->result_future->fail( $result[0], routine => ); } delete $self->{tid}; }), ); $self->{tid} = $tid; $self->{id} = "T" . $tid; $self->_adopt_channels_in ( @channels_in ); $self->_adopt_channels_out( @channels_out ); $_->otherfd->close for @channels_in, @channels_out; } # The injected program that goes into spawn mode use constant PERL_RUNNER => <<'EOF'; ( my ( $module, $func ), @INC ) = @ARGV; ( my $file = "$module.pm" ) =~ s{::}{/}g; require $file; my $code = $module->can( $func ) or die "Module '$module' has no '$func'\n"; require IO::Async::Channel; exit $code->( IO::Async::Channel->new_stdin, IO::Async::Channel->new_stdout ); EOF sub _setup_spawn { my $self = shift; $self->{code} and die "Cannot run IO::Async::Routine in 'spawn' with code\n"; @{ $self->{channels_in} } == 1 or die "IO::Async::Routine in 'spawn' mode requires exactly one input channel\n"; @{ $self->{channels_out} } == 1 or die "IO::Async::Routine in 'spawn' mode requires exactly one output channel\n"; my @channels_in = $self->_create_channels_in; my @channels_out = $self->_create_channels_out; my $module = $self->{module}; my $func = $self->{func}; my $process = IO::Async::Process->new( setup => [ stdin => $channels_in[0]->otherfd, stdout => $channels_out[0]->otherfd, ], command => [ $^X, "-E", PERL_RUNNER, $module, $func, grep { !ref } @INC ], on_finish => $self->_replace_weakself( sub { my $self = shift or return; my ( $exitcode ) = @_; $self->maybe_invoke_event( on_finish => $exitcode ); unless( $exitcode & 0x7f ) { $self->maybe_invoke_event( on_return => ($exitcode >> 8) ); $self->result_future->done( $exitcode >> 8 ); } }), on_exception => $self->_replace_weakself( sub { my $self = shift or return; my ( $exception, $errno, $exitcode ) = @_; $self->maybe_invoke_event( on_die => $exception ); $self->result_future->fail( $exception, routine => ); }), ); $self->_adopt_channels_in ( @channels_in ); $self->_adopt_channels_out( @channels_out ); $self->add_child( $self->{process} = $process ); $self->{id} = "P" . $process->pid; $_->otherfd->close for @channels_in, @channels_out; } =head1 METHODS =cut =head2 id $id = $routine->id; Returns an ID string that uniquely identifies the Routine out of all the currently-running ones. (The ID of already-exited Routines may be reused, however.) =cut sub id { my $self = shift; return $self->{id}; } =head2 model $model = $routine->model; Returns the detachment model in use by the Routine. =cut sub model { my $self = shift; return $self->{model}; } =head2 kill $routine->kill( $signal ); Sends the specified signal to the routine code. This is either implemented by C or C as required. Note that in the thread case this has the usual limits of signal delivery to threads; namely, that it works at the Perl interpreter level, and cannot actually interrupt blocking system calls. =cut sub kill { my $self = shift; my ( $signal ) = @_; $self->{process}->kill( $signal ) if $self->{model} eq "fork"; threads->object( $self->{tid} )->kill( $signal ) if $self->{model} eq "thread"; } =head2 result_future $f = $routine->result_future; I Returns a new C which will complete with the eventual return value or exception when the routine finishes. If the routine finishes with a successful result then this will be the C result of the future. If the routine fails with an exception then this will be the C result. =cut sub result_future { my $self = shift; return $self->{result_future} //= do { my $f = $self->loop->new_future; # This future needs to strongly retain $self to ensure it definitely gets # notified $f->on_ready( sub { undef $self } ); $f; }; } =head1 AUTHOR Paul Evans =cut 0x55AA;