Index: t/55filtered.t
===================================================================
--- t/55filtered.t	(revision 0)
+++ t/55filtered.t	(revision 0)
@@ -0,0 +1,60 @@
+#!/usr/bin/perl -w
+
+BEGIN { $COUNT = 10; }
+
+use Test; BEGIN { plan tests => 1 + $COUNT * (1 + 2) + 1 + $COUNT * 4; }
+
+use lib '../lib'; if (-d 't') { chdir 't'; }
+use IPC::DirQueue;
+
+mkdir ("log");
+mkdir ("log/qdir");
+my $bq = IPC::DirQueue->new({ dir => 'log/qdir', tag_sub => sub {
+  my($job) = @_;
+  ok($job);
+  my $id = $job->{metadata}->{id};
+  ok($id =~ /^(\d+|[+])$/);
+  print "queueing $id\n";
+  return $id;
+}});
+ok ($bq);
+
+start_writer();
+start_worker();
+exit;
+
+sub start_writer {
+  for my $j ('+', 1 .. $COUNT) {
+    ok ($bq->enqueue_string ("hello $j! $$", { id => $j }));
+  }
+}
+
+sub start_worker {
+  ok ($COUNT > 9);
+  my @k = ('+', 1 .. $COUNT);
+  while (@k) {
+    my $k = splice(@k, int(rand(scalar @k)), 1);
+
+    print "looking for $k...\n";
+    my $job;
+
+    $job = $bq->pickup_queued_job("not.$k");
+    ok(!$job);
+
+    $job = $bq->wait_for_queued_job(1, 1, "$k");
+    ok($job);
+    $job->return_to_queue() if $job;
+
+    $k = quotemeta($k);
+    $job = $bq->wait_for_queued_job(1, 1, qr/^$k$/);
+    ok($job);
+
+    my $data = $job ? $job->get_data() : 'no job';
+    ok ($data =~ /^hello $k! \d+$/)   
+      or warn "got: [$data]";
+
+    $job->finish() if $job;
+    print "finished $k\n";
+  }
+}
+

Property changes on: t/55filtered.t
___________________________________________________________________
Name: svn:executable
   + *

Index: t/10enq_string.t
===================================================================
--- t/10enq_string.t	(revision 8510)
+++ t/10enq_string.t	(working copy)
@@ -2,7 +2,7 @@
 
 use Test; BEGIN { plan tests => 801 };
 
-use lib '../lib'; if (-d 't') { chdir 't'; }
+BEGIN { if (-d 't') { chdir 't'; } unshift @INC, '../lib'; };
 use IPC::DirQueue;
 
 mkdir ("log");
