PHP рдкреНрд░рдХреНрд░рд┐рдпрд╛ рдкреНрд░рдмрдВрдзрдХ

рдЫрд╡рд┐

рд╕рднреА рдХреЛ рдирдорд╕реНрдХрд╛рд░!

PHP рдореЗрдВ рдбреЗрдореЛрдВрд╕ рд▓рд┐рдЦрдиреЗ рдФрд░ рдЕрдиреНрдп рдЬрд╛рд▓реА рдЪреАрдЬреЛрдВ рдХреЗ рдмрд╛рд░реЗ рдореЗрдВ Habr рдкрд░ рдмрд╣реБрдд рд╕рд╛рд░реЗ рд▓реЗрдЦ рдереЗред рдореИрдВ рдЖрдкрдХреЗ рд╕рд╛рде рдЕрдкрдиреЗ рд╡рд┐рдХрд╛рд╕ рдХреЛ рдПрдХ рд╕рдорд╛рди рдкрд░ рд╕рд╛рдЭрд╛ рдХрд░рдирд╛ рдЪрд╛рд╣рддрд╛ рд╣реВрдВ, рд▓реЗрдХрд┐рди рдлрд┐рд░ рднреА рдХреБрдЫ рдЕрд▓рдЧ рд╡рд┐рд╖рдп рд╣реИ - рдХрдИ PHP рдкреНрд░рдХреНрд░рд┐рдпрд╛рдУрдВ рдХрд╛ рдкреНрд░рдмрдВрдзрди ред


рд╢реБрд░реБрдЖрдд рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП, рд▓реЗрдЦ рдореЗрдВ рдкреНрд░рдпреБрдХреНрдд рд╢рдмреНрджреЛрдВ рдХреА рдПрдХ рдЫреЛрдЯреА рд╢рдмреНрджрд╛рд╡рд▓реА ред


рдХрд╛рд░реНрдп рдХрд╛ рд▓рдХреНрд╖реНрдп рдкрд╣рд▓реЗ рд╕реЗ рдЪрд▓ рд░рд╣реА рдФрд░ рдХрд╛рд░реНрдпрд╢реАрд▓ рдкреНрд░рдХреНрд░рд┐рдпрд╛рдУрдВ рдХреЛ рдкреНрд░рднрд╛рд╡рд┐рдд рдХрд░рдиреЗ рдФрд░ рдЙрдирдХреЗ рдХрд╛рд░реНрдпрд╛рдиреНрд╡рдпрди рдХреА рдкреНрд░рдЧрддрд┐ рдХреЗ рдмрд╛рд░реЗ рдореЗрдВ рдЬрд╛рдирдХрд╛рд░реА рдкреНрд░рд╛рдкреНрдд рдХрд░рдиреЗ рдореЗрдВ рд╕рдХреНрд╖рдо рд╣реЛрдирд╛ рд╣реИред

рдирдИ рдкреНрд░рдХреНрд░рд┐рдпрд╛рдУрдВ рдХреЛ рд╢реБрд░реВ рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП, рдореИрдВ proc_open рдлрд╝рдВрдХреНрд╢рди рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рддрд╛ рд╣реВрдВ, рдЬреЛ рдЖрдкрдХреЛ рдирдИ рдкреНрд░рдХреНрд░рд┐рдпрд╛ рдХреЗ рд▓рд┐рдП I / O рдбрд┐рд╕реНрдХреНрд░рд┐рдкреНрдЯрд░ рдХреЛ рдлрд┐рд░ рд╕реЗ рдкрд░рд┐рднрд╛рд╖рд┐рдд рдХрд░рдиреЗ рдХреА рдЕрдиреБрдорддрд┐ рджреЗрддрд╛ рд╣реИред рдПрдХ рдПрдХрд▓ рдкреНрд░рдХреНрд░рд┐рдпрд╛ рдХрд╛ рдкреНрд░рдмрдВрдзрди рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП, рдиреМрдХрд░реА рд╡рд░реНрдЧ рд╡рд┐рдХрд╕рд┐рдд рдХрд┐рдпрд╛ рдЧрдпрд╛ рдерд╛ред рдХрд╛рд░реНрдп рдирд╛рдо рдФрд░ рдЯреАрдо рджреНрд╡рд╛рд░рд╛ рдкреНрд░рджрд░реНрд╢рди рдХрд┐рдпрд╛ рдЬрд╛рддрд╛ рд╣реИред

