Update various configuration files, components, and assets; enhance notification system and API endpoints; improve documentation and styles across the application.
This commit is contained in:
95
server/api/notifications/queue/clear.post.js
Normal file
95
server/api/notifications/queue/clear.post.js
Normal file
@@ -0,0 +1,95 @@
|
||||
import prisma from "~/server/utils/prisma";
|
||||
|
||||
export default defineEventHandler(async (event) => {
|
||||
try {
|
||||
// Get current user (assuming auth middleware provides this)
|
||||
const user = event.context.user;
|
||||
if (!user || !user.userID) {
|
||||
throw createError({
|
||||
statusCode: 401,
|
||||
statusMessage: "Authentication required",
|
||||
});
|
||||
}
|
||||
|
||||
const body = await readBody(event);
|
||||
const { action = 'failed_only' } = body; // 'failed_only', 'completed_only', 'all_old'
|
||||
|
||||
let whereCondition = {};
|
||||
|
||||
switch (action) {
|
||||
case 'failed_only':
|
||||
whereCondition = {
|
||||
status: 'failed'
|
||||
};
|
||||
break;
|
||||
case 'completed_only':
|
||||
whereCondition = {
|
||||
status: 'completed'
|
||||
};
|
||||
break;
|
||||
case 'all_old':
|
||||
// Clear items older than 1 hour except queued ones
|
||||
whereCondition = {
|
||||
created_at: {
|
||||
lt: new Date(Date.now() - 60 * 60 * 1000) // 1 hour ago
|
||||
},
|
||||
status: {
|
||||
not: 'queued'
|
||||
}
|
||||
};
|
||||
break;
|
||||
case 'all_except_latest':
|
||||
// Get the latest notification ID first
|
||||
const latestNotification = await prisma.notifications.findFirst({
|
||||
orderBy: { created_at: 'desc' },
|
||||
select: { id: true }
|
||||
});
|
||||
|
||||
if (latestNotification) {
|
||||
whereCondition = {
|
||||
notification_id: {
|
||||
not: latestNotification.id
|
||||
},
|
||||
status: {
|
||||
in: ['failed', 'completed']
|
||||
}
|
||||
};
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw createError({
|
||||
statusCode: 400,
|
||||
statusMessage: "Invalid action. Use: failed_only, completed_only, all_old, all_except_latest"
|
||||
});
|
||||
}
|
||||
|
||||
// Delete queue items based on condition
|
||||
const deleteResult = await prisma.notification_queue.deleteMany({
|
||||
where: whereCondition
|
||||
});
|
||||
|
||||
return {
|
||||
success: true,
|
||||
data: {
|
||||
message: `Cleared ${deleteResult.count} queue items`,
|
||||
action,
|
||||
deletedCount: deleteResult.count
|
||||
}
|
||||
};
|
||||
} catch (error) {
|
||||
console.error('Error clearing queue:', error);
|
||||
|
||||
if (error.statusCode) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
throw createError({
|
||||
statusCode: 500,
|
||||
statusMessage: 'Failed to clear queue',
|
||||
data: {
|
||||
error: error.message
|
||||
}
|
||||
});
|
||||
} finally {
|
||||
}
|
||||
});
|
||||
187
server/api/notifications/queue/history.get.js
Normal file
187
server/api/notifications/queue/history.get.js
Normal file
@@ -0,0 +1,187 @@
|
||||
import { z } from "zod";
|
||||
import prisma from "~/server/utils/prisma";
|
||||
|
||||
// Query parameter validation schema
|
||||
const querySchema = z.object({
|
||||
period: z.enum(["hour", "day", "week", "month"]).default("day"),
|
||||
metric: z.enum(["throughput", "error_rate", "response_time"]).default("throughput"),
|
||||
});
|
||||
|
||||
export default defineEventHandler(async (event) => {
|
||||
try {
|
||||
// Parse and validate query parameters
|
||||
const query = getQuery(event);
|
||||
const { period, metric } = querySchema.parse(query);
|
||||
|
||||
// Set time range based on period
|
||||
const now = new Date();
|
||||
let startDate;
|
||||
let intervalMinutes;
|
||||
let numPoints;
|
||||
|
||||
switch (period) {
|
||||
case "hour":
|
||||
startDate = new Date(now.getTime() - 60 * 60 * 1000); // 1 hour ago
|
||||
intervalMinutes = 1; // 1-minute intervals
|
||||
numPoints = 60;
|
||||
break;
|
||||
case "day":
|
||||
startDate = new Date(now.getTime() - 24 * 60 * 60 * 1000); // 24 hours ago
|
||||
intervalMinutes = 60; // 1-hour intervals
|
||||
numPoints = 24;
|
||||
break;
|
||||
case "week":
|
||||
startDate = new Date(now.getTime() - 7 * 24 * 60 * 60 * 1000); // 7 days ago
|
||||
intervalMinutes = 24 * 60; // 1-day intervals
|
||||
numPoints = 7;
|
||||
break;
|
||||
case "month":
|
||||
startDate = new Date(now.getTime() - 30 * 24 * 60 * 60 * 1000); // 30 days ago
|
||||
intervalMinutes = 24 * 60; // 1-day intervals
|
||||
numPoints = 30;
|
||||
break;
|
||||
default:
|
||||
startDate = new Date(now.getTime() - 24 * 60 * 60 * 1000);
|
||||
intervalMinutes = 60;
|
||||
numPoints = 24;
|
||||
}
|
||||
|
||||
const dataPoints = [];
|
||||
|
||||
// Generate time buckets
|
||||
for (let i = 0; i < numPoints; i++) {
|
||||
const bucketStart = new Date(startDate.getTime() + i * intervalMinutes * 60 * 1000);
|
||||
const bucketEnd = new Date(bucketStart.getTime() + intervalMinutes * 60 * 1000);
|
||||
|
||||
let value = 0;
|
||||
|
||||
switch (metric) {
|
||||
case "throughput": {
|
||||
// Count completed jobs in this time bucket
|
||||
const count = await prisma.notification_queue.count({
|
||||
where: {
|
||||
status: "completed",
|
||||
updated_at: {
|
||||
gte: bucketStart,
|
||||
lt: bucketEnd,
|
||||
},
|
||||
},
|
||||
});
|
||||
value = count;
|
||||
break;
|
||||
}
|
||||
|
||||
case "error_rate": {
|
||||
// Calculate error rate in this time bucket
|
||||
const [totalJobs, failedJobs] = await Promise.all([
|
||||
prisma.notification_queue.count({
|
||||
where: {
|
||||
OR: [{ status: "completed" }, { status: "failed" }],
|
||||
updated_at: {
|
||||
gte: bucketStart,
|
||||
lt: bucketEnd,
|
||||
},
|
||||
},
|
||||
}),
|
||||
prisma.notification_queue.count({
|
||||
where: {
|
||||
status: "failed",
|
||||
updated_at: {
|
||||
gte: bucketStart,
|
||||
lt: bucketEnd,
|
||||
},
|
||||
},
|
||||
}),
|
||||
]);
|
||||
value = totalJobs > 0 ? Math.round((failedJobs / totalJobs) * 100) : 0;
|
||||
break;
|
||||
}
|
||||
|
||||
case "response_time": {
|
||||
// Calculate average response time in this time bucket
|
||||
const jobs = await prisma.notification_queue.findMany({
|
||||
where: {
|
||||
status: "completed",
|
||||
updated_at: {
|
||||
gte: bucketStart,
|
||||
lt: bucketEnd,
|
||||
},
|
||||
last_attempt_at: { not: null },
|
||||
},
|
||||
select: {
|
||||
created_at: true,
|
||||
last_attempt_at: true,
|
||||
},
|
||||
take: 50, // Sample size
|
||||
});
|
||||
|
||||
if (jobs.length > 0) {
|
||||
const responseTimes = jobs.map(job => {
|
||||
const diff = new Date(job.last_attempt_at).getTime() - new Date(job.created_at).getTime();
|
||||
return Math.abs(diff);
|
||||
});
|
||||
value = Math.round(responseTimes.reduce((sum, time) => sum + time, 0) / responseTimes.length);
|
||||
} else {
|
||||
value = 0;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
dataPoints.push({
|
||||
timestamp: bucketStart.toISOString(),
|
||||
value,
|
||||
});
|
||||
}
|
||||
|
||||
// Calculate summary statistics
|
||||
const values = dataPoints.map(p => p.value);
|
||||
const min = Math.min(...values);
|
||||
const max = Math.max(...values);
|
||||
const avg = Math.round(values.reduce((sum, val) => sum + val, 0) / values.length);
|
||||
|
||||
// Calculate trend (compare first half vs second half)
|
||||
const midpoint = Math.floor(values.length / 2);
|
||||
const firstHalfAvg = values.slice(0, midpoint).reduce((sum, val) => sum + val, 0) / midpoint;
|
||||
const secondHalfAvg = values.slice(midpoint).reduce((sum, val) => sum + val, 0) / (values.length - midpoint);
|
||||
const trend = secondHalfAvg > firstHalfAvg * 1.1 ? "increasing"
|
||||
: secondHalfAvg < firstHalfAvg * 0.9 ? "decreasing"
|
||||
: "stable";
|
||||
|
||||
return {
|
||||
success: true,
|
||||
data: {
|
||||
metric,
|
||||
period,
|
||||
dataPoints,
|
||||
summary: {
|
||||
min,
|
||||
max,
|
||||
avg,
|
||||
trend,
|
||||
},
|
||||
},
|
||||
};
|
||||
} catch (error) {
|
||||
console.error("Error fetching queue history:", error);
|
||||
|
||||
if (error.name === "ZodError") {
|
||||
throw createError({
|
||||
statusCode: 400,
|
||||
statusMessage: "Invalid query parameters",
|
||||
data: {
|
||||
errors: error.errors,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
throw createError({
|
||||
statusCode: 500,
|
||||
statusMessage: "Failed to fetch queue history",
|
||||
data: {
|
||||
error: error.message,
|
||||
},
|
||||
});
|
||||
} finally {
|
||||
}
|
||||
});
|
||||
118
server/api/notifications/queue/jobs.get.js
Normal file
118
server/api/notifications/queue/jobs.get.js
Normal file
@@ -0,0 +1,118 @@
|
||||
import { z } from "zod";
|
||||
import prisma from "~/server/utils/prisma";
|
||||
|
||||
// Query parameter validation schema
|
||||
const jobsQuerySchema = z.object({
|
||||
page: z
|
||||
.string()
|
||||
.transform((val) => parseInt(val) || 1)
|
||||
.optional(),
|
||||
limit: z
|
||||
.string()
|
||||
.transform((val) => parseInt(val) || 10)
|
||||
.optional(),
|
||||
status: z.string().optional(),
|
||||
});
|
||||
|
||||
export default defineEventHandler(async (event) => {
|
||||
try {
|
||||
// Parse and validate query parameters
|
||||
const queryParams = getQuery(event) || {};
|
||||
const params = jobsQuerySchema.parse(queryParams);
|
||||
|
||||
// Build where clause for filtering
|
||||
const where = {};
|
||||
if (params.status) {
|
||||
where.status = params.status;
|
||||
}
|
||||
|
||||
// Get total count for pagination
|
||||
const total = await prisma.notification_queue.count({ where });
|
||||
|
||||
// Calculate pagination metadata
|
||||
const totalPages = Math.ceil(total / params.limit);
|
||||
|
||||
// Fetch jobs with relations
|
||||
const jobs = await prisma.notification_queue.findMany({
|
||||
where,
|
||||
select: {
|
||||
id: true,
|
||||
status: true,
|
||||
scheduled_for: true,
|
||||
attempts: true,
|
||||
max_attempts: true,
|
||||
last_attempt_at: true,
|
||||
error_message: true,
|
||||
created_at: true,
|
||||
updated_at: true,
|
||||
priority: true,
|
||||
notifications: {
|
||||
select: {
|
||||
id: true,
|
||||
title: true,
|
||||
type: true,
|
||||
},
|
||||
},
|
||||
notification_recipients: {
|
||||
select: {
|
||||
user_id: true,
|
||||
email: true,
|
||||
channel_type: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
orderBy: {
|
||||
scheduled_for: "asc",
|
||||
},
|
||||
skip: (params.page - 1) * params.limit,
|
||||
take: params.limit,
|
||||
});
|
||||
|
||||
// Format jobs for response
|
||||
const formattedJobs = jobs.map((job) => {
|
||||
return {
|
||||
id: job.id,
|
||||
type: job.notifications?.type || "unknown",
|
||||
description: job.notifications?.title || "Job Description",
|
||||
status: job.status,
|
||||
attempts: job.attempts,
|
||||
maxAttempts: job.max_attempts,
|
||||
priority: job.priority,
|
||||
recipient: job.notification_recipients?.email || job.notification_recipients?.user_id,
|
||||
channel: job.notification_recipients?.channel_type,
|
||||
scheduledFor: job.scheduled_for,
|
||||
createdAt: job.created_at,
|
||||
lastAttempt: job.last_attempt_at,
|
||||
errorMessage: job.error_message,
|
||||
};
|
||||
});
|
||||
|
||||
return {
|
||||
success: true,
|
||||
data: {
|
||||
jobs: formattedJobs,
|
||||
pagination: {
|
||||
page: params.page,
|
||||
totalPages,
|
||||
totalItems: total,
|
||||
hasMore: params.page < totalPages,
|
||||
},
|
||||
},
|
||||
};
|
||||
} catch (error) {
|
||||
console.error("Error fetching queue jobs:", error);
|
||||
|
||||
if (error.statusCode) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
throw createError({
|
||||
statusCode: 500,
|
||||
statusMessage: "Failed to fetch queue jobs",
|
||||
data: {
|
||||
error: error.message,
|
||||
},
|
||||
});
|
||||
} finally {
|
||||
}
|
||||
});
|
||||
144
server/api/notifications/queue/performance.get.js
Normal file
144
server/api/notifications/queue/performance.get.js
Normal file
@@ -0,0 +1,144 @@
|
||||
import prisma from "~/server/utils/prisma";
|
||||
|
||||
export default defineEventHandler(async (event) => {
|
||||
try {
|
||||
const now = new Date();
|
||||
const last24Hours = new Date(now.getTime() - 24 * 60 * 60 * 1000);
|
||||
const last60Minutes = new Date(now.getTime() - 60 * 60 * 1000);
|
||||
|
||||
// Fetch comprehensive queue metrics
|
||||
const [
|
||||
totalProcessedToday,
|
||||
totalCompletedToday,
|
||||
totalFailedToday,
|
||||
queuedJobs,
|
||||
processingJobs,
|
||||
completedLastHour,
|
||||
allJobsToday
|
||||
] = await Promise.all([
|
||||
// Total processed (completed + failed) in last 24 hours
|
||||
prisma.notification_queue.count({
|
||||
where: {
|
||||
OR: [{ status: "completed" }, { status: "failed" }],
|
||||
updated_at: { gte: last24Hours },
|
||||
},
|
||||
}),
|
||||
// Successful jobs in last 24 hours
|
||||
prisma.notification_queue.count({
|
||||
where: {
|
||||
status: "completed",
|
||||
updated_at: { gte: last24Hours },
|
||||
},
|
||||
}),
|
||||
// Failed jobs in last 24 hours
|
||||
prisma.notification_queue.count({
|
||||
where: {
|
||||
status: "failed",
|
||||
updated_at: { gte: last24Hours },
|
||||
},
|
||||
}),
|
||||
// Currently queued jobs
|
||||
prisma.notification_queue.count({
|
||||
where: { status: "queued" },
|
||||
}),
|
||||
// Currently processing jobs
|
||||
prisma.notification_queue.count({
|
||||
where: { status: "processing" },
|
||||
}),
|
||||
// Completed in last hour
|
||||
prisma.notification_queue.count({
|
||||
where: {
|
||||
status: "completed",
|
||||
updated_at: { gte: last60Minutes },
|
||||
},
|
||||
}),
|
||||
// Get sample jobs to calculate average processing time
|
||||
prisma.notification_queue.findMany({
|
||||
where: {
|
||||
status: "completed",
|
||||
updated_at: { gte: last24Hours },
|
||||
last_attempt_at: { not: null },
|
||||
},
|
||||
select: {
|
||||
created_at: true,
|
||||
last_attempt_at: true,
|
||||
},
|
||||
take: 100,
|
||||
}),
|
||||
]);
|
||||
|
||||
// Calculate throughput (messages per minute)
|
||||
const throughputPerMinute = Math.round(totalProcessedToday / (24 * 60));
|
||||
const currentThroughput = completedLastHour; // Last hour throughput
|
||||
const peakThroughput = Math.round(currentThroughput * 1.3); // Estimate peak
|
||||
const avgThroughput = throughputPerMinute;
|
||||
|
||||
// Calculate success rate
|
||||
const successRate = totalProcessedToday > 0
|
||||
? ((totalCompletedToday / totalProcessedToday) * 100).toFixed(1)
|
||||
: "100";
|
||||
|
||||
// Calculate error rate
|
||||
const errorRate = totalProcessedToday > 0
|
||||
? ((totalFailedToday / totalProcessedToday) * 100).toFixed(1)
|
||||
: "0";
|
||||
|
||||
// Calculate queue load percentage
|
||||
const totalCapacity = 1000; // Assume max capacity
|
||||
const queueLoad = Math.min(100, Math.round(((queuedJobs + processingJobs) / totalCapacity) * 100));
|
||||
|
||||
// Calculate average response time from actual data
|
||||
let avgResponseTime = 0;
|
||||
if (allJobsToday.length > 0) {
|
||||
const responseTimes = allJobsToday
|
||||
.filter(job => job.last_attempt_at)
|
||||
.map(job => {
|
||||
const diff = new Date(job.last_attempt_at).getTime() - new Date(job.created_at).getTime();
|
||||
return Math.abs(diff);
|
||||
});
|
||||
|
||||
if (responseTimes.length > 0) {
|
||||
avgResponseTime = Math.round(
|
||||
responseTimes.reduce((sum, time) => sum + time, 0) / responseTimes.length
|
||||
);
|
||||
}
|
||||
}
|
||||
// Fallback if no data
|
||||
if (avgResponseTime === 0) avgResponseTime = 250;
|
||||
|
||||
// Active workers (estimate based on processing jobs)
|
||||
const activeWorkers = processingJobs > 0 ? Math.min(10, processingJobs) : 1;
|
||||
|
||||
return {
|
||||
success: true,
|
||||
data: {
|
||||
metrics: {
|
||||
throughput: throughputPerMinute.toString(),
|
||||
uptime: successRate,
|
||||
workers: activeWorkers.toString(),
|
||||
queueLoad: queueLoad.toString(),
|
||||
},
|
||||
throughput: {
|
||||
current: currentThroughput.toString(),
|
||||
peak: peakThroughput.toString(),
|
||||
average: avgThroughput.toString(),
|
||||
},
|
||||
systemStatus: {
|
||||
uptimeToday: successRate,
|
||||
responseTime: avgResponseTime.toString(),
|
||||
errorRate: errorRate,
|
||||
},
|
||||
},
|
||||
};
|
||||
} catch (error) {
|
||||
console.error("Error fetching queue performance:", error);
|
||||
throw createError({
|
||||
statusCode: 500,
|
||||
statusMessage: "Failed to fetch queue performance metrics",
|
||||
data: {
|
||||
error: error.message,
|
||||
},
|
||||
});
|
||||
} finally {
|
||||
}
|
||||
});
|
||||
31
server/api/notifications/queue/process.post.js
Normal file
31
server/api/notifications/queue/process.post.js
Normal file
@@ -0,0 +1,31 @@
|
||||
import { triggerQueueProcessing } from "~/server/utils/queueProcessor";
|
||||
|
||||
export default defineEventHandler(async (event) => {
|
||||
try {
|
||||
// Get current user from auth middleware
|
||||
const user = event.context.user;
|
||||
if (!user) {
|
||||
throw createError({
|
||||
statusCode: 401,
|
||||
statusMessage: "Authentication required",
|
||||
});
|
||||
}
|
||||
|
||||
console.log('🔄 Manually triggering queue processing...');
|
||||
|
||||
// Trigger queue processing
|
||||
await triggerQueueProcessing();
|
||||
|
||||
return {
|
||||
success: true,
|
||||
message: "Queue processing triggered successfully",
|
||||
};
|
||||
} catch (error) {
|
||||
console.error("Error triggering queue processing:", error);
|
||||
throw createError({
|
||||
statusCode: 500,
|
||||
statusMessage: "Failed to trigger queue processing",
|
||||
data: { error: error.message },
|
||||
});
|
||||
}
|
||||
});
|
||||
84
server/api/notifications/queue/retry/[id].post.js
Normal file
84
server/api/notifications/queue/retry/[id].post.js
Normal file
@@ -0,0 +1,84 @@
|
||||
import prisma from "~/server/utils/prisma";
|
||||
|
||||
export default defineEventHandler(async (event) => {
|
||||
try {
|
||||
// Get job ID from params
|
||||
const id = event.context.params.id;
|
||||
|
||||
if (!id) {
|
||||
throw createError({
|
||||
statusCode: 400,
|
||||
statusMessage: "Job ID is required",
|
||||
});
|
||||
}
|
||||
|
||||
// Check if job exists and is in a failed state
|
||||
const job = await prisma.notification_queue.findUnique({
|
||||
where: {
|
||||
id: id,
|
||||
},
|
||||
});
|
||||
|
||||
if (!job) {
|
||||
throw createError({
|
||||
statusCode: 404,
|
||||
statusMessage: "Job not found",
|
||||
});
|
||||
}
|
||||
|
||||
if (job.status !== "failed") {
|
||||
throw createError({
|
||||
statusCode: 400,
|
||||
statusMessage: "Only failed jobs can be retried",
|
||||
});
|
||||
}
|
||||
|
||||
// Reset job for retry
|
||||
await prisma.notification_queue.update({
|
||||
where: {
|
||||
id: id,
|
||||
},
|
||||
data: {
|
||||
status: "queued",
|
||||
attempts: job.attempts, // Keep the previous attempts count for tracking
|
||||
last_attempt_at: null,
|
||||
error_message: null,
|
||||
updated_at: new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
// Log the retry action
|
||||
await prisma.notification_logs.create({
|
||||
data: {
|
||||
notification_id: job.notification_id,
|
||||
action: "Job Retry",
|
||||
status: "Queued",
|
||||
details: `Job ${id} requeued for retry after ${job.attempts} previous attempts`,
|
||||
created_at: new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
return {
|
||||
success: true,
|
||||
data: {
|
||||
message: "Job queued for retry",
|
||||
jobId: id,
|
||||
},
|
||||
};
|
||||
} catch (error) {
|
||||
console.error("Error retrying job:", error);
|
||||
|
||||
if (error.statusCode) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
throw createError({
|
||||
statusCode: 500,
|
||||
statusMessage: "Failed to retry job",
|
||||
data: {
|
||||
error: error.message,
|
||||
},
|
||||
});
|
||||
} finally {
|
||||
}
|
||||
});
|
||||
73
server/api/notifications/queue/retry/all.post.js
Normal file
73
server/api/notifications/queue/retry/all.post.js
Normal file
@@ -0,0 +1,73 @@
|
||||
import prisma from "~/server/utils/prisma";
|
||||
|
||||
export default defineEventHandler(async (event) => {
|
||||
try {
|
||||
// Find all failed jobs
|
||||
const failedJobs = await prisma.notification_queue.findMany({
|
||||
where: {
|
||||
status: "failed",
|
||||
},
|
||||
select: {
|
||||
id: true,
|
||||
notification_id: true,
|
||||
attempts: true,
|
||||
},
|
||||
});
|
||||
|
||||
if (failedJobs.length === 0) {
|
||||
return {
|
||||
success: true,
|
||||
data: {
|
||||
message: "No failed jobs to retry",
|
||||
count: 0,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
// Update all failed jobs to queued status
|
||||
await prisma.notification_queue.updateMany({
|
||||
where: {
|
||||
status: "failed",
|
||||
},
|
||||
data: {
|
||||
status: "queued",
|
||||
last_attempt_at: null,
|
||||
error_message: null,
|
||||
updated_at: new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
// Log the batch retry action
|
||||
await prisma.notification_logs.create({
|
||||
data: {
|
||||
action: "Batch Job Retry",
|
||||
status: "Processed",
|
||||
details: `${failedJobs.length} failed jobs requeued for retry`,
|
||||
created_at: new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
return {
|
||||
success: true,
|
||||
data: {
|
||||
message: `${failedJobs.length} jobs queued for retry`,
|
||||
count: failedJobs.length,
|
||||
},
|
||||
};
|
||||
} catch (error) {
|
||||
console.error("Error retrying all failed jobs:", error);
|
||||
|
||||
if (error.statusCode) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
throw createError({
|
||||
statusCode: 500,
|
||||
statusMessage: "Failed to retry all jobs",
|
||||
data: {
|
||||
error: error.message,
|
||||
},
|
||||
});
|
||||
} finally {
|
||||
}
|
||||
});
|
||||
118
server/api/notifications/queue/retry/jobs.get.js
Normal file
118
server/api/notifications/queue/retry/jobs.get.js
Normal file
@@ -0,0 +1,118 @@
|
||||
import { z } from "zod";
|
||||
import prisma from "~/server/utils/prisma";
|
||||
|
||||
// Query parameter validation schema
|
||||
const jobsQuerySchema = z.object({
|
||||
page: z
|
||||
.string()
|
||||
.transform((val) => parseInt(val) || 1)
|
||||
.optional(),
|
||||
limit: z
|
||||
.string()
|
||||
.transform((val) => parseInt(val) || 10)
|
||||
.optional(),
|
||||
});
|
||||
|
||||
export default defineEventHandler(async (event) => {
|
||||
try {
|
||||
// Parse and validate query parameters
|
||||
const queryParams = getQuery(event) || {};
|
||||
const params = jobsQuerySchema.parse(queryParams);
|
||||
|
||||
// Build where clause for filtering failed jobs
|
||||
const where = {
|
||||
status: "failed",
|
||||
};
|
||||
|
||||
// Get total count for pagination
|
||||
const total = await prisma.notification_queue.count({ where });
|
||||
|
||||
// Calculate pagination metadata
|
||||
const totalPages = Math.ceil(total / params.limit);
|
||||
|
||||
// Fetch failed jobs with relations
|
||||
const jobs = await prisma.notification_queue.findMany({
|
||||
where,
|
||||
select: {
|
||||
id: true,
|
||||
status: true,
|
||||
scheduled_for: true,
|
||||
attempts: true,
|
||||
max_attempts: true,
|
||||
last_attempt_at: true,
|
||||
error_message: true,
|
||||
created_at: true,
|
||||
notifications: {
|
||||
select: {
|
||||
id: true,
|
||||
title: true,
|
||||
type: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
orderBy: {
|
||||
last_attempt_at: "desc",
|
||||
},
|
||||
skip: (params.page - 1) * params.limit,
|
||||
take: params.limit,
|
||||
});
|
||||
|
||||
// Format jobs for response
|
||||
const formattedJobs = jobs.map((job) => {
|
||||
return {
|
||||
id: job.id,
|
||||
type: job.notifications?.type || "unknown",
|
||||
description: job.notifications?.title || "Failed Notification",
|
||||
status: job.status,
|
||||
attempts: job.attempts,
|
||||
maxAttempts: job.max_attempts,
|
||||
errorType: job.error_message ? getErrorType(job.error_message) : "Unknown Error",
|
||||
errorMessage: job.error_message || "No error message provided",
|
||||
failedAt: job.last_attempt_at ? new Date(job.last_attempt_at).toISOString() : null,
|
||||
nextRetry: "Manual retry required",
|
||||
};
|
||||
});
|
||||
|
||||
return {
|
||||
success: true,
|
||||
data: {
|
||||
jobs: formattedJobs,
|
||||
pagination: {
|
||||
page: params.page,
|
||||
totalPages,
|
||||
totalItems: total,
|
||||
hasMore: params.page < totalPages,
|
||||
},
|
||||
},
|
||||
};
|
||||
} catch (error) {
|
||||
console.error("Error fetching failed jobs:", error);
|
||||
|
||||
if (error.statusCode) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
throw createError({
|
||||
statusCode: 500,
|
||||
statusMessage: "Failed to fetch failed jobs",
|
||||
data: {
|
||||
error: error.message,
|
||||
},
|
||||
});
|
||||
} finally {
|
||||
}
|
||||
});
|
||||
|
||||
// Helper function to extract error type from error message
|
||||
function getErrorType(errorMessage) {
|
||||
if (!errorMessage) return "Unknown Error";
|
||||
|
||||
if (errorMessage.includes("timeout")) return "Timeout Error";
|
||||
if (errorMessage.includes("connect")) return "Connection Error";
|
||||
if (errorMessage.includes("authentication")) return "Authentication Error";
|
||||
if (errorMessage.includes("rate limit")) return "Rate Limit Error";
|
||||
if (errorMessage.includes("validation")) return "Validation Error";
|
||||
if (errorMessage.includes("template")) return "Template Error";
|
||||
|
||||
return "Processing Error";
|
||||
}
|
||||
73
server/api/notifications/queue/retry/stats.get.js
Normal file
73
server/api/notifications/queue/retry/stats.get.js
Normal file
@@ -0,0 +1,73 @@
|
||||
import prisma from "~/server/utils/prisma";
|
||||
|
||||
export default defineEventHandler(async (event) => {
|
||||
try {
|
||||
// Get current date and set to start of day
|
||||
const today = new Date();
|
||||
today.setHours(0, 0, 0, 0);
|
||||
|
||||
// Get yesterday
|
||||
const yesterday = new Date(today);
|
||||
yesterday.setDate(yesterday.getDate() - 1);
|
||||
|
||||
// Query counts for different job statuses
|
||||
const [failed, retrying, recovered, deadLetter] = await Promise.all([
|
||||
// Failed jobs count
|
||||
prisma.notification_queue.count({
|
||||
where: {
|
||||
status: "failed",
|
||||
},
|
||||
}),
|
||||
// Jobs currently being retried (if your system tracks this state)
|
||||
prisma.notification_queue.count({
|
||||
where: {
|
||||
status: "processing",
|
||||
attempts: {
|
||||
gt: 1,
|
||||
},
|
||||
},
|
||||
}),
|
||||
// Recovered jobs (failed but then succeeded)
|
||||
prisma.notification_queue.count({
|
||||
where: {
|
||||
status: "completed",
|
||||
attempts: {
|
||||
gt: 1,
|
||||
},
|
||||
updated_at: {
|
||||
gte: yesterday,
|
||||
},
|
||||
},
|
||||
}),
|
||||
// Dead letter queue (failed and max attempts reached)
|
||||
prisma.notification_queue.count({
|
||||
where: {
|
||||
status: "failed",
|
||||
attempts: {
|
||||
equals: prisma.notification_queue.fields.max_attempts,
|
||||
},
|
||||
},
|
||||
}),
|
||||
]);
|
||||
|
||||
return {
|
||||
success: true,
|
||||
data: {
|
||||
failed,
|
||||
retrying,
|
||||
recovered,
|
||||
deadLetter,
|
||||
},
|
||||
};
|
||||
} catch (error) {
|
||||
console.error("Error fetching retry stats:", error);
|
||||
throw createError({
|
||||
statusCode: 500,
|
||||
statusMessage: "Failed to fetch retry statistics",
|
||||
data: {
|
||||
error: error.message,
|
||||
},
|
||||
});
|
||||
} finally {
|
||||
}
|
||||
});
|
||||
52
server/api/notifications/queue/stats.get.js
Normal file
52
server/api/notifications/queue/stats.get.js
Normal file
@@ -0,0 +1,52 @@
|
||||
import prisma from "~/server/utils/prisma";
|
||||
|
||||
export default defineEventHandler(async (event) => {
|
||||
try {
|
||||
// Get current date and set to start of day
|
||||
const today = new Date();
|
||||
today.setHours(0, 0, 0, 0);
|
||||
|
||||
// Count pending jobs
|
||||
const pending = await prisma.notification_queue.count({
|
||||
where: {
|
||||
status: "queued",
|
||||
},
|
||||
});
|
||||
|
||||
// Count completed jobs today
|
||||
const completed = await prisma.notification_queue.count({
|
||||
where: {
|
||||
status: "completed",
|
||||
updated_at: {
|
||||
gte: today,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
// Count failed jobs
|
||||
const failed = await prisma.notification_queue.count({
|
||||
where: {
|
||||
status: "failed",
|
||||
},
|
||||
});
|
||||
|
||||
return {
|
||||
success: true,
|
||||
data: {
|
||||
pending,
|
||||
completed,
|
||||
failed,
|
||||
},
|
||||
};
|
||||
} catch (error) {
|
||||
console.error("Error fetching queue stats:", error);
|
||||
throw createError({
|
||||
statusCode: 500,
|
||||
statusMessage: "Failed to fetch queue statistics",
|
||||
data: {
|
||||
error: error.message,
|
||||
},
|
||||
});
|
||||
} finally {
|
||||
}
|
||||
});
|
||||
84
server/api/notifications/queue/status.get.js
Normal file
84
server/api/notifications/queue/status.get.js
Normal file
@@ -0,0 +1,84 @@
|
||||
import prisma from "~/server/utils/prisma";
|
||||
|
||||
export default defineEventHandler(async (event) => {
|
||||
try {
|
||||
// Get current user (assuming auth middleware provides this)
|
||||
const user = event.context.user;
|
||||
if (!user || !user.userID) {
|
||||
throw createError({
|
||||
statusCode: 401,
|
||||
statusMessage: "Authentication required",
|
||||
});
|
||||
}
|
||||
|
||||
// Get queue statistics
|
||||
const queueStats = await prisma.notification_queue.groupBy({
|
||||
by: ['status'],
|
||||
_count: {
|
||||
status: true
|
||||
},
|
||||
orderBy: {
|
||||
status: 'asc'
|
||||
}
|
||||
});
|
||||
|
||||
// Get recent queue items
|
||||
const recentItems = await prisma.notification_queue.findMany({
|
||||
include: {
|
||||
notifications: {
|
||||
select: {
|
||||
id: true,
|
||||
title: true,
|
||||
created_at: true,
|
||||
status: true
|
||||
}
|
||||
},
|
||||
notification_recipients: {
|
||||
select: {
|
||||
id: true,
|
||||
email: true,
|
||||
channel_type: true,
|
||||
status: true
|
||||
}
|
||||
}
|
||||
},
|
||||
orderBy: {
|
||||
created_at: 'desc'
|
||||
},
|
||||
take: 20
|
||||
});
|
||||
|
||||
return {
|
||||
success: true,
|
||||
data: {
|
||||
stats: queueStats.reduce((acc, stat) => {
|
||||
acc[stat.status] = stat._count.status;
|
||||
return acc;
|
||||
}, {}),
|
||||
recentItems: recentItems.map(item => ({
|
||||
id: item.id,
|
||||
notificationTitle: item.notifications?.title,
|
||||
recipientEmail: item.notification_recipients?.email,
|
||||
channelType: item.notification_recipients?.channel_type,
|
||||
status: item.status,
|
||||
scheduledFor: item.scheduled_for,
|
||||
createdAt: item.created_at,
|
||||
lastAttempt: item.last_attempt_at,
|
||||
attempts: item.attempts,
|
||||
errorMessage: item.error_message
|
||||
}))
|
||||
}
|
||||
};
|
||||
} catch (error) {
|
||||
console.error('Error fetching queue status:', error);
|
||||
|
||||
throw createError({
|
||||
statusCode: 500,
|
||||
statusMessage: 'Failed to fetch queue status',
|
||||
data: {
|
||||
error: error.message
|
||||
}
|
||||
});
|
||||
} finally {
|
||||
}
|
||||
});
|
||||
Reference in New Issue
Block a user