MCE::Candy - Sugar methods and output iterators
This document describes MCE::Candy version 1.901
This module provides a collection of sugar methods and helpful output iterators for preserving output order.
The sugar methods described below were created prior to the 1.5 release which added MCE Models. This module is loaded automatically upon calling a "for" method.
Forchunk, foreach, and forseq are sugar methods in MCE. Workers are spawned automatically, the code block is executed in parallel, and shutdown is called. Do not call these methods if workers must persist afterwards.
Specifying options is optional. Valid options are the same as for the process method.
## Declare a MCE instance.
my $mce = MCE->new(
max_workers => $max_workers,
chunk_size => 20
);
## Arguments inside the code block are the same as passed to user_func.
$mce->forchunk(\@input_array, sub {
my ($mce, $chunk_ref, $chunk_id) = @_;
foreach ( @{ $chunk_ref } ) {
MCE->print("$chunk_id: $_\n");
}
});
## Input hash, current API available since 1.828.
$mce->forchunk(\%input_hash, sub {
my ($mce, $chunk_ref, $chunk_id) = @_;
for my $key ( keys %{ $chunk_ref } ) {
MCE->print("$chunk_id: [ $key ] ", $chunk_ref->{$key}, "\n");
}
});
## Passing chunk_size as an option.
$mce->forchunk(\@input_array, { chunk_size => 30 }, sub { ... });
$mce->forchunk(\%input_hash, { chunk_size => 30 }, sub { ... });
Foreach implies chunk_size => 1 and cannot be overwritten. Thus, looping is not necessary inside the block. Unlike forchunk above, a hash reference as input data isn't allowed.
my $mce = MCE->new(
max_workers => $max_workers
);
$mce->foreach(\@input_data, sub {
my ($mce, $chunk_ref, $chunk_id) = @_;
my $row = $chunk_ref->[0];
MCE->print("$chunk_id: $row\n");
});
Sequence may be defined using an array or hash reference.
my $mce = MCE->new(
max_workers => 3
);
$mce->forseq([ 20, 40 ], sub {
my ($mce, $n, $chunk_id) = @_;
my $result = `ping 192.168.1.${n}`;
...
});
$mce->forseq({ begin => 15, end => 10, step => -1 }, sub {
my ($mce, $n, $chunk_id) = @_;
print $n, " from ", MCE->wid, "\n";
});
The $n_seq variable points to an array_ref of sequences. Chunk size defaults to 1 when not specified.
$mce->forseq([ 20, 80 ], { chunk_size => 10 }, sub {
my ($mce, $n_seq, $chunk_id) = @_;
for my $n ( @{ $n_seq } ) {
my $result = `ping 192.168.1.${n}`;
...
}
});
This module provides three output iterators useful for preserving output order while gathering data. The chunk_id value must be the first argument to gather. Gather must be called once and not more inside the block.
The example utilizes the Core API with chunking disabled. Basically, setting chunk_size to 1.
use MCE;
use MCE::Candy;
my @results;
my $mce = MCE->new(
chunk_size => 1, max_workers => 4,
gather => MCE::Candy::out_iter_array(\@results),
user_func => sub {
my ($mce, $chunk_ref, $chunk_id) = @_;
$mce->gather($chunk_id, $chunk_ref->[0] * 2);
}
);
$mce->process([ 100 .. 109 ]);
$mce->shutdown();
print "@results", "\n";
-- Output
200 202 204 206 208 210 212 214 216 218
Chunking may be desired for thousands or more items. In other words, wanting to reduce the overhead placed on IPC.
use MCE;
use MCE::Candy;
my @results;
my $mce = MCE->new(
chunk_size => 100, max_workers => 4,
gather => MCE::Candy::out_iter_array(\@results),
user_func => sub {
my ($mce, $chunk_ref, $chunk_id) = @_;
my @output;
foreach my $item (@{ $chunk_ref }) {
push @output, $item * 2;
}
$mce->gather($chunk_id, @output);
}
);
$mce->process([ 100_000 .. 200_000 - 1 ]);
$mce->shutdown();
print scalar @results, "\n";
-- Output
100000
MCE workers pass arguments for the callback function. The chunk_id argument to gather is used internally for calling the callback function orderly.
Current API available since 1.897.
use MCE;
use MCE::Candy;
my @results;
my $max_status = 0;
sub upd_vars {
push @results, @{ $_[0] };
$max_status = $_[1] if ($_[1] > $max_status);
}
my $mce = MCE->new(
chunk_size => 100, max_workers => 4,
gather => MCE::Candy::out_iter_callback(\&upd_vars),
user_func => sub {
my ($mce, $chunk_ref, $chunk_id) = @_;
my @output;
foreach my $item (@{ $chunk_ref }) {
push @output, $item * 2;
}
my $status = $mce->chunk_id == 3 ? 2 : 0;
$mce->gather($chunk_id, [ @output ], $status);
}
);
$mce->process([ 100_000 .. 200_000 - 1 ]);
$mce->shutdown();
print scalar @results, "\n";
print $max_status, "\n";
-- Output
100000
2
Let's change things a bit and use MCE::Flow for the next 2 examples. Chunking is not desired for the first example.
use MCE::Flow;
use MCE::Candy;
open my $fh, '>', '/tmp/foo.txt';
mce_flow {
chunk_size => 1, max_workers => 4,
gather => MCE::Candy::out_iter_fh($fh)
},
sub {
my ($mce, $chunk_ref, $chunk_id) = @_;
$mce->gather($chunk_id, $chunk_ref->[0] * 2, "\n");
}, (100 .. 109);
close $fh;
-- Output sent to '/tmp/foo.txt'
200
202
204
206
208
210
212
214
216
218
Same thing, an IO::*
object that can print
is supported since MCE 1.845.
use IO::All;
use MCE::Flow;
use MCE::Candy;
my $io = io('/tmp/foo.txt'); # i.e. $io->can('print')
mce_flow {
chunk_size => 1, max_workers => 4,
gather => MCE::Candy::out_iter_fh($io)
},
sub {
my ($mce, $chunk_ref, $chunk_id) = @_;
$mce->gather($chunk_id, $chunk_ref->[0] * 2, "\n");
}, (100 .. 109);
$io->close;
-- Output sent to '/tmp/foo.txt'
200
202
204
206
208
210
212
214
216
218
Chunking is desired for the next example due to processing many thousands.
use MCE::Flow;
use MCE::Candy;
open my $fh, '>', '/tmp/foo.txt';
mce_flow {
chunk_size => 100, max_workers => 4,
gather => MCE::Candy::out_iter_fh( $fh )
},
sub {
my ($mce, $chunk_ref, $chunk_id) = @_;
my @output;
foreach my $item (@{ $chunk_ref }) {
push @output, ($item * 2) . "\n";
}
$mce->gather($chunk_id, @output);
}, (100_000 .. 200_000 - 1);
close $fh;
print -s '/tmp/foo.txt', "\n";
-- Output
700000
Input data is not a requirement for using the output iterators. The 'chunk_id' argument to gather is still needed and set uniquely, same as 'wid' when not processing input data.
use MCE::Flow;
use MCE::Candy;
my @results;
sub append_results {
push @results, $_[0];
}
mce_flow {
max_workers => 'auto', ## Note that 'auto' is never greater than 8
gather => MCE::Candy::out_iter_array(\@results),
# gather => MCE::Candy::out_iter_callback(\&append_results),
},
sub {
my ($mce) = @_; ## This line is not necessary
## Calling via module okay; e.g: MCE->method
## Do work
## Sending a complex data structure is allowed
## Output will become orderly by iterator
$mce->gather( $mce->wid, {
wid => $mce->wid, result => $mce->wid * 2
});
};
foreach my $href (@results) {
print $href->{wid} .": ". $href->{result} ."\n";
}
-- Output
1: 2
2: 4
3: 6
4: 8
5: 10
6: 12
7: 14
8: 16
use MCE::Flow;
use MCE::Candy;
open my $fh, '>', '/tmp/out.txt';
mce_flow {
max_workers => 'auto', ## See get_ncpu in <MCE::Util|MCE::Util>
gather => MCE::Candy::out_iter_fh($fh)
},
sub {
my $output = "# Worker ID: " . MCE->wid . "\n";
## Append results to $output string
$output .= (MCE->wid * 2) . "\n\n";
## Output will become orderly by iterator
MCE->gather( MCE->wid, $output );
};
close $fh;
-- Output
# Worker ID: 1
2
# Worker ID: 2
4
# Worker ID: 3
6
# Worker ID: 4
8
# Worker ID: 5
10
# Worker ID: 6
12
# Worker ID: 7
14
# Worker ID: 8
16
Mario E. Roy, <marioeroy AT gmail DOT com>