class Job { protected $_pid = 0; protected $_name; protected $_cmd = ''; protected $_stderr = '/dev/null'; private $_resource = NULL; private $_pipes = array(); private $_waitpid = TRUE; public function __construct($cmd, $name = 'job') { $this->_cmd = $cmd; $this->_name = $name; } public function __destruct() { //    if ($this->_resource) { if ($this->_waitpid && $this->isRunning()) { echo "Waiting for job to complete "; $status = NULL; pcntl_waitpid($this->_pid, $status); /*while ($this->isRunning()) { echo '.'; sleep(1); }*/ echo "\n"; } } //   if (isset($this->_pipes) && is_array($this->_pipes)) { foreach (array_keys($this->_pipes) as $index ) { if (is_resource($this->_pipes[$index])) { fflush($this->_pipes[$index]); fclose($this->_pipes[$index]); unset($this->_pipes[$index]); } } } //    if ($this->_resource) { proc_close($this->_resource); unset($this->_resource); } } public function pid() { return $this->_pid; } public function name() { return $this->_name; } //    "". $nohup      private function readPipe($index, $nohup = FALSE) { if (!isset($this->_pipes[$index])) return FALSE; if (!is_resource($this->_pipes[$index]) || feof($this->_pipes[$index])) return FALSE; if ($nohup) { $data = ''; while ($line = fgets($this->_pipes[$index])) { $data .= $line; } return $data; } while ($data = fgets($this->_pipes[$index])) { echo $data; } } public function pipeline($nohup = FALSE) { return $this->readPipe(1, $nohup); } public function stderr($nohup = FALSE) { return $this->readPipe(2, $nohup); } //      public function execute() { //         $descriptorspec = array( 0 => array('pipe', 'r'), // stdin 1 => array('pipe', 'w'), // stdout 2 => array('pipe', 'w') // stderr ); $this->_resource = proc_open('exec '.$this->_cmd, $descriptorspec, $this->_pipes); //      stream_set_blocking($this->_pipes[0], 0); stream_set_blocking($this->_pipes[1], 0); stream_set_blocking($this->_pipes[2], 0); if (!is_resource($this->_resource)) return FALSE; $proc_status = proc_get_status($this->_resource); $this->_pid = isset($proc_status['pid']) ? $proc_status['pid'] : 0; } public function getPipe() { return $this->_pipes[1]; } public function getStderr() { return $this->_pipes[2]; } public function isRunning() { if (!is_resource($this->_resource)) return FALSE; $proc_status = proc_get_status($this->_resource); return isset($proc_status['running']) && $proc_status['running']; } //    public function signal($sig) { if (!$this->isRunning()) return FALSE; posix_kill($this->_pid, $sig); } //    STDIN  public function message($msg) { if (!$this->isRunning()) return FALSE; fwrite($this->_pipes[0], $msg); } } 


рдиреМрдХрд░рд┐рдпреЛрдВ рдХрд╛ рдкреНрд░рдмрдВрдзрди рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП, Job_Manager рд╡рд░реНрдЧ рдмрдирд╛рдпрд╛ рдЬрд╛рддрд╛ рд╣реИ, рдЬреЛ рдЕрдирд┐рд╡рд╛рд░реНрдп рд░реВрдк рд╕реЗ рдкреВрд░реА рдпреЛрдЬрдирд╛ рдореЗрдВ рдорд╣рддреНрд╡рдкреВрд░реНрдг рд╣реИред

