Skip to content

Email storage and processing

Store and process incoming emails using KV storage and queue systems for support tickets and workflow automation

Store and process incoming emails with comprehensive storage, queue processing, and support ticket automation for streamlined email workflow management.

Store emails in KV

Store emails in a KV namespace for later processing. This example uses mimetext to build replies, which requires the nodejs_compat compatibility flag.

TypeScript
import { EmailMessage } from "cloudflare:email";
import { createMimeMessage } from "mimetext";
interface Env {
EMAIL: SendEmail;
EMAILS: KVNamespace;
SUPPORT_TICKETS: KVNamespace;
}
export default {
async email(message, env, ctx): Promise<void> {
const emailId = `email-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
// Read email content as an ArrayBuffer, then decode to a string
const rawBuffer = await new Response(message.raw).arrayBuffer();
const rawContent = new TextDecoder().decode(rawBuffer);
// Store email metadata and content
const emailData = {
id: emailId,
from: message.from,
to: message.to,
subject: message.headers.get("subject"),
timestamp: new Date().toISOString(),
size: message.rawSize,
rawContent: rawContent,
processed: false,
};
await env.EMAILS.put(emailId, JSON.stringify(emailData));
// Process based on recipient
if (message.to.includes("support@")) {
await handleSupportEmail(message, env, emailId);
} else {
await message.forward("general@example.com");
}
},
};
async function handleSupportEmail(message, env, emailId) {
const ticketId = `TICKET-${Date.now()}`;
// Create support ticket
const ticketData = {
id: ticketId,
emailId: emailId,
from: message.from,
subject: message.headers.get("subject"),
status: "open",
priority: "normal",
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
};
await env.SUPPORT_TICKETS.put(ticketId, JSON.stringify(ticketData));
// Send threaded auto-reply with ticket number
const messageId = message.headers.get("Message-ID");
const reply = createMimeMessage();
if (messageId) {
reply.setHeader("In-Reply-To", messageId);
reply.setHeader("References", messageId);
}
reply.setSender(message.to);
reply.setRecipient(message.from);
reply.setSubject(`Support Ticket Created: ${ticketId}`);
reply.addMessage({
contentType: "text/html",
data: `
<h1>Support Ticket Created</h1>
<p>Your support request has been received and assigned ticket number: <strong>${ticketId}</strong></p>
<p>We will respond within 2-4 hours during business hours.</p>
<hr>
<p><em>Original subject: ${message.headers.get("subject")}</em></p>
`,
});
await message.reply(
new EmailMessage(message.to, message.from, reply.asRaw()),
);
// Forward to support team
await message.forward("support-team@example.com");
}

Queue-based processing

Process emails asynchronously using Cloudflare Queues. The email() handler sends a threaded acknowledgement with message.reply() and enqueues the message for later processing. The queue consumer runs in a separate invocation where the original EmailMessage is no longer available, so any follow-up emails it sends use env.EMAIL.send() instead.

TypeScript
import { EmailMessage } from "cloudflare:email";
import { createMimeMessage } from "mimetext";
interface Env {
EMAIL: SendEmail;
EMAIL_QUEUE: Queue;
EMAIL_STORAGE: KVNamespace;
EMAIL_ANALYTICS: AnalyticsEngine;
SUPPORT_TICKETS?: KVNamespace;
SALES_LEADS?: KVNamespace;
}
interface EmailQueueMessage {
emailId: string;
from: string;
to: string;
subject: string;
timestamp: string;
priority: "low" | "normal" | "high" | "urgent";
category: string;
}
export default {
// Handle incoming emails
async email(message, env, ctx): Promise<void> {
const emailId = `email-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
// Read raw email content as an ArrayBuffer, then decode to a string
const rawBuffer = await new Response(message.raw).arrayBuffer();
const rawContent = new TextDecoder().decode(rawBuffer);
// Store email with metadata
const emailData = {
id: emailId,
from: message.from,
to: message.to,
subject: message.headers.get("subject"),
timestamp: new Date().toISOString(),
size: message.rawSize,
rawContent: rawContent,
processed: false,
status: "queued",
};
await env.EMAIL_STORAGE.put(emailId, JSON.stringify(emailData));
// Determine priority and category
const priority = determinePriority(message);
const category = determineCategory(message);
// Queue email for processing
const queueMessage: EmailQueueMessage = {
emailId,
from: message.from,
to: message.to,
subject: message.headers.get("subject") || "",
timestamp: new Date().toISOString(),
priority,
category,
};
await env.EMAIL_QUEUE.send(queueMessage, {
delaySeconds: priority === "urgent" ? 0 : priority === "high" ? 5 : 30,
});
// Send threaded immediate auto-reply
const messageId = message.headers.get("Message-ID");
const reply = createMimeMessage();
if (messageId) {
reply.setHeader("In-Reply-To", messageId);
reply.setHeader("References", messageId);
}
reply.setSender(message.to);
reply.setRecipient(message.from);
reply.setSubject(`Re: ${message.headers.get("subject")}`);
reply.addMessage({
contentType: "text/plain",
data: "Thank you for your message. It has been queued for processing.",
});
await message.reply(
new EmailMessage(message.to, message.from, reply.asRaw()),
);
},
// Process queued emails
async queue(batch, env, ctx): Promise<void> {
console.log(`Processing ${batch.messages.length} queued emails`);
for (const message of batch.messages) {
try {
const emailData = message.body as EmailQueueMessage;
console.log(
`Processing ${emailData.category} email from ${emailData.from}`,
);
// Get stored email content
const storedEmailData = await env.EMAIL_STORAGE.get(emailData.emailId);
if (!storedEmailData) {
console.error(`Email data not found: ${emailData.emailId}`);
message.ack();
continue;
}
const emailContent = JSON.parse(storedEmailData);
// Process based on category
let processResult;
switch (emailData.category) {
case "support":
processResult = await processSupport(emailData, emailContent, env);
break;
case "sales":
processResult = await processSales(emailData, emailContent, env);
break;
case "billing":
processResult = await processBilling(emailData, emailContent, env);
break;
default:
processResult = await processGeneral(emailData, emailContent, env);
}
// Update email status
emailContent.processed = true;
emailContent.status = "completed";
emailContent.processedAt = new Date().toISOString();
emailContent.processingResult = processResult;
await env.EMAIL_STORAGE.put(
emailData.emailId,
JSON.stringify(emailContent),
);
// Track processing metrics
env.EMAIL_ANALYTICS?.writeDataPoint({
blobs: [
"email_processed",
emailData.from,
emailData.to,
emailData.category,
emailData.priority,
],
doubles: [1, emailContent.size],
indexes: [
`category:${emailData.category}`,
`priority:${emailData.priority}`,
],
});
message.ack();
} catch (error) {
console.error("Failed to process email:", error);
message.retry();
}
}
},
};
function determinePriority(message): "low" | "normal" | "high" | "urgent" {
const subject = (message.headers.get("subject") || "").toLowerCase();
const to = message.to.toLowerCase();
if (subject.includes("urgent") || subject.includes("emergency")) {
return "urgent";
}
if (
to.includes("support") &&
(subject.includes("down") || subject.includes("error"))
) {
return "high";
}
if (to.includes("sales") || to.includes("billing")) {
return "high";
}
return "normal";
}
function determineCategory(message): string {
const to = message.to.toLowerCase();
const subject = (message.headers.get("subject") || "").toLowerCase();
if (
to.includes("support") ||
subject.includes("help") ||
subject.includes("issue")
) {
return "support";
}
if (
to.includes("sales") ||
subject.includes("quote") ||
subject.includes("pricing")
) {
return "sales";
}
if (
to.includes("billing") ||
subject.includes("invoice") ||
subject.includes("payment")
) {
return "billing";
}
return "general";
}
async function processSupport(
emailData: EmailQueueMessage,
emailContent: any,
env: Env,
) {
const ticketId = `TICKET-${Date.now()}`;
// Create support ticket
const ticketData = {
id: ticketId,
emailId: emailData.emailId,
from: emailData.from,
subject: emailData.subject,
priority: emailData.priority,
status: "open",
category: "support",
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
content: emailContent.rawContent.substring(0, 5000), // Limit stored content
};
await env.SUPPORT_TICKETS?.put(ticketId, JSON.stringify(ticketData));
// Send confirmation email
await env.EMAIL.send({
to: emailData.from,
from: "support@yourdomain.com",
subject: `Support Ticket Created: ${ticketId}`,
html: `
<h2>Support Ticket Created</h2>
<p>Your support request has been received and assigned ticket number: <strong>${ticketId}</strong></p>
<p><strong>Priority:</strong> ${emailData.priority}</p>
<p>We will respond based on the priority level:</p>
<ul>
<li><strong>Urgent:</strong> Within 1 hour</li>
<li><strong>High:</strong> Within 4 hours</li>
<li><strong>Normal:</strong> Within 24 hours</li>
</ul>
<hr>
<p><em>Original subject: ${emailData.subject}</em></p>
`,
});
return { ticketId, action: "ticket_created" };
}
async function processSales(
emailData: EmailQueueMessage,
emailContent: any,
env: Env,
) {
// Create sales lead
const leadId = `LEAD-${Date.now()}`;
const leadData = {
id: leadId,
emailId: emailData.emailId,
contact: emailData.from,
subject: emailData.subject,
priority: emailData.priority,
status: "new",
source: "email",
createdAt: new Date().toISOString(),
};
await env.SALES_LEADS?.put(leadId, JSON.stringify(leadData));
// Send sales response
await env.EMAIL.send({
to: emailData.from,
from: "sales@yourdomain.com",
subject: `Re: ${emailData.subject}`,
html: `
<h2>Thank you for your interest!</h2>
<p>We've received your sales inquiry and assigned it reference: <strong>${leadId}</strong></p>
<p>A member of our sales team will contact you within 24 hours.</p>
<p>Best regards,<br>Sales Team</p>
`,
});
return { leadId, action: "lead_created" };
}
async function processBilling(
emailData: EmailQueueMessage,
emailContent: any,
env: Env,
) {
// Handle billing inquiries
await env.EMAIL.send({
to: emailData.from,
from: "billing@yourdomain.com",
subject: `Re: ${emailData.subject}`,
html: `
<h2>Billing Inquiry Received</h2>
<p>Thank you for contacting our billing department.</p>
<p>Your inquiry has been forwarded to our billing specialists who will respond within 2 business hours.</p>
<p>For immediate assistance, please call: +1-800-555-0123</p>
`,
});
return { action: "billing_forwarded" };
}
async function processGeneral(
emailData: EmailQueueMessage,
emailContent: any,
env: Env,
) {
// Handle general inquiries
await env.EMAIL.send({
to: emailData.from,
from: "info@yourdomain.com",
subject: `Re: ${emailData.subject}`,
text: `
Thank you for contacting us.
We have received your message and will respond within 48 hours.
For urgent matters, please contact our support team at support@yourdomain.com.
Best regards,
Customer Service Team
`,
});
return { action: "general_acknowledged" };
}

Next steps