<?php
/**
* $Id: $
* $Author: $
* $LastChangedDate: $
* $Revision: $
*/
abstract class manager extends thread {
private $threads=Array();
private $stopped = false;
protected function fork (thread $thread) {
$pid=pcntl_fork();
if($pid == -1) {
throw new Exception('Can\'t create new thread!');
}
if($pid) {
$this->threads[$pid]=true;
}
else {
//child execution
$thread->exec();
//normal exit for a child;
exit;
}
}
public function threads_count() {
return count($this->threads);
}
protected function process_events() {
while($pid = pcntl_wait($status, WNOHANG | WUNTRACED)) {
if($pid == -1) {
$this->threads=Array();
break;
}
echo 'Thread with ID '.$pid.' closed'.PHP_EOL;
unset($this->threads[$pid]);
}
usleep(500000);
}
public function daemonize() {
$pid = pcntl_fork();
if($pid) {
exit;
}
posix_setsid();
return $this->exec();
}
}
abstract class thread {
private $pid;
private $stopped;
public function is_stopped() {
return $this->stopped;
}
public function sig_handler($sig) {
switch ($sig) {
case SIGTERM:
$this->stopped=true;
break;
}
}
public function get_pid () {
if($this->pid) {
$this->pid=getmypid();
}
return $this->pid;
}
public function exec () {
declare(ticks=1);
pcntl_signal(SIGTERM, Array($this,"sig_handler"));
return $this->run();
}
abstract protected function run ();
}
class shared_memory {
protected $key;
protected $sem_id;
protected $shm;
public function __construct($file_name,$size=null) {
$this->key = ftok($file_name, 'g');
if(!$this->sem_id=sem_get($this->key)) {
throw new Exception('Can\'t attach semaphore!');
}
if(!$this->shm=shm_attach($this->key,$size)) {
throw new Exception('Can\'t attach shared memory!');
}
}
public function exclusive() {
sem_acquire($this->sem_id);
return $this;
}
public function get($var_id=1) {
if(shm_has_var($this->shm,$var_id)) {
return shm_get_var($this->shm,$var_id);
}
return false;
}
public function set($val,$var_id=1) {
if(!shm_put_var($this->shm,$var_id,$val)) {
throw new Exception('Can\'t set var with id'.$var_id.' in shared memory!');
}
return $this;
}
public function end_exclusive() {
sem_release($this->sem_id);
return $this;
}
public function remove() {
if($this->sem_id) {
sem_remove($this->sem_id);
}
if($this->shm) {
shm_remove($this->shm);
shm_detach($this->shm);
}
}
}
class email_sender extends thread {
protected $renderer;
protected $shared_memory;
public function __construct($shared_memory, $renderer) {
$this->renderer = $renderer;
$this->shared_memory = $shared_memory;
}
protected function run() {
$db = Yii::app()->db;
$delivery_rules = Array(
'other' => delivery_rule::model()->find('domain=\'other\'')
);
if (!$delivery_rules['other']) {
throw new Exception('Please specify "other" rule!');
}
$mailer = new PHPMailer(true);
$mailer->Mailer = "smtp";
$mailer->Subject = "PHP Developer Newsletter";
$mailer->IsHTML(true);
do {
#exclusive get last user id
$shm = $this->shared_memory;
$last_id = $shm->exclusive()->get();
if (!$last_id) {
$last_id = 0;
}
try {
$user = user::model()->with_opt()->find(
'id>:id and last_submit<=:prev_week',
Array(
':id' => $last_id,
':prev_week'=>new CDbExpression('DATE_SUB(CURRENT_DATE,INTERVAL 7 DAY)')
)
);
if (!$user) {
//complete all jobs
$shm->end_exclusive();
echo 'Users not found! Break;' . PHP_EOL;
break;
}
$shm->set($user['id'])->end_exclusive();
$domain = substr($user['email'], strpos($user['email'], '@'));
if (!isset($delivery_rules[$domain])) {
$delivery_rules[$domain] = delivery_rule::model()->find('domain=:domain',
array(':domain' => $domain));
if (!$delivery_rules[$domain]) {
$delivery_rules[$domain] = $delivery_rules['other'];
}
}
$rule = $delivery_rules[$domain];
$mailer->Host = $rule->host;
$mailer->Port = $rule->port;
echo 'Sending mail to ' . $user['email'] . ' via host ' . $rule->host . ':' . $rule->port . PHP_EOL;
$mailer->AddAddress($user['email']);
$mailer->Body = $this->renderer->renderFile('mail_text.php', Array('user' => $user), true);
$mailer->Send();
$mailer->ClearAddresses();
$user->last_submit=new CDbExpression('CURRENT_DATE');
$user->save();
} catch (phpmailerException $e) {
echo 'Error sending email: '.$e->getMessage().PHP_EOL;
} catch (Exception $e) {
//Another error
throw $e;
}
} while (!$this->is_stopped());
}
}
class email_sender_manager extends manager {
protected $renderer;
protected $threads_max;
public function __construct($renderer,$threads_max) {
$this->renderer=$renderer;
$this->threads_max=$threads_max;
}
protected function run () {
echo 'Threads max: '.$this->threads_max.PHP_EOL;
$shared_memory=new shared_memory(__FILE__,64);
for($i=0;$i<$this->threads_max;$i++) {
$this->fork(new email_sender($shared_memory,$this->renderer));
}
//wait for compete childs
do {
$this->process_events();
} while(!$this->is_stopped() && $this->threads_count());
$shared_memory->remove();
}
}
$manager = new email_sender_manager($renderer,$thread_count);
$manager->exec();