 class Job_Manager { private $_pool_size = 20; private $_pool = array(); private $_streams = array(); private $_stderr = array(); private $_is_terminated = FALSE; protected $_dispatch_function = NULL; public function __construct() { // init pool // } public function __destruct() { // destroy pool foreach (array_keys($this->_pool) as $index) { $this->stopJob($index); } } //     private function checkJobs() { $running_jobs = 0; foreach ($this->_pool as $index => $job) { if (!$job->isRunning()) { echo "Stopping job ".$this->_pool[$index]->name()." ($index)" . PHP_EOL; $this->stopJob($index); } else { $running_jobs++; } } return $running_jobs; } private function getFreeIndex() { foreach ($this->_pool as $index => $job) { if (!isset($job)) return $index; } return count($this->_pool) < $this->_pool_size ? count($this->_pool) : -1; } //    public function startJob($cmd, $name = 'job') { // broadcast existing jobs $this->checkJobs(); $free_pool_slots = $this->_pool_size - count($this->_pool); if ($free_pool_slots <= 0) { // output error "no free slots in the pool" return -1; } $free_slot_index = $this->getFreeIndex(); if ($free_slot_index < 0) { return -1; } echo "Starting job $name ($free_slot_index)" . PHP_EOL; $this->_pool[$free_slot_index] = new Job($cmd, $name); $this->_pool[$free_slot_index]->execute(); $this->_streams[$free_slot_index] = $this->_pool[$free_slot_index]->getPipe(); $this->_stderr[$free_slot_index] = $this->_pool[$free_slot_index]->getStderr(); return $free_slot_index; } public function stopJob($index) { if (!isset($this->_pool[$index])) return FALSE; unset($this->_streams[$index]); unset($this->_stderr[$index]); unset($this->_pool[$index]); } public function name($index) { if (!isset($this->_pool[$index])) return FALSE; return $this->_pool[$index]->name(); } public function pipeline($index, $nohup = FALSE) { if (!isset($this->_pool[$index])) return FALSE; return $this->_pool[$index]->pipeline($nohup); } public function stderr($index, $nohup = FALSE) { if (!isset($this->_pool[$index])) return FALSE; return $this->_pool[$index]->stderr($nohup); } private function broadcastMessage($msg) { // sends selected signal to all child processes foreach ($this->_pool as $pool_index => $job) { $job->message($msg); } } private function broadcastSignal($sig) { // sends selected signal to all child processes foreach ($this->_pool as $pool_index => $job) { $job->signal($sig); } } //       -   protected function dispatch($cmd) { if (is_callable($this->_dispatch_function)) { call_user_func($this->_dispatch_function, $cmd); } } //      public function registerDispatch($callable) { if (is_callable($callable)) { $this->_dispatch_function = $callable; } else { trigger_error("$callable is not callable func", E_USER_WARNING); } } //    private function dispatchMain($cmd) { $parts = explode(' ', $cmd); $arg = isset($parts[0]) ? $parts[0] : ''; $val = isset($parts[1]) ? $parts[1] : ''; switch ($arg) { case "exit": $this->broadcastSignal(SIGTERM); $this->_is_terminated = TRUE; break; case "test": echo 'sending test' . PHP_EOL; $this->broadcastMessage('test'); $this->broadcastSignal(SIGUSR1); break; case 'kill': $pool_index = $val !== '' && (int)$val >= 0 ? (int)$val : -1; if ($pool_index >= 0 && isset($this->_pool[$pool_index])) { $this->_pool[$pool_index]->signal(SIGKILL); } break; default: $this->dispatch($cmd); break; } return FALSE; } public function process() { stream_set_blocking(STDIN, 0); $write = NULL; $except = NULL; while (!$this->_is_terminated) { /* -   stream_select        */ $read = $this->_streams; $except = $this->_stderr; $read[$this->_pool_size] = STDIN; if (is_array($read) && count($read) > 0) { if (false === ($num_changed_streams = stream_select($read, $write, $except, 2))) { // oops } elseif ($num_changed_streams > 0) { //    if (is_array($read) && count($read) > 0) { $cmp_array = $this->_streams; $cmp_array[$this->_pool_size] = STDIN; foreach ($read as $resource) { $pool_index = array_search($resource, $cmp_array, TRUE); if ($pool_index === FALSE) continue; if ($pool_index == $this->_pool_size) { // stdin $content = ''; while ($cmd = fgets(STDIN)) { if (!$cmd) break; $content .= $cmd; } $content = trim($content); if ($content) { //  Process Manager    -  -      $this->dispatchMain($content); } //echo "stdin> " . $cmd; } else { //    $pool_content = $this->pipeline($pool_index, TRUE); $job_name = $this->name($pool_index); if ($pool_content) { echo $job_name ." ($pool_index)" . ': ' . $pool_content; } $pool_content = $this->stderr($pool_index, TRUE); if ($pool_content) { echo $job_name ." ($pool_index)" . ' [STDERR]: ' . $pool_content; } } } } } } $this->checkJobs(); } } } 


рд╣рдордиреЗ рдкрд╣рд▓реЗ рд╣реА рд╕реАрдЦрд╛ рд╣реИ рдХрд┐ рдХреБрдЫ рдЕрдореВрд░реНрдд рдХрд╛рд░реНрдпреЛрдВ рдХрд╛ рдкреНрд░рдмрдВрдзрди рдХреИрд╕реЗ рдХрд┐рдпрд╛ рдЬрд╛рддрд╛ рд╣реИ, рдпрд╣ рдирд┐рд╖реНрдкрд╛рджрди рдпреЛрдЧреНрдп рдкреНрд░рдХреНрд░рд┐рдпрд╛рдУрдВ рдХреЗ рд▓рд┐рдП рдЦреБрдж рдХреЛ рд▓рд╛рдЧреВ рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рдПрдХ рд╡рд░реНрдЧ рдХреЛ рд▓рд╛рдЧреВ рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рд░рд╣рддрд╛ рд╣реИред

