| 1 | <?php␊ |
| 2 | /* -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */␊ |
| 3 | /*␊ |
| 4 | # ***** BEGIN LICENSE BLOCK *****␊ |
| 5 | # This file is part of InDefero, an open source project management application.␊ |
| 6 | # Copyright (C) 2008-2011 CĂ©ondo Ltd and contributors.␊ |
| 7 | #␊ |
| 8 | # InDefero is free software; you can redistribute it and/or modify␊ |
| 9 | # it under the terms of the GNU General Public License as published by␊ |
| 10 | # the Free Software Foundation; either version 2 of the License, or␊ |
| 11 | # (at your option) any later version.␊ |
| 12 | #␊ |
| 13 | # InDefero is distributed in the hope that it will be useful,␊ |
| 14 | # but WITHOUT ANY WARRANTY; without even the implied warranty of␊ |
| 15 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the␊ |
| 16 | n# GNU General Public License for more details.␊ |
| 17 | #␊ |
| 18 | # You should have received a copy of the GNU General Public License␊ |
| 19 | # along with this program; if not, write to the Free Software␊ |
| 20 | # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA␊ |
| 21 | #␊ |
| 22 | # ***** END LICENSE BLOCK ***** */␊ |
| 23 | ␊ |
| 24 | ␊ |
| 25 | /**␊ |
| 26 | * Queue system for the management of asynchronous operations.␊ |
| 27 | *␊ |
| 28 | * Anybody can add an item to the queue and any application can␊ |
| 29 | * register itself to process an item from the queue.␊ |
| 30 | *␊ |
| 31 | * An item in the queue is considered as fully processed when all the␊ |
| 32 | * handlers have processed it successfully.␊ |
| 33 | *␊ |
| 34 | * To push a new item in the queue:␊ |
| 35 | *␊ |
| 36 | * <code>␊ |
| 37 | * $item = new IDF_Queue();␊ |
| 38 | * $item->type = 'new_commit';␊ |
| 39 | * $item->payload = array('what', 'ever', array('data'));␊ |
| 40 | * $item->create();␊ |
| 41 | * </code>␊ |
| 42 | *␊ |
| 43 | * To process one item from the queue, you first need to register an␊ |
| 44 | * handler, by adding the following in your relations.php file before␊ |
| 45 | * the return statement or in your config file.␊ |
| 46 | *␊ |
| 47 | * <code>␊ |
| 48 | * Pluf_Signal::connect('IDF_Queue::processItem', ␊ |
| 49 | * array('YourApp_Class', 'processItem'));␊ |
| 50 | * </code>␊ |
| 51 | *␊ |
| 52 | * The processItem method will be called with two arguments, the first␊ |
| 53 | * is the name of the signal ('IDF_Queue::processItem') and the second␊ |
| 54 | * is an array with:␊ |
| 55 | *␊ |
| 56 | * <code>␊ |
| 57 | * array('item' => $item,␊ |
| 58 | * 'res' => array('OtherApp_Class::handler' => false,␊ |
| 59 | * 'FooApp_Class::processItem' => true));␊ |
| 60 | * </code>␊ |
| 61 | *␊ |
| 62 | * When you process an item, you need first to check if the type is␊ |
| 63 | * corresponding to what you want to work with, then you need to check␊ |
| 64 | * in 'res' if you have not already processed successfully the item,␊ |
| 65 | * that is the key 'YourApp_Class::processItem' must be set to true,␊ |
| 66 | * and then you can process the item. At the end of your processing,␊ |
| 67 | * you need to modify by reference the 'res' key to add your status.␊ |
| 68 | *␊ |
| 69 | * All the data except for the type is in the payload, this makes the␊ |
| 70 | * queue flexible to manage many different kind of tasks.␊ |
| 71 | *␊ |
| 72 | */␊ |
| 73 | class IDF_Queue extends Pluf_Model␊ |
| 74 | {␊ |
| 75 | public $_model = __CLASS__;␊ |
| 76 | ␊ |
| 77 | function init()␊ |
| 78 | {␊ |
| 79 | $this->_a['table'] = 'idf_queue';␊ |
| 80 | $this->_a['model'] = __CLASS__;␊ |
| 81 | $this->_a['cols'] = array(␊ |
| 82 | // It is mandatory to have an "id" column.␊ |
| 83 | 'id' =>␊ |
| 84 | array(␊ |
| 85 | 'type' => 'Pluf_DB_Field_Sequence',␊ |
| 86 | 'blank' => true, ␊ |
| 87 | ),␊ |
| 88 | 'status' => ␊ |
| 89 | array(␊ |
| 90 | 'type' => 'Pluf_DB_Field_Integer',␊ |
| 91 | 'blank' => false,␊ |
| 92 | 'choices' => array(␊ |
| 93 | 'pending' => 0,␊ |
| 94 | 'in_progress' => 1,␊ |
| 95 | 'need_retry' => 2,␊ |
| 96 | 'done' => 3,␊ |
| 97 | 'error' => 4,␊ |
| 98 | ),␊ |
| 99 | 'default' => 0,␊ |
| 100 | ),␊ |
| 101 | 'trials' =>␊ |
| 102 | array(␊ |
| 103 | 'type' => 'Pluf_DB_Field_Integer',␊ |
| 104 | 'default' => 0,␊ |
| 105 | ),␊ |
| 106 | 'type' =>␊ |
| 107 | array(␊ |
| 108 | 'type' => 'Pluf_DB_Field_Varchar',␊ |
| 109 | 'blank' => false,␊ |
| 110 | 'size' => 50,␊ |
| 111 | ),␊ |
| 112 | 'payload' =>␊ |
| 113 | array(␊ |
| 114 | 'type' => 'Pluf_DB_Field_Serialized',␊ |
| 115 | 'blank' => false,␊ |
| 116 | ),␊ |
| 117 | 'results' =>␊ |
| 118 | array(␊ |
| 119 | 'type' => 'Pluf_DB_Field_Serialized',␊ |
| 120 | 'blank' => false,␊ |
| 121 | ),␊ |
| 122 | 'lasttry_dtime' =>␊ |
| 123 | array(␊ |
| 124 | 'type' => 'Pluf_DB_Field_Datetime',␊ |
| 125 | 'blank' => true,␊ |
| 126 | ),␊ |
| 127 | 'creation_dtime' =>␊ |
| 128 | array(␊ |
| 129 | 'type' => 'Pluf_DB_Field_Datetime',␊ |
| 130 | 'blank' => true,␊ |
| 131 | ),␊ |
| 132 | );␊ |
| 133 | }␊ |
| 134 | ␊ |
| 135 | function preSave($create=false)␊ |
| 136 | {␊ |
| 137 | if ($create) {␊ |
| 138 | $this->creation_dtime = gmdate('Y-m-d H:i:s');␊ |
| 139 | $this->lasttry_dtime = gmdate('Y-m-d H:i:s');␊ |
| 140 | $this->results = array();␊ |
| 141 | $this->trials = 0;␊ |
| 142 | $this->status = 0;␊ |
| 143 | }␊ |
| 144 | }␊ |
| 145 | ␊ |
| 146 | /**␊ |
| 147 | * The current item is going to be processed.␊ |
| 148 | */␊ |
| 149 | function processItem()␊ |
| 150 | {␊ |
| 151 | /**␊ |
| 152 | * [signal]␊ |
| 153 | *␊ |
| 154 | * IDF_Queue::processItem␊ |
| 155 | *␊ |
| 156 | * [sender]␊ |
| 157 | *␊ |
| 158 | * IDF_Queue␊ |
| 159 | *␊ |
| 160 | * [description]␊ |
| 161 | *␊ |
| 162 | * This signal allows an application to run an asynchronous␊ |
| 163 | * job. The handler gets the queue item and the results from␊ |
| 164 | * the previous run. If the handler key is not set, then the␊ |
| 165 | * job was not run. If set it can be either true (already done)␊ |
| 166 | * or false (error at last run).␊ |
| 167 | *␊ |
| 168 | * [parameters]␊ |
| 169 | *␊ |
| 170 | * array('item' => $item, 'res' => $res)␊ |
| 171 | *␊ |
| 172 | */␊ |
| 173 | $params = array('item' => $this, 'res' => $this->results);␊ |
| 174 | Pluf_Signal::send('IDF_Queue::processItem',␊ |
| 175 | 'IDF_Queue', $params);␊ |
| 176 | $this->status = 3; // Success␊ |
| 177 | foreach ($params['res'] as $handler=>$ok) {␊ |
| 178 | if (!$ok) {␊ |
| 179 | $this->status = 2; // Set to need retry␊ |
| 180 | $this->trials += 1;␊ |
| 181 | break;␊ |
| 182 | }␊ |
| 183 | }␊ |
| 184 | $this->results = $params['res'];␊ |
| 185 | $this->lasttry_dtime = gmdate('Y-m-d H:i:s');␊ |
| 186 | $this->update();␊ |
| 187 | }␊ |
| 188 | ␊ |
| 189 | /** ␊ |
| 190 | * Parse the queue.␊ |
| 191 | *␊ |
| 192 | * It is a signal handler to just hook itself at the right time in␊ |
| 193 | * the cron job performing the maintainance work.␊ |
| 194 | *␊ |
| 195 | * The processing relies on the fact that no other processing jobs␊ |
| 196 | * must run at the same time. That is, your cron job must use a␊ |
| 197 | * lock file or something like to not run in parallel.␊ |
| 198 | *␊ |
| 199 | * The processing is simple, first get 500 queue items, mark them␊ |
| 200 | * as being processed and for each of them call the processItem()␊ |
| 201 | * method which will trigger another event for processing.␊ |
| 202 | *␊ |
| 203 | * If you are processing more than 500 items per batch, you need␊ |
| 204 | * to switch to a different solution.␊ |
| 205 | *␊ |
| 206 | */␊ |
| 207 | public static function process($sender, &$params)␊ |
| 208 | {␊ |
| 209 | $where = 'status=0 OR status=2';␊ |
| 210 | $items = Pluf::factory('IDF_Queue')->getList(array('filter'=>$where,␊ |
| 211 | 'nb'=> 500));␊ |
| 212 | Pluf_Log::event(array('IDF_Queue::process', $items->count()));␊ |
| 213 | foreach ($items as $item) {␊ |
| 214 | $item->status = 1;␊ |
| 215 | $item->update();␊ |
| 216 | }␊ |
| 217 | foreach ($items as $item) {␊ |
| 218 | $item->status = 1;␊ |
| 219 | $item->processItem();␊ |
| 220 | }␊ |
| 221 | }␊ |
| 222 | }␊ |
| 223 | |