# IO::Poll event loop bridge for POE::Kernel. The theory is that this # will be faster for large scale applications. This file is # contributed by Matt Sergeant (baud). # Empty package to appease perl. package POE::Loop::IO_Poll; use vars qw($VERSION); $VERSION = '1.370'; # NOTE - Should be #.### (three decimal places) # Include common signal handling. use POE::Loop::PerlSignals; # Everything plugs into POE::Kernel; package POE::Kernel; =for poe_tests sub skip_tests { return "IO::Poll is not 100% compatible with $^O" if ( $^O eq "MSWin32" and not $ENV{POE_DANTIC} ); return "IO::Poll tests require the IO::Poll module" if ( do { eval "use IO::Poll"; $@ } ); } =cut use strict; # Be sure we're using a contemporary version of IO::Poll. There were # issues with certain versions of IO::Poll prior to 0.05. The latest # version is 0.01, however. use IO::Poll 0.01; # Hand off to POE::Loop::Select if we're running under ActivePerl. BEGIN { if ($^O eq "MSWin32" and not $ENV{POE_DANTIC}) { warn "IO::Poll is defective on $^O. Falling back to IO::Select.\n"; require POE::Loop::Select; POE::Loop::Select->import(); die "not really dying"; } } use Errno qw(EINPROGRESS EWOULDBLOCK EINTR); use IO::Poll qw( POLLRDNORM POLLWRNORM POLLRDBAND POLLERR POLLHUP POLLNVAL ); # Many systems' IO::Poll don't define POLLRDNORM. # Usually upgrading IO::Poll helps. BEGIN { my $x = eval { POLLRDNORM }; if ($@ or not defined $x) { warn( "Your IO::Poll doesn't define POLLRDNORM. Falling back to IO::Select.\n" ); require POE::Loop::Select; POE::Loop::Select->import(); die "not really dying"; } } my %poll_fd_masks; # Allow $^T to change without affecting our internals. my $start_time = monotime(); #------------------------------------------------------------------------------ # Loop construction and destruction. sub loop_initialize { my $self = shift; %poll_fd_masks = (); } sub loop_finalize { my $self = shift; $self->loop_ignore_all_signals(); } #------------------------------------------------------------------------------ # Signal handler maintenance functions. sub loop_attach_uidestroy { # does nothing } #------------------------------------------------------------------------------ # Maintain time watchers. For this loop, we simply save the next # event time in a scalar. loop_do_timeslice() will use the saved # value. A "paused" time watcher is just a timeout for some future # time. my $_next_event_time = monotime(); sub loop_resume_time_watcher { $_next_event_time = $_[1]; } sub loop_reset_time_watcher { $_next_event_time = $_[1]; } sub loop_pause_time_watcher { $_next_event_time = monotime() + 3600; } # A static function; not some object method. sub mode_to_poll { return POLLRDNORM if $_[0] == MODE_RD; return POLLWRNORM if $_[0] == MODE_WR; return POLLRDBAND if $_[0] == MODE_EX; croak "unknown I/O mode $_[0]"; } #------------------------------------------------------------------------------ # Maintain filehandle watchers. sub loop_watch_filehandle { my ($self, $handle, $mode) = @_; my $fileno = fileno($handle); my $type = mode_to_poll($mode); my $current = $poll_fd_masks{$fileno} || 0; my $new = $current | $type; if (TRACE_FILES) { POE::Kernel::_warn( sprintf( " Watch $fileno: " . "Current mask: 0x%02X - including 0x%02X = 0x%02X\n", $current, $type, $new ) ); } $poll_fd_masks{$fileno} = $new; } sub loop_ignore_filehandle { my ($self, $handle, $mode) = @_; my $fileno = fileno($handle); my $type = mode_to_poll($mode); my $current = $poll_fd_masks{$fileno} || 0; my $new = $current & ~$type; if (TRACE_FILES) { POE::Kernel::_warn( sprintf( " Ignore $fileno: " . ": Current mask: 0x%02X - removing 0x%02X = 0x%02X\n", $current, $type, $new ) ); } if ($new) { $poll_fd_masks{$fileno} = $new; } else { delete $poll_fd_masks{$fileno}; } } sub loop_pause_filehandle { my ($self, $handle, $mode) = @_; my $fileno = fileno($handle); my $type = mode_to_poll($mode); my $current = $poll_fd_masks{$fileno} || 0; my $new = $current & ~$type; if (TRACE_FILES) { POE::Kernel::_warn( sprintf( " Pause $fileno: " . ": Current mask: 0x%02X - removing 0x%02X = 0x%02X\n", $current, $type, $new ) ); } if ($new) { $poll_fd_masks{$fileno} = $new; } else { delete $poll_fd_masks{$fileno}; } } sub loop_resume_filehandle { my ($self, $handle, $mode) = @_; my $fileno = fileno($handle); my $type = mode_to_poll($mode); my $current = $poll_fd_masks{$fileno} || 0; my $new = $current | $type; if (TRACE_FILES) { POE::Kernel::_warn( sprintf( " Resume $fileno: " . "Current mask: 0x%02X - including 0x%02X = 0x%02X\n", $current, $type, $new ) ); } $poll_fd_masks{$fileno} = $new; } #------------------------------------------------------------------------------ # The event loop itself. sub loop_do_timeslice { my $self = shift; # Check for a hung kernel. $self->_test_if_kernel_is_idle(); # Set the poll timeout based on current queue conditions. If there # are FIFO events, then the poll timeout is zero and move on. # Otherwise set the poll timeout until the next pending event, if # there are any. If nothing is waiting, set the timeout for some # constant number of seconds. my $timeout = $_next_event_time; my $now = monotime(); if (defined $timeout) { $timeout -= $now; $timeout = 0 if $timeout < 0; } else { die "shouldn't happen" if ASSERT_DATA; $timeout = 3600; } if (TRACE_EVENTS) { POE::Kernel::_warn( ' Kernel::run() iterating. ' . sprintf( "now(%.4f) timeout(%.4f) then(%.4f)\n", $now-$start_time, $timeout, ($now-$start_time)+$timeout ) ); } if (TRACE_FILES) { foreach (sort { $a<=>$b} keys %poll_fd_masks) { my @types; push @types, "plain-file" if -f; push @types, "directory" if -d; push @types, "symlink" if -l; push @types, "pipe" if -p; push @types, "socket" if -S; push @types, "block-special" if -b; push @types, "character-special" if -c; push @types, "tty" if -t; my @modes; my $flags = $poll_fd_masks{$_}; push @modes, 'r' if $flags & (POLLRDNORM | POLLHUP | POLLERR); push @modes, 'w' if $flags & (POLLWRNORM | POLLHUP | POLLERR); push @modes, 'x' if $flags & (POLLRDBAND | POLLHUP | POLLERR); POE::Kernel::_warn( " file descriptor $_ = modes(@modes) types(@types)\n" ); } } # Avoid looking at filehandles if we don't need to. # TODO The added code to make this sleep is non-optimal. There is a # way to do this in fewer tests. if (scalar keys %poll_fd_masks) { # There are filehandles to poll, so do so. # Check filehandles, or wait for a period of time to elapse. my $hits = IO::Poll::_poll($timeout * 1000, my @results = %poll_fd_masks); if (ASSERT_FILES) { if ($hits < 0) { POE::Kernel::_trap(" poll returned $hits (error): $!") unless ( ($! == EINPROGRESS) or ($! == EWOULDBLOCK) or ($! == EINTR) or ($! == 0) # SIGNAL_PIPE strangeness ); } } if (TRACE_FILES) { if ($hits > 0) { POE::Kernel::_warn " poll hits = $hits\n"; } elsif ($hits == 0) { POE::Kernel::_warn " poll timed out...\n"; } } # If poll has seen filehandle activity, then gather up the # active filehandles and synchronously dispatch events to the # appropriate handlers. if ($hits > 0) { # This is where they're gathered. my (@rd_ready, @wr_ready, @ex_ready); my %poll_fd_results = @results; while (my ($fd, $got_mask) = each %poll_fd_results) { next unless $got_mask; my $watch_mask = $poll_fd_masks{$fd}; if ( $watch_mask & POLLRDNORM and $got_mask & (POLLRDNORM | POLLHUP | POLLERR | POLLNVAL) ) { if (TRACE_FILES) { POE::Kernel::_warn " enqueuing read for fileno $fd"; } push @rd_ready, $fd; } if ( $watch_mask & POLLWRNORM and $got_mask & (POLLWRNORM | POLLHUP | POLLERR | POLLNVAL) ) { if (TRACE_FILES) { POE::Kernel::_warn " enqueuing write for fileno $fd"; } push @wr_ready, $fd; } if ( $watch_mask & POLLRDBAND and $got_mask & (POLLRDBAND | POLLHUP | POLLERR | POLLNVAL) ) { if (TRACE_FILES) { POE::Kernel::_warn " enqueuing expedite for fileno $fd"; } push @ex_ready, $fd; } } @rd_ready and $self->_data_handle_enqueue_ready(MODE_RD, @rd_ready); @wr_ready and $self->_data_handle_enqueue_ready(MODE_WR, @wr_ready); @ex_ready and $self->_data_handle_enqueue_ready(MODE_EX, @ex_ready); } } elsif ($timeout) { # No filehandles to poll on. Try to sleep instead. Use sleep() # itself on MSWin32. Use a dummy four-argument select() everywhere # else. if ($^O eq 'MSWin32') { sleep($timeout); } else { CORE::select(undef, undef, undef, $timeout); } } # Dispatch whatever events are due. $self->_data_ev_dispatch_due(); } ### Run for as long as there are sessions to service. sub loop_run { my $self = shift; while ($self->_data_ses_count()) { $self->loop_do_timeslice(); } } sub loop_halt { # does nothing } 1; __END__ =head1 NAME POE::Loop::IO_Poll - a bridge that allows POE to be driven by IO::Poll =head1 SYNOPSIS See L. =head1 DESCRIPTION POE::Loop::IO_Poll implements the interface documented in L. Therefore it has no documentation of its own. Please see L for more details. =head1 SEE ALSO L, L, L, L =head1 AUTHORS & LICENSING Please see L for more information about authors, contributors, and POE's licensing. =cut # rocco // vim: ts=2 sw=2 expandtab # TODO - Edit.