 class Executable { protected $_is_terminated = FALSE; protected $_cleanup_function = NULL; public function __construct() { //    pcntl_signal(SIGTERM, array('Executable', 'signalHandler')); pcntl_signal(SIGHUP, array('Executable', 'signalHandler')); pcntl_signal(SIGINT, array('Executable', 'signalHandler')); pcntl_signal(SIGUSR1, array('Executable', 'signalHandler')); pcntl_signal(SIGUSR2, array('Executable', 'signalHandler')); stream_set_blocking(STDIN, 0); stream_set_blocking(STDOUT, 0); stream_set_blocking(STDERR, 0); } public function __destruct() { //echo "destructor called in " . get_class($this) . PHP_EOL; if (!$this->_is_terminated) { $this->_is_terminated = TRUE; $this->isTerminated(); } } //   -    private function cleanup() { if (is_callable($this->_cleanup_function)) { call_user_func($this->_cleanup_function); } } protected function registerCleanup($callable) { if (is_callable($callable)) { $this->_cleanup_function = $callable; } else { trigger_error("$callable is not callable func", E_USER_WARNING); } } protected function isTerminated() { pcntl_signal_dispatch(); if ($this->_is_terminated) { $this->cleanup(); } return $this->_is_terminated; } protected function dispatch($cmd) { //      /* switch ($cmd) { } */ } protected function checkStdin() { $read = array(STDIN); $write = NULL; $except = NULL; if (is_array($read) && count($read) > 0) { if (false === ($num_changed_streams = stream_select($read, $write, $except, 2))) { // oops } elseif ($num_changed_streams > 0) { if (is_array($read) && count($read) > 0) { // stdin $content = ''; while ($cmd = fgets(STDIN)) { if (!$cmd) break; $content .= $cmd; } $this->dispatch($content); echo "recieved $content"; //echo "stdin> " . $cmd; } } } } //   protected function signalHandler ($signo) { switch ($signo) { case SIGTERM: case SIGHUP: case SIGINT: $this->_is_terminated = TRUE; //echo "exiting in ".get_class($this)."...\n"; break; case SIGUSR1: //echo "SIGUSR1 recieved\n"; $this->checkStdin(); break; case SIGUSR2: $this->_is_terminated = TRUE; echo "[SHUTDOWN] in " . get_class($this) . PHP_EOL; flush(); exit(1); break; default: // handle all other signals break; } } } 


рдкреНрд░рдХреНрд░рд┐рдпрд╛ рдкреНрд░рдмрдВрдзрдХ рдХреЗ рдЙрдкрдпреЛрдЧ рдХреЗ рдПрдХ рдЙрджрд╛рд╣рд░рдг рдХреЗ рд░реВрдк рдореЗрдВ, рд╣рдо "рдиреАрдВрдж" рдкреНрд░рдХреНрд░рд┐рдпрд╛ рдХреЛ рд▓рд╛рдЧреВ рдХрд░рддреЗ рд╣реИрдВ - рдПрдХ рд╕реНрдХреНрд░рд┐рдкреНрдЯ рдЬреЛ рд╕реЛ рдЬрд╛рдПрдЧреА рдФрд░ STDOUT рдореЗрдВ рдЗрд╕ рдмрд╛рд░реЗ рдореЗрдВ рд╕рджрд╕реНрдпрддрд╛ рд╕рдорд╛рдкреНрдд рдХрд░ рджреЗрдЧреАред

sleep.php
 class SleeperTest extends Executable { public function sleep() { for($i = 0; !$this->isTerminated() && $i < 10; $i++) { ob_start(); echo $i . "\n"; ob_end_flush(); sleep(5); } } } $s = new SleeperTest; $s->sleep(); 


pm.php
 $pm = new Job_Manager; $pm->startJob('php sleep.php', 'sleeper1'); $pm->startJob('php sleep.php', 'sleeper2'); // $pm->process(); 


рдХрд╛рд░реНрдпрд╛рдиреНрд╡рдпрди рдореЗрдВ рдЙрдкрдпреЛрдЧ рдХрд┐рдП рдЬрд╛рдиреЗ рд╡рд╛рд▓реЗ рдЧреИрд░-рдЕрд╡рд░реЛрдзрдХ рд╡рд░реНрдгрдирдХрд░реНрддрд╛ рдФрд░ рд╕реНрдЯреНрд░реАрдо_рд╕реЗрд▓реЗрдХреНрдЯ рдлрд╝рдВрдХреНрд╢рди рд╕рднреА рдкреНрд░рдХрд╛рд░ рдХреЗ рдбреЗрдореЙрди рдХреА рд╕рдорд╕реНрдпрд╛ рд╕реЗ рдмрдЪрдиреЗ рдХреА рдЕрдиреБрдорддрд┐ рджреЗрддреЗ рд╣реИрдВ - рдПрдХ рдирд┐рд╖реНрдХреНрд░рд┐рдп рдЪрдХреНрд░ рдореЗрдВ рдЙрдЪреНрдЪ рд╕реАрдкреАрдпреВ рдЙрдкрдпреЛрдЧред рдкреНрд░рд╕реНрддрд╛рд╡рд┐рдд рд╡рд┐рдзрд┐ рдЗрд╕ рдЦрд╛рдореА рд╕реЗ рд░рд╣рд┐рдд рд╣реИ, рд╕рдм рдХреБрдЫ рдЖрд╕рд╛рдиреА рд╕реЗ рдФрд░ рд╢рд╛рдВрддрд┐ рд╕реЗ рдХрд╛рдо рдХрд░рддрд╛ рд╣реИред

рдЕрджреНрдпрддрдиред рдореИрдВрдиреЗ рдХрдХреНрд╖рд╛ рдХреЗ рд╕реНрд░реЛрддреЛрдВ рдХреЛ github https://github.com/xzag/php-pm рдкрд░ рдкреЛрд╕реНрдЯ рдХрд┐рдпрд╛ рд╣реИ

Source: https://habr.com/ru/post/In148596/


All Articles