<?php
/**
 * $Id: $
 * $Author: $
 * $LastChangedDate: $
 * $Revision: $
 */
abstract class manager extends thread {
    private $threads=Array();
    private $stopped = false;
    protected function fork (thread $thread) {
        $pid=pcntl_fork();
        if($pid == -1) {
            throw new Exception('Can\'t create new thread!');
        }
        if($pid) {
            $this->threads[$pid]=true;
        }
        else {
            //child execution
            $thread->exec();
            //normal exit for a child;
            exit;
        }
    }
    public function threads_count() {
        return count($this->threads);
    }
    protected function process_events() {
        while($pid = pcntl_wait($status, WNOHANG | WUNTRACED)) {
            if($pid == -1) {
                $this->threads=Array();
                break;
            }
            echo 'Thread with ID '.$pid.' closed'.PHP_EOL;
            unset($this->threads[$pid]);
        }
        usleep(500000);
    }
    public function daemonize() {
        $pid = pcntl_fork();
        if($pid) {
            exit;
        }
        posix_setsid();
        return $this->exec();
    }
}
abstract class thread {
    private $pid;
    private $stopped;
    public function is_stopped() {
        return $this->stopped;
    }
    public function sig_handler($sig) {
        switch ($sig) {
            case SIGTERM:
                $this->stopped=true;
                break;
        }
    }
    public function get_pid () {
        if($this->pid) {
            $this->pid=getmypid();
        }
        return $this->pid;
    }
    public function exec () {
        declare(ticks=1);
        pcntl_signal(SIGTERM, Array($this,"sig_handler"));
        return $this->run();
    }
    abstract protected function run ();
}
class shared_memory {
    protected $key;
    protected $sem_id;
    protected $shm;
    public function __construct($file_name,$size=null) {
        $this->key = ftok($file_name, 'g');
        if(!$this->sem_id=sem_get($this->key)) {
            throw new Exception('Can\'t attach semaphore!');
        }
        if(!$this->shm=shm_attach($this->key,$size)) {
            throw new Exception('Can\'t attach shared memory!');
        }
    }
    public function exclusive() {
        sem_acquire($this->sem_id);
        return $this;
    }
    public function get($var_id=1) {
        if(shm_has_var($this->shm,$var_id)) {
            return shm_get_var($this->shm,$var_id);
        }
        return false;
    }
    public function set($val,$var_id=1) {
        if(!shm_put_var($this->shm,$var_id,$val)) {
            throw new Exception('Can\'t set var with id'.$var_id.' in shared memory!');
        }
        return $this;
    }
    public function end_exclusive() {
        sem_release($this->sem_id);
        return $this;
    }
    public function remove() {
        if($this->sem_id) {
            sem_remove($this->sem_id);
        }
        if($this->shm) {
            shm_remove($this->shm);
            shm_detach($this->shm);
        }
    }
}
class email_sender extends thread {
    protected $renderer;
    protected $shared_memory;
    public function __construct($shared_memory, $renderer) {
        $this->renderer = $renderer;
        $this->shared_memory = $shared_memory;
    }
    protected function run() {
        $db = Yii::app()->db;
        $delivery_rules = Array(
            'other' => delivery_rule::model()->find('domain=\'other\'')
        );
        if (!$delivery_rules['other']) {
            throw new Exception('Please specify "other" rule!');
        }
        $mailer = new PHPMailer(true);
        $mailer->Mailer = "smtp";
        $mailer->Subject = "PHP Developer Newsletter";
        $mailer->IsHTML(true);
        do {
            #exclusive get last user id
            $shm = $this->shared_memory;
            $last_id = $shm->exclusive()->get();
            if (!$last_id) {
                $last_id = 0;
            }
            try {
                $user = user::model()->with_opt()->find(
                    'id>:id and last_submit<=:prev_week',
                    Array(
                        ':id' => $last_id,
                        ':prev_week'=>new CDbExpression('DATE_SUB(CURRENT_DATE,INTERVAL 7 DAY)')
                    )
                );
                if (!$user) {
                    //complete all jobs
                    $shm->end_exclusive();
                    echo 'Users not found! Break;' . PHP_EOL;
                    break;
                }
                $shm->set($user['id'])->end_exclusive();
                $domain = substr($user['email'], strpos($user['email'], '@'));
                if (!isset($delivery_rules[$domain])) {
                    $delivery_rules[$domain] = delivery_rule::model()->find('domain=:domain',
                                                                            array(':domain' => $domain));
                    if (!$delivery_rules[$domain]) {
                        $delivery_rules[$domain] = $delivery_rules['other'];
                    }
                }
                $rule = $delivery_rules[$domain];
                $mailer->Host = $rule->host;
                $mailer->Port = $rule->port;
                echo 'Sending mail to ' . $user['email'] . ' via host ' . $rule->host . ':' . $rule->port . PHP_EOL;
                $mailer->AddAddress($user['email']);
                $mailer->Body = $this->renderer->renderFile('mail_text.php', Array('user' => $user), true);
                $mailer->Send();
                $mailer->ClearAddresses();
                $user->last_submit=new CDbExpression('CURRENT_DATE');
                $user->save();
            } catch (phpmailerException $e) {
                echo 'Error sending email: '.$e->getMessage().PHP_EOL;
            } catch (Exception $e) {
                //Another error
                throw $e;
            }
        } while (!$this->is_stopped());
    }
}
class email_sender_manager extends manager {
    protected $renderer;
    protected $threads_max;
    public function __construct($renderer,$threads_max) {
        $this->renderer=$renderer;
        $this->threads_max=$threads_max;
    }
    protected function run () {
        echo 'Threads max: '.$this->threads_max.PHP_EOL;
        $shared_memory=new shared_memory(__FILE__,64);
        for($i=0;$i<$this->threads_max;$i++) {
            $this->fork(new email_sender($shared_memory,$this->renderer));
        }
        //wait for compete childs
        do {
            $this->process_events();
        } while(!$this->is_stopped() && $this->threads_count());
        $shared_memory->remove();
    }
}
$manager = new email_sender_manager($renderer,$thread_count);
$manager->exec();