Демонизация на PHP

Breeze

goshogun
Команда форума
Партнер клуба
кмк вполне достаточная вещь для твоей задачи, проще форка для мастер-воркеров наверное ничего не придумаешь.
возможно для коммуникаций подойдут unix-сокеты.

Syra

ты за чужое быстродействие не переживай, пожалста.
 

cDLEON

Онанист РНРСlub
PHP:
<?php

/**
 * $Id: $
 * $Author: $
 * $LastChangedDate: $
 * $Revision: $
 */

abstract class manager extends thread {
    private $threads=Array();
    private $stopped = false;
    protected function fork (thread $thread) {
        $pid=pcntl_fork();
        if($pid == -1) {
            throw new Exception('Can\'t create new thread!');
        }
        if($pid) {
            $this->threads[$pid]=true;
        }
        else {
            //child execution
            $thread->exec();
            //normal exit for a child;
            exit;
        }
    }
    public function threads_count() {
        return count($this->threads);
    }
    protected function process_events() {
        while($pid = pcntl_wait($status, WNOHANG | WUNTRACED)) {
            if($pid == -1) {
                $this->threads=Array();
                break;
            }
            echo 'Thread with ID '.$pid.' closed'.PHP_EOL;
            unset($this->threads[$pid]);
        }
        usleep(500000);
    }
    public function daemonize() {
        $pid = pcntl_fork();
        if($pid) {
            exit;
        }
        posix_setsid();
        return $this->exec();
    }
}
abstract class thread {
    private $pid;
    private $stopped;
    public function is_stopped() {
        return $this->stopped;
    }
    public function sig_handler($sig) {
        switch ($sig) {
            case SIGTERM:
                $this->stopped=true;
                break;
        }
    }
    public function get_pid () {
        if($this->pid) {
            $this->pid=getmypid();
        }
        return $this->pid;
    }
    public function exec () {
        declare(ticks=1);
        pcntl_signal(SIGTERM, Array($this,"sig_handler"));
        return $this->run();
    }
    abstract protected function run ();
}
class shared_memory {
    protected $key;
    protected $sem_id;
    protected $shm;
    public function __construct($file_name,$size=null) {
        $this->key = ftok($file_name, 'g');
        if(!$this->sem_id=sem_get($this->key)) {
            throw new Exception('Can\'t attach semaphore!');
        }
        if(!$this->shm=shm_attach($this->key,$size)) {
            throw new Exception('Can\'t attach shared memory!');
        }
    }
    public function exclusive() {
        sem_acquire($this->sem_id);
        return $this;
    }
    public function get($var_id=1) {
        if(shm_has_var($this->shm,$var_id)) {
            return shm_get_var($this->shm,$var_id);
        }
        return false;
    }
    public function set($val,$var_id=1) {
        if(!shm_put_var($this->shm,$var_id,$val)) {
            throw new Exception('Can\'t set var with id'.$var_id.' in shared memory!');
        }
        return $this;
    }
    public function end_exclusive() {
        sem_release($this->sem_id);
        return $this;
    }
    public function remove() {
        if($this->sem_id) {
            sem_remove($this->sem_id);
        }
        if($this->shm) {
            shm_remove($this->shm);
            shm_detach($this->shm);
        }
    }
}
class email_sender extends thread {
    protected $renderer;
    protected $shared_memory;

    public function __construct($shared_memory, $renderer) {
        $this->renderer = $renderer;
        $this->shared_memory = $shared_memory;
    }

    protected function run() {
        $db = Yii::app()->db;
        $delivery_rules = Array(
            'other' => delivery_rule::model()->find('domain=\'other\'')
        );
        if (!$delivery_rules['other']) {
            throw new Exception('Please specify "other" rule!');
        }
        $mailer = new PHPMailer(true);
        $mailer->Mailer = "smtp";
        $mailer->Subject = "PHP Developer Newsletter";
        $mailer->IsHTML(true);
        do {
            #exclusive get last user id
            $shm = $this->shared_memory;
            $last_id = $shm->exclusive()->get();
            if (!$last_id) {
                $last_id = 0;
            }
            try {
                $user = user::model()->with_opt()->find(
                    'id>:id and last_submit<=:prev_week',
                    Array(
                        ':id' => $last_id,
                        ':prev_week'=>new CDbExpression('DATE_SUB(CURRENT_DATE,INTERVAL 7 DAY)')
                    )
                );

                if (!$user) {
                    //complete all jobs
                    $shm->end_exclusive();
                    echo 'Users not found! Break;' . PHP_EOL;
                    break;
                }
                $shm->set($user['id'])->end_exclusive();

                $domain = substr($user['email'], strpos($user['email'], '@'));
                if (!isset($delivery_rules[$domain])) {
                    $delivery_rules[$domain] = delivery_rule::model()->find('domain=:domain',
                                                                            array(':domain' => $domain));
                    if (!$delivery_rules[$domain]) {
                        $delivery_rules[$domain] = $delivery_rules['other'];
                    }
                }
                $rule = $delivery_rules[$domain];

                $mailer->Host = $rule->host;
                $mailer->Port = $rule->port;

                echo 'Sending mail to ' . $user['email'] . ' via host ' . $rule->host . ':' . $rule->port . PHP_EOL;

                $mailer->AddAddress($user['email']);
                $mailer->Body = $this->renderer->renderFile('mail_text.php', Array('user' => $user), true);
                $mailer->Send();
                $mailer->ClearAddresses();

                $user->last_submit=new CDbExpression('CURRENT_DATE');
                $user->save();
            } catch (phpmailerException $e) {
                echo 'Error sending email: '.$e->getMessage().PHP_EOL;
            } catch (Exception $e) {
                //Another error
                throw $e;
            }
        } while (!$this->is_stopped());
    }
}
class email_sender_manager extends manager {
    protected $renderer;
    protected $threads_max;
    public function __construct($renderer,$threads_max) {
        $this->renderer=$renderer;
        $this->threads_max=$threads_max;
    }
    protected function run () {
        echo 'Threads max: '.$this->threads_max.PHP_EOL;
        $shared_memory=new shared_memory(__FILE__,64);
        for($i=0;$i<$this->threads_max;$i++) {
            $this->fork(new email_sender($shared_memory,$this->renderer));
        }
        //wait for compete childs
        do {
            $this->process_events();
        } while(!$this->is_stopped() && $this->threads_count());
        $shared_memory->remove();
    }
}


