From e330ac1934516807757afe2d2760fa21b27006e6 Mon Sep 17 00:00:00 2001
From: Akihiko Odaki <nekomanma@pixiv.co.jp>
Date: Thu, 5 Apr 2018 01:04:44 +0900
Subject: [PATCH] Let unhandled rejection handler handle rejections in jobs

---
 .../processors/db/delete-post-dependents.ts   |  4 +-
 src/queue/processors/db/index.ts              |  2 +-
 src/queue/processors/http/deliver-post.ts     | 28 ++++---
 src/queue/processors/http/follow.ts           | 79 +++++++++----------
 src/queue/processors/http/index.ts            |  2 +-
 .../processors/http/perform-activitypub.ts    |  5 +-
 src/queue/processors/http/process-inbox.ts    | 55 +++++++------
 .../processors/http/report-github-failure.ts  | 39 +++++----
 src/queue/processors/http/unfollow.ts         | 31 +++++---
 9 files changed, 134 insertions(+), 111 deletions(-)

diff --git a/src/queue/processors/db/delete-post-dependents.ts b/src/queue/processors/db/delete-post-dependents.ts
index 6de21eb05..fb6617e95 100644
--- a/src/queue/processors/db/delete-post-dependents.ts
+++ b/src/queue/processors/db/delete-post-dependents.ts
@@ -5,7 +5,7 @@ import PostReaction from '../../../models/post-reaction';
 import PostWatching from '../../../models/post-watching';
 import Post from '../../../models/post';
 
-export default async ({ data }) => Promise.all([
+export default ({ data }, done) => Promise.all([
 	Favorite.remove({ postId: data._id }),
 	Notification.remove({ postId: data._id }),
 	PollVote.remove({ postId: data._id }),
@@ -19,4 +19,4 @@ export default async ({ data }) => Promise.all([
 		}),
 		Post.remove({ repostId: data._id })
 	]))
-]);
+]).then(() => done(), done);
diff --git a/src/queue/processors/db/index.ts b/src/queue/processors/db/index.ts
index 75838c099..468ec442a 100644
--- a/src/queue/processors/db/index.ts
+++ b/src/queue/processors/db/index.ts
@@ -4,4 +4,4 @@ const handlers = {
   deletePostDependents
 };
 
-export default (job, done) => handlers[job.data.type](job).then(() => done(), done);
+export default (job, done) => handlers[job.data.type](job, done);
diff --git a/src/queue/processors/http/deliver-post.ts b/src/queue/processors/http/deliver-post.ts
index e743fc5f6..8107c8bf7 100644
--- a/src/queue/processors/http/deliver-post.ts
+++ b/src/queue/processors/http/deliver-post.ts
@@ -5,17 +5,23 @@ import renderCreate from '../../../remote/activitypub/renderer/create';
 import renderNote from '../../../remote/activitypub/renderer/note';
 import request from '../../../remote/request';
 