@@ -25,7 +25,9 @@
 sub start_worker {
   my $k = 0;
   while (1) {
+    warn "JMD todo fix";
     my $job = $bq->wait_for_queued_job();
+    warn "JMD never gets here";
     if (!$job) { next; }
 
     # Test the traditional way of getting job data
Index: lib/IPC/DirQueue.pm
===================================================================
--- lib/IPC/DirQueue.pm	(revision 8510)
+++ lib/IPC/DirQueue.pm	(working copy)
@@ -90,23 +90,40 @@
 
 Name the directory where the queue files are stored.  This is required.
 
-=item data_file_mode => $mode (default: 0666)
+=item file_mode => $mode (default: 0666)
 
-The C<chmod>-style file mode for data files.  This should be specified
-as a string with a leading 0.  It will be affected by the current
-process C<umask>.
+The C<chmod>-style file mode for data and queue control files.  This 
+should be specified as a string with a leading 0.  It will be affected 
+by the current process' C<umask>.
 
-=item queue_file_mode => $mode (default: 0666)
+=item data_file_mode => $mode (default: C<file_mode>)
 
-The C<chmod>-style file mode for queue control files.  This should be
-specified as a string with a leading 0.  It will be affected by the
-current process C<umask>.
+Override the file mode for data files.  See C<file_mode>.
 
+=item queue_file_mode => $mode (default: C<file_mode>)
+
+Override the file mode for queue control files.  See C<file_mode>.
+
 =item ordered => { 0 | 1 } (default: 1)
 
-Whether the jobs should be processed in order of submission, or
-in no particular order.
+Whether the jobs should be processed in order of submission (1), or
+in no particular order (0).
 
+=item tag => $string (default: undef)
+
+Sets the TAG part of the file name.  The default is the undefined
+string.  See the 'FILENAME TAGS' section, below.
+
+=item tag_sub => $subref (default: undef)
+
+The TAG can be generated dynamically by routine referenced by this
+parameter.  See the 'FILENAME TAGS' section, below.
+
+=item tag_max_length => int (default: 128)
+
+The tag is part of the file name, so it should not be too long.  The default
+limit is already quite generous, there shouldn't be any reason to increase it.
+
 =item queue_fanout => { 0 | 1 } (default: 0)
 
 Whether the queue directory should be 'fanned out'.  This allows better
@@ -142,9 +159,10 @@
   bless ($self, $class);
 
   die "no 'dir' specified" unless $self->{dir};
-  $self->{data_file_mode} ||= '0666';
+  $self->{file_mode} ||= '0666';
+  $self->{data_file_mode} ||= $self->{file_mode};
   $self->{data_file_mode} = oct ($self->{data_file_mode});
-  $self->{queue_file_mode} ||= '0666';
+  $self->{queue_file_mode} ||= $self->{file_mode};
   $self->{queue_file_mode} = oct ($self->{queue_file_mode});
 
   if ($self->{queue_fanout}) {
@@ -154,6 +172,15 @@
   elsif (!defined $self->{ordered}) {
     $self->{ordered} = 1;
   }
+  
+  $self->{hash} ||= hash_string_to_filename($self->gethostname().$$);
+  if (defined $self->{tag}) {
+    $self->{tag_sub} ||= sub { return $self->{tag}; };
+  }
+  $self->{tag_max_length} ||= 128;
+  if (!defined $self->{tag_warn}) {
+    $self->{tag_warn} = 1;
+  }
 
   $self->{buf_size} ||= 65536;
   $self->{active_file_lifetime} ||= 600;
@@ -427,10 +454,15 @@
 
 ###########################################################################
 
-=item $job = $dq->pickup_queued_job();
+=item $job = $dq->pickup_queued_job([ $filter ]);
 
 Pick up the next job in the queue, so that it can be processed.
 
+The parameter C<$filter> can be used to specify either a string or a regular 
+expression (with C<qr//>) which is compared (for strings) or matched 
+(for regexps) against the tag part of the queued filename.  All files 
+which don't match will be skipped.
+
 If no job is available for processing, either because the queue is
 empty or because other worker processes are already working on
 them, C<undef> is returned; otherwise, a new instance of C<IPC::DirQueue::Job>
@@ -442,13 +474,13 @@
 =cut
 
 sub pickup_queued_job {
-  my ($self) = @_;
+  my ($self, $filter) = @_;
 
   my $pathqueuedir = $self->q_subdir('queue');
   my $pathactivedir = $self->q_subdir('active');
   $self->ensure_dir_exists ($pathactivedir);
 
-  my $iter = $self->queue_iter_start($pathqueuedir);
+  my $iter = $self->queue_iter_start($pathqueuedir, $filter);
 
   while (1) {
     my $nextfile = $self->queue_iter_next($iter);
@@ -596,7 +628,7 @@
 
 ###########################################################################
 
-=item $job = $dq->wait_for_queued_job ([ $timeout [, $pollinterval] ]);
+=item $job = $dq->wait_for_queued_job ([ $timeout [, $pollinterval [, $filter ]] ]);
 
 Wait for a job to be queued within the next C<$timeout> seconds.
 
@@ -619,10 +651,12 @@
 the nearest round multiple of C<$pollinterval> greater than C<$timeout>
 will be used instead.  Also note that C<$timeout> is used as an integer.
 
+The job can be filtered with C<$filter> as in C<pickup_queued_job()>.
+
 =cut
 
 sub wait_for_queued_job {
-  my ($self, $timeout, $pollintvl) = @_;
+  my ($self, $timeout, $pollintvl, $filter) = @_;
 
   my $finishtime;
   if ($timeout && $timeout > 0) {
@@ -651,7 +685,7 @@
     my @stat = stat ($pathqueuedir);
     my $qdirlaststat = $stat[9];
 
-    my $job = $self->pickup_queued_job();
+    my $job = $self->pickup_queued_job($filter);
     if ($job) { return $job; }
 
     # there's another semi-race condition here, brought about by a lack of
@@ -678,7 +712,7 @@
     while (time == $qdirlaststat) {
       Time::HiRes::usleep ($pollintvl);
       dbg "wait_for_queued_job: spinning until time != stat $qdirlaststat";
-      my $job = $self->pickup_queued_job();
+      my $job = $self->pickup_queued_job($filter);
       if ($job) { return $job; }
     }
 
@@ -708,7 +742,7 @@
 
 ###########################################################################
 
-=item $job = $dq->visit_all_jobs($visitor, $visitcontext);
+=item $job = $dq->visit_all_jobs($visitor, $visitcontext, $filter);
 
 Visit all the jobs in the queue, in a read-only mode.  Used to list
 the entire queue.
@@ -728,15 +762,17 @@
   'active_host': the hostname on which the job is active
   'active_pid': the process ID of the process which picked up the job
 
+The jobs can be filtered with C<$filter> as in C<pickup_queued_job()>.
+
 =cut
 
 sub visit_all_jobs {
-  my ($self, $visitor, $visitcontext) = @_;
+  my ($self, $visitor, $visitcontext, $filter) = @_;
 
   my $pathqueuedir = $self->q_subdir('queue');
   my $pathactivedir = $self->q_subdir('active');
 
-  my $iter = $self->queue_iter_start($pathqueuedir);
+  my $iter = $self->queue_iter_start($pathqueuedir, $filter);
 
   my $nextfile;
   while (1) {
@@ -825,20 +861,6 @@
 
 ###########################################################################
 
-sub get_dir_filelist_sorted {
-  my ($self, $dir) = @_;
-
-  if (!opendir (DIR, $dir)) {
-    return [];          # no dir?  nothing queued
-  }
-  # have to read the lot, to sort them.
-  my @files = sort grep { /^\d/ } readdir(DIR);
-  closedir DIR;
-  return \@files;
-}
-
-###########################################################################
-
 sub copy_in_to_out_fh {
   my ($self, $fhin, $callbackin, $fhout, $outfname) = @_;
 
@@ -1101,28 +1123,50 @@
 
   my @gmt = gmtime ($job->{time_submitted_secs});
 
-  # NN.20040718140300MMMM.hash(hostname.$$)[.rand]
+  # NN.20040718140300MMMM[.tag].hash[.rand]
   #
   # NN = priority, default 50
   # MMMM = microseconds from Time::HiRes::gettimeofday()
+  # hash = hash(hostname.$$)
+  # tag = some base64-ish string
   # hostname = current hostname
 
-  my $buf = sprintf ("%02d.%04d%02d%02d%02d%02d%02d%06d.%s",
+  my $file = sprintf ("%02d.%04d%02d%02d%02d%02d%02d%06d",
         $job->{pri},
         $gmt[5]+1900, $gmt[4]+1, $gmt[3], $gmt[2], $gmt[1], $gmt[0],
-        $job->{time_submitted_msecs},
-        hash_string_to_filename ($self->gethostname().$$));
+        $job->{time_submitted_msecs});
 
+  # add tag (including leading dot, only if wanted) and hash
+  $file .= $self->get_q_filename_tag($job).".".$self->{hash};
+
   # normally, this isn't used.  but if there's a collision,
   # all retries after that will do this; in this case, the
   # extra anti-collision stuff is useful
   if ($addextra) {
-    $buf .= ".".$$.".".$self->get_random_int();
+    $file .= ".".$$.".".$self->get_random_int();
   }
 
-  return $buf;
+  return $file;
 }
 
+sub get_q_filename_tag {
+  my($self, $job) = @_;
+  # return an empty string if there's nothing to do
+  return '' unless defined $self->{tag_sub};
+  # create a (new?) tag, possibly empty
+  my $str = $self->{tag_sub}->($job);
+  return '' unless defined $str;
+  # weed out all dangerous chars
+  my $tag = filter_unsafe_chars($str);
+  # limit the length
+  $tag = substr($tag, 0, $self->{tag_max_length});
+  # warn the user if it was filtered
+  if ($self->{tag_warn} && $tag ne $str) {
+    warn "IPC::DirQueue: the tag was filtered\n";
+  }
+  return ".$tag";
+}
+
 sub hash_string_to_filename {
   my ($str) = @_;
   # get a 16-bit checksum of the input, then uuencode that string
@@ -1130,6 +1174,12 @@
   # transcode from uuencode-space into safe, base64-ish space
   $str =~ y/ -_/A-Za-z0-9+_/;
   # and remove the stuff that wasn't in that "safe" range
+  return filter_unsafe_chars($str);
+}
+
+sub filter_unsafe_chars {
+  my ($str) = @_;
+  # remove any chars which aren't in the "safe" base64-ish range
   $str =~ y/A-Za-z0-9+_//cd;
   return $str;
 }
@@ -1205,62 +1255,107 @@
 ###########################################################################
 
 sub queue_iter_start {
-  my ($self, $pathqueuedir) = @_;
+  my ($self, $pathqueuedir, $filter, $type) = @_;
 
-  if ($self->{indexclient}) {
-    dbg ("queue iter: getting list for $pathqueuedir");
-    my @files = sort grep { /^\d/ } $self->{indexclient}->ls($pathqueuedir);
-
-    if (scalar @files <= 0) {
-      return if $self->queuedir_is_bad($pathqueuedir);
+  $filter = qr// unless defined $filter;
+  dbg ("queue iter: filter $filter in $pathqueuedir");
+  unless (ref $filter eq 'CODE') {
+    # we need to copy $filter here else the closure will get annoyed
+    my $re = $filter;
+    unless (ref ($re) eq 'Regexp') {
+      $re = quotemeta($re);
+      $re = qr/^${re}$/;
     }
-
-    return { files => \@files };
+    $filter = sub {
+                # we can't use grep because it is picky about 
+                # list context and this sub can be called for
+                # single files and grep would return the count
+                # in those cases
+                my @r;
+                while (@_) {
+                  my $f = shift;
+                  next unless defined $f;
+                  next unless $f =~ /^\d/;
+                  # we've got to go split here to make the ^
+                  # and $ anchors work 
+                  my @f = split(/\./, $f);
+                  # does this file have a tag (ie. even number
+                  # of elements)?
+                  next if scalar (@f) % 2;
+                  # apply the tag
+                  next unless $f[2] =~ $re;
+                  push(@r, $f);
+                }
+                return wantarray ? @r : $r[0];
+              };
   }
-  elsif ($self->{ordered}) {
-    dbg ("queue iter: opening $pathqueuedir (ordered)");
-    my $files = $self->get_dir_filelist_sorted($pathqueuedir);
-    if (scalar @$files <= 0) {
-      return if $self->queuedir_is_bad($pathqueuedir);
-    }
+  
+  # iterator always has:
+  #   dir    = directory processed
+  #   filter = filter routine applied to the files
+  # iterator can have:
+  #   files  = cached list of files (indexclient, ordered)
+  #   sub    = pointer to sub-iterator (fanout)
+  #   fh     = directory handle (unordered)
+  my $iter = {
+    dir    => $pathqueuedir,
+    filter => $filter,
+  };
 
-    return { files => $files };
+  unless ($type) {
+    $type = $self->{indexclient}  ? 'indexclient'
+          : $self->{ordered}      ? 'ordered'
+          : $self->{queue_fanout} ? 'fanout'
+          :                         'unordered';
   }
-  elsif ($self->{queue_fanout}) {
-    return $self->queue_iter_fanout_start($pathqueuedir);
+  if ($type eq 'ordered') {
+    $iter = $self->queue_iter_ordered_start($iter);
   }
-  else {
+  elsif ($type eq 'unordered') {
     my $dirfh;
-    dbg ("queue iter: opening $pathqueuedir");
-    if (!opendir ($dirfh, $pathqueuedir)) {
-      return if $self->queuedir_is_bad($pathqueuedir);
-      if (!opendir ($dirfh, $pathqueuedir)) {
+    dbg ("unordered: opening $iter->{dir}");
+    if (!opendir ($dirfh, $iter->{dir})) {
+      return if $self->queuedir_is_bad($iter->{dir});
+      if (!opendir ($dirfh, $iter->{dir})) {
         warn "oops? pathqueuedir bad";
         return;
       }
     }
 
-    return { fh => $dirfh };
+    $iter->{fh} = $dirfh;
   }
+  elsif ($type eq 'fanout') {
+    $iter = $self->queue_iter_fanout_start($iter);
+  }
+  elsif ($type eq 'indexclient') {
+    dbg ("indexclient: getting list for $iter->{dir}");
+    my @files = sort $iter->{filter}->($self->{indexclient}->ls($iter->{dir}));
 
-  die "cannot get here";
+    if (scalar @files <= 0) {
+      return if $self->queuedir_is_bad($iter->{dir});
+    }
+
+    $iter->{files} = \@files;
+  }
+  else {
+    die "unknown iterator type $type";
+  }
+
+  return $iter;
 }
 
 sub queue_iter_next {
   my ($self, $iter) = @_;
 
-  if ($self->{indexclient}) {
+  if ($iter->{files}) {
     return shift @{$iter->{files}};
   }
-  elsif ($self->{ordered}) {
-    return shift @{$iter->{files}};
+  elsif ($iter->{fh}) {
+    return $self->queue_iter_unordered_next($iter);
   }
-  elsif ($self->{queue_fanout}) {
+  elsif ($iter->{fanoutlist}) {
     return $self->queue_iter_fanout_next($iter);
   }
-  else {
-    return readdir($iter->{fh});
-  }
 
   return;
 }
@@ -1269,12 +1364,58 @@
   my ($self, $iter) = @_;
 
   return unless $iter;
-  if (defined $iter->{fanfh}) { closedir($iter->{fanfh}); }
+  $self->queue_iter_stop($iter->{sub});
+  dbg ("queue iter: closing $iter->{dir}");
   if (defined $iter->{fh}) { closedir($iter->{fh}); }
+  delete $iter->{fh};
+  delete $iter->{files};
+  delete $iter->{fanoutlist};
+  
+  return undef;
 }
+###########################################################################
 
+sub queue_iter_ordered_start {
+  my ($self, $iter) = @_;
+
+  my @files = ();
+
+  dbg ("ordered: opening $iter->{dir} (ordered)");
+  if (opendir (DIR, $iter->{dir})) {
+    # have to read the lot, to sort them.
+    @files = sort $iter->{filter}->(readdir(DIR));
+    closedir DIR;
+  }
+
+  if (scalar @files <= 0) {
+    return if $self->queuedir_is_bad($iter->{dir});
+  }
+
+  $iter->{type} = 'files';
+  $iter->{files} = \@files;
+  return $iter;
+}
+
 ###########################################################################
 
+sub queue_iter_unordered_next {
+  my ($self, $iter) = @_;
+  my $file = undef;
+
+  while (1) {
+    $file = readdir($iter->{fh});
+    last unless defined $file;
+    #dbg("unordered: candidate $file");
+    $file = $iter->{filter}->($file);
+    last if defined $file;
+    #dbg("unordered: candidate filtered");
+  }
+  
+  return $file;
+}
+
+###########################################################################
+
 sub queue_dir_fanout_create {
   my ($self, $pathqueuedir) = @_;
 
@@ -1323,26 +1464,23 @@
 }
 
 sub queue_iter_fanout_start {
-  my ($self, $pathqueuedir) = @_;
-  my $iter = { };
+  my ($self, $iter) = @_;
 
   {
     my @fanouts;
-    dbg ("queue iter: opening $pathqueuedir");
-    if (!opendir (DIR, $pathqueuedir)) {
+    dbg ("fanout: opening $iter->{dir}");
+    if (!opendir (DIR, $iter->{dir})) {
       @fanouts = ();          # no dir?  nothing queued
     }
     else {
       my %map = map {
-              $_ => (-M $pathqueuedir.SLASH.$_)
+              $_ => (-M $iter->{dir}.SLASH.$_)
             } grep { /^[a-z0-9]$/ } readdir(DIR);
       @fanouts = sort { $map{$a} <=> $map{$b} } keys %map;
-      dbg ("fanout: $pathqueuedir, order is ".join ' ', @fanouts);
+      dbg ("fanout: $iter->{dir}, order is ".join ' ', @fanouts);
     }
     closedir DIR;
     $iter->{fanoutlist} = \@fanouts;
-    $iter->{pathqueuedir} = $pathqueuedir;
-
   }
   return $iter;
 }
@@ -1350,40 +1488,34 @@
 sub queue_iter_fanout_next {
   my ($self, $iter) = @_;
 
-  # dir handles are:
-  # /path/to/queue     = $iter->{fh}
-  #               /f   = $iter->{fanfh}
-
 next_fanout:
 
-  # open the {fanfh} handle, if it isn't already going
-  if (!defined $iter->{fanfh}) {
+  # start the sub iterator, if it isn't already going
+  if (!defined $iter->{sub}) {
     my $nextfanout = shift @{$iter->{fanoutlist}};
     if (!defined $nextfanout) {
       dbg ("fanout: end of list");
       return;
     }
 
-    my $dirfh;
+    my $subiter;
     dbg ("fanout: opening next dir: $nextfanout");
-    if (!opendir ($dirfh, $iter->{pathqueuedir}.SLASH.$nextfanout)) {
-      warn "opendir failed $iter->{pathqueuedir}/$nextfanout: $!";
-      return;
-    }
+    $subiter = $self->queue_iter_start($iter->{dir}.SLASH.$nextfanout,
+                 $iter->{filter}, 'unordered');
+    return unless $subiter;
 
+    $iter->{sub} = $subiter;
     $iter->{fanstr} = $nextfanout;
-    $iter->{fanfh} = $dirfh;
   }
 
-  my $fname = readdir($iter->{fanfh});
+  my $fname = $self->queue_iter_next($iter->{sub});
   if (defined $fname) {
     return $iter->{fanstr}.SLASH.$fname;        # best-case scenario
   }
   
   dbg ("fanout: finished this dir, trying next one");
-  closedir($iter->{fanfh});
+  $iter->{sub} = $self->queue_iter_stop($iter->{sub});
   $iter->{fanstr} = undef;
-  $iter->{fanfh} = undef;
   goto next_fanout;
 }
 
@@ -1451,15 +1583,20 @@
 
 The filename format is as follows:
 
-    50.20040909232529941258.HASH[.PID.RAND]
+    50.20040909232529941258[.TAG].HASH[.PID.RAND]
 
 The first two digits (C<50>) are the priority of the job.  Lower priority
 numbers are run first.  C<20040909232529> is the current date and time when the
-enqueueing process was run, in C<YYYYMMDDHHMMSS> format.   C<941258> is the time in
-microseconds, as returned by C<gettimeofday()>.  And finally, C<HASH> is a
-variable-length hash of some semi-random data, used to increase the chance of
-uniqueness.
+enqueueing process was run, in C<YYYYMMDDHHMMSS> format.   C<941258> is the
+time in microseconds, as returned by C<gettimeofday()>.
 
+The C<TAG> is a short string.  By default it is not used, and is therefore not
+included in the filename; however it is possible to set this with the C<tag>
+parameter of the constructor, or have it generated dynamically by the
+subroutine set with C<tag_sub>.
+
+See the C<FILENAME TAGS> section for more details on this.
+
 If there is a collision, the timestamps are regenerated after a 250 msec sleep,
 and further randomness will be added at the end of the string (namely, the
 current process ID and a random integer value).   Up to 10 retries will be
@@ -1505,6 +1642,25 @@
 Atomic, NFS-safe renaming is used to avoid collisions, overwriting or
 other unsafe operations.
 
+=head1 FILENAME TAGS
+
+The C<TAG> is a short string.  By default it is not used, and is therefore not
+included in the filename; however it is possible to set this with the C<tag>
+parameter of the constructor, or have it generated dynamically by the
+subroutine set with C<tag_sub>.
+
+The C<tag_sub> subroutine is called with a single parameter C<$job>.  This is
+the job currently enqueued, as an C<IPC::DirQueue::Job> object, and thus allows
+access to the metadata. Note that only characters from the set C<[A-Za-z0-9+_]>
+are allowed in the tag string, and the length of the string is limited to
+C<tag_max_length> characters (default 128).  If any of these restrictions are
+violated, a warning will be issued.
+
+In combination with the C<$filter> parameter the retrieving methods offer, the
+C<TAG> can be used to filter the queued data more efficiently, as if the
+metadata was used.  (Since the tag appears in the filename, filtering
+by tag does not require that the files be opened and read.)
+
 =head1 SEE ALSO
 
 C<IPC::DirQueue::Job>
