Laravel 9 Queues manuell abarbeiten, aber wie?

 | 863 Wörter


Ahoi,

im Zuge eines Projektes auf der Arbeit war es nötig, manuell mit Laravels Queue zu interagieren, also Jobs abzurufen und auszuwerten, ohne das Laravels eigene Queue-Verwaltung dadurch beeinflusst wird.

Da der gesamte Prozess nicht offiziell dokumentiert ist, möchte ich hier die Möglichkeit nutzen und es festhalten.

Wie werden die Queues zur Laufzeit zur Verfügung gestellt

Zu aller erst ist die Implementierung als regulärer Service Provider verfügbar. Zum Verständnis ist es zwingend notwendig, die Funktionsweise folgende Komponenten verstehen: Service Provider, Service Container, Facades.

Das Laden des Service Providers übernimmt die Klasse Illuminate\Queue\QueueServiceProvider

Abhängig von der Konfiguration der Queues, entscheided dann die über den QueueServiceProvider bereitgestellte Klasse Illuminate\Queue\QueueManager welche exakte Implementierung verwendet werden soll (bspw. Queues in einer Db-Tabelle oder in Redis). Welche Backends unterstützt werden und wie diese zu Konfigurieren sind, würde den Umfang des Beitrages sprengen, weswegen ich lediglich auf die Dokumentation verweisen möchte: Laravel Queues.

Erzeugen eines Queue Jobs

Unabhängig von der globalen Konfiguration kann eine _Queueabl_e-Klasse auch eigenständig definieren, in welchem Queue und Queue-Backend diese “geschoben” werden soll. Zuständig dafür ist das Trait Illuminate\Bus\Queueable. Die bereitsgestellten Attribute connection und queue können respektieve angepasst werden, andernfalls wird die globale Konfiguration übernommen.

Wollen wir also jetzt ein Objekt in eine Queue für eine asynchrone Bearbeitung schieben, müssen wir lediglich dieselben Traits und Klassen wie ein Laravel Job erben und können dann über ein wenig Reverse-Engineering des Packets laravel/framework manuell Jobs aus der Queue abrufen und bearbeiten.

Für diesen Beitrag erzeugen wir uns einen neuen Job:

1
php artisan make:job TestJob

Und erhalten folgenden Boilerplate

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class TestJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Create a new job instance.
     */
    public function __construct()
    {
        //
    }

    /**
     * Execute the job.
     */
    public function handle(): void
    {
        //
    }
}

Interessanterweise wird die handle() Methode nicht durch ein Interface vorgeschrieben, aber ist zwingend notwending, wenn man auf Laravels eigene Queue-Verwaltung verwenden möchte. Da wir das aber explizit in diesem Fall nicht möchten, können wir diese entfernen oder bspw. alle Aufrufe loggen, um zu erfahren, wann ungewollterweise Laravel die Bearbeitung übernommen hat.

Abrufen von Inhalten in einer Queue

Der über die Klasse QueueServiceProvider bereitgestellte Service Provider bietet über die Methode

1
2
3
4
5
6
7
/**
* Pop the next job off of the queue.
*
* @param  string|null  $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null);

uns den nächsten Eintrag in der angegebenen Queue.

Nun müssen wir nur noch die Einträge aus der Queue abfragen:

1
2
3
4
$job = Queue::pop();
if(is_null($job)) {
    Log::info('No jobs available');
}

Falls der Queue nicht-leer ist, bekommen wir ein Objekt vom Typ_ Illuminate\Contracts\Queue\Job_ wessen Implementierung anhand des Illuminate\Queue\Jobs\DatabaseJobs nachfolgzogen werden kann.

Konkret interessiert uns nur die payload() Methode, welche schlicht und einfach die serialisierte Queueable-Klasse und ein paar Meta-Informationen enthält.

Wir können also ziemlich einfach auf den Inhalt zugreifen:

1
$payload = $job->payload();

Folgendes ist alles in dem Payload enthalten:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
array:10 [
  "uuid" => "d57f1d03-0a57-417f-966d-659bc043c03b"
  "displayName" => "App\Jobs\TestJob"
  "job" => "Illuminate\Queue\CallQueuedHandler@call"
  "maxTries" => null
  "maxExceptions" => null
  "failOnTimeout" => false
  "backoff" => null
  "timeout" => null
  "retryUntil" => null
  "data" => array:2 [
    "commandName" => "App\Jobs\TestJob"
    "command" => "O:16:"App\Jobs\TestJob":0:{}"
  ]
]

Da uns die Meta-Informationen am Anfang nicht interessieren, müssen wir nur sicherstellen, dass das Array data und der Key command vorhanden sind:

1
2
3
if(!is_array($payload) || !array_key_exists('data', $payload) || !array_key_exists('command', $payload['data'])) {
    Log::info('Unexpected payload format');
}

Dann deserialisieren wir das Objekt und können es verarbeiten:

1
$job = unserialize($payload['data']['command']);

Der Inhalt ist selbstverständlich identisch zu dem Objekt, welches auf den Queue geschoben wurde:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
App\Jobs\TestJob {#284 ▼
  -data: array:2 [
    0 => "some"
    1 => "data"
  ]
  +job: null
  +connection: null
  +queue: null
  +chainConnection: null
  +chainQueue: null
  +chainCatchCallbacks: null
  +delay: null
  +afterCommit: null
  +middleware: []
  +chained: []
}

Der gesamte Code zum Abrufen des Queues sieht dann so aus:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// add job to default queue
TestJob::dispatch([0 => 'some', 1 => 'data']);

// pop the next job from the default queue
$job = Queue::pop();
if (is_null($job)) {
    Log::info('No jobs available');
}

$payload = $job->payload();
// validate existence of necessary payload data
if (!is_array($payload) || !array_key_exists('data', $payload) || !array_key_exists('command', $payload['data'])) {
    Log::info('Unexpected payload format');
}

// delete job from database as we're going to work on it.
$job->delete();

// retrieve object
$job = unserialize($payload['data']['command']);

// print object and die
dd($job);

// Ausgabe:
App\Jobs\TestJob {#284 ▼
  -data: array:2 [
    0 => "some"
    1 => "data"
  ]
  +job: null
  +connection: null
  +queue: null
  +chainConnection: null
  +chainQueue: null
  +chainCatchCallbacks: null
  +delay: null
  +afterCommit: null
  +middleware: []
  +chained: []
}
#laravel #php