-export default async ({ data }) => {
-	const promisedTo = User.findOne({ _id: data.toId }) as Promise<IRemoteUser>;
-	const [from, post] = await Promise.all([
-		User.findOne({ _id: data.fromId }),
-		Post.findOne({ _id: data.postId })
-	]);
-	const note = await renderNote(from, post);
-	const to = await promisedTo;
-	const create = renderCreate(note);
+export default async ({ data }, done) => {
+	try {
+		const promisedTo = User.findOne({ _id: data.toId }) as Promise<IRemoteUser>;
+		const [from, post] = await Promise.all([
+			User.findOne({ _id: data.fromId }),
+			Post.findOne({ _id: data.postId })
+		]);
+		const note = await renderNote(from, post);
+		const to = await promisedTo;
+		const create = renderCreate(note);
 
-	create['@context'] = context;
+		create['@context'] = context;
 
-	return request(from, to.account.inbox, create);
+		await request(from, to.account.inbox, create);
+	} catch (error) {
+		done(error);
+	}
+
+	done();
 };
diff --git a/src/queue/processors/http/follow.ts b/src/queue/processors/http/follow.ts
index 4cb72828e..ba1cc3118 100644
--- a/src/queue/processors/http/follow.ts
+++ b/src/queue/processors/http/follow.ts
@@ -7,10 +7,8 @@ import notify from '../../../publishers/notify';
 import context from '../../../remote/activitypub/renderer/context';
 import render from '../../../remote/activitypub/renderer/follow';
 import request from '../../../remote/request';
-import Logger from '../../../utils/logger';
 
-export default async ({ data }) => {
-	const { followerId, followeeId } = await Following.findOne({ _id: data.following });
+export default ({ data }, done) => Following.findOne({ _id: data.following }).then(async ({ followerId, followeeId }) => {
 	const [follower, followee] = await Promise.all([
 		User.findOne({ _id: followerId }),
 		User.findOne({ _id: followeeId })
@@ -23,47 +21,46 @@ export default async ({ data }) => {
 		await request(follower, followee.account.inbox, rendered);
 	}
 
-	try {
-		await Promise.all([
-			// Increment following count
-			User.update(followerId, {
-				$inc: {
-					followingCount: 1
-				}
-			}),
+	return [follower, followee];
+}).then(([follower, followee]) => Promise.all([
+	// Increment following count
+	User.update(follower._id, {
+		$inc: {
+			followingCount: 1
+		}
+	}),
 
-			FollowingLog.insert({
-				createdAt: data.following.createdAt,
-				userId: followerId,
-				count: follower.followingCount + 1
-			}),
+	FollowingLog.insert({
+		createdAt: data.following.createdAt,
+		userId: follower._id,
+		count: follower.followingCount + 1
+	}),
 
-			// Increment followers count
-			User.update({ _id: followeeId }, {
-				$inc: {
-					followersCount: 1
-				}
-			}),
+	// Increment followers count
+	User.update({ _id: followee._id }, {
+		$inc: {
+			followersCount: 1
+		}
+	}),
 
-			FollowedLog.insert({
-				createdAt: data.following.createdAt,
-				userId: followerId,
-				count: followee.followersCount + 1
-			}),
+	FollowedLog.insert({
+		createdAt: data.following.createdAt,
+		userId: follower._id,
+		count: followee.followersCount + 1
+	}),
 
-			// Publish follow event
-			isLocalUser(follower) && packUser(followee, follower)
-				.then(packed => event(follower._id, 'follow', packed)),
+	// Publish follow event
+	isLocalUser(follower) && packUser(followee, follower)
+		.then(packed => event(follower._id, 'follow', packed)),
 
-			isLocalUser(followee) && Promise.all([
-				packUser(follower, followee)
-					.then(packed => event(followee._id, 'followed', packed)),
+	isLocalUser(followee) && Promise.all([
+		packUser(follower, followee)
+			.then(packed => event(followee._id, 'followed', packed)),
 
-				// Notify
-				isLocalUser(followee) && notify(followeeId, followerId, 'follow')
-			])
-		]);
-	} catch (error) {
-		Logger.error(error.toString());
-	}
-};
+		// Notify
+		isLocalUser(followee) && notify(followee._id, follower._id, 'follow')
+	])
+]).then(() => done(), error => {
+	done();
+	throw error;
+}), done);
diff --git a/src/queue/processors/http/index.ts b/src/queue/processors/http/index.ts
index 8f9aa717c..0ea79305c 100644
--- a/src/queue/processors/http/index.ts
+++ b/src/queue/processors/http/index.ts
@@ -14,4 +14,4 @@ const handlers = {
   unfollow
 };
 
-export default (job, done) => handlers[job.data.type](job).then(() => done(), done);
+export default (job, done) => handlers[job.data.type](job, done);
diff --git a/src/queue/processors/http/perform-activitypub.ts b/src/queue/processors/http/perform-activitypub.ts
index 7b84400d5..ae70c0f0b 100644
--- a/src/queue/processors/http/perform-activitypub.ts
+++ b/src/queue/processors/http/perform-activitypub.ts
@@ -2,6 +2,7 @@ import User from '../../../models/user';
 import act from '../../../remote/activitypub/act';
 import Resolver from '../../../remote/activitypub/resolver';
 
-export default ({ data }) => User.findOne({ _id: data.actor })
+export default ({ data }, done) => User.findOne({ _id: data.actor })
 	.then(actor => act(new Resolver(), actor, data.outbox))
-	.then(Promise.all);
+	.then(Promise.all)
+	.then(() => done(), done);
diff --git a/src/queue/processors/http/process-inbox.ts b/src/queue/processors/http/process-inbox.ts
index de1dbd2f9..88fbb9737 100644
--- a/src/queue/processors/http/process-inbox.ts
+++ b/src/queue/processors/http/process-inbox.ts
@@ -5,35 +5,40 @@ import act from '../../../remote/activitypub/act';
 import resolvePerson from '../../../remote/activitypub/resolve-person';
 import Resolver from '../../../remote/activitypub/resolver';
 
-export default async ({ data }): Promise<void> => {
-	const keyIdLower = data.signature.keyId.toLowerCase();
-	let user;
+export default async ({ data }, done) => {
+	try {
+		const keyIdLower = data.signature.keyId.toLowerCase();
+		let user;
 
-	if (keyIdLower.startsWith('acct:')) {
-		const { username, host } = parseAcct(keyIdLower.slice('acct:'.length));
-		if (host === null) {
-			throw 'request was made by local user';
+		if (keyIdLower.startsWith('acct:')) {
+			const { username, host } = parseAcct(keyIdLower.slice('acct:'.length));
+			if (host === null) {
+				done();
+				return;
+			}
+
+			user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser;
+		} else {
+			user = await User.findOne({
+				host: { $ne: null },
+				'account.publicKey.id': data.signature.keyId
+			}) as IRemoteUser;
+
+			if (user === null) {
+				user = await resolvePerson(data.signature.keyId);
+			}
 		}
 
-		user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser;
-	} else {
-		user = await User.findOne({
-			host: { $ne: null },
-			'account.publicKey.id': data.signature.keyId
-		}) as IRemoteUser;
-
-		if (user === null) {
-			user = await resolvePerson(data.signature.keyId);
+		if (user === null || !verifySignature(data.signature, user.account.publicKey.publicKeyPem)) {
+			done();
+			return;
 		}
+
+		await Promise.all(await act(new Resolver(), user, data.inbox, true));
+	} catch (error) {
+		done(error);
+		return;
 	}
 
-	if (user === null) {
-		throw 'failed to resolve user';
-	}
-
-	if (!verifySignature(data.signature, user.account.publicKey.publicKeyPem)) {
-		throw 'signature verification failed';
-	}
-
-	await Promise.all(await act(new Resolver(), user, data.inbox, true));
+	done();
 };
diff --git a/src/queue/processors/http/report-github-failure.ts b/src/queue/processors/http/report-github-failure.ts
index 21683ba3c..af9659bda 100644
--- a/src/queue/processors/http/report-github-failure.ts
+++ b/src/queue/processors/http/report-github-failure.ts
@@ -2,23 +2,30 @@ import * as request from 'request-promise-native';
 import User from '../../../models/user';
 const createPost = require('../../../server/api/endpoints/posts/create');
 
-export default async ({ data }) => {
-	const asyncBot = User.findOne({ _id: data.userId });
+export default async ({ data }, done) => {
+	try {
+		const asyncBot = User.findOne({ _id: data.userId });
 
-	// Fetch parent status
-	const parentStatuses = await request({
-		url: `${data.parentUrl}/statuses`,
-		headers: {
-			'User-Agent': 'misskey'
-		},
-		json: true
-	});
+		// Fetch parent status
+		const parentStatuses = await request({
+			url: `${data.parentUrl}/statuses`,
+			headers: {
+				'User-Agent': 'misskey'
+			},
+			json: true
+		});
 
-	const parentState = parentStatuses[0].state;
-	const stillFailed = parentState == 'failure' || parentState == 'error';
-	const text = stillFailed ?
-		`**⚠️BUILD STILL FAILED⚠️**: ?[${data.message}](${data.htmlUrl})` :
-		`**🚨BUILD FAILED🚨**: →→→?[${data.message}](${data.htmlUrl})←←←`;
+		const parentState = parentStatuses[0].state;
+		const stillFailed = parentState == 'failure' || parentState == 'error';
+		const text = stillFailed ?
+			`**⚠️BUILD STILL FAILED⚠️**: ?[${data.message}](${data.htmlUrl})` :
+			`**🚨BUILD FAILED🚨**: →→→?[${data.message}](${data.htmlUrl})←←←`;
 
-	createPost({ text }, await asyncBot);
+		createPost({ text }, await asyncBot);
+	} catch (error) {
+		done(error);
+		return;
+	}
+
+	done();
 };
diff --git a/src/queue/processors/http/unfollow.ts b/src/queue/processors/http/unfollow.ts
index 801a3612a..dc50e946c 100644
--- a/src/queue/processors/http/unfollow.ts
+++ b/src/queue/processors/http/unfollow.ts
@@ -7,24 +7,31 @@ import renderFollow from '../../../remote/activitypub/renderer/follow';
 import renderUndo from '../../../remote/activitypub/renderer/undo';
 import context from '../../../remote/activitypub/renderer/context';
 import request from '../../../remote/request';
-import Logger from '../../../utils/logger';
 
-export default async ({ data }) => {
+export default async ({ data }, done) => {
 	const following = await Following.findOne({ _id: data.id });
 	if (following === null) {
+		done();
 		return;
 	}
 
-	const [follower, followee] = await Promise.all([
-		User.findOne({ _id: following.followerId }),
-		User.findOne({ _id: following.followeeId })
-	]);
+	let follower, followee;
 
-	if (isLocalUser(follower) && isRemoteUser(followee)) {
-		const undo = renderUndo(renderFollow(follower, followee));
-		undo['@context'] = context;
+	try {
+		[follower, followee] = await Promise.all([
+			User.findOne({ _id: following.followerId }),
+			User.findOne({ _id: following.followeeId })
+		]);
 
-		await request(follower, followee.account.inbox, undo);
+		if (isLocalUser(follower) && isRemoteUser(followee)) {
+			const undo = renderUndo(renderFollow(follower, followee));
+			undo['@context'] = context;
+
+			await request(follower, followee.account.inbox, undo);
+		}
+	} catch (error) {
+		done(error);
+		return;
 	}
 
 	try {
@@ -57,7 +64,7 @@ export default async ({ data }) => {
 
 		// Publish follow event
 		stream(follower._id, 'unfollow', promisedPackedUser);
-	} catch (error) {
-		Logger.error(error.toString());
+	} finally {
+		done();
 	}
 };