JobQueue.pm 5.43 KB
Newer Older
1 2 3
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
#
5 6
# This Source Code Form is "Incompatible With Secondary Licenses", as
# defined by the Mozilla Public License, v. 2.0.
7 8 9

package Bugzilla::JobQueue;

10
use 5.10.1;
11
use strict;
12
use warnings;
13 14 15 16

use Bugzilla::Constants;
use Bugzilla::Error;
use Bugzilla::Install::Util qw(install_string);
17
use Bugzilla::Util qw(read_text);
18 19 20
use File::Basename;
use base qw(TheSchwartz);
use fields qw(_worker_pidfile);
21 22 23

# This maps job names for Bugzilla::JobQueue to the appropriate modules.
# If you add new types of jobs, you should add a mapping here.
24 25
use constant JOB_MAP =>
  {send_mail => 'Bugzilla::Job::Mailer', bug_mail => 'Bugzilla::Job::BugMail',};
26

27 28 29
# Without a driver cache TheSchwartz opens a new database connection
# for each email it sends.  This cached connection doesn't persist
# across requests.
30
use constant DRIVER_CACHE_TIME => 300;    # 5 minutes
31

32 33 34 35
# To avoid memory leak/fragmentation, a worker process won't process more than
# MAX_MESSAGES messages.
use constant MAX_MESSAGES => 1000;

36
sub job_map {
37 38 39 40 41 42 43
  if (!defined(Bugzilla->request_cache->{job_map})) {
    my $job_map = JOB_MAP;
    Bugzilla::Hook::process('job_map', {job_map => $job_map});
    Bugzilla->request_cache->{job_map} = $job_map;
  }

  return Bugzilla->request_cache->{job_map};
44 45
}

46
sub new {
47 48 49 50 51 52 53 54 55 56 57 58
  my $class = shift;

  if (!Bugzilla->feature('jobqueue')) {
    ThrowUserError('feature_disabled', {feature => 'jobqueue'});
  }

  my $lc = Bugzilla->localconfig;

  # We need to use the main DB as TheSchwartz module is going
  # to write to it.
  my $self = $class->SUPER::new(
    databases => [{
59
      dsn    => Bugzilla->dbh_main->dsn,
60 61 62 63 64 65 66 67 68
      user   => $lc->{db_user},
      pass   => $lc->{db_pass},
      prefix => 'ts_',
    }],
    driver_cache_expiration => DRIVER_CACHE_TIME,
    prioritize              => 1,
  );

  return $self;
69 70 71 72
}

# A way to get access to the underlying databases directly.
sub bz_databases {
73 74 75
  my $self   = shift;
  my @hashes = keys %{$self->{databases}};
  return map { $self->driver_for($_) } @hashes;
76 77 78 79
}

# inserts a job into the queue to be processed and returns immediately
sub insert {
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
  my $self = shift;
  my $job  = shift;

  if (!ref($job)) {
    my $mapped_job = Bugzilla::JobQueue->job_map()->{$job};
    ThrowCodeError('jobqueue_no_job_mapping', {job => $job}) if !$mapped_job;

    $job = new TheSchwartz::Job(
      funcname => $mapped_job,
      arg      => $_[0],
      priority => $_[1] || 5
    );
  }

  my $retval = $self->SUPER::insert($job);

  # XXX Need to get an error message here if insert fails, but
  # I don't see any way to do that in TheSchwartz.
  ThrowCodeError('jobqueue_insert_failed', {job => $job, errmsg => $@})
    if !$retval;

  return $retval;
102 103
}

104 105 106
# To avoid memory leaks/fragmentation which tends to happen for long running
# perl processes; check for jobs, and spawn a new process to empty the queue.
sub subprocess_worker {
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
  my $self = shift;

  my $command = "$0 -d -p '" . $self->{_worker_pidfile} . "' onepass";

  while (1) {
    my $time = (time);
    my @jobs = $self->list_jobs({
      funcname      => $self->{all_abilities},
      run_after     => $time,
      grabbed_until => $time,
      limit         => 1,
    });
    if (@jobs) {
      $self->debug("Spawning queue worker process");

      # Run the worker as a daemon
      system $command;

      # And poll the PID to detect when the working has finished.
      # We do this instead of system() to allow for the INT signal to
      # interrup us and trigger kill_worker().
      my $pid = read_text($self->{_worker_pidfile}, err_mode => 'quiet');
      if ($pid) {
        sleep(3) while (kill(0, $pid));
      }
      $self->debug("Queue worker process completed");
133
    }
134 135 136 137 138
    else {
      $self->debug("No jobs found");
    }
    sleep(5);
  }
139 140 141
}

sub kill_worker {
142 143 144 145 146 147
  my $self = Bugzilla->job_queue();
  if ($self->{_worker_pidfile} && -e $self->{_worker_pidfile}) {
    my $worker_pid = read_text($self->{_worker_pidfile});
    if ($worker_pid && kill(0, $worker_pid)) {
      $self->debug("Stopping worker process");
      system "$0 -f -p '" . $self->{_worker_pidfile} . "' stop";
148
    }
149
  }
150 151 152
}

sub set_pidfile {
153 154 155
  my ($self, $pidfile) = @_;
  $self->{_worker_pidfile}
    = bz_locations->{'datadir'} . '/worker-' . basename($pidfile);
156 157
}

158 159
# Clear the request cache at the start of each run.
sub work_once {
160 161 162
  my $self = shift;
  Bugzilla->clear_request_cache();
  return $self->SUPER::work_once(@_);
163 164
}

165 166 167
# Never process more than MAX_MESSAGES in one batch, to avoid memory
# leak/fragmentation issues.
sub work_until_done {
168 169 170 171 172
  my $self  = shift;
  my $count = 0;
  while ($count++ < MAX_MESSAGES) {
    $self->work_once or last;
  }
173 174
}

175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
1;

__END__

=head1 NAME

Bugzilla::JobQueue - Interface between Bugzilla and TheSchwartz.

=head1 SYNOPSIS

 use Bugzilla;

 my $obj = Bugzilla->job_queue();
 $obj->insert('send_mail', { msg => $message });

=head1 DESCRIPTION

Certain tasks should be done asyncronously.  The job queue system allows
Bugzilla to use some sort of service to schedule jobs to happen asyncronously.

=head2 Inserting a Job

See the synopsis above for an easy to follow example on how to insert a
job into the queue.  Give it a name and some arguments and the job will
be sent away to be done later.
200 201 202 203 204 205 206 207 208 209 210

=head1 B<Methods in need of POD>

=over

=item insert

=item bz_databases

=item job_map

211 212 213 214
=item set_pidfile

=item kill_worker

215
=back