# BullMQ
# Feature
The BullMQ (opens new window) Module for Ts.ED allows you to decorate a class using the @JobController
decorator and implement the JobMethods
interface provided by the module.
Repeatable Jobs can also be defined using this decorator.
For more information about BullMQ look at the documentation here (opens new window);
# Installation
To begin, install the BullMQ module for Ts.ED:
npm install @tsed/bullmq bullmq
# Configure the Server
Import the @tsed/bullmq
module in your server
import {Configuration} from "@tsed/common";
import "@tsed/bullmq"; // import bullmq ts.ed module
@Configuration({
bullmq: {
// Define queue names.
// Note: Since v7.60.0 this options is not required anymore, excepted if queue is not defined in JobController decorator
queues: ["default", "special"],
connection: {
// redisio connection options
},
defaultQueueOptions: {
// Default queue options which are applied to every queue
// Can be extended/overridden by `queueOptions`
},
queueOptions: {
special: {
// Specify additional queue options by queue name
}
},
// Disable the creation of any worker.
// All other worker configuration will be ignored
disableWorker: true,
// Specify for which queues to start a worker for.
// When undefined falls back to all queues specified.
workerQueues: ["default"],
defaultWorkerOptions: {
// Default worker options which are applied to every worker
// Can be extended/overridden by `workerOptions`
},
workerOptions: {
special: {
// Specify additional worker options by queue name
}
}
}
})
export class Server {}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# Define a Job
A job is defined as a class decorated with the @JobController
decorator and implementing the JobMethods
interface of the @tsed/bullmq
package
import {JobController, JobMethods} from "@tsed/bullmq";
@JobController("example")
class ExampleJob implements JobMethods {
public handle(payload: {msg: string}) {
console.info("New message incoming", payload.msg);
}
}
2
3
4
5
6
7
8
You can also specify a non default queue as the second argument in the decorator and add any other job specific options as a third argument
import {JobController, JobMethods} from "@tsed/bullmq";
@JobController("other-example", "other-queue", {
attempts: 42
})
class OtherExampleJob implements JobMethods {
public handle(payload: {num: number}) {
console.info("look at my awesome number: ", payload.num);
}
}
2
3
4
5
6
7
8
9
10
# Defining a custom job id within a job
The JobMethods
interface has an optional method jobId
, which when implemented instructs the dispatcher to use it when defining the id for the job.
The method will accept the job payload, and since it is defined within the job class, it will also have access to all injected services.
import {JobController, JobMethods} from "@tsed/bullmq";
@JobController("example-with-custom-id")
class ExampleJobWithCustomId implements JobMethods {
public handle(payload: {num: number}) {
console.info("look at my awesome number: ", payload.num);
}
public jobId(payload: {num: number}): string {
return `very realistic job id #${payload.num}`;
}
}
2
3
4
5
6
7
8
9
10
11
12
Keep in mind, though, that when defining a job using the dispatcher and dispatching the job, the ID defined using the dispatcher will take precedence!
this.dispatcher(ExampleJobWithCustomId, {num: 1}); // id: 'very realistic job id #1'
this.dispatcher(ExampleJobWithCustomId, {num: 2}, {jobId: "I do my own thing!"}); // id: 'I do my own thing!'
2
# Defining a repeating job
Jobs that should be run regularly on a schedule can also easily be defined using the @JobController
decorator.
Doing so will automatically dispatch it without any data.
import {JobController, JobMethods} from "@tsed/bullmq";
@JobController("my-cron-job", "default", {
repeat: {
pattern: "* * * * *"
}
})
class MyCronJob implements JobMethods {
public handle() {
console.info("I run every minute!");
}
}
2
3
4
5
6
7
8
9
10
11
12
To register the job, you now need to import it into the server so that it can be detected.
import {Configuration} from "@tsed/common";
import "@tsed/bullmq"; // import bullmq ts.ed module
import "./jobs/MyCronJob";
@Configuration()
// server configuration
export class Server {}
2
3
4
5
6
7
8
# Defining a fallback job
Because the @JobController
requires a name, you can not use it in case you have dynamic job names.
For this usecase there is a @FallbackJobController
, which allows you to define a fallback method globally or per queue:
import {FallbackJobController, JobMethods} from "@tsed/bullmq";
@FallbackJobController("foo")
class FooFallbackController implements JobMethods {
public handle() {
console.info(`I run for every job within the "foo" queue, which doesn't have its own JobController`);
}
}
@FallbackJobController()
class GlobalFallbackController implements JobMethods {
public handle() {
console.info(`I run for every job in every other queue, which doesn't have its own JobController`);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
You also have to register the fallback job in the server:
import {Configuration} from "@tsed/common";
import "@tsed/bullmq"; // import bullmq ts.ed module
import "./jobs/MyFallbackJobs";
@Configuration()
// server configuration
export class Server {}
2
3
4
5
6
7
8
# Dispatching jobs
Dispatching jobs is done via the JobDispatcher
service that takes the job to be dispatched and its payload.
import {Service} from "@tsed/di";
import {JobDispatcher} from "@tsed/bullmq";
import {ExampleJob} from "./jobs/ExampleJob";
@Service()
class MyService {
constructor(private readonly dispatcher: JobDispatcher) {}
public async doingSomething() {
await this.dispatcher.dispatch(ExampleJob, {msg: "this message is part of the payload for the job"});
console.info("I just dispatched a job!");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
In addition to statically defined job options when declaring the job, custom job options can also be set when dispatching the job. This allows, for example, the job to be delayed from its original dispatch time.
import {Service} from "@tsed/di";
import {JobDispatcher} from "@tsed/bullmq";
import {ExampleJob} from "./jobs/ExampleJob";
@Service()
class MyService {
constructor(private readonly dispatcher: JobDispatcher) {}
public async doingSomething() {
await this.dispatcher.dispatch(
ExampleJob,
{msg: "this message is part of the payload for the job"},
{
delay: 600_000 // 10 minutes in milliseconds
}
);
console.info("I just dispatched a job!");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
In case you want to be more flexible, you can also dispatch a job via a name or a queue/name combination.
Note: This normally only makes sense when you have a Fallback Job configured.
import {Service} from "@tsed/di";
import {JobDispatcher} from "@tsed/bullmq";
import {ExampleJob} from "./jobs/ExampleJob";
@Service()
class MyService {
constructor(private readonly dispatcher: JobDispatcher) {}
public async doingSomething() {
// When passing only a name, the job will be dispatched in a queue named "default"
await this.dispatcher.dispatch("some-name", {msg: "this message is part of the payload for the job"});
// You can also specifiy which queue to use
await this.dispatcher.dispatch({queue: "some-queue", name: "some-name"}, {msg: "this message is part of the payload for the job"});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Inject a Queue v7.60.0+
While JobDispatcher is the recommended way to dispatch jobs use class token, this can be useful in some cases to manipulate the queue directly.
You can inject a queue using the @InjectQueue
decorator.
import {InjectQueue, JobController} from "@tsed/bullmq";
import {Queue} from "bullmq";
@JobController("example")
class ExampleJob implements JobMethods {
@InjectQueue("default")
private readonly queue?: Queue;
$onInit() {
if (this.queue) {
// do something with the queue
this.queue.add("some-job", {msg: "this message is part of the payload for the job"});
}
}
public handle(payload: {msg: string}) {
console.info("New message incoming", payload.msg);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Inject a Worker v7.60.0+
You can also inject a worker using the @InjectWorker
decorator.
import {InjectWorker, JobController} from "@tsed/bullmq";
@JobController("example")
class ExampleJob implements JobMethods {
@InjectWorker("default")
private readonly worker?: Worker;
$onInit() {
if (this.worker) {
// do something with the worker
this.worker.on("completed", (job) => {
console.log("Job completed", job);
});
}
}
public handle(payload: {msg: string}) {
console.info("New message incoming", payload.msg);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# Authors
# Maintainers
Other topics
- Session & cookies
- Passport.js
- Keycloak
- Prisma
- TypeORM
- MikroORM
- Mongoose
- GraphQL
- GraphQL WS
- Apollo
- TypeGraphQL
- GraphQL Nexus
- Socket.io
- Swagger
- AJV
- Multer
- Serve static files
- Templating
- Serverless HTTP
- Seq
- OIDC
- Stripe
- Agenda
- Terminus
- Serverless
- Server-sent events
- IORedis
- Vike
- Jest
- Vitest
- Controllers
- Providers
- Model
- JsonMapper
- Middlewares
- Pipes
- Interceptors
- Authentication
- Hooks
- Exceptions
- Throw HTTP Exceptions
- Cache
- Command
- Response Filter
- Injection scopes
- Custom providers
- Lazy-loading provider
- Custom endpoint decorator
- Testing
- Customize 404