multithreading - Perl forking and IPC::Open2 exec pipes -
i'm running script via cron every 5 minutes. script collects large number of performance metrics across environment, , uses them update round robin databases using rrdtool
.
at moment, i'm doing via threads
, thread::queue
. have 'collector' threads, , 'updater' threads:
#!/usr/bin/perl use strict; use warnings; use ipc::open2; use thread::queue; use english; $update_q = thread::queue->new(); sub updater { open2( $rrdtool_response, $rrdtool, "/usr/bin/rrdtool -" ) or warn $os_error; while ( $item = $update_q->dequeue ) { ( $rrd, $data ) = split( /,/, $item ); print {$rrdtool} "update --daemon /tmp/rrdcached.sock $rrd $data\n"; $result = <$rrdtool_response>; if ( not $result =~ m/^ok/ ) { print "$rrd $data $result\n"; close($rrdtool_response); close($rrdtool); open2( $rrdtool_response, $rrdtool, "/usr/bin/rrdtool -" ) or warn $os_error; } } close($rrdtool_response); close($rrdtool); } ( 1 .. $update_threads ) { $thr = threads->create( \&updater ); }
this updater fed whole load of strings of type of:
"/path/to/data/file.rrd,n:1:4:3:2:234:3";
(that's update format rrdtool
- n
being 'now` , colon separated values being things update).
because use queue, it's serialised, , can ensure i'm running appropriate number of instances of rrdtool.
i collect around 20,000 metrics in fashion every 5 minutes, , broadly works ok.
i'm in middle of bit of rewrite, see if can't work forking. had intended spawn multiple 'rrdtool' update instances, accidentally did open2
outside forked code.
e.g.:
open2( $rrdtool_response, $rrdtool, "/usr/bin/rrdtool -" ) or warn $os_error; $manager -> start , next while ( $item = $update_q->dequeue ) { ( $rrd, $data ) = split( /,/, $item ); print {$rrdtool} "update --daemon /tmp/rrdcached.sock $rrd $data\n"; $result = <$rrdtool_response>; if ( not $result =~ m/^ok/ ) { print "$rrd $data $result\n"; close($rrdtool_response); close($rrdtool); open2( $rrdtool_response, $rrdtool, "/usr/bin/rrdtool -" ) or warn $os_error; } } close($rrdtool_response); close($rrdtool);
and because children inherit filehandles, almost worked. printing {$rrdtool}
filehandle worked. re-opening on error worked. (you did end rrdtool instances, 'a few' based on error rate, rather 20,000 trying avoid).
#!/usr/bin/perl use strict; use warnings; use ipc::open2; use parallel::forkmanager; use english; $max_concurrency = 100; %metric_subs = ( "fetch_iops" => \&fetch_io_operations_on, #... more here; ); sub fetch_io_operations_on { ($hostname) = @_; $rrd = "/path/to/data/$hostname/iops.rrd"; #do stuff fetch data update_rrd( $rrd, $data ); } { $rrdtool; $rrd_response; sub start_updates { open2( $rrdtool_response, $rrdtool, "/usr/bin/rrdtool -" ) or warn $os_error; } sub update_rrd { ( $rrd, $data ) = @arg; print {$rrdtool} "update --daemon /tmp/rrdcached.sock $rrd $data\n"; $result = <$rrdtool_response>; if ( not $result =~ m/^ok/ ) { print "$rrd $data $result\n"; close($rrdtool_response); close($rrdtool); open2( $rrdtool_response, $rrdtool, "/usr/bin/rrdtool -" ) or warn $os_error; } } sub end_updates { print {$rrdtool} "quit\n"; close($rrdtool); close($rrdtool_response); } } ##main $manager = parallel::forkmanager->new($max_concurrency); start_updates(); foreach $host (@list_of_hosts) { foreach $metric ( keys %metric_subs ) { $manager->start , next; ##parallel bit print "fetching $metric on $host\n"; &{ $metric_subs{$metric} }($host); $manager->finish(); } } end_updates(); $manager->wait_all_children();
this broadly satisfactory - because restart within forked child, end multiple rrdtool instances if there's error, they're updating via rrdcached anyway, that's acceptable. forkmanager
limiting ensure don't absurd number
however had started getting few cases 'response' query blocking, , realised io process suffering race conditions. started trying match 'update' , 'responses' via debugging prints, ... well, i'm sure you'll have realised because of buffering, there's not way sure fork reads response filehandle 1 printed matching update update filehandle.
so - if i'm looking in way that's little less shoddy - what's right way handle exec pipes across forks in perl, such i'm not building race conditions code?
what's right way handle exec pipes across forks in perl, such i'm not building race conditions code?
it's easy ensure 1 child executes @ time, 1 child executing @ time. defy purpose of multitasking.
you reuse workers in threaded model, p::fm uses new worker each job.
as such, initialization code have outside loop need moved loop.
for $item (...) { $pm->start , next; open2( $rrdtool_response, $rrdtool, "/usr/bin/rrdtool -" ) or warn $os_error; ( $rrd, $data ) = split( /,/, $item ); print { $rrdtool } "update --daemon /tmp/rrdcached.sock $rrd $data\n"; $result = <$rrdtool_response>; if ( not $result =~ m/^ok/ ) { print "$rrd $data $result\n"; close($rrdtool_response); close($rrdtool); open2( $rrdtool_response, $rrdtool, "/usr/bin/rrdtool -" ) or warn $os_error; } close($rrdtool_response); close($rrdtool) $pm->finish(); }
this needs cleanup.
- the whole relaunching of
rrdtool
no longer useful. open2
never returns false when provided command.- you aren't reaping
rrdtool
children. need callwaitpid
.
for $item (...) { $pm->start , next; $pid = open2( $rrdtool_response, $rrdtool, "/usr/bin/rrdtool -" ); ( $rrd, $data ) = split( /,/, $item ); print { $rrdtool } "update --daemon /tmp/rrdcached.sock $rrd $data\n"; $result = <$rrdtool_response>; print "$rrd $data $result\n" if $result !~ /^ok/; close($rrdtool_response); close($rrdtool) waitpid($pid, 0); $pm->finish(); }
Comments
Post a Comment