$manager = new email_sender_manager($renderer,$thread_count);
$manager->exec();
Вот для примера, может быть, понадобится :)
 
  • Like
Реакции: AmdY

grigori

( ͡° ͜ʖ ͡°)
Команда форума
cDLEON мне кажется, или у тебя спутаны run() и exec() ?
года 4 назад у меня подобный менеджер внезапно падал в долгосрочной работе (дни, недели)
у тебя какое стабильное время жизни?


среднее время ожидания источников - 0.7 - 1 сек на 1 запрос
Выборки из БД идут 3-4 секунды
мастер запускает процесс загрузки (туда мульти курл), возвращает список местеру,
вокер получает новый список, как только один документы загрузился, передается мастером другому вокеру, который уже пошел работать с текстом (анализировать, строить цепочки), создавать PDF/RTF/DOC копии, индексы.
ты сумбурно написал, как обычно, хрен поймешь...
я правильно понял, что ты загружаешь пачку документов курлом, парсишь, делаешь запросы по базе, пишешь в базу результат?
 

cDLEON

Онанист РНРСlub
grigori
cDLEON мне кажется, или у тебя спутаны run() и exec() ?
года 4 назад у меня подобный менеджер внезапно падал в долгосрочной работе (дни, недели)
у тебя какое стабильное время жизни?
Ну вообще не знаю как их можно спутать :) По-моему, это практически синонимы :)
Но.. Моя логика была в том, что execute - выполнить, run - двигаться. Вооот :)
На счёт падений - не вкурсе. Это было тестовое задание =) Хотел, было, попробовать себя в роли удалённого программиста на постоянке =)
 

grigori

( ͡° ͜ʖ ͡°)
Команда форума
grigori
Как делал сигналы? Передача данных между процессами?
у меня мастер ничего не делает, только запускает/прибивает воркеров, никаких данных, иначе он будет падать
сигналы у меня только для проверки жизни воркеров
PHP:
$MONITOR_SCRIPTS = array(
	array(
		'script'=>'parse_mail_lists.php -v >>'._Config::$LOG_DIR.'/parser.log 2>&1 ',
		'pid'=>$PID_FILES_DIR.'/parse_mail_lists.pid',
	),
);
foreach ($MONITOR_SCRIPTS as $script_info){
    if (file_exists($script_info['pid'])){
        $pidmtime = filemtime($script_info['pid']);
        $pid = file_get_contents($script_info['pid']);
        if (time()-$pidmtime > DAEMON_TIMEOUT || !posix_kill($pid, SIGUSR1)){
            echo 'timed out: `'.$script_info['script'].'` - '.(time()-$pidmtime)." seconds\n";
            $restart_script[]=$script_info;
        }
    }else{
        $restart_script[]=$script_info;
    }
}
if (!$restart_script){
    exit;
}
sleep (CALLBACK_TIMEOUT);
clearstatcache();

foreach ($restart_script as $script_info){
	if (file_exists($script_info['pid'])){
        $pidmtime = filemtime($script_info['pid']);
        if (time()-$pidmtime > DAEMON_TIMEOUT){
            posix_kill($pid, SIGTERM);
        }else{
            continue;
        }
    }
    $command = _Config::PHP_BIN_PATH.' '.$script_info['script']
            .' --pid-file='.$script_info['pid']
            .' &';
    echo 'executing ',$command,"\n";
    exec ($command);
}
у каждого воркера свой pid-файл, который он обновляет
мастер шлет SIGUSR1 на воркера, у воркеров висит на него колбек, который обновляет pid-файл (влом было писать )
вся работа у меня только в воркерах, которые выполняют некий объем, выходят, и мастер их сразу запускает

универсальное решение - запускать через exec/proc_open, т.к. форк тащит в дочку все открытые порты и файлы, которые php при выходе дочки успешно закрывает за родителя

в твоем случае я бы выделил воркера, который получает данные, и пачку воркеров на обработку
как организовать обмен между ними - варианта обычно 2: через сокет или пайпы

а если надежность не критична - можно классически форкать воркеров из мастера, оставить в мастере получение данных, и через пайпы работать
 
Сверху