multithreading - Perl forking and IPC::Open2 exec pipes -


i'm running script via cron every 5 minutes. script collects large number of performance metrics across environment, , uses them update round robin databases using rrdtool.

at moment, i'm doing via threads , thread::queue. have 'collector' threads, , 'updater' threads:

#!/usr/bin/perl  use strict; use warnings;  use ipc::open2; use thread::queue; use english;  $update_q = thread::queue->new();  sub updater {     open2( $rrdtool_response, $rrdtool, "/usr/bin/rrdtool -" )         or warn $os_error;     while ( $item = $update_q->dequeue ) {         ( $rrd, $data ) = split( /,/, $item );         print {$rrdtool}             "update --daemon /tmp/rrdcached.sock $rrd $data\n";         $result = <$rrdtool_response>;         if ( not $result =~ m/^ok/ ) {             print "$rrd $data $result\n";             close($rrdtool_response);             close($rrdtool);             open2(                 $rrdtool_response,                 $rrdtool,                 "/usr/bin/rrdtool -"             ) or warn $os_error;         }     }     close($rrdtool_response);     close($rrdtool); }   ( 1 .. $update_threads ) {     $thr = threads->create( \&updater ); } 

this updater fed whole load of strings of type of:

"/path/to/data/file.rrd,n:1:4:3:2:234:3";  

(that's update format rrdtool - n being 'now` , colon separated values being things update).

because use queue, it's serialised, , can ensure i'm running appropriate number of instances of rrdtool.

i collect around 20,000 metrics in fashion every 5 minutes, , broadly works ok.

i'm in middle of bit of rewrite, see if can't work forking. had intended spawn multiple 'rrdtool' update instances, accidentally did open2 outside forked code.

e.g.:

    open2( $rrdtool_response, $rrdtool, "/usr/bin/rrdtool -" )         or warn $os_error;      $manager -> start , next      while ( $item = $update_q->dequeue ) {         ( $rrd, $data ) = split( /,/, $item );         print {$rrdtool}             "update --daemon /tmp/rrdcached.sock $rrd $data\n";         $result = <$rrdtool_response>;         if ( not $result =~ m/^ok/ ) {             print "$rrd $data $result\n";             close($rrdtool_response);             close($rrdtool);             open2(                 $rrdtool_response,                 $rrdtool,                 "/usr/bin/rrdtool -"             ) or warn $os_error;         }     }       close($rrdtool_response);     close($rrdtool); 

and because children inherit filehandles, almost worked. printing {$rrdtool} filehandle worked. re-opening on error worked. (you did end rrdtool instances, 'a few' based on error rate, rather 20,000 trying avoid).

#!/usr/bin/perl  use strict; use warnings;  use ipc::open2; use parallel::forkmanager; use english;  $max_concurrency = 100;  %metric_subs = (     "fetch_iops" => \&fetch_io_operations_on,      #... more here; );  sub fetch_io_operations_on {     ($hostname) = @_;     $rrd = "/path/to/data/$hostname/iops.rrd";      #do stuff fetch data     update_rrd( $rrd, $data ); }  {     $rrdtool;     $rrd_response;      sub start_updates {         open2( $rrdtool_response, $rrdtool, "/usr/bin/rrdtool -" )             or warn $os_error;     }      sub update_rrd {         ( $rrd, $data ) = @arg;         print {$rrdtool}             "update --daemon /tmp/rrdcached.sock $rrd $data\n";         $result = <$rrdtool_response>;         if ( not $result =~ m/^ok/ ) {             print "$rrd $data $result\n";             close($rrdtool_response);             close($rrdtool);             open2(                 $rrdtool_response,                 $rrdtool,                 "/usr/bin/rrdtool -"             ) or warn $os_error;         }     }      sub end_updates {         print {$rrdtool} "quit\n";         close($rrdtool);         close($rrdtool_response);     } }  ##main  $manager = parallel::forkmanager->new($max_concurrency);  start_updates(); foreach $host (@list_of_hosts) {     foreach $metric ( keys %metric_subs ) {         $manager->start , next;         ##parallel bit         print "fetching $metric on $host\n";         &{ $metric_subs{$metric} }($host);         $manager->finish();     } } end_updates();   $manager->wait_all_children(); 

this broadly satisfactory - because restart within forked child, end multiple rrdtool instances if there's error, they're updating via rrdcached anyway, that's acceptable. forkmanager limiting ensure don't absurd number

however had started getting few cases 'response' query blocking, , realised io process suffering race conditions. started trying match 'update' , 'responses' via debugging prints, ... well, i'm sure you'll have realised because of buffering, there's not way sure fork reads response filehandle 1 printed matching update update filehandle.

so - if i'm looking in way that's little less shoddy - what's right way handle exec pipes across forks in perl, such i'm not building race conditions code?

what's right way handle exec pipes across forks in perl, such i'm not building race conditions code?

it's easy ensure 1 child executes @ time, 1 child executing @ time. defy purpose of multitasking.


you reuse workers in threaded model, p::fm uses new worker each job.

as such, initialization code have outside loop need moved loop.

for $item (...) {     $pm->start , next;     open2( $rrdtool_response, $rrdtool, "/usr/bin/rrdtool -" )         or warn $os_error;     ( $rrd, $data ) = split( /,/, $item );     print { $rrdtool } "update --daemon /tmp/rrdcached.sock $rrd $data\n";     $result = <$rrdtool_response>;     if ( not $result =~ m/^ok/ ) {         print "$rrd $data $result\n";         close($rrdtool_response);         close($rrdtool);         open2(             $rrdtool_response,             $rrdtool,             "/usr/bin/rrdtool -"         ) or warn $os_error;     }      close($rrdtool_response);     close($rrdtool)     $pm->finish(); } 

this needs cleanup.

  • the whole relaunching of rrdtool no longer useful.
  • open2 never returns false when provided command.
  • you aren't reaping rrdtool children. need call waitpid.

for $item (...) {     $pm->start , next;     $pid = open2( $rrdtool_response, $rrdtool, "/usr/bin/rrdtool -" );     ( $rrd, $data ) = split( /,/, $item );     print { $rrdtool } "update --daemon /tmp/rrdcached.sock $rrd $data\n";     $result = <$rrdtool_response>;     print "$rrd $data $result\n"         if $result !~ /^ok/;      close($rrdtool_response);     close($rrdtool)     waitpid($pid, 0);     $pm->finish(); } 

Comments

Popular posts from this blog

java - Oracle EBS .ClassNotFoundException: oracle.apps.fnd.formsClient.FormsLauncher.class ERROR -

c# - how to use buttonedit in devexpress gridcontrol -

How do you convert a timestamp into a datetime in python with the correct timezone? -