Indefero

Indefero Git Source Tree

Root/src/IDF/Queue.php

1<?php
2/* -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
3/*
4# ***** BEGIN LICENSE BLOCK *****
5# This file is part of InDefero, an open source project management application.
6# Copyright (C) 2008-2011 Céondo Ltd and contributors.
7#
8# InDefero is free software; you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation; either version 2 of the License, or
11# (at your option) any later version.
12#
13# InDefero is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16n# GNU General Public License for more details.
17#
18# You should have received a copy of the GNU General Public License
19# along with this program; if not, write to the Free Software
20# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
21#
22# ***** END LICENSE BLOCK ***** */
23
24
25/**
26 * Queue system for the management of asynchronous operations.
27 *
28 * Anybody can add an item to the queue and any application can
29 * register itself to process an item from the queue.
30 *
31 * An item in the queue is considered as fully processed when all the
32 * handlers have processed it successfully.
33 *
34 * To push a new item in the queue:
35 *
36 * <code>
37 * $item = new IDF_Queue();
38 * $item->type = 'new_commit';
39 * $item->payload = array('what', 'ever', array('data'));
40 * $item->create();
41 * </code>
42 *
43 * To process one item from the queue, you first need to register an
44 * handler, by adding the following in your relations.php file before
45 * the return statement or in your config file.
46 *
47 * <code>
48 * Pluf_Signal::connect('IDF_Queue::processItem',
49 * array('YourApp_Class', 'processItem'));
50 * </code>
51 *
52 * The processItem method will be called with two arguments, the first
53 * is the name of the signal ('IDF_Queue::processItem') and the second
54 * is an array with:
55 *
56 * <code>
57 * array('item' => $item,
58 * 'res' => array('OtherApp_Class::handler' => false,
59 * 'FooApp_Class::processItem' => true));
60 * </code>
61 *
62 * When you process an item, you need first to check if the type is
63 * corresponding to what you want to work with, then you need to check
64 * in 'res' if you have not already processed successfully the item,
65 * that is the key 'YourApp_Class::processItem' must be set to true,
66 * and then you can process the item. At the end of your processing,
67 * you need to modify by reference the 'res' key to add your status.
68 *
69 * All the data except for the type is in the payload, this makes the
70 * queue flexible to manage many different kind of tasks.
71 *
72 */
73class IDF_Queue extends Pluf_Model
74{
75 public $_model = __CLASS__;
76
77 function init()
78 {
79 $this->_a['table'] = 'idf_queue';
80 $this->_a['model'] = __CLASS__;
81 $this->_a['cols'] = array(
82 // It is mandatory to have an "id" column.
83 'id' =>
84 array(
85 'type' => 'Pluf_DB_Field_Sequence',
86 'blank' => true,
87 ),
88 'status' =>
89 array(
90 'type' => 'Pluf_DB_Field_Integer',
91 'blank' => false,
92 'choices' => array(
93 'pending' => 0,
94 'in_progress' => 1,
95 'need_retry' => 2,
96 'done' => 3,
97 'error' => 4,
98 ),
99 'default' => 0,
100 ),
101 'trials' =>
102 array(
103 'type' => 'Pluf_DB_Field_Integer',
104 'default' => 0,
105 ),
106 'type' =>
107 array(
108 'type' => 'Pluf_DB_Field_Varchar',
109 'blank' => false,
110 'size' => 50,
111 ),
112 'payload' =>
113 array(
114 'type' => 'Pluf_DB_Field_Serialized',
115 'blank' => false,
116 ),
117 'results' =>
118 array(
119 'type' => 'Pluf_DB_Field_Serialized',
120 'blank' => false,
121 ),
122 'lasttry_dtime' =>
123 array(
124 'type' => 'Pluf_DB_Field_Datetime',
125 'blank' => true,
126 ),
127 'creation_dtime' =>
128 array(
129 'type' => 'Pluf_DB_Field_Datetime',
130 'blank' => true,
131 ),
132 );
133 }
134
135 function preSave($create=false)
136 {
137 if ($create) {
138 $this->creation_dtime = gmdate('Y-m-d H:i:s');
139 $this->lasttry_dtime = gmdate('Y-m-d H:i:s');
140 $this->results = array();
141 $this->trials = 0;
142 $this->status = 0;
143 }
144 }
145
146 /**
147 * The current item is going to be processed.
148 */
149 function processItem()
150 {
151 /**
152 * [signal]
153 *
154 * IDF_Queue::processItem
155 *
156 * [sender]
157 *
158 * IDF_Queue
159 *
160 * [description]
161 *
162 * This signal allows an application to run an asynchronous
163 * job. The handler gets the queue item and the results from
164 * the previous run. If the handler key is not set, then the
165 * job was not run. If set it can be either true (already done)
166 * or false (error at last run).
167 *
168 * [parameters]
169 *
170 * array('item' => $item, 'res' => $res)
171 *
172 */
173 $params = array('item' => $this, 'res' => $this->results);
174 Pluf_Signal::send('IDF_Queue::processItem',
175 'IDF_Queue', $params);
176 $this->status = 3; // Success
177 foreach ($params['res'] as $handler=>$ok) {
178 if (!$ok) {
179 $this->status = 2; // Set to need retry
180 $this->trials += 1;
181 break;
182 }
183 }
184 $this->results = $params['res'];
185 $this->lasttry_dtime = gmdate('Y-m-d H:i:s');
186 $this->update();
187 }
188
189 /**
190 * Parse the queue.
191 *
192 * It is a signal handler to just hook itself at the right time in
193 * the cron job performing the maintainance work.
194 *
195 * The processing relies on the fact that no other processing jobs
196 * must run at the same time. That is, your cron job must use a
197 * lock file or something like to not run in parallel.
198 *
199 * The processing is simple, first get 500 queue items, mark them
200 * as being processed and for each of them call the processItem()
201 * method which will trigger another event for processing.
202 *
203 * If you are processing more than 500 items per batch, you need
204 * to switch to a different solution.
205 *
206 */
207 public static function process($sender, &$params)
208 {
209 $where = 'status=0 OR status=2';
210 $items = Pluf::factory('IDF_Queue')->getList(array('filter'=>$where,
211 'nb'=> 500));
212 Pluf_Log::event(array('IDF_Queue::process', $items->count()));
213 foreach ($items as $item) {
214 $item->status = 1;
215 $item->update();
216 }
217 foreach ($items as $item) {
218 $item->status = 1;
219 $item->processItem();
220 }
221 }
222}
223

Archive Download this file