root/trunk/midcom/midcom.helper.replicator/queuemanager.php

Revision 16796, 29.8 kB (checked in by rambo, 2 months ago)

forward port r16794 & r 16795 (merge -r 16793:16795)

Line 
1 <?php
2 /**
3 * @package midcom.helper.replicator
4 * @author The Midgard Project, http://www.midgard-project.org
5 * @version $Id: viewer.php 3975 2006-09-06 17:36:03Z bergie $
6 * @copyright The Midgard Project, http://www.midgard-project.org
7 * @license http://www.gnu.org/licenses/lgpl.html GNU Lesser General Public License
8 */
9
10 /**
11  * @package midcom.helper.replicator
12  */
13 class midcom_helper_replicator_queuemanager extends midcom_baseclasses_components_purecode
14 {
15     var $max_queues_per_pass = 10; // per subscription
16     var $max_items_per_pass = 3500; // per QM instance, not this is checked only *after* queue pass so more might be processed if single queue dirs are very large
17     var $_items_processed = 0;
18     var $exporters = array();
19     var $transporters = array();
20     var $started = 0;
21     /**
22      * file counts keyed by subscription guid
23      */
24     var $file_counts = array();
25
26     /**
27      * Initializes the class. The real startup is done by the initialize() call.
28      *
29      * @param midcom_helper_replicator_subscription_dba $subscription Subscription
30      */
31     function midcom_helper_replicator_queuemanager()
32     {
33          $this->_component = 'midcom.helper.replicator';
34          parent::midcom_baseclasses_components_purecode();
35          $this->started = date('YmdHis');
36     }
37
38     /**
39      * The correct way to get a queue manager, call this statically, returns reference
40      *
41      * @static
42      */
43     function &get()
44     {
45         if (!array_key_exists('midcom_helper_replicator_queuemanager_instance', $GLOBALS))
46         {
47             $GLOBALS['midcom_helper_replicator_queuemanager_instance'] = new midcom_helper_replicator_queuemanager();
48         }
49         return $GLOBALS['midcom_helper_replicator_queuemanager_instance'];
50     }
51
52     function sanity_check()
53     {
54         // TODO: Sanity check queue root etc
55         return true;
56     }
57
58     /**
59      * Check if we have any valid queues to add data to
60      *
61      * @return boolean indicating state
62      */
63     function can_add_to_queue()
64     {
65         $global_base = $this->_config->get('queue_root_dir');
66         if (   !is_dir($global_base)
67             || !is_writable($global_base))
68         {
69             return false;
70         }
71         $qb = midcom_helper_replicator_subscription_dba::new_query_builder();
72         $qb->add_constraint('status', '=', MIDCOM_REPLICATOR_AUTOMATIC);
73         $qb->set_limit(1);
74         $_MIDCOM->auth->request_sudo('midcom.helper.replicator');
75         $count = $qb->count();
76         $_MIDCOM->auth->drop_sudo();
77         if ($count > 0)
78         {
79             return true;
80         }
81         return false;
82     }
83
84     /**
85      * This method will check exporters of each subscription whether they are interested
86      * in exporting the particular object.
87      *
88      * If the exporters are interested, they will be asked to serialize the data. Queue manager
89      * will then store the serialized data for each queue.
90      * @todo refactor to smaller methods
91      * @todo query for subscriptions only once
92      * @todo raise UIMessage in style of 'N object queued to subscription X'
93      */
94     function add_to_queue(&$object, $rewrite_to_delete = false)
95     {
96         $_MIDCOM->auth->request_sudo('midcom.helper.replicator');
97         $GLOBALS['midcom_helper_replicator_logger']->push_prefix('Queue Manager');
98         debug_push_class(__CLASS__, __FUNCTION__);
99         $qb = midcom_helper_replicator_subscription_dba::new_query_builder();
100         $qb->add_constraint('sitegroup', '=', $object->sitegroup);
101         // NOTE: if this constraint is changed see can_add_to_queue()
102         $qb->add_constraint('status', '=', MIDCOM_REPLICATOR_AUTOMATIC);
103         $subscriptions = $qb->execute();
104         $queued_guids = array();
105         foreach ($subscriptions as $subscription)
106         {
107             debug_add("Processing subscription '{$subscription->title}' ({$subscription->guid})");
108         
109             $exporter = midcom_helper_replicator_exporter::create($subscription);
110
111             if (!$exporter->is_exportable($object))
112             {
113                 debug_add('exporter->is_exportable() returned false');
114                 continue;
115             }
116             
117             if ($rewrite_to_delete)
118             {
119                 $exporter->_serialize_rewrite_to_delete[$object->guid] = true;
120             }
121             $exporter_serializations = $exporter->serialize($object);
122             if ($exporter_serializations === false)
123             {
124                 debug_add('exporter->serialize() returned false', MIDCOM_LOG_ERROR);
125                 continue;
126             }
127             if (empty($exporter_serializations))
128             {
129                 debug_add('exporter->serialize() returned empty array', MIDCOM_LOG_WARN);
130                 continue;
131             }
132             
133             // Store the serialized XML data for the queue
134             $path = $this->_get_subscription_queue_basedir($subscription);
135             if (empty($path))
136             {
137                 // TODO: error handling
138                 debug_add('could not get queue dir for subscription', MIDCOM_LOG_ERROR);
139                 continue;
140             }
141             if (   !isset($this->file_counts[$subscription->guid])
142                 || !is_numeric($this->file_counts[$subscription->guid]))
143             {
144                 $this->file_counts[$subscription->guid] = 1;
145             }
146             $i =& $this->file_counts[$subscription->guid];
147             debug_add('about to queue ' . count($exporter_serializations) . ' keys');
148             reset($exporter_serializations);
149             foreach ($exporter_serializations as $key => $data)
150             {
151                 if (empty($data))
152                 {
153                     $msg = "Key {$key} has empty data, skipping";
154                     $GLOBALS['midcom_helper_replicator_logger']->log($msg, MIDCOM_LOG_WARN);
155                     debug_add($msg, MIDCOM_LOG_WARN);
156                     continue;
157                 }
158                 $file = "{$path}/" . sprintf('%010d', $i) . "_{$key}.xml";
159                 if (file_exists($file))
160                 {
161                     // PANIC: file is already there (it *really* should not be)
162                     unset($exporter_serializations, $key, $data);
163                     debug_add("file {$file} already exists", MIDCOM_LOG_ERROR);
164                     debug_pop();
165                     $GLOBALS['midcom_helper_replicator_logger']->pop_prefix();
166                     $_MIDCOM->auth->drop_sudo();
167                     return false;
168                 }
169                 $fp = fopen($file, 'w');
170                 if (!$fp)
171                 {
172                     // PANIC: Can't open file for writing
173                     debug_add("can't open file {$file} for writing", MIDCOM_LOG_ERROR);
174                     unset($exporter_serializations, $key, $data);
175                     debug_pop();
176                     $GLOBALS['midcom_helper_replicator_logger']->pop_prefix();
177                     $_MIDCOM->auth->drop_sudo();
178                     return false;
179                 }
180                 fwrite($fp, $data, strlen($data));
181                 fclose($fp);
182                 
183                 $msg = "Queued {$key} as {$file}";
184                 $GLOBALS['midcom_helper_replicator_logger']->log($msg);
185                 debug_add($msg);
186
187                 // TODO: How to call midgard_replicator::export() for all the objects exported ?? (and is this the correct place for that ?)
188                 if (($guid_end = strpos($key, '_metadata')) !== false)
189                 {
190                     // This key is attachment metadata
191                     $export_guid = substr($key, 0, $guid_end);
192                     unset($guid_end);
193                 }
194                 elseif (mgd_is_guid($key))
195                 {
196                     $export_guid = $key;
197                 }
198                 else
199                 {
200                     debug_add("could not determine GUID from key '{$key}'");
201                     $export_guid = false;
202                 }
203                 if ($export_guid)
204                 {
205                     $queued_guids[$export_guid] = true;
206                 }
207
208                 unset($key, $data);
209                 ++$i;
210             }
211             unset($exporter_serializations);
212             debug_add('all keys queued, now marking them exported');
213             foreach ($queued_guids as $export_guid => $bool)
214             {
215                 $marked_exported = false;
216                 if (version_compare(mgd_version(), '1.8.2', '>='))
217                 {
218                     // In Midgard 1.8.2 we can do this efficiently with the export_by_guid method
219                     $marked_exported = midgard_replicator::export_by_guid($export_guid);
220                 }
221                 else
222                 {
223                     $object = $_MIDCOM->dbfactory->get_object_by_guid($export_guid);
224                     $marked_exported = midgard_replicator::export($object);
225                 }
226                 
227                 if ($marked_exported)
228                 {
229                     $msg = "Marked GUID '{$export_guid}' as exported to queue \"{$subscription->title}\"";
230                     //$GLOBALS['midcom_helper_replicator_logger']->log($msg);
231                     debug_add($msg);
232                 }
233                 else
234                 {
235                     $msg = "Failed to mark GUID '{$export_guid}' as exported, errstr: " . mgd_errstr();
236                     $GLOBALS['midcom_helper_replicator_logger']->log($msg, MIDCOM_LOG_ERROR);
237                     debug_add($msg, MIDCOM_LOG_ERROR);
238                 }
239             }
240             debug_add("All done for subscription '{$subscription->title}' ({$subscription->guid})");
241         }
242         debug_add('All automatic subscriptions processed');
243         debug_pop();
244         $GLOBALS['midcom_helper_replicator_logger']->pop_prefix();
245         $_MIDCOM->auth->drop_sudo();
246         return true;
247     }
248
249     function list_path_items($path)
250     {
251         $path = preg_replace('%/{2,}|/$%', '', $path);
252         $ret = array();
253         $dp = opendir($path);
254         if (!$dp)
255         {
256             return false;
257         }
258         
259         while (($file_name = readdir($dp)) !== false)
260         {
261             if (   $file_name == '.'
262                 || $file_name == '..')
263             {
264                 continue;
265             }
266             $file_path = "{$path}/{$file_name}";
267             if (is_dir($file_path))
268             {
269                 $ret = array_merge($ret, midcom_helper_replicator_queuemanager::list_path_items($file_path));
270                 continue;
271             }
272             $ret[] = $file_path;
273         }
274         closedir($dp);
275
276         sort($ret);
277         return $ret;
278     }
279
280     function get_sg_basedir(&$subscription)
281     {
282         // Normalize basedir, no trailing slash and no consecutive slashes
283         $global_base = preg_replace('%/{2,}|/$%', '', $this->_config->get('queue_root_dir'));
284         if (!is_dir($global_base))
285         {
286             // The configuration key might have dynamic part to it
287             debug_push_class(__CLASS__, __FUNCTION__);   
288             debug_add("directory {$global_base} does not exist, creating", MIDCOM_LOG_DEBUG);
289             if (!mkdir($global_base))
290             {
291                 // TODO: Error reporting
292                 debug_add("could not create directory {$global_base}", MIDCOM_LOG_ERROR);
293                 debug_pop();
294                 return false;
295             }
296             debug_pop();
297         }
298         // Append sitegroup name
299         $sitegroup_base = "{$global_base}/" . $this->safe_sg_name($subscription->sitegroup);
300         if (!is_dir($sitegroup_base))
301         {
302             // The configuration key might have dynamic part to it
303             debug_push_class(__CLASS__, __FUNCTION__);   
304             debug_add("directory {$sitegroup_base} does not exist, creating", MIDCOM_LOG_DEBUG);
305             if (!mkdir($sitegroup_base))
306             {
307                 // TODO: Error reporting
308                 debug_add("could not create directory {$sitegroup_base}", MIDCOM_LOG_ERROR);
309                 debug_pop();
310                 return false;
311             }
312             debug_pop();
313         }
314         return $sitegroup_base;
315     }
316
317     /**
318      * Gets/creates the path for subscriptions spool dir
319      * @todo make a smarter recursive directory creator
320      */
321     function get_subscription_basedir(&$subscription)
322     {
323         $sitegroup_base = $this->get_sg_basedir($subscription);
324         if ($sitegroup_base === false)
325         {
326             return false;
327         }
328
329         $subscription_path = "{$sitegroup_base}/{$subscription->guid}";
330         if (!is_dir($subscription_path))
331         {
332             debug_push_class(__CLASS__, __FUNCTION__);   
333             debug_add("directory {$subscription_path} does not exist, creating", MIDCOM_LOG_DEBUG);
334             if (!mkdir($subscription_path))
335             {
336                 // TODO: Error reporting
337                 debug_add("could not create directory {$subscription_path}", MIDCOM_LOG_ERROR);
338                 debug_pop();
339                 return false;
340             }
341             debug_pop();
342         }
343         return $subscription_path;
344     }
345
346     function get_subscription_quarantine_basedir(&$subscription)
347     {
348         $sitegroup_base = $this->get_sg_basedir($subscription);
349         if ($sitegroup_base === false)
350         {
351             return false;
352         }
353
354         $subscription_path = "{$sitegroup_base}/{$subscription->guid}-quarantine";
355         if (!is_dir($subscription_path))
356         {
357             debug_push_class(__CLASS__, __FUNCTION__);   
358             debug_add("directory {$subscription_path} does not exist, creating", MIDCOM_LOG_DEBUG);
359             if (!mkdir($subscription_path))
360             {
361                 // TODO: Error reporting
362                 debug_add("could not create directory {$subscription_path}", MIDCOM_LOG_ERROR);
363                 debug_pop();
364                 return false;
365             }
366             debug_pop();
367         }
368         return $subscription_path;
369     }
370
371     function _get_subscription_quarantine_queuedir(&$subscription)
372     {
373         $quarantine_path = $this->get_subscription_quarantine_basedir($subscription);
374         if ($quarantine_path === false)
375         {
376             // TODO: Error reporting
377             return false;
378         }
379         $queue_path = "{$quarantine_path}/{$this->started}";
380         if (!is_dir($queue_path))
381         {
382             debug_push_class(__CLASS__, __FUNCTION__);
383             debug_add("directory {$queue_path} does not exist, creating", MIDCOM_LOG_DEBUG);
384             if (!mkdir($queue_path))
385             {
386                 // TODO: Error reporting
387                 debug_add("could not create directory {$queue_path}", MIDCOM_LOG_ERROR);
388                 debug_pop();
389                 return false;
390             }
391             debug_pop();
392         }
393         return $queue_path;
394     }
395
396
397     function _get_subscription_queue_basedir(&$subscription)
398     {
399         $subscription_path = $this->get_subscription_basedir($subscription);
400         if ($subscription_path === false)
401         {
402             // TODO: Error reporting
403             return false;
404         }
405         $queue_path = "{$subscription_path}/{$this->started}";
406         if (!is_dir($queue_path))
407         {
408             debug_push_class(__CLASS__, __FUNCTION__);
409             debug_add("directory {$queue_path} does not exist, creating", MIDCOM_LOG_DEBUG);
410             if (!mkdir($queue_path))
411             {
412                 // TODO: Error reporting
413                 debug_add("could not create directory {$queue_path}", MIDCOM_LOG_ERROR);
414                 debug_pop();
415                 return false;
416             }
417             debug_pop();
418         }
419         return $queue_path;
420     }
421
422     function _process_queue_queuepath_sanitychecks(&$queue_name, &$subscription_path)
423     {
424         if (   $queue_name == '.'
425             || $queue_name == '..')
426         {
427             // Skip the . and .. entries (which are always present)
428             return false;
429         }
430         // Sanity check the subdir name
431         if (!is_numeric($queue_name))
432         {
433             // Nonnumeric paths are not our queues
434             debug_add("Weird queue name '{$queue_name}' in path '{$subscription_path}'", MIDCOM_LOG_WARN);
435             return false;
436         }
437         // Sanity check path
438         $queue_path = "{$subscription_path}/{$queue_name}";
439         if (!is_dir($queue_path))
440         {
441             debug_add("Queue path '{$queue_path}' is not a directory, skipping", MIDCOM_LOG_ERROR);
442             return false;
443         }
444         return true;
445     }
446
447     /**
448      * Helper for process_queue, moves quarantined items to correct directory
449      */
450     function _quarantine_items(&$q_items, &$items_paths, &$subscription)
451     {
452         if (!is_array($q_items))
453         {
454             return false;
455         }
456         if (empty($q_items))
457         {
458             return true;
459         }
460         $quarantine_path = $this->_get_subscription_quarantine_queuedir($subscription);
461         if (!is_dir($quarantine_path))
462         {
463             // Could not get valid dir
464             return false;
465         }
466         foreach ($q_items as $item_key => $item_path)
467         {
468             // Reset time limit counter while processing files
469             set_time_limit(30);
470             $quarantine_filepath = $quarantine_path . '/' . basename($item_path);
471             debug_add("Quarantineing '{$item_key}' as '{$quarantine_filepath}'", MIDCOM_LOG_DEBUG);
472             $output = array();
473             $code = 0;
474             exec("mv {$item_path} {$quarantine_filepath}", $output, $code);
475             if ($code != 0)
476             {
477                 debug_add("Failed to quarantine '{$item_path}' as '{$quarantine_filepath}'", MIDCOM_LOG_ERROR);
478                 continue;
479             }
480
481             // TODO: write per-item error if available to some index file.
482
483             unset($q_items[$item_key], $items_paths[$item_key]);
484         }
485         return true;
486     }
487
488     /**
489      * Helper for process_queue, removes processed items, quarantines failed ones
490      */
491     function _process_queue_quarantines(&$items, &$items_paths, &$subscription)
492     {
493         debug_push_class(__CLASS__, __FUNCTION__);
494         $q_items = array();
495         foreach ($items_paths as $item_key => $item_path)
496         {
497             if (array_key_exists($item_key, $items))
498             {
499                 // Item still in array (transporter should remove each key as it's transported properly)
500                 debug_add("Transporter left key '{$item_key}' into items, marking for quarantineing", MIDCOM_LOG_INFO);
501                 $q_items[$item_key] = $item_path;
502                 continue;
503             }
504             if (!unlink($item_path))
505             {
506                 debug_add("Could not remove file '{$item_path}'", MIDCOM_LOG_ERROR);
507                 continue;
508             }
509             //$GLOBALS['midcom_helper_replicator_logger']->log("File {$item_path} removed from queue \"{$subscription->title}\"");
510             unset($items_paths[$item_key]);
511         }
512         $this->_quarantine_items(&$q_items, &$items_paths, &$subscription);
513         unset($q_items);
514         // Restore to original value
515         set_time_limit(ini_get('max_execution_time'));
516         debug_pop();
517     }
518
519
520
521