TITLE: E-commerce Order Fulfillment Route Handler (Upstash, Next.js) DESCRIPTION: This TypeScript code defines a Next.js route handler that uses Upstash Workflow to automate e-commerce order fulfillment. It receives an order payload, creates an order ID, verifies stock availability, processes payment, dispatches the order, and sends confirmation and dispatch notifications. It depends on the `@upstash/workflow/nextjs` package and several utility functions. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/eCommerceOrderFulfillment.mdx#_snippet_0 LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs" import { createOrderId, checkStockAvailability, processPayment, dispatchOrder, sendOrderConfirmation, sendDispatchNotification, } from "./utils" type OrderPayload = { userId: string items: { productId: string, quantity: number }[] } export const { POST } = serve<OrderPayload>(async (context) => { const { userId, items } = context.requestPayload; // Step 1: Create Order Id const orderId = await context.run("create-order-id", async () => { return await createOrderId(userId); }); // Step 2: Verify stock availability const stockAvailable = await context.run("check-stock", async () => { return await checkStockAvailability(items); }); if (!stockAvailable) { console.warn("Some items are out of stock"); return; }; // Step 3: Process payment await context.run("process-payment", async () => { return await processPayment(orderId) }) // Step 4: Dispatch the order await context.run("dispatch-order", async () => { return await dispatchOrder(orderId, items) }) // Step 5: Send order confirmation email await context.run("send-confirmation", async () => { return await sendOrderConfirmation(userId, orderId) }) // Step 6: Send dispatch notification await context.run("send-dispatch-notification", async () => { return await sendDispatchNotification(userId, orderId) }) }) ``` ---------------------------------------- TITLE: E-commerce Order Fulfillment API (Upstash, FastAPI) DESCRIPTION: This Python code defines a FastAPI endpoint that uses Upstash Workflow to manage e-commerce order fulfillment. It receives an order payload with user ID and items, then creates an order ID, checks stock, processes payment, dispatches the order, and sends notifications. It relies on the `upstash_workflow.fastapi` library and utility functions for core operations. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/eCommerceOrderFulfillment.mdx#_snippet_1 LANGUAGE: python CODE: ``` from fastapi import FastAPI from typing import List, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext from utils import ( create_order_id, check_stock_availability, process_payment, dispatch_order, send_order_confirmation, send_dispatch_notification, ) app = FastAPI() serve = Serve(app) class OrderItem(TypedDict): product_id: str quantity: int class OrderPayload(TypedDict): user_id: str items: List[OrderItem] @serve.post("/order-fulfillment") async def order_fulfillment(context: AsyncWorkflowContext[OrderPayload]) -> None: # Get the order payload from the request payload = context.request_payload user_id = payload["user_id"] items = payload["items"] # Step 1: Create Order Id async def _create_order_id(): return await create_order_id(user_id) order_id: str = await context.run("create-order-id", _create_order_id) # Step 2: Verify stock availability async def _check_stock(): return await check_stock_availability(items) stock_available: bool = await context.run("check-stock", _check_stock) if not stock_available: print("Some items are out of stock") return # Step 3: Process payment async def _process_payment(): return await process_payment(order_id) await context.run("process-payment", _process_payment) # Step 4: Dispatch the order async def _dispatch_order(): return await dispatch_order(order_id, items) await context.run("dispatch-order", _dispatch_order) # Step 5: Send order confirmation email async def _send_confirmation(): return await send_order_confirmation(user_id, order_id) await context.run("send-confirmation", _send_confirmation) # Step 6: Send dispatch notification async def _send_dispatch_notification(): return await send_dispatch_notification(user_id, order_id) await context.run("send-dispatch-notification", _send_dispatch_notification) ``` ---------------------------------------- TITLE: Payment Retry Loop (Python) DESCRIPTION: This Python code implements a loop that attempts to charge a customer up to 3 times. If the payment fails, it waits for 24 hours before retrying. If the payment succeeds, it executes logic to unsuspend the user, send an invoice email, and then ends the workflow. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#_snippet_5 LANGUAGE: python CODE: ``` for i in range(3): # attempt to charge the customer if not result: # Wait for a day await context.sleep("wait for retry", 24 * 60 * 60) else: # Payment succeeded # Unsuspend user, send invoice email # end the workflow: return ``` ---------------------------------------- TITLE: Webhook Handler with Upstash Workflow in FastAPI DESCRIPTION: This Python snippet defines a FastAPI endpoint that serves as a webhook handler for authentication providers using Upstash Workflow. It mirrors the functionality of the TypeScript example, managing user creation, Stripe integration, and email notifications. The workflow leverages `context.run` to execute asynchronous operations and `context.sleep` to pause execution. Type hints are used for data validation and clarity. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#_snippet_1 LANGUAGE: python CODE: ``` from fastapi import FastAPI from typing import Dict, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) class UserCreatedPayload(TypedDict): name: str email: str class UserStats(TypedDict): total_problems_solved: int most_interested_topic: str async def create_user_in_database(name: str, email: str) -> Dict[str, str]: print("Creating a user in the database:", name, email) return {"userid": "12345"} async def create_new_user_in_stripe(email: str) -> None: # Implement logic to create a new user in Stripe print("Creating a user in Stripe for", email) async def start_trial_in_stripe(email: str) -> None: # Implement logic to start a trial in Stripe print("Starting a trial of 14 days in Stripe for", email) async def get_user_stats(userid: str) -> UserStats: # Implement logic to get user stats print("Getting user stats for", userid) return {"total_problems_solved": 10000, "most_interested_topic": "Python"} async def check_upgraded_plan(email: str) -> bool: # Implement logic to check if the user has upgraded the plan print("Checking if the user has upgraded the plan", email) return False async def send_email(email: str, content: str) -> None: # Implement logic to send an email print("Sending email to", email, content) async def send_problem_solved_email( context: AsyncWorkflowContext[UserCreatedPayload], email: str, stats: UserStats ) -> None: if stats["total_problems_solved"] == 0: async def _send_no_answers_email() -> None: await send_email( email, "Hey, you haven't solved any questions in the last 7 days..." ) await context.run("send no answers email", _send_no_answers_email) else: async def _send_stats_email() -> None: await send_email( email, f"You have solved {stats['total_problems_solved']} problems in the last 7 days. Keep it up!", ) await context.run("send stats email", _send_stats_email) @serve.post("/auth-provider-webhook") async def auth_provider_webhook( context: AsyncWorkflowContext[UserCreatedPayload], ) -> None: payload = context.request_payload name = payload["name"] email = payload["email"] async def _sync_user() -> str: return await create_user_in_database(name, email) result = await context.run("sync user", _sync_user) userid = result["userid"] async def _create_new_user_in_stripe() -> None: await create_new_user_in_stripe(email) await context.run("create new user in stripe", _create_new_user_in_stripe) async def _start_trial_in_stripe() -> None: await start_trial_in_stripe(email) await context.run("start trial in Stripe", _start_trial_in_stripe) async def _send_welcome_email() -> None: await send_email( email, "Welcome to our platform!, You have 14 days of free trial." ) await context.run("send welcome email", _send_welcome_email) await context.sleep("wait", 7 * 24 * 60 * 60) # get user stats and send email with them async def _get_user_stats() -> UserStats: return await get_user_stats(userid) stats: UserStats = await context.run("get user stats", _get_user_stats) await send_problem_solved_email(context, email, stats) # wait until there are two days to the end of trial period and check upgrade status await context.sleep("wait for trial warning", 5 * 24 * 60 * 60) async def _check_upgraded_plan() -> bool: return await check_upgraded_plan(email) is_upgraded = await context.run("check upgraded plan", _check_upgraded_plan) # end the workflow if upgraded if is_upgraded: return async def _send_trial_warning_email() -> None: await send_email( email, "Your trial is about to end in 2 days. Please upgrade your plan to keep using our platform.", ) await context.run("send trial warning email", _send_trial_warning_email) await context.sleep("wait for trial end", 2 * 24 * 60 * 60) async def _send_trial_end_email() -> None: await send_email( email, "Your trial has ended. Please upgrade your plan to keep using our platform.", ) await context.run("send trial end email", _send_trial_end_email) ``` ---------------------------------------- TITLE: Evaluator-Optimizer Workflow with Upstash Agents (Next.js) DESCRIPTION: This snippet defines a Next.js API route using Upstash Workflow to implement an evaluator-optimizer workflow. It uses two agents: a generator to create content based on a prompt, and an evaluator to provide feedback and corrections. The generator is called iteratively until the evaluator approves the generated text. The workflow repeats up to 3 times or until the evaluator returns a "PASS" result. It uses the `@upstash/workflow/nextjs` package to integrate with Next.js, and leverages OpenAI's `gpt-3.5-turbo` model. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/patterns/evaluator-optimizer.mdx#_snippet_0 LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-3.5-turbo'); // Generator agent that generates content const generator = context.agents.agent({ model, name: 'generator', maxSteps: 1, background: 'You are an agent that generates text based on a prompt.', tools: {} }); // Evaluator agent that evaluates the text and gives corrections const evaluator = context.agents.agent({ model, name: 'evaluator', maxSteps: 1, background: 'You are an agent that evaluates the generated text and provides corrections if needed.', tools: {} }); let generatedText = ''; let evaluationResult = ''; const prompt = "Generate a short explanation of quantum mechanics."; let nextPrompt = prompt; for (let i = 0; i < 3; i++) { // Construct prompt for generator: // - If there's no evaluation, use the original prompt // - If there's an evaluation, provide the prompt, the last generated text, and the evaluator's feedback if (evaluationResult && evaluationResult !== "PASS") { nextPrompt = `Please revise the answer to the question "${prompt}". Previous answer was: "${generatedText}", which received this feedback: "${evaluationResult}".`; } // Generate content const generatedResponse = await context.agents.task({ agent: generator, prompt: nextPrompt }).run(); generatedText = generatedResponse.text // Evaluate the generated content const evaluationResponse = await context.agents.task({ agent: evaluator, prompt: `Evaluate and provide feedback for the following text: ${generatedText}` }).run(); evaluationResult = evaluationResponse.text // If the evaluator accepts the content (i.e., "PASS"), stop if (evaluationResult.includes("PASS")) { break; } } console.log(generatedText); }); ``` ---------------------------------------- TITLE: Defining Upstash Workflow Route (Next.js) DESCRIPTION: This TypeScript code defines a Next.js API route that implements a payment retry workflow using Upstash Workflow. It handles charging a customer, retrying on failure with a delay, unsuspending the user if the payment succeeds, sending an invoice email, and suspending the user if all retries fail. It depends on the `@upstash/workflow/nextjs` package. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#_snippet_0 LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; type ChargeUserPayload = { email: string; }; export const { POST } = serve<ChargeUserPayload>(async (context) => { const { email } = context.requestPayload; for (let i = 0; i < 3; i++) { // attempt to charge the user const result = await context.run("charge customer", async () => { try { return await chargeCustomer(i + 1), } catch (e) { console.error(e); return } }); if (!result) { // Wait for a day await context.sleep("wait for retry", 24 * 60 * 60); } else { // Unsuspend User const isSuspended = await context.run("check suspension", async () => { return await checkSuspension(email); }); if (isSuspended) { await context.run("unsuspend user", async () => { await unsuspendUser(email); }); } // send invoice email await context.run("send invoice email", async () => { await sendEmail( email, `Payment successfull. Incoice: ${result.invoiceId}, Total cost: $${result.totalCost}` ); }); // by retuning, we end the workflow run return; } } // suspend user if the user isn't suspended const isSuspended = await context.run("check suspension", async () => { return await checkSuspension(email); }); if (!isSuspended) { await context.run("suspend user", async () => { await suspendUser(email); }); await context.run("send suspended email", async () => { await sendEmail( email, "Your account has been suspended due to payment failure. Please update your payment method." ); }); } }); async function sendEmail(email: string, content: string) { // Implement the logic to send an email console.log("Sending email to", email, "with content:", content); } async function checkSuspension(email: string) { // Implement the logic to check if the user is suspended console.log("Checking suspension status for", email); return true; } async function suspendUser(email: string) { // Implement the logic to suspend the user console.log("Suspending the user", email); } async function unsuspendUser(email: string) { // Implement the logic to unsuspend the user console.log("Unsuspending the user", email); } async function chargeCustomer(attempt: number) { // Implement the logic to charge the customer console.log("Charging the customer"); if (attempt <= 2) { throw new Error("Payment failed"); } return { invoiceId: "INV123", totalCost: 100, } as const; } ``` ---------------------------------------- TITLE: Prompt Chaining with Upstash Workflow and Langchain (TS) DESCRIPTION: This code defines a prompt chaining workflow using Upstash Workflow, Langchain, and OpenAI's GPT-3.5-turbo model. It creates three agents: one to list famous physicists, another to describe their work using Wikipedia, and a third to summarize the findings. The output of each agent serves as the input for the subsequent agent, creating a chain of actions. It uses `serve` from `@upstash/workflow/nextjs` to define the API endpoint. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/patterns/prompt-chaining.mdx#_snippet_0 LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run"; export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-3.5-turbo'); const agent1 = context.agents.agent({ model, name: 'firstAgent', maxSteps: 1, background: 'You are an agent that lists famous physicists.', tools: {} }); const agent2 = context.agents.agent({ model, name: 'secondAgent', // set to 2 as this agent will first request tools // and then summarize them: maxSteps: 2, background: 'You are an agent that describes the work of' + ' the physicists listed in the previous prompt.', tools: { wikiTool: new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) } }); const agent3 = context.agents.agent({ model, name: 'thirdAgent', maxSteps: 1, background: 'You are an agent that summarizes the ' + 'works of the physicists mentioned previously.', tools: {} }); // Chaining agents const firstOutput = await context.agents.task({ agent: agent1, prompt: "List 3 famous physicists." }).run(); const secondOutput = await context.agents.task({ agent: agent2, prompt: `Describe the work of: ${firstOutput.text}` }).run(); const { text } = await context.agents.task({ agent: agent3, prompt: `Summarize: ${secondOutput.text}` }).run(); console.log(text); }); ``` ---------------------------------------- TITLE: AI Generation Workflow (FastAPI) DESCRIPTION: This Python code defines a FastAPI endpoint that uses Upstash Workflow to process a dataset with OpenAI. It follows the same steps as the TypeScript version: downloading the dataset, splitting it into chunks, processing each chunk with GPT-4 via OpenAI API, aggregating the results, and generating and sending a final report. It uses `context.run` for asynchronous task execution and `context.call` to make external API calls. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/allInOne.mdx#_snippet_1 LANGUAGE: python CODE: ``` from fastapi import FastAPI import json import os from typing import Dict, List, Any, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext, CallResponse from utils import ( aggregate_results, generate_report, send_report, get_dataset_url, split_into_chunks, ) app = FastAPI() serve = Serve(app) class RequestPayload(TypedDict): dataset_id: str user_id: str @serve.post("/ai-generation") async def ai_generation(context: AsyncWorkflowContext[RequestPayload]) -> None: request = context.request_payload dataset_id = request["dataset_id"] user_id = request["user_id"] # Step 1: Download the dataset async def _get_dataset_url() -> str: return await get_dataset_url(dataset_id) dataset_url = await context.run("get-dataset-url", _get_dataset_url) # HTTP request with much longer timeout (2hrs) response: CallResponse[Any] = await context.call( "download-dataset", url=dataset_url, method="GET" ) dataset = response.body # Step 2: Process data in chunks using OpenAI chunk_size = 1000 chunks = split_into_chunks(dataset, chunk_size) processed_chunks: List[str] = [] for i, chunk in enumerate(chunks): openai_response: CallResponse[Dict[str, str]] = await context.call( f"process-chunk-{i}", url="https://api.openai.com/v1/chat/completions", method="POST", headers={ "authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}", }, body={ "model": "gpt-4", "messages": [ { "role": "system", "content": "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.", }, { "role": "user", "content": f"Analyze this data chunk: {json.dumps(chunk)}", }, ], "max_tokens": 150, }, ) processed_chunks.append( openai_response.body["choices"][0]["message"]["content"] ) # Every 10 chunks, we'll aggregate intermediate results if i % 10 == 9 or i == len(chunks) - 1: async def _aggregate_results() -> None: await aggregate_results(processed_chunks) processed_chunks.clear() await context.run(f"aggregate-results{i}", _aggregate_results) # Step 3: Generate and send data report async def _generate_report() -> Any: return await generate_report(dataset_id) report = await context.run("generate-report", _generate_report) async def _send_report() -> None: await send_report(report, user_id) await context.run("send-report", _send_report) ``` ---------------------------------------- TITLE: Customer Onboarding Workflow Handler - TypeScript DESCRIPTION: This TypeScript code defines a Next.js API route that handles the customer onboarding workflow. It receives an email address, sends a welcome email, waits for 3 days, and then periodically checks the user's state, sending appropriate emails based on their activity. It relies on the `@upstash/workflow/nextjs` package for workflow management and `sendEmail` and `getUserState` functions for external actions. The initial data for the workflow consists of the user's email. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/customerOnboarding.mdx#_snippet_0 LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs" type InitialData = { email: string } export const { POST } = serve<InitialData>(async (context) => { const { email } = context.requestPayload await context.run("new-signup", async () => { await sendEmail("Welcome to the platform", email) }) await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3) while (true) { const state = await context.run("check-user-state", async () => { return await getUserState() }) if (state === "non-active") { await context.run("send-email-non-active", async () => { await sendEmail("Email to non-active users", email) }) } else if (state === "active") { await context.run("send-email-active", async () => { await sendEmail("Send newsletter to active users", email) }) } await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30) } }) async function sendEmail(message: string, email: string) { // Implement email sending logic here console.log(`Sending ${message} email to ${email}`) } type UserState = "non-active" | "active" const getUserState = async (): Promise<UserState> => { // Implement user state logic here return "non-active" } ``` ---------------------------------------- TITLE: Ensure Idempotency in context.run - Python DESCRIPTION: This Python code snippet highlights the need for idempotency within `context.run` in Upstash Workflow. The `some_work(input)` function must be idempotent to ensure the workflow behaves consistently even if retried. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#_snippet_15 LANGUAGE: python CODE: ``` @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step_1() -> None: return await some_work(input) await context.run("step-1", _step_1) ``` ---------------------------------------- TITLE: Define a Workflow Endpoint with Upstash Agents API in Next.js DESCRIPTION: Defines a Next.js route that serves as an Upstash Workflow endpoint. It uses the `@upstash/workflow/nextjs` library's `serve` function to handle incoming requests. The code defines an agent using the `context.agents.agent` function with a tool for communicating inner thoughts. The prompt from the request payload is passed to the agent, and the final response is logged and returned. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/getting-started.mdx#_snippet_5 LANGUAGE: typescript CODE: ``` import { z } from "zod"; import { tool } from "ai"; import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve<{ prompt: string }>(async (context) => { const prompt = context.requestPayload.prompt const model = context.agents.openai('gpt-3.5-turbo') const communicatorAgent = context.agents.agent({ model, name: 'communicatorAgent', maxSteps: 2, tools: { communicationTool: tool({ description: 'A tool for informing the caller about your inner thoughts', parameters: z.object({ message: z.string() }), execute: async ({ message }) => { console.log("Inner thought:", message) return "success" } }) }, background: 'Answer questions directed towards you.' + ' You have access to a tool to share your inner thoughts' + ' with the caller. Utilize this tool at least once before' + ' answering the prompt. In your inner thougts, briefly' + ' explain what you will talk about and why. Keep your' + ' answers brief.', }) const task = context.agents.task({ agent: communicatorAgent, prompt }) const { text } = await task.run() console.log("Final response:", text); }) ``` ---------------------------------------- TITLE: Defining Upstash Workflow Route (FastAPI) DESCRIPTION: This Python code defines a FastAPI endpoint that implements a payment retry workflow using Upstash Workflow. It leverages `upstash_workflow.fastapi.Serve` to integrate with FastAPI. The workflow includes charging a customer, retrying upon failure with a delay, unsuspending the user if the payment succeeds, sending an invoice email, and suspending the user if all retries fail. It uses `dataclasses`, `typing` and `fastapi`. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#_snippet_1 LANGUAGE: python CODE: ``` from fastapi import FastAPI from typing import TypedDict, Optional from dataclasses import dataclass from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @dataclass class ChargeResult: invoice_id: str total_cost: float class ChargeUserPayload(TypedDict): email: str async def send_email(email: str, content: str) -> None: # Implement the logic to send an email print("Sending email to", email, "with content:", content) async def check_suspension(email: str) -> bool: # Implement the logic to check if the user is suspended print("Checking suspension status for", email) return True async def suspend_user(email: str) -> None: # Implement the logic to suspend the user print("Suspending the user", email) async def unsuspend_user(email: str) -> None: # Implement the logic to unsuspend the user print("Unsuspending the user", email) async def charge_customer(attempt: int) -> Optional[ChargeResult]: # Implement the logic to charge the customer print("Charging the customer") if attempt <= 2: raise Exception("Payment failed") return ChargeResult(invoice_id="INV123", total_cost=100) @serve.post("/payment-retries") async def payment_retries(context: AsyncWorkflowContext[ChargeUserPayload]) -> None: email = context.request_payload["email"] async def _check_suspension() -> bool: return await check_suspension(email) for i in range(3): # attempt to charge the user async def _charge_customer() -> Optional[ChargeResult]: try: return await charge_customer(i + 1) except Exception as e: print(f"Error: {e}") return None result = await context.run("charge customer", _charge_customer) if not result: # Wait for a day await context.sleep("wait for retry", 24 * 60 * 60) else: # Unsuspend User is_suspended = await context.run("check suspension", _check_suspension) if is_suspended: async def _unsuspend_user() -> None: await unsuspend_user(email) await context.run("unsuspend user", _unsuspend_user) # send invoice email async def _send_invoice_email() -> None: await send_email( email, f"Payment successful. Invoice: {result.invoice_id}, Total cost: ${result.total_cost}", ) await context.run("send invoice email", _send_invoice_email) # by returning, we end the workflow run return # suspend user if the user isn't suspended is_suspended = await context.run("check suspension", _check_suspension) if not is_suspended: async def _suspend_user() -> None: await suspend_user(email) await context.run("suspend user", _suspend_user) async def _send_suspended_email() -> None: await send_email( email, "Your account has been suspended due to payment failure. Please update your payment method.", ) await context.run("send suspended email", _send_suspended_email) ``` ---------------------------------------- TITLE: Triggering Workflow Run with Client (TypeScript) DESCRIPTION: This snippet demonstrates how to start a workflow using the `client.trigger` method from the `@upstash/workflow` package. It initializes a workflow client with a QStash token and then uses the `trigger` method to send a request to the workflow endpoint. The `workflowRunId` is returned in the result. The function accepts optional parameters for body, headers, a specific `workflowRunId` to assign, and retries for the initial request. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/start.mdx#_snippet_0 LANGUAGE: TypeScript CODE: ``` // Using the workflow client import { Client } from "@upstash/workflow"; const client = new Client({ token: "<QSTASH_TOKEN>" }); const { workflowRunId } = await client.trigger({ url: "https://<YOUR_WORKFLOW_ENDPOINT>/<YOUR-WORKFLOW-ROUTE>", body: "hello there!", // Optional body headers: { ... }, // Optional headers workflowRunId: "my-workflow", // Optional workflow run ID retries: 3 // Optional retries for the initial request }); ``` ---------------------------------------- TITLE: Periodic State Check and Email Sending - Python DESCRIPTION: This Python snippet implements a periodic state check within an infinite loop. It retrieves the user state using `get_user_state`, and sends different emails depending on whether the user is 'active' or 'non-active'. The workflow sleeps for one month (60 * 60 * 24 * 30 seconds) after each state check. Asynchronous functions are defined inline using `async def` to execute tasks. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/customerOnboarding.mdx#_snippet_7 LANGUAGE: python CODE: ``` while True: async def _check_user_state() -> UserState: return await get_user_state() state: UserState = await context.run("check-user-state", _check_user_state) if state == "non-active": async def _send_email_non_active() -> None: await send_email("Email to non-active users", email) await context.run("send-email-non-active", _send_email_non_active) else: async def _send_email_active() -> None: await send_email("Send newsletter to active users", email) await context.run("send-email-active", _send_email_active) await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30) ``` ---------------------------------------- TITLE: Send Welcome Email - Python DESCRIPTION: This Python snippet sends a welcome email to the user using the `send_email` function within an Upstash workflow. An asynchronous function `_send_welcome_email` is defined, and then executed using `context.run`, passing the user's email and welcome message. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#_snippet_9 LANGUAGE: python CODE: ``` async def _send_welcome_email() -> None: await send_email( email, "Welcome to our platform!, You have 14 days of free trial." ) await context.run("send welcome email", _send_welcome_email) ``` ---------------------------------------- TITLE: Trigger Workflow with TypeScript SDK DESCRIPTION: Demonstrates how to trigger the workflow from a TypeScript application using the Upstash Workflow SDK. It initializes a `Client` instance and calls the `trigger` method, passing the workflow endpoint URL, optional body, headers, workflow run ID, and retry settings. SOURCE: https://github.com/upstash/docs/blob/main/workflow/getstarted.mdx#_snippet_4 LANGUAGE: typescript CODE: ``` import { Client } from "@upstash/workflow"; const client = new Client({ token: "<QSTASH_TOKEN>" }); const { workflowRunId } = await client.trigger({ url: "https://<YOUR_WORKFLOW_ENDPOINT>/<YOUR-WORKFLOW-ROUTE>", body: "hello there!", // Optional body headers: { ... }, // Optional headers workflowRunId: "my-workflow", // Optional workflow run ID retries: 3 // Optional retries for the initial request }); ``` ---------------------------------------- TITLE: Waiting for an Event using context.waitForEvent TypeScript DESCRIPTION: This snippet demonstrates how to use `context.waitForEvent` to pause a workflow until a specific event occurs. It takes an event description, event ID, and an optional timeout in seconds. The workflow will resume when the event is received or when the timeout expires. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/events.mdx#_snippet_0 LANGUAGE: typescript CODE: ``` const { eventData, timeout } = await context.waitForEvent( "description of event", "event-id", { timeout: timeoutInSeconds, } ); ``` ---------------------------------------- TITLE: Secure Workflow with QStash Receiver (TypeScript) DESCRIPTION: This TypeScript code snippet shows how to integrate QStash's `Receiver` into an Upstash Workflow to verify requests. It imports `Receiver` from `@upstash/qstash` and `serve` from `@upstash/workflow/nextjs`, and configures the receiver with the current and next signing keys from environment variables. This ensures that only requests with a valid QStash signature are processed. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/security.mdx#_snippet_2 LANGUAGE: typescript CODE: ``` import { Receiver } from "@upstash/qstash"; import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { // Your workflow steps... }, { receiver: new Receiver({ currentSigningKey: process.env.QSTASH_CURRENT_SIGNING_KEY, nextSigningKey: process.env.QSTASH_NEXT_SIGNING_KEY, }), } ); ``` ---------------------------------------- TITLE: Trigger Workflow Run with Upstash Client (TS) DESCRIPTION: This snippet demonstrates how to trigger a workflow run using the Upstash Workflow Client. It includes optional parameters for the request body, headers, workflow run ID, retries, and flow control. A QSTASH_TOKEN is required for authentication. The triggered workflowRunId is logged to the console. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/client.mdx#_snippet_0 LANGUAGE: typescript CODE: ``` import { Client } from "@upstash/workflow"; const client = new Client({ token: "<QSTASH_TOKEN>" }) const { workflowRunId } = await client.trigger({ url: "https://<YOUR_WORKFLOW_ENDPOINT>/<YOUR-WORKFLOW-ROUTE>", body: "hello there!", // optional body headers: { ... }, // optional headers workflowRunId: "my-workflow", // optional workflow run id retries: 3 // optional retries in the initial request flowControl: { // optional flow control key: "USER_GIVEN_KEY", ratePerSecond: 10, parallelism: 5 } }) console.log(workflowRunId) // prints wfr_my-workflow ``` ---------------------------------------- TITLE: Verifying Stock (Upstash Workflow) DESCRIPTION: This code snippet demonstrates how to verify stock availability using Upstash Workflow in both Typescript and Python. It creates an order ID and checks if the items are in stock. If not, it logs a warning and halts the process, ensuring that payments are not processed for out-of-stock items. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/eCommerceOrderFulfillment.mdx#_snippet_2 LANGUAGE: typescript CODE: ``` const orderId = await context.run("create-order-id", async () => { return await createOrderId(userId); }); const stockAvailable = await context.run("check-stock", async () => { return await checkStockAvailability(items) }) if (!stockAvailable) { console.warn("Some items are out of stock") return; } ``` LANGUAGE: python CODE: ``` async def _create_order_id(): return await create_order_id(user_id) order_id: str = await context.run("create-order-id", _create_order_id) async def _check_stock(): return await check_stock_availability(items) stock_available: bool = await context.run("check-stock", _check_stock) if not stock_available: print("Some items are out of stock") return ``` ---------------------------------------- TITLE: Workflow Definition with Event Handling (Next.js) DESCRIPTION: This code snippet defines a workflow using Upstash Workflow and Next.js to handle order processing. It receives an order request, sends an email to request order processing, waits for an external event indicating order completion, handles timeouts, and sends a confirmation email. The `serve` function integrates the workflow with a Next.js API route. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/waitForEvent.mdx#_snippet_0 LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const { orderId, userEmail } = context.requestPayload; // Step 1: request order processing await context.run("request order processing", async () => { await requestProcessing(orderId) }) // Step 2: Wait for the order to be processed const { eventData, timeout } = await context.waitForEvent( "wait for order processing", `order-${orderId}`, { timeout: "10m" // 10 minutes timeout } ); if (timeout) { // end workflow in case of timeout return; } const processedData = eventData; // Step 3: Log the processed order await context.run("process-order", async () => { console.log(`Order ${orderId} processed:`, processedData); }); // Step 4: Send a confirmation email await context.run("send-confirmation-email", async () => { await sendEmail( userEmail, "Your order has been processed!", processedData ); }); }); ``` ---------------------------------------- TITLE: Creating OpenAI Client with Workflow Context DESCRIPTION: Creates an OpenAI client that integrates with Upstash Workflow using a custom `fetch` implementation. This allows Upstash Workflow to manage HTTP requests to the OpenAI API, ensuring durability and handling long-running operations. It uses `context.call` to make HTTP requests, allowing Upstash Workflow to track and manage the request lifecycle. SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/aisdk.mdx#_snippet_3 LANGUAGE: typescript CODE: ``` import { createOpenAI } from '@ai-sdk/openai'; import { HTTPMethods } from '@upstash/qstash'; import { WorkflowAbort, WorkflowContext } from '@upstash/workflow'; export const createWorkflowOpenAI = (context: WorkflowContext) => { return createOpenAI({ compatibility: "strict", fetch: async (input, init) => { try { // Prepare headers from init.headers const headers = init?.headers ? Object.fromEntries(new Headers(init.headers).entries()) : {}; // Prepare body from init.body const body = init?.body ? JSON.parse(init.body as string) : undefined; // Make network call const responseInfo = await context.call("openai-call-step", { url: input.toString(), method: init?.method as HTTPMethods, headers, body, }); // Construct headers for the response const responseHeaders = new Headers( Object.entries(responseInfo.header).reduce((acc, [key, values]) => { acc[key] = values.join(", "); return acc; }, {} as Record<string, string>) ); // Return the constructed response return new Response(JSON.stringify(responseInfo.body), { status: responseInfo.status, headers: responseHeaders, }); } catch (error) { if (error instanceof WorkflowAbort) { throw error } else { console.error("Error in fetch implementation:", error); throw error; // Rethrow error for further handling } } }, }); }; ``` ---------------------------------------- TITLE: Send Trial Warning Email - TypeScript DESCRIPTION: This TypeScript snippet sends a trial warning email 2 days before the trial ends. It waits for 5 days (7 - 2), checks if the user has upgraded using `checkUpgradedPlan`, and if not, sends a trial warning email. If the user has upgraded, the workflow returns, ending the process. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#_snippet_14 LANGUAGE: typescript CODE: ``` await context.sleep("wait for trial warning", 5 * 24 * 60 * 60); const isUpgraded = await context.run("check upgraded plan", async () => { return await checkUpgradedPlan(email); }); if (isUpgraded) return; await context.run("send trial warning email", async () => { await sendEmail( email, "Your trial is about to end in 2 days. Please upgrade your plan to keep using our platform." ); }); ``` ---------------------------------------- TITLE: Use Initial Payload Parser in Python DESCRIPTION: Demonstrates how to define and use an `initial_payload_parser` to process the initial request's payload in Python. The parser transforms the raw initial payload into a structured object, defined using `dataclass`. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#_snippet_9 LANGUAGE: Python CODE: ``` from dataclasses import dataclass @dataclass class InitialPayload: foo: str bar: int def initial_payload_parser(initial_payload: str) -> InitialPayload: return parse_payload(initial_payload) @serve.post("/api/example", initial_payload_parser=initial_payload_parser) async def example(context: AsyncWorkflowContext[InitialPayload]) -> None: payload: InitialPayload = context.request_payload ``` ---------------------------------------- TITLE: Workflow Handler with Upstash/FastAPI DESCRIPTION: This Python snippet implements an image processing workflow using Upstash Workflow and FastAPI. It defines an endpoint `/process-image` that receives an image ID and user ID, retrieves the image, resizes it to multiple resolutions, applies filters, and stores the processed images. It uses `upstash_workflow.fastapi.Serve` to integrate with FastAPI and `context.run` and `context.call` to execute workflow steps. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/imageProcessing.mdx#_snippet_1 LANGUAGE: python CODE: ``` from fastapi import FastAPI from typing import List, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext, CallResponse from utils import store_image, get_image_url app = FastAPI() serve = Serve(app) class ImageResult(TypedDict): image_url: str class ImageProcessingPayload(TypedDict): image_id: str user_id: str @serve.post("/process-image") async def process_image(context: AsyncWorkflowContext[ImageProcessingPayload]) -> None: payload = context.request_payload image_id = payload["image_id"] user_id = payload["user_id"] # Step 1: Retrieve the uploaded image async def _get_image_url() -> str: return await get_image_url(image_id) image_url: str = await context.run("get-image-url", _get_image_url) # Step 2: Resize the image to multiple resolutions resolutions = [640, 1280, 1920] resize_responses = [] for resolution in resolutions: response: CallResponse[ImageResult] = await context.call( f"resize-image-{resolution}", # endpoint which returns ImageResult type in response url="https://image-processing-service.com/resize", method="POST", body={"imageUrl": image_url, "width": resolution}, ) resize_responses.append(response) resized_images = [response.body for response in resize_responses] # Step 3: Apply filters to each resized image filters = ["grayscale", "sepia", "contrast"] filter_responses = [] for resized_image in resized_images: for filter in filters: response: CallResponse[ImageResult] = await context.call( f"apply-filter-{filter}", # endpoint which returns ImageResult type in response url="https://image-processing-service.com/filter", method="POST", body={"imageUrl": resized_image["imageUrl"], "filter": filter}, ) filter_responses.append(response) processed_images = [response.body for response in filter_responses] # Step 4: Store processed images in cloud storage async def _store_image() -> str: return await store_image(processed_image["imageUrl"]) stored_image_urls: List[str] = [] for processed_image in processed_images: stored_image_url = await context.run("store-image", _store_image) stored_image_urls.append(stored_image_url) ``` ---------------------------------------- TITLE: Processing Payment (Upstash Workflow) DESCRIPTION: This code snippet shows how to process payments using Upstash Workflow in both Typescript and Python. It calls a `processPayment` function with the order ID to handle the payment processing logic, ensuring the order proceeds only if the payment is successful. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/eCommerceOrderFulfillment.mdx#_snippet_3 LANGUAGE: typescript CODE: ``` await context.run("process-payment", async () => { return await processPayment(orderId) }) ``` LANGUAGE: python CODE: ``` async def _process_payment(): return await process_payment(order_id) await context.run("process-payment", _process_payment) ``` ---------------------------------------- TITLE: Customer Onboarding Workflow - Python DESCRIPTION: This Python code implements the customer onboarding workflow using FastAPI and Upstash Workflow. It defines an endpoint `/customer-onboarding` that receives user email as input. The workflow sends a welcome email, waits for 3 days, and then periodically checks the user's state, sending targeted emails based on their engagement. It utilizes the `upstash_workflow.fastapi` module for integration. The initial data expected is a dictionary containing the user's email. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/customerOnboarding.mdx#_snippet_1 LANGUAGE: python CODE: ``` from fastapi import FastAPI from typing import Literal, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) UserState = Literal["non-active", "active"] class InitialData(TypedDict): email: str async def send_email(message: str, email: str) -> None: # Implement email sending logic here print(f"Sending {message} email to {email}") async def get_user_state() -> UserState: # Implement user state logic here return "non-active" @serve.post("/customer-onboarding") async def customer_onboarding(context: AsyncWorkflowContext[InitialData]) -> None: email = context.request_payload["email"] async def _new_signup() -> None: await send_email("Welcome to the platform", email) await context.run("new-signup", _new_signup) await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3) while True: async def _check_user_state() -> UserState: return await get_user_state() state: UserState = await context.run("check-user-state", _check_user_state) if state == "non-active": async def _send_email_non_active() -> None: await send_email("Email to non-active users", email) await context.run("send-email-non-active", _send_email_non_active) else: async def _send_email_active() -> None: await send_email("Send newsletter to active users", email) await context.run("send-email-active", _send_email_active) await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30) ``` ---------------------------------------- TITLE: Notifying Workflow using Client DESCRIPTION: This code snippet shows how to use the `Client` class to notify the workflow about an event. It creates an instance of the `Client` with an authentication token and then calls the `notify` method with the event ID and event data. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/waitForEvent.mdx#_snippet_6 LANGUAGE: typescript CODE: ``` import { Client } from "@upstash/workflow"; const client = new Client({ token: "<QSTASH_TOKEN>" }); const orderId = "1324"; await client.notify({ eventId: `order-${orderId}`, eventData: { deliveryTime: "2 days" } }); ``` ---------------------------------------- TITLE: Downloading Dataset (TypeScript) DESCRIPTION: This TypeScript code snippet retrieves a dataset URL using `getDatasetUrl` and downloads the dataset using `context.call`. The `context.call` function is used to make an HTTP request with a longer timeout than a regular serverless function would allow. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/allInOne.mdx#_snippet_2 LANGUAGE: typescript CODE: ``` const datasetUrl = await context.run("get-dataset-url", async () => { return await getDatasetUrl(request.datasetId) }) const { body: dataset } = await context.call("download-dataset", { url: datasetUrl, method: "GET" }) ``` ---------------------------------------- TITLE: Processing Data with OpenAI (TypeScript) DESCRIPTION: This TypeScript code snippet processes data in chunks using OpenAI's GPT-4 model. It iterates over the chunks, calling the OpenAI API to analyze each chunk. The `context.api.openai.call` method is used to interact with the OpenAI API, passing the API key, operation, and request body. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/allInOne.mdx#_snippet_4 LANGUAGE: typescript CODE: ``` for (let i = 0; i < chunks.length; i++) { const { body: processedChunk } = await context.api.openai.call< OpenAiResponse >( `process-chunk-${i}`, { token: process.env.OPENAI_API_KEY!, operation: "chat.completions.create", body: { model: "gpt-4", messages: [ { role: "system", content: "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.", }, { role: "user", content: `Analyze this data chunk: ${JSON.stringify(chunks[i])}`, }, ], max_completion_tokens: 150, }, } ) } ``` ---------------------------------------- TITLE: Downloading Dataset (Python) DESCRIPTION: This Python code snippet retrieves a dataset URL using `get_dataset_url` and downloads the dataset using `context.call`. The `context.call` function is used to make an HTTP request that can run for much longer than a regular serverless function normally would allow. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/allInOne.mdx#_snippet_3 LANGUAGE: python CODE: ``` async def _get_dataset_url() -> str: return await get_dataset_url(dataset_id) dataset_url = await context.run("get-dataset-url", _get_dataset_url) response: CallResponse[Any] = await context.call( "download-dataset", url=dataset_url, method="GET" ) dataset = response.body ``` ---------------------------------------- TITLE: Run New User Signup Task - TypeScript DESCRIPTION: This TypeScript snippet executes the 'new-signup' task within the Upstash workflow context. It calls an async function that sends a welcome email to the user using the `sendEmail` function, passing the email address from the workflow context. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/customerOnboarding.mdx#_snippet_2 LANGUAGE: typescript CODE: ``` await context.run("new-signup", async () => { await sendEmail("Welcome to the platform", email) }) ``` ---------------------------------------- TITLE: Set Failure URL in TypeScript DESCRIPTION: Shows how to specify a `failureUrl` option within the `serve` method. This URL is called if the workflow fails after exhausting all retries. The failure callback payload and the error message are included in the `body` field of the request to the `failureUrl`. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#_snippet_2 LANGUAGE: TypeScript CODE: ``` export const { POST } = serve<string>( async (context) => { ... }, { failureUrl: "https://<YOUR-FAILURE-ENDPOINT>/..." } ); ``` ---------------------------------------- TITLE: Make POST Request to Workflow Endpoint DESCRIPTION: Sends a POST request to the local workflow endpoint using `curl`. This command triggers the execution of the workflow defined in the Cloudflare Worker and passes a JSON payload as input. The endpoint returns a workflow run ID. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/cloudflare-workers.mdx#_snippet_9 LANGUAGE: bash CODE: ``` curl -X POST https://localhost:8787/ -D '{"text": "hello world!"}' # result: {"workflowRunId":"wfr_xxxxxx"} ``` ---------------------------------------- TITLE: Transforming Conditions to Explicit Steps - Python DESCRIPTION: This Python code provides the recommended pattern for handling non-deterministic conditions in Upstash Workflow. It transforms the condition into an explicit workflow step using `context.run`. This approach avoids the authentication failure that occurs when returning early without executing any steps. some_condition() is assumed to be a non-deterministic function. SOURCE: https://github.com/upstash/docs/blob/main/workflow/troubleshooting/general.mdx#_snippet_9 LANGUAGE: python CODE: ``` @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: async def _check_condition() -> bool: return some_condition() should_return = await context.run("check condition", _check_condition) if should_return: return # rest of the workflow ``` ---------------------------------------- TITLE: Webhook Handler with Upstash Workflow in Next.js DESCRIPTION: This TypeScript snippet defines a Next.js route that serves as a webhook handler for authentication providers using Upstash Workflow. It orchestrates several tasks: creating a user in the database, creating a user and starting a trial in Stripe, sending welcome/reminder/warning/end emails. The workflow uses `context.run` to execute asynchronous operations and `context.sleep` to pause execution for specified durations, managing the user's trial lifecycle. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#_snippet_0 LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { WorkflowContext } from '@upstash/qstash/workflow' /** * This can be the payload of the user created webhook event coming from your * auth provider (e.g. Firebase, Auth0, Clerk etc.) */ type UserCreatedPayload = { name: string; email: string; }; export const { POST } = serve<UserCreatedPayload>(async (context) => { const { name, email } = context.requestPayload; const { userid } = await context.run("sync user", async () => { return await createUserInDatabase({ name, email }); }); await context.run("create new user in stripe", async () => { await createNewUserInStripe(email); }); await context.run("start trial in Stripe", async () => { await startTrialInStripe(email); }); await context.run("send welcome email", async () => { await sendEmail( email, "Welcome to our platform!, You have 14 days of free trial." ); }); await context.sleep("wait", 7 * 24 * 60 * 60); // get user stats and send email with them const stats = await context.run("get user stats", async () => { return await getUserStats(userid); }); await sendProblemSolvedEmail({context, email, stats}); // wait until there are two days to the end of trial period // and check upgrade status await context.sleep("wait for trial warning", 5 * 24 * 60 * 60); const isUpgraded = await context.run("check upgraded plan", async () => { return await checkUpgradedPlan(email); }); // end the workflow if upgraded if (isUpgraded) return; await context.run("send trial warning email", async () => { await sendEmail( email, "Your trial is about to end in 2 days. Please upgrade your plan to keep using our platform." ); }); await context.sleep("wait for trial end", 2 * 24 * 60 * 60); await context.run("send trial end email", async () => { await sendEmail( email, "Your trial has ended. Please upgrade your plan to keep using our platform." ); }); }); async function sendProblemSolvedEmail({ context: WorkflowContext<UserCreatedPayload>, email: string, stats: { totalProblemsSolved: number } }) { if (stats.totalProblemsSolved === 0) { await context.run("send no answers email", async () => { await sendEmail( email, "Hey, you haven't solved any questions in the last 7 days..." ); }); } else { await context.run("send stats email", async () => { await sendEmail( email, `You have solved ${stats.totalProblemsSolved} problems in the last 7 days. Keep it up!` ); }); } } async function createUserInDatabase({ name, email, }: { name: string; email: string; }) { console.log("Creating a user in the database:", name, email); return { userid: "12345" }; } async function createNewUserInStripe(email: string) { // Implement logic to create a new user in Stripe console.log("Creating a user in Stripe for", email); } async function startTrialInStripe(email: string) { // Implement logic to start a trial in Stripe console.log("Starting a trial of 14 days in Stripe for", email); } async function getUserStats(userid: string) { // Implement logic to get user stats console.log("Getting user stats for", userid); return { totalProblemsSolved: 10_000, mostInterestedTopic: "JavaScript", }; } async function checkUpgradedPlan(email: string) { // Implement logic to check if the user has upgraded the plan console.log("Checking if the user has upgraded the plan", email); return false; } async function sendEmail(email: string, content: string) { // Implement logic to send an email console.log("Sending email to", email, content); } ``` ---------------------------------------- TITLE: Scheduling a Per-User Workflow (Next.js) DESCRIPTION: This TypeScript code demonstrates how to schedule a per-user workflow to send weekly summary reports using Upstash QStash. It calculates the date for the first summary (7 days after signup) and creates a cron expression to schedule weekly summaries. It uses `@upstash/qstash` to create the schedule. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/schedule.mdx#_snippet_2 LANGUAGE: typescript CODE: ``` import { signUp } from "@/utils/auth-utils"; import { Client } from "@upstash/qstash"; const client = new Client({ token: process.env.QSTASH_TOKEN! }); export async function POST(request: Request) { const userData: UserData = await request.json(); // Simulate user registration const user = await signUp(userData); // Calculate the date for the first summary (7 days from now) const firstSummaryDate = new Date(Date.now() + 7 * 24 * 60 * 60 * 1000); // Create cron expression for weekly summaries starting 7 days from signup const cron = `${firstSummaryDate.getMinutes()} ${firstSummaryDate.getHours()} * * ${firstSummaryDate.getDay()}`; // Schedule weekly account summary await client.schedules.create({ scheduleId: `user-summary-${user.email}`, destination: "https://<YOUR_APP_URL>/api/send-weekly-summary", body: { userId: user.id }, cron: cron, }); return NextResponse.json( { success: true, message: "User registered and summary scheduled" }, { status: 201 } ); } ``` ---------------------------------------- TITLE: Workflow Endpoint with Call DESCRIPTION: This Python code demonstrates a FastAPI endpoint integrated with Upstash Workflow, showcasing the `context.call` function to make HTTP requests to other endpoints. The workflow includes steps that process input, call an external `/get-data` endpoint, and then process the response. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/fastapi.mdx#_snippet_8 LANGUAGE: python CODE: ``` from fastapi import FastAPI from typing import Dict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext, CallResponse app = FastAPI() serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @app.post("/get-data") async def get_data() -> Dict[str, str]: return {"message": "get data response"} @serve.post("/call") async def call(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step1() -> str: output = some_work(input) print("step 1 input", input, "output", output) return output result1: str = await context.run("step1", _step1) response: CallResponse[Dict[str, str]] = await context.call( "get-data", url=f"{context.env.get('UPSTASH_WORKFLOW_URL', 'http://localhost:8000')}/get-data", method="POST", body={"message": result1}, ) async def _step2() -> str: output = some_work(response.body["message"]) print("step 2 input", response, "output", output) return output await context.run("step2", _step2) ``` ---------------------------------------- TITLE: Express.js Workflow Endpoint DESCRIPTION: Defines a workflow endpoint using Express.js. It imports necessary modules, configures environment variables, sets up middleware to parse JSON, and defines a POST route that uses the `serve` function from `@upstash/workflow/express` to handle the workflow logic. The workflow defines two steps: step1 retrieves a message from the request payload, and step2 logs the result of step1. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/express.mdx#_snippet_7 LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/express"; import express from 'express'; import { config } from 'dotenv'; config(); const app = express(); app.use( express.json() ); app.post( '/workflow', serve<{ message: string }>(async (context) => { const res1 = await context.run("step1", async () => { const message = context.requestPayload.message; return message; }) await context.run("step2", async () => { console.log(res1); }) }) ); app.listen(3000, () => { console.log('Server running on port 3000'); }); ``` ---------------------------------------- TITLE: Invoking a Workflow with Context in Typescript DESCRIPTION: This snippet shows how to invoke another workflow within a current workflow context using the `context.invoke` method. It retrieves the response, failure and cancel state from the invoked workflow. It uses options such as `retries` and `flowControl`. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/invoke.mdx#_snippet_0 LANGUAGE: typescript CODE: ``` const { body, // response from the invoked workflow isFailed, // whether the invoked workflow was canceled isCanceled // whether the invoked workflow failed } = await context.invoke( "analyze-content", { workflow: analyzeContent, body: "test", header: {...}, // headers to pass to anotherWorkflow (optional) retries, // number of retries (optional, default: 3) flowControl, // flow control settings (optional) workflowRunId // workflowRunId to set (optional) } ) ``` ---------------------------------------- TITLE: Executing Parallel Workflow Steps with Promise.all in TypeScript DESCRIPTION: This code demonstrates how to run multiple workflow steps in parallel using `Promise.all` within an Upstash Workflow. Each `ctx.run` invocation initiates a separate asynchronous task. The `result1`, `result2`, and `result3` variables will contain the results of the respective parallel steps once they complete. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/parallel-runs.mdx#_snippet_0 LANGUAGE: typescript CODE: ``` const [result1, result2, result3] = await Promise.all([ ctx.run("parallel-step-1", async () => { ... }), ctx.run("parallel-step-2", async () => { ... }), ctx.run("parallel-step-3", async () => { ... }), ]) ``` ---------------------------------------- TITLE: Custom Authorization in Workflow (TypeScript) DESCRIPTION: This TypeScript code snippet shows how to implement a custom authorization mechanism within an Upstash Workflow. It retrieves the `authorization` header from the request context, extracts the bearer token, and validates it using the `isValid` function. If the token is invalid, it logs an error and returns. The same validation is applied in the `failureFunction` to handle failures due to authentication issues. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/security.mdx#_snippet_4 LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { const authHeader = context.headers.get("authorization"); const bearerToken = authHeader?.split(" ")[1]; if (!isValid(bearerToken)) { console.error("Authentication failed."); return; } // Your workflow steps.. }, { failureFunction: async () => { const authHeader = context.headers.get("authorization"); const bearerToken = authHeader?.split(" ")[1]; if (!isValid(bearerToken)) { // ... } }, } ); ``` ---------------------------------------- TITLE: Sending Confirmation and Notifications (Upstash Workflow) DESCRIPTION: This code snippet demonstrates sending order confirmation and dispatch notifications to the customer using Upstash Workflow in both Typescript and Python. It calls `sendOrderConfirmation` and `sendDispatchNotification` functions with the user and order IDs to ensure customers are informed about order status. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/eCommerceOrderFulfillment.mdx#_snippet_5 LANGUAGE: typescript CODE: ``` await context.run("send-confirmation", async () => { return await sendOrderConfirmation(userId, orderId) }) await context.run("send-dispatch-notification", async () => { return await sendDispatchNotification(userId, orderId) }) ``` LANGUAGE: python CODE: ``` async def _send_confirmation(): return await send_order_confirmation(user_id, order_id) await context.run("send-confirmation", _send_confirmation) async def _send_dispatch_notification(): return await send_dispatch_notification(user_id, order_id) await context.run("send-dispatch-notification", _send_dispatch_notification) ``` ---------------------------------------- TITLE: Aggregating Results (TypeScript) DESCRIPTION: This TypeScript code snippet aggregates intermediate results after processing every 10 chunks or at the end of all chunks. It calls the `aggregateResults` function within a `context.run` block, ensuring the aggregation is executed as a separate, potentially long-running task. The processed chunks are then cleared for the next iteration. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/allInOne.mdx#_snippet_6 LANGUAGE: typescript CODE: ``` if (i % 10 === 9 || i === chunks.length - 1) { await context.run(`aggregate-results${i}`, async () => { await aggregateResults(processedChunks) processedChunks.length = 0 }) } ``` ---------------------------------------- TITLE: Correct Result Handling - TypeScript DESCRIPTION: This TypeScript code demonstrates the correct way to handle results within Upstash Workflow. The `someWork` function's return value is directly returned from `context.run("step-1", ...)` and assigned to the `result` variable. This ensures that `result` is always properly initialized with the correct value before `step-2` is executed. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#_snippet_3 LANGUAGE: typescript CODE: ``` export const { POST } = serve<string>(async (context) => { const input = context.requestPayload const result = await context.run("step-1", async () => { return await someWork(input) }) await context.run("step-2", async () => { someOtherWork(result) }) }) ``` ---------------------------------------- TITLE: Image Storage - TypeScript DESCRIPTION: This TypeScript snippet stores the processed images in cloud storage using the `storeImage` function. It iterates through the processed images and calls `context.run` for each image to store it. `Promise.all` is used to run the storage operations concurrently. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/imageProcessing.mdx#_snippet_8 LANGUAGE: typescript CODE: ``` const storedImageUrls: string[] = await Promise.all( processedImages.map( processedImage => context.run(`store-image`, async () => { return await storeImage(processedImage.body.imageUrl) }) ) ) ``` ---------------------------------------- TITLE: Make a POST Request to Workflow Endpoint DESCRIPTION: Sends a POST request to the workflow endpoint to trigger the workflow. The request includes a JSON payload. The response contains a unique workflow run ID. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/astro.mdx#_snippet_7 LANGUAGE: bash CODE: ``` curl -X POST http://localhost:3000/api/workflow \ -H "Content-Type: application/json" \ -d '{"message": "Hello from the workflow!"}' # result: {"workflowRunId":"wfr_xxxxxx"} ``` ---------------------------------------- TITLE: Workflow Handler with Upstash/NextJS DESCRIPTION: This TypeScript snippet defines a Next.js route handler that processes images using Upstash Workflow. It receives an image ID and user ID, retrieves the image, resizes it to multiple resolutions, applies filters, and stores the processed images. It uses the `serve` function from `@upstash/workflow/nextjs` to create the handler and `context.run` and `context.call` to execute workflow steps. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/imageProcessing.mdx#_snippet_0 LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs" import { resizeImage, applyFilters, storeImage, getImageUrl, } from "./utils" type ImageResult = { imageUrl: string } export const { POST } = serve<{ imageId: string; userId: string }> (async (context) => { const { imageId, userId } = context.requestPayload // Step 1: Retrieve the uploaded image const imageUrl = await context.run("get-image-url", async () => { return await getImageUrl(imageId) }) // Step 2: Resize the image to multiple resolutions const resolutions = [640, 1280, 1920] const resizedImages: { body: ImageResult }[] = await Promise.all(resolutions.map( resolution => context.call<ImageResult>( `resize-image-${resolution}`, { // endpoint which returns ImageResult type in response url: "https://image-processing-service.com/resize", method: "POST", body: { imageUrl, width: resolution, } } ) )) // Step 3: Apply filters to each resized image const filters = ["grayscale", "sepia", "contrast"] const processedImagePromises: Promise<string>[] = [] for (const resizedImage of resizedImages) { for (const filter of filters) { const processedImagePromise = context.call<ImageResult>( `apply-filter-${filter}`, { // endpoint which returns ImageResult type in response url: "https://image-processing-service.com/filter", method: "POST", body: { imageUrl: resizedImage.body.imageUrl, filter, } } ) processedImagePromises.push(processedImagePromise) } } const processedImages: { body: ImageResult }[] = await Promise.all(processedImagePromises) // Step 4: Store processed images in cloud storage const storedImageUrls: string[] = await Promise.all( processedImages.map( processedImage => context.run(`store-image`, async () => { return await storeImage(processedImage.body.imageUrl) }) ) ) } ) ``` ---------------------------------------- TITLE: AI Generation Workflow (Next.js) DESCRIPTION: This TypeScript code defines a Next.js route that leverages Upstash Workflow to process a dataset using OpenAI. It downloads the dataset, splits it into chunks, processes each chunk with GPT-4, aggregates the results, and generates a final report. The code uses `context.run` for long-running tasks and `context.call` to interact with the OpenAI API. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/allInOne.mdx#_snippet_0 LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs" import { downloadData, aggregateResults, generateReport, sendReport, getDatasetUrl, splitIntoChunks, } from "./utils" type OpenAiResponse = { choices: { message: { role: string, content: string } }[] } export const { POST } = serve<{ datasetId: string; userId: string }> (async (context) => { const request = context.requestPayload // Step 1: Download the dataset const datasetUrl = await context.run("get-dataset-url", async () => { return await getDatasetUrl(request.datasetId) }) // HTTP request with much longer timeout (2hrs) const { body: dataset } = await context.call("download-dataset", { url: datasetUrl, method: "GET" }) // Step 2: Process data in chunks using OpenAI const chunkSize = 1000 const chunks = splitIntoChunks(dataset, chunkSize) const processedChunks: string[] = [] for (let i = 0; i < chunks.length; i++) { const { body: processedChunk } = await context.api.openai.call< OpenAiResponse >( `process-chunk-${i}`, { token: process.env.OPENAI_API_KEY, operation: "chat.completions.create", body: { model: "gpt-4", messages: [ { role: "system", content: "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.", }, { role: "user", content: `Analyze this data chunk: ${JSON.stringify(chunks[i])}`, }, ], max_completion_tokens: 150, }, } ) processedChunks.push(processedChunk.choices[0].message.content!) // Every 10 chunks, we'll aggregate intermediate results if (i % 10 === 9 || i === chunks.length - 1) { await context.run(`aggregate-results${i}`, async () => { await aggregateResults(processedChunks) processedChunks.length = 0 }) } } // Step 3: Generate and send data report const report = await context.run("generate-report", async () => { return await generateReport(request.datasetId) }) await context.run("send-report", async () => { await sendReport(report, request.userId) }) } ) ``` ---------------------------------------- TITLE: Call Anthropic API with Type-Safe Context Call (TypeScript) DESCRIPTION: This code snippet demonstrates how to call the Anthropic API's `/v1/messages` endpoint using the `context.api.anthropic.call` method in an Upstash workflow. It includes setting the API key, operation, and request body. The code then extracts and logs the generated text from the response. SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/anthropic.mdx#_snippet_0 LANGUAGE: typescript CODE: ``` const { status, body } = await context.api.anthropic.call( "Call Anthropic", { token: "<ANTHROPIC_API_KEY>", operation: "messages.create", body: { model: "claude-3-5-sonnet-20241022", max_tokens: 1024, messages: [ {"role": "user", "content": "Hello, world"} ] }, } ); // get text: console.log(body.content[0].text) ``` ---------------------------------------- TITLE: Workflow Endpoint Example DESCRIPTION: This Python code defines a basic FastAPI endpoint that uses Upstash Workflow. It initializes FastAPI and Serve, defines a `/api/workflow` endpoint, and defines two asynchronous steps that are executed within the workflow context. The context.run function registers the steps to be executed within the workflow. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/fastapi.mdx#_snippet_6 LANGUAGE: python CODE: ``` from fastapi import FastAPI from upstash_workflow.fastapi import Serve app = FastAPI() serve = Serve(app) @serve.post("/api/workflow") async def workflow(context) -> None: async def _step1() -> None: print("initial step ran") await context.run("initial-step", _step1) async def _step2() -> None: print("second step ran") await context.run("second-step", _step2) ``` ---------------------------------------- TITLE: Creating a Workflow with createWorkflow in Typescript DESCRIPTION: This snippet demonstrates how to define a workflow using the `createWorkflow` function. It specifies the type of the request body and ensures type safety for request and response when invoking other workflows. The workflow logic is defined as an asynchronous function that receives a `WorkflowContext`. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/invoke.mdx#_snippet_1 LANGUAGE: typescript CODE: ``` import { WorkflowContext } from "@upstash/workflow"; import { createWorkflow } from "@upstash/workflow/nextjs"; const anotherWorkflow = createWorkflow( // Define the workflow logic, specifying the type of the initial request body. // In this case, the body is a string: async (context: WorkflowContext<string>) => { await context.sleep("wait 1 second", 1) // Return a response from the workflow. The type of this // response will be available when `context.invoke` is // called with `anotherWorkflow`. return { message: "This is the data returned by the workflow" }; } ); const someWorkflow = createWorkflow(async (context) => { // Invoke anotherWorkflow with a string body and get the response // The types of the body parameter and the response are // typesafe and inferred from anotherWorkflow const { body } = await context.invoke( "invoke anotherWorkflow", { workflow: anotherWorkflow, body: "user-1" } ), }); ``` ---------------------------------------- TITLE: Processing Data with OpenAI (Python) DESCRIPTION: This Python code snippet processes data in chunks using OpenAI's GPT-4 model via the OpenAI API. For each chunk, it calls the OpenAI API with the `context.call` method, including the API URL, method, headers (containing the API key), and request body. The chunk is analyzed, and the response is stored. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/allInOne.mdx#_snippet_5 LANGUAGE: python CODE: ``` for i, chunk in enumerate(chunks): openai_response: CallResponse[Dict[str, str]] = await context.call( f"process-chunk-{i}", url="https://api.openai.com/v1/chat/completions", method="POST", headers={ "authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}", }, body={ "model": "gpt-4", "messages": [ { "role": "system", "content": "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.", }, { "role": "user", "content": f"Analyze this data chunk: {json.dumps(chunk)}", }, ], "max_tokens": 150, }, ) ``` ---------------------------------------- TITLE: Configuring a Failure Function with TypeScript DESCRIPTION: This snippet demonstrates how to configure a `failureFunction` within the `serve` method for handling workflow failures. The `failureFunction` allows for custom error handling logic, such as logging to Sentry or implementing specific cleanup actions. It receives the context, failure status, response, and headers as input. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/failures.mdx#_snippet_0 LANGUAGE: typescript CODE: ``` export const { POST } = serve<string>( async (context) => { // Your workflow logic... }, { failureFunction: async ({ context, failStatus, failResponse, failHeaders, }) => { // Handle error, i.e. log to Sentry }, } ); ``` ---------------------------------------- TITLE: Calling Resend API for Batch Email in Upstash Workflow (TypeScript) DESCRIPTION: This code snippet demonstrates how to call the Resend API to send a batch of emails using `context.api.resend.call` within an Upstash Workflow. It requires a Resend API key and setting `batch: true`. The `status` and `body` of the response are captured. SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/resend.mdx#_snippet_1 LANGUAGE: typescript CODE: ``` const { status, body } = await context.api.resend.call( "Call Resend", { batch: true, token: "<RESEND_API_KEY>", body: [ { from: "Acme <onboarding@resend.dev>", to: ["delivered@resend.dev"], subject: "Hello World", html: "<p>It works!</p>", }, { from: "Acme <onboarding@resend.dev>", to: ["delivered@resend.dev"], subject: "Hello World", html: "<p>It works!</p>", }, ], headers: { "content-type": "application/json", }, } ); ``` ---------------------------------------- TITLE: Fetching Workflow Logs with Node.js DESCRIPTION: This snippet demonstrates how to fetch workflow logs using Node.js and the `fetch` API. It makes a GET request to the `/v2/workflows/logs` endpoint and includes the Authorization header with a Bearer token. SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/logs.mdx#_snippet_2 LANGUAGE: javascript CODE: ``` const response = await fetch("https://qstash.upstash.io/v2/workflows/logs", { headers: { Authorization: "Bearer <token>", }, }); ``` ---------------------------------------- TITLE: Unsuspend User Logic (Python) DESCRIPTION: This Python code checks if a user is suspended and, if so, unsuspends them using the `check_suspension` and `unsuspend_user` functions within an Upstash Workflow context. Asynchronous functions `_check_suspension` and `_unsuspend_user` are defined to encapsulate the calls to the underlying functions. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#_snippet_7 LANGUAGE: python CODE: ``` async def _check_suspension() -> bool: return await check_suspension(email) is_suspended = await context.run("check suspension", _check_suspension) if is_suspended: async def _unsuspend_user() -> None: await unsuspend_user(email) await context.run("unsuspend user", _unsuspend_user) ``` ---------------------------------------- TITLE: Send Welcome Email - TypeScript DESCRIPTION: This TypeScript snippet sends a welcome email to the user upon creation. It uses `context.run` to execute the `sendEmail` function with the user's email and a welcome message, indicating the start of their free trial. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#_snippet_8 LANGUAGE: typescript CODE: ``` await context.run("send welcome email", async () => { await sendEmail( email, "Welcome to our platform!, You have 14 days of free trial." ); }); ``` ---------------------------------------- TITLE: Run Non-Idempotent Functions in context.run - TypeScript DESCRIPTION: This TypeScript code shows the correct way to call non-idempotent function. The getResultFromDb is called inside `context.run` to prevent `Failed to authenticate Workflow request`. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#_snippet_12 LANGUAGE: typescript CODE: ``` const result = await context.run(async () => { await getResultFromDb(entryId) }); if (result.return) { return; } ``` ---------------------------------------- TITLE: Unsuspend User Logic (TypeScript) DESCRIPTION: This TypeScript code checks if a user is suspended and, if so, unsuspends them using the `checkSuspension` and `unsuspendUser` functions within an Upstash Workflow context. It first checks suspension status, and then conditionally runs the unsuspend user step. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#_snippet_6 LANGUAGE: typescript CODE: ``` const isSuspended = await context.run("check suspension", async () => { return await checkSuspension(email); }); if (isSuspended) { await context.run("unsuspend user", async () => { await unsuspendUser(email); }); } ``` ---------------------------------------- TITLE: Send Reminder Email - TypeScript DESCRIPTION: This TypeScript snippet handles sending a reminder email after 7 days if the user hasn't solved any questions. It uses `context.sleep` to wait for 7 days, fetches user stats using `getUserStats`, and then calls `sendProblemSolvedEmail` to conditionally send the reminder email. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#_snippet_10 LANGUAGE: typescript CODE: ``` await context.sleep("wait", 7 * 24 * 60 * 60); const stats = await context.run("get user stats", async () => { return await getUserStats(userid); }); await sendProblemSolvedEmail({context, email, stats}); ``` ---------------------------------------- TITLE: Suspend User: TypeScript DESCRIPTION: This TypeScript snippet checks if a user is suspended, suspends the user if not already suspended, and sends an email notification. It relies on `checkSuspension`, `suspendUser`, and `sendEmail` functions. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#_snippet_10 LANGUAGE: TypeScript CODE: ``` const isSuspended = await context.run("check suspension", async () => { return await checkSuspension(email); }); if (!isSuspended) { await context.run("suspend user", async () => { await suspendUser(email); }); await context.run("send suspended email", async () => { await sendEmail( context.requestPayload.email, "Your account has been suspended due to payment failure. Please update your payment method." ); }); } ``` ---------------------------------------- TITLE: Workflow Endpoint with Sleep DESCRIPTION: This Python code defines a FastAPI endpoint that utilizes Upstash Workflow with sleep functionality. The workflow includes steps that perform some work using the `some_work` function, print input and output, and then pause execution using `context.sleep_until` and `context.sleep`. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/fastapi.mdx#_snippet_7 LANGUAGE: python CODE: ``` from fastapi import FastAPI import time from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @serve.post("/sleep") async def sleep(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step1() -> str: output = some_work(input) print("step 1 input", input, "output", output) return output result1: str = await context.run("step1", _step1) await context.sleep_until("sleep1", time.time() + 3) async def _step2() -> str: output = some_work(result1) print("step 2 input", result1, "output", output) return output result2: str = await context.run("step2", _step2) await context.sleep("sleep2", 2) async def _step3() -> None: output = some_work(result2) print("step 3 input", result2, "output", output) await context.run("step3", _step3) ``` ---------------------------------------- TITLE: Handling Webhook Events - TypeScript DESCRIPTION: This TypeScript code snippet demonstrates how to handle webhook events within an Upstash Workflow using the `context.run` method. It parses and validates the incoming request, and then uses `context.run` to process the event. If the event type is `user.created`, it extracts relevant user data and returns it; otherwise, it returns false. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/use-webhooks.mdx#_snippet_5 LANGUAGE: TypeScript CODE: ``` export const { POST } = serve(async (context) => { // ... Parse and validate the incoming request const user = await context.run<false | UserPayload>( "handle-webhook-event", async () => { if (event.type === "user.created") { const { id: clerkUserId, email_addresses, first_name } = event.data; const primaryEmail = email_addresses.find( (email) => (email.id = event.data.primary_email_address_id) ); if (!primaryEmail) { return false; } return { event: event.type, userId: clerkUserId, email: primaryEmail.email_address, firstName: first_name, } as UserPayload; } return false; } ); }); ``` ---------------------------------------- TITLE: Requesting Order Processing in Workflow DESCRIPTION: This snippet shows how to encapsulate the order processing request in a `context.run` call inside the workflow. It executes the `requestProcessing` function with the `orderId`. It's a part of the workflow defined previously and demonstrates running a specific step. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/waitForEvent.mdx#_snippet_2 LANGUAGE: typescript CODE: ``` await context.run("request order processing", async () => { await requestProcessing(orderId) }) ``` ---------------------------------------- TITLE: Conditional Email Based on Solved Problems - TypeScript DESCRIPTION: This TypeScript function `sendProblemSolvedEmail` conditionally sends an email based on the number of problems the user has solved. If the user hasn't solved any problems, a 'no answers' email is sent; otherwise, a stats email is sent indicating the number of problems solved in the last 7 days. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#_snippet_12 LANGUAGE: typescript CODE: ``` async function sendProblemSolvedEmail({ context: WorkflowContext<UserCreatedPayload>, email: string, stats: { totalProblemsSolved: number } }) { if (stats.totalProblemsSolved === 0) { await context.run("send no answers email", async () => { await sendEmail( email, "Hey, you haven't solved any questions in the last 7 days..." ); }); } else { await context.run("send stats email", async () => { await sendEmail( email, `You have solved ${stats.totalProblemsSolved} problems in the last 7 days. Keep it up!` ); }); } } ``` ---------------------------------------- TITLE: Defining OpenAI Model DESCRIPTION: This code snippet shows how to define an OpenAI model using the `context.agents.openai` method in a Next.js route. It imports the `serve` function from `@upstash/workflow/nextjs` and defines a POST handler that initializes the model. The model will be used for generating responses and deciding which tools to call. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/features.mdx#_snippet_2 LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-3.5-turbo') // ... }) ``` ---------------------------------------- TITLE: Custom Authorization in Workflow (Python) DESCRIPTION: This Python code snippet illustrates a custom authorization implementation for an Upstash Workflow. It retrieves the `authorization` header from the request context, extracts the bearer token, and checks its validity using the `is_valid` function. If the token is invalid, it prints an error message and returns. It leverages FastAPI and `upstash_workflow.fastapi` for workflow integration. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/security.mdx#_snippet_5 LANGUAGE: python CODE: ``` from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: auth_header = context.headers.get("authorization") bearer_token = auth_header.split(" ")[1] if auth_header else None if not is_valid(bearer_token): print("Authentication failed.") return # Your workflow steps... ``` ---------------------------------------- TITLE: Charging Customer (TypeScript) DESCRIPTION: This TypeScript code attempts to charge a customer using the `chargeCustomer` function within an Upstash Workflow context. It includes a try-catch block to handle potential errors during the charging process, allowing for custom logic to be executed upon failure. The attempt number is passed to the `chargeCustomer` function. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#_snippet_2 LANGUAGE: typescript CODE: ``` const result = await context.run("charge customer", async () => { try { return await chargeCustomer(i + 1), } catch (e) { console.error(e); return } }); ``` ---------------------------------------- TITLE: Initialize Workflow Endpoint with Python DESCRIPTION: Defines a workflow endpoint using FastAPI and the `Serve` class from `upstash_workflow.fastapi`. It defines an asynchronous function `example` that uses the `AsyncWorkflowContext` to run two steps. Each step is defined as an asynchronous function. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#_snippet_1 LANGUAGE: Python CODE: ``` from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: async def _step1() -> str: # define a piece of business logic as step 1 return "step 1 result" result = await context.run("step-1", _step1) async def _step2() -> None: # define another piece of business logic as step 2 pass await context.run("step-2", _step2) ``` ---------------------------------------- TITLE: Generating and Sending Report (TypeScript) DESCRIPTION: This TypeScript code snippet generates a report based on the aggregated results and sends it to the user. It calls the `generateReport` and `sendReport` functions within `context.run` blocks to execute them as separate workflow steps. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/allInOne.mdx#_snippet_8 LANGUAGE: typescript CODE: ``` const report = await context.run("generate-report", async () => { return await generateReport(request.datasetId) }) await context.run("send-report", async () => { await sendReport(report, request.userId) }) ``` ---------------------------------------- TITLE: Execute Business Logic in context.run - TypeScript DESCRIPTION: This code snippet demonstrates the correct way to execute business logic within the `context.run` function in Upstash Workflow. The business logic (returning `{ success: true }` and logging) is placed inside `context.run`, ensuring it only executes once per step during a workflow run. Logging outside `context.run` can appear multiple times because the workflow endpoint can be called multiple times. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#_snippet_0 LANGUAGE: typescript CODE: ``` export const { POST } = serve<string>(async (context) => { const input = context.requestPayload const result = await context.run("step-1", () => { return { success: true } }) console.log("This log will appear multiple times") await context.run("step-2", () => { console.log("This log will appear just once") console.log("Step 1 status is:", result.success) }) }) ``` ---------------------------------------- TITLE: Generating and Sending Report (Python) DESCRIPTION: This Python code snippet generates a report and sends it to the user. It defines asynchronous functions `_generate_report` and `_send_report` which call `generate_report` and `send_report` respectively. These functions are then executed using `context.run` to ensure they are executed as separate workflow steps. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/allInOne.mdx#_snippet_9 LANGUAGE: python CODE: ``` async def _generate_report() -> Any: return await generate_report(dataset_id) report = await context.run("generate-report", _generate_report) async def _send_report() -> None: await send_report(report, user_id) await context.run("send-report", _send_report) ``` ---------------------------------------- TITLE: Calling Resend API for Single Email in Upstash Workflow (TypeScript) DESCRIPTION: This code snippet demonstrates how to call the Resend API to send a single email using `context.api.resend.call` within an Upstash Workflow. It requires a Resend API key. The `status` and `body` of the response are captured. SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/resend.mdx#_snippet_0 LANGUAGE: typescript CODE: ``` const { status, body } = await context.api.resend.call( "Call Resend", { token: "<RESEND_API_KEY>", body: { from: "Acme <onboarding@resend.dev>", to: ["delivered@resend.dev"], subject: "Hello World", html: "<p>It works!</p>", }, headers: { "content-type": "application/json", }, } ); ``` ---------------------------------------- TITLE: Generating Text with OpenAI Client in Workflow DESCRIPTION: Creates a Next.js API route that uses the Upstash Workflow to generate text using the OpenAI client. It defines a workflow that takes a prompt as input, uses the `generateText` function from the `ai` package to generate text, and logs the result. The code includes error handling for `ToolExecutionError` and `WorkflowAbort` exceptions. SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/aisdk.mdx#_snippet_4 LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { WorkflowAbort } from '@upstash/workflow'; import { generateText, ToolExecutionError } from 'ai'; import { createWorkflowOpenAI } from './model'; export const { POST } = serve<{ prompt: string }>(async (context) => { const openai = createWorkflowOpenAI(context); // Important: Must have a step before generateText const prompt = await context.run("get prompt", async () => { return context.requestPayload.prompt; }); try { const result = await generateText({ model: openai('gpt-3.5-turbo'), maxTokens: 2048, prompt, }); await context.run("text", () => { console.log(`TEXT: ${result.text}`); return result.text; }); } catch (error) { if (error instanceof ToolExecutionError && error.cause instanceof WorkflowAbort) { throw error.cause; } else { throw error; } } }); ``` ---------------------------------------- TITLE: Limiting External API Calls with context.call (JS) DESCRIPTION: This code snippet shows how to limit requests to an external API using `context.call` within the `serve` method. It configures the rate per second and parallelism limits for the API call, preventing excessive requests. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/flow-control.mdx#_snippet_2 LANGUAGE: javascript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve<{ topic: string }>(async (context) => { const request = context.requestPayload; const response = await context.call( "generate-long-essay", { url: "https://api.openai.com/v1/chat/completions", method: "POST", body: {/*****/}, flowControl: { key: "opani-call", parallelism: 3, ratePerSecond: 10 } } ); }); ``` ---------------------------------------- TITLE: Defining an Agent DESCRIPTION: This code defines an agent with a specified model, name, maximum number of steps, tools, and background using `context.agents.agent`. It leverages the `WikipediaQueryRun` tool for accessing Wikipedia information. The agent represents an LLM with access to tools and background knowledge, which it can use to perform tasks. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/features.mdx#_snippet_10 LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run"; export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-3.5-turbo') const researcherAgent = context.agents.agent({ model, name: 'academic', maxSteps: 2, tools: { wikiTool: new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) }, background: 'You are researcher agent with access to Wikipedia. ' + 'Utilize Wikipedia as much as possible for correct information', }) }) ``` ---------------------------------------- TITLE: Sending HTTP Request with Requests (Python) DESCRIPTION: This Python snippet demonstrates how to send a POST request to a workflow endpoint using the `requests` library. It sends a JSON payload and includes a custom header. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/start.mdx#_snippet_5 LANGUAGE: Python CODE: ``` import requests requests.post( "https://<YOUR_WORKFLOW_ENDPOINT>/<YOUR-WORKFLOW-ROUTE>", json={"foo": "bar"}, headers={"my-header": "foo"} ) ``` ---------------------------------------- TITLE: Execute Business Logic in context.run - Python DESCRIPTION: This code snippet demonstrates the correct way to execute business logic within the `context.run` function in Upstash Workflow using Python. The business logic (returning `{"success": True}` and logging) is placed inside `context.run`, ensuring it only executes once per step during a workflow run. Logging outside `context.run` can appear multiple times because the workflow endpoint can be called multiple times. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#_snippet_1 LANGUAGE: python CODE: ``` @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step_1() -> Dict: return {"success": True} result = await context.run("step-1", _step_1) print("This log will appear multiple times") async def _step_2() -> None: print("This log will appear just once") print("Step 1 status is:", result["success"]) await context.run("step-2", _step_2) ``` ---------------------------------------- TITLE: Configure Flow Control in TypeScript DESCRIPTION: Illustrates how to configure flow control for a workflow using the `flowControl` option. This allows setting `ratePerSecond` to limit requests and `parallelism` to limit concurrent requests. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#_snippet_6 LANGUAGE: TypeScript CODE: ``` export const { POST } = serve<string>( async (context) => { ... }, { flowControl: { key: "aFlowControlKey", ratePerSecond: 10, parallelism: 3 } } ); ``` ---------------------------------------- TITLE: Configuring OpenAI Model with baseURL DESCRIPTION: This code snippet shows how to configure the OpenAI model with a custom baseURL and apiKey, allowing the use of OpenAI-compatible providers like DeepSeek. The baseURL specifies the API endpoint for the provider. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/features.mdx#_snippet_3 LANGUAGE: typescript CODE: ``` const model = context.agents.openai('gpt-3.5-turbo', { baseURL: "https://api.deepseek.com", apiKey: process.env.DEEPSEEK_API_KEY }) ``` ---------------------------------------- TITLE: Orchestrating Workers with Upstash Workflow and Langchain DESCRIPTION: This snippet demonstrates how to use Upstash Workflow with Next.js to create an orchestrator that directs multiple worker agents using Langchain to handle different subtasks and synthesize their outputs. It defines worker agents with specific tools and backgrounds, and a task that orchestrates these agents to create a Q&A for advanced topics in physics. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/patterns/orchestrator-workers.mdx#_snippet_0 LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run"; const wikiTool = new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-4o'); // Worker agents const worker1 = context.agents.agent({ model, name: 'worker1', tools: { wikiTool }, maxSteps: 3, background: 'You are a worker agent that answers general questions about advanced physics.' }); const worker2 = context.agents.agent({ model, name: 'worker2', tools: { wikiTool }, maxSteps: 3, background: 'You are a worker agent that answers questions about quantum mechanics.' }); const worker3 = context.agents.agent({ model, name: 'worker3', tools: { wikiTool }, maxSteps: 3, background: 'You are a worker agent that answers questions about relativity.' }); // Synthesizing results const task = context.agents.task({ model, prompt: `Create a Q&A for advanced topics in physics`, agents: [worker1, worker2, worker3], maxSteps: 3, }) const { text } = await task.run(); console.log(text); }); ``` ---------------------------------------- TITLE: Cancel Workflow Run Programmatically - JavaScript DESCRIPTION: This snippet demonstrates how to cancel a running workflow using the Upstash Workflow JavaScript client. It initializes the client with a Qstash token and then calls the `cancel` method with the ID of the workflow run to be canceled. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/cancel.mdx#_snippet_0 LANGUAGE: JavaScript CODE: ``` import { Client } from "@upstash/workflow"; const client = new Client({ token: "<QSTASH_TOKEN>" }); await client.cancel({ ids: "<WORKFLOW_RUN_ID>" }); ``` ---------------------------------------- TITLE: Handling Webhook Events - Python DESCRIPTION: This Python code snippet demonstrates how to handle webhook events within an Upstash Workflow. It uses `context.run` to execute an asynchronous function `_handle_webhook_event`. Inside this function, it checks if the event type is `user.created`, extracts the data, and returns it as a dictionary. If no primary email is found or the event type is different, it returns `False`. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/use-webhooks.mdx#_snippet_6 LANGUAGE: Python CODE: ``` @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: # ... Parse and validate the incoming request async def _handle_webhook_event(): if event.type == "user.created": clerk_user_id = event.data["id"] email_addresses = event.data["email_addresses"] first_name = event.data["first_name"] primary_email = next( ( email for email in email_addresses if email.id == event.data["primary_email_address_id"] ), None, ) if not primary_email: return False return { "event": event.type, "user_id": clerk_user_id, "email": primary_email["email_address"], "first_name": first_name, } return False user = await context.run("handle-webhook-event", _handle_webhook_event) ``` ---------------------------------------- TITLE: Aggregating Results (Python) DESCRIPTION: This Python code snippet aggregates the processed chunks every 10 chunks or at the end of the dataset. It defines an asynchronous function `_aggregate_results` that calls the `aggregate_results` function and clears the `processed_chunks` list. This function is executed within a `context.run` block, ensuring the aggregation task can run independently. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/allInOne.mdx#_snippet_7 LANGUAGE: python CODE: ``` if i % 10 == 9 or i == len(chunks) - 1: async def _aggregate_results() -> None: await aggregate_results(processed_chunks) processed_chunks.clear() await context.run(f"aggregate-results{i}", _aggregate_results) ``` ---------------------------------------- TITLE: Workflow Endpoint with Auth DESCRIPTION: This Python code presents a FastAPI endpoint using Upstash Workflow with authentication. The workflow verifies an 'authentication' header with a specific token, and only proceeds if the authentication is successful. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/fastapi.mdx#_snippet_9 LANGUAGE: python CODE: ``` from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @serve.post("/auth") async def auth(context: AsyncWorkflowContext[str]) -> None: if context.headers.get("authentication") != "Bearer secret_password": print("Authentication failed.") return async def _step1() -> str: return "output 1" await context.run("step1", _step1) async def _step2() -> str: return "output 2" await context.run("step2", _step2) ``` ---------------------------------------- TITLE: Creating Stripe Customer - TypeScript DESCRIPTION: This TypeScript code snippet demonstrates how to create a Stripe customer using the Stripe API within an Upstash Workflow. It assumes that a user object has been previously extracted. If a user object exists, it calls the Stripe API to create a customer with the user's email, name, and user ID. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/use-webhooks.mdx#_snippet_7 LANGUAGE: TypeScript CODE: ``` export const { POST } = serve(async (context) => { // ... Previous validation and user data extraction if (!user) { return; } const customer = await context.run("create-stripe-customer", async () => { return await stripe.customers.create({ email: user.email, name: `${user.firstName} ${user.lastName}`, metadata: { userId: user.userId, }, }); }); /// ... Additional steps }); ``` ---------------------------------------- TITLE: Notifying a Workflow with client.notify TypeScript DESCRIPTION: This snippet shows how to use the `client.notify` method to notify a waiting workflow that a specific event has occurred. It sends the event ID and event data to the workflow, allowing it to resume execution. Requires the `@upstash/workflow` package and a QSTASH_TOKEN. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/events.mdx#_snippet_1 LANGUAGE: typescript CODE: ``` import { Client } from "@upstash/workflow"; const client = new Client({ token: "<QSTASH_TOKEN>" }); await client.notify({ eventId: "event-id", eventData: { my: "data" }, }); ``` ---------------------------------------- TITLE: Image Resizing - Python DESCRIPTION: This Python snippet resizes the image to multiple resolutions using an external image processing service. It iterates through a list of resolutions and calls `context.call` for each resolution to trigger an external service call. The results are collected and processed to extract the image data. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/imageProcessing.mdx#_snippet_5 LANGUAGE: python CODE: ``` resolutions = [640, 1280, 1920] resize_responses = [] for resolution in resolutions: response: CallResponse[ImageResult] = await context.call( f"resize-image-{resolution}", # endpoint which returns ImageResult type in response url="https://image-processing-service.com/resize", method="POST", body={"imageUrl": image_url, "width": resolution}, ) resize_responses.append(response) resized_images = [response.body for response in resize_responses] ``` ---------------------------------------- TITLE: Publishing Message with QStash (TypeScript) DESCRIPTION: This snippet shows how to publish a message to a workflow endpoint using the QStash client in TypeScript. It initializes a QStash client and uses the `publishJSON` method to send a JSON payload to the specified URL. The function takes parameters such as the URL, body, headers, and the number of retries. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/start.mdx#_snippet_1 LANGUAGE: TypeScript CODE: ``` // Using the QStash client import { Client } from "@upstash/qstash"; const client = new Client({ token: "<QSTASH_TOKEN>" }); const { messageId } = await client.publishJSON({ url: "https://<YOUR_WORKFLOW_ENDPOINT>/<YOUR-WORKFLOW-ROUTE>", body: { hello: "there!" }, headers: { ... }, retries: 3 }); ``` ---------------------------------------- TITLE: Transforming Conditions to Explicit Steps - TypeScript DESCRIPTION: This TypeScript code provides the recommended pattern for handling non-deterministic conditions in Upstash Workflow. It transforms the condition into an explicit workflow step using `context.run`. This approach avoids the authentication failure that occurs when returning early without executing any steps. someCondition() is assumed to be a non-deterministic function. SOURCE: https://github.com/upstash/docs/blob/main/workflow/troubleshooting/general.mdx#_snippet_8 LANGUAGE: typescript CODE: ``` export const { POST } = serve(async (context) => { const shouldReturn = await context.run("check condition", () => someCondition()) if (shouldReturn) => { return; } // rest of the workflow }) ``` ---------------------------------------- TITLE: Periodic State Check and Email Sending - TypeScript DESCRIPTION: This TypeScript snippet implements a periodic state check within an infinite loop. It retrieves the user state using `getUserState`, and sends different emails depending on whether the user is 'active' or 'non-active'. The workflow sleeps for one month (60 * 60 * 24 * 30 seconds) after each state check. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/customerOnboarding.mdx#_snippet_6 LANGUAGE: typescript CODE: ``` while (true) { const state = await context.run("check-user-state", async () => { return await getUserState() }) if (state === "non-active") { await context.run("send-email-non-active", async () => { await sendEmail("Email to non-active users", email) }) } else if (state === "active") { await context.run("send-email-active", async () => { await sendEmail("Send newsletter to active users", email) }) } await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30) } ``` ---------------------------------------- TITLE: Send Reminder Email - Python DESCRIPTION: This Python snippet sends a reminder email after 7 days if the user hasn't solved any questions. It waits using `context.sleep`, retrieves user stats using `get_user_stats`, and then calls `send_problem_solved_email` to conditionally send the reminder based on the user's progress. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#_snippet_11 LANGUAGE: python CODE: ``` await context.sleep("wait", 7 * 24 * 60 * 60) async def _get_user_stats() -> UserStats: return await get_user_stats(userid) stats: UserStats = await context.run("get user stats", _get_user_stats) await send_problem_solved_email(context, email, stats) ``` ---------------------------------------- TITLE: Define Cloudflare Workers Workflow Endpoint DESCRIPTION: Defines a workflow endpoint using Cloudflare Workers. This code imports the `serve` function from the `@upstash/workflow/cloudflare` library and defines a simple workflow with two steps, demonstrating how to access the request payload and execute asynchronous tasks within the workflow. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/cloudflare-workers.mdx#_snippet_7 LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/cloudflare" interface Env { ENVIRONMENT: "development" | "production" } export default serve<{ text: string }>( async (context) => { const initialPayload = context.requestPayload.text const result = await context.run("initial-step", async () => { console.log(`Step 1 running with payload: ${initialPayload}`) return { text: "initial step ran" } } ) await context.run("second-step", async () => { console.log(`Step 2 running with result from step 1: ${result.text}`) }) } ) ``` ---------------------------------------- TITLE: Get Workflow Logs with Upstash Client (TS) DESCRIPTION: This snippet demonstrates how to retrieve workflow logs using the Upstash Workflow Client. It allows filtering by workflow run ID, count, state, workflow URL, workflow creation timestamp, and cursor. A QSTASH_TOKEN is required for authentication. All parameters are optional. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/client.mdx#_snippet_1 LANGUAGE: typescript CODE: ``` import { Client } from "@upstash/workflow"; const client = new Client({ token: "<QSTASH_TOKEN>" }) const { runs, cursor } = await client.logs({ // Id of the workflow run to get workflowRunId, // Number of workflows to get count, // Workflow state to filter for. // One of "RUN_STARTED", "RUN_SUCCESS", "RUN_FAILED", "RUN_CANCELED" state, // Workflow url to search for. should be an exact match workflowUrl, // Unix timestamp when the run was created workflowCreatedAt, // Cursor from a previous request to continue the search cursor }) ``` ---------------------------------------- TITLE: Creating a Weekly Summary Workflow (FastAPI) DESCRIPTION: This Python code defines a FastAPI endpoint to send weekly summary emails using Upstash Workflow. It fetches user data, generates a summary, and sends an email with the summary, leveraging the `upstash_workflow.fastapi` module. It utilizes `AsyncWorkflowContext` and asynchronous functions for each step. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/schedule.mdx#_snippet_5 LANGUAGE: python CODE: ``` from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext from utils import get_user_data, generate_summary, send_email app = FastAPI() serve = Serve(app) @dataclass class WeeklySummaryData: user_id: str @serve.post("/api/send-weekly-summary") async def send_weekly_summary(context: AsyncWorkflowContext[WeeklySummaryData]) -> None: user_id = context.request_payload.user_id # Step 1: Fetch user data async def _step1(): return await get_user_data(user_id) user = await context.run("fetch_user_data", _step1) # Step 2: Generate weekly summary async def _step2(): return await generate_summary(user_id) summary = await context.run("generate_summary", _step2) # Step 3: Send email with weekly summary async def _step3(): await send_email(user.email, "Your Weekly Summary", summary) await context.run("send_summary_email", _step3) ``` ---------------------------------------- TITLE: Single Agent Task DESCRIPTION: This code demonstrates how to create and run a task assigned to a single agent. It initializes the agent, defines a prompt, and then calls the `task.run()` method to execute the task and print the result to the console. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/features.mdx#_snippet_11 LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run"; export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-3.5-turbo'); const researcherAgent = context.agents.agent({ model, name: 'academic', maxSteps: 2, tools: { wikiTool: new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) }, background: 'You are researcher agent with access to Wikipedia. ' + 'Utilize Wikipedia as much as possible for correct information', }); const task = context.agents.task({ agent: researcherAgent, prompt: "Tell me about 5 topics in advanced physics.", }); const { text } = await task.run(); console.log("result:", text) }) ``` ---------------------------------------- TITLE: Minimal Workflow Endpoint with Hono DESCRIPTION: This TypeScript code defines a minimal workflow endpoint using Hono and the Upstash Workflow SDK. It creates a POST route at `/workflow` that serves a workflow with two steps, each logging a message to the console. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/hono.mdx#_snippet_7 LANGUAGE: typescript CODE: ``` import { Hono } from "hono" import { serve } from "@upstash/workflow/hono" const app = new Hono() app.post("/workflow", serve(async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) }) ) export default app ``` ---------------------------------------- TITLE: Notifying a Workflow with context.notify TypeScript DESCRIPTION: This snippet demonstrates how to use `context.notify` within a workflow to trigger another workflow's event listener. It takes a step name, event ID, and event data as parameters and returns a `notifyResponse`. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/events.mdx#_snippet_2 LANGUAGE: typescript CODE: ``` const { notifyResponse } = await context.notify( "notify step", // notify step name "event-Id", // event id { my: "data" } // event data ); ``` ---------------------------------------- TITLE: Early Return - TypeScript DESCRIPTION: This TypeScript code demonstrates the behavior of returning from a workflow function before any steps are executed when using Upstash Workflow. An early return without running any steps will result in an HTTP 400 status code and the error message 'Failed to authenticate Workflow request.', because the Workflow SDK interprets this scenario as an authentication failure. someCondition() is a placeholder for the authorization logic. SOURCE: https://github.com/upstash/docs/blob/main/workflow/troubleshooting/general.mdx#_snippet_6 LANGUAGE: typescript CODE: ``` export const { POST } = serve(async (context) => { if (someCondition()) => { return; } // rest of the workflow }) ``` ---------------------------------------- TITLE: Bulk Cancel Workflows with Python Requests DESCRIPTION: Cancels workflow runs using the Python requests library. Sets the Authorization and Content-Type headers, and provides a dictionary with workflowRunIds as the request body. SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/bulk-cancel.mdx#_snippet_4 LANGUAGE: Python CODE: ``` import requests headers = { 'Authorization': 'Bearer <token>', 'Content-Type': 'application/json', } data = { "workflowRunIds": [ "run_id_1", "run_id_2", "run_id_3" ] } response = requests.delete( 'https://qstash.upstash.io/v2/workflows/runs', headers=headers, data=data ) ``` ---------------------------------------- TITLE: Suspend User: Python DESCRIPTION: This Python snippet checks if a user is suspended, suspends the user if not already suspended, and sends an email notification. It uses asynchronous functions and depends on `check_suspension`, `suspend_user`, and `send_email` functions. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#_snippet_11 LANGUAGE: Python CODE: ``` async def _check_suspension() -> bool: return await check_suspension(email) is_suspended = await context.run("check suspension", _check_suspension) if not is_suspended: async def _suspend_user() -> None: await suspend_user(email) await context.run("suspend user", _suspend_user) async def _send_suspended_email() -> None: await send_email( email, "Your account has been suspended due to payment failure. Please update your payment method.", ) await context.run("send suspended email", _send_suspended_email) ``` ---------------------------------------- TITLE: Initialize Workflow Endpoint with TypeScript DESCRIPTION: Defines a workflow endpoint using the `serve` method from `@upstash/workflow/nextjs`. It exports a POST handler that executes two steps using the provided context object. Each step contains a placeholder for business logic. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#_snippet_0 LANGUAGE: TypeScript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const result = await context.run("step-1", async () => { // define a piece of business logic as step 1 }); await context.run("step-2", async () => { // define another piece of business logic as step 2 }); }); ``` ---------------------------------------- TITLE: Notify Workflow with Workflow SDK - Javascript DESCRIPTION: Uses the Upstash Workflow SDK to notify a workflow. It initializes the client with the QSTASH_TOKEN and then calls the notify method with the event ID and event data. SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/notify.mdx#_snippet_2 LANGUAGE: js Workflow SDK CODE: ``` import { Client } from "@upstash/workflow"; const client = new Client({ token: "<QSTASH_TOKEN>" }) await client.notify({ eventId: "my-event-id", eventData: "Hello World!" }); ``` ---------------------------------------- TITLE: Configuring Workflow Options in Typescript DESCRIPTION: This snippet shows how to configure options for both `createWorkflow` and `serveMany`. Options specified in `createWorkflow` take precedence. Options defined in `createWorkflow` such as `retries`, `failureFunction`, `failureUrl`, and `flowControl` will be applied in invocation requests. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/invoke.mdx#_snippet_3 LANGUAGE: typescript CODE: ``` const workflowOne = createWorkflow( async (context) => { // ... }, { retries: 0 } ) export const { POST } = serveMany( { workflowOne }, { failureUrl: "https://some-url" } ) ``` ---------------------------------------- TITLE: Multi-Agent Workflow with Upstash and Langchain (TS) DESCRIPTION: This code defines a Next.js API endpoint that utilizes Upstash Workflow to create a multi-agent system. It includes a researcher agent using Langchain's Wikipedia tool and a math agent that evaluates mathematical expressions. The main task prompts the agents to research Japanese cities and calculate the sum of their populations. It uses the `@upstash/workflow/nextjs`, `@langchain/community/tools/wikipedia_query_run`, `mathjs`, `ai`, and `zod` packages. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/features.mdx#_snippet_12 LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run"; import * as mathjs from 'mathjs' import { tool } from "ai"; import { z } from "zod"; export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-4o'); const researcherAgent = context.agents.agent({ model, name: 'academic', maxSteps: 2, tools: { wikiTool: new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) }, background: 'You are researcher agent with access to Wikipedia. ' + 'Utilize Wikipedia as much as possible for correct information', }); const mathAgent = context.agents.agent({ model, name: "mathematician", maxSteps: 2, tools: { calculate: tool({ description: 'A tool for evaluating mathematical expressions. ' + 'Example expressions: ' + "'1.2 * (2 + 4.5)', '12.7 cm to inch', 'sin(45 deg) ^ 2'." + "only call this tool if you need to calculate a mathematical expression." + "when writing an expression, don't use words like 'thousand' or 'million'", parameters: z.object({ expression: z.string() }), execute: async ({ expression }) => mathjs.evaluate(expression), }), }, background: "You are a mathematician agent which can utilize" + "a calculator to compute expressions" }) const task = context.agents.task({ model, maxSteps: 3, agents: [researcherAgent, mathAgent], prompt: "Tell me about 3 cities in Japan and calculate the sum of their populations", }); const { text } = await task.run(); console.log("result:", text) }) ``` ---------------------------------------- TITLE: Trigger workflow endpoint DESCRIPTION: Sends a POST request to the local workflow endpoint using `curl`. This triggers the workflow defined in the `server/api/workflow.ts` file. A successful request returns a JSON object containing a `workflowRunId`. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/nuxt.mdx#_snippet_7 LANGUAGE: bash CODE: ``` curl -X POST https://localhost:3000/api/workflow # result: {"workflowRunId":"wfr_xxxxxx"} ``` ---------------------------------------- TITLE: Cancel Workflow Run with Workflow SDK DESCRIPTION: Cancels a workflow run using the Upstash Workflow SDK. Requires a QStash token and a workflow run ID. SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/cancel.mdx#_snippet_1 LANGUAGE: js Workflow SDK CODE: ``` import { Client } from "@upstash/workflow"; const client = new Client({ token: "<QSTASH_TOKEN>" }) await client.cancel({ workflowRunId: "<WORKFLOW_RUN_ID>" }) ``` ---------------------------------------- TITLE: Webhook Request Validation Function - TypeScript DESCRIPTION: This TypeScript code snippet provides a function, `validateRequest`, to validate webhook requests using the `svix` library. It retrieves the `svix-id`, `svix-timestamp`, and `svix-signature` headers, initializes a `Webhook` object with a secret, and then verifies the payload against the headers. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/use-webhooks.mdx#_snippet_4 LANGUAGE: TypeScript CODE: ``` import { Webhook } from "svix"; import { WebhookEvent } from "@clerk/nextjs/server"; const webhookSecret = "YOUR_WEBHOOK_SECRET"; async function validateRequest(payloadString: string, headerPayload: Headers) { const svixHeaders = { "svix-id": headerPayload.get("svix-id") as string, "svix-timestamp": headerPayload.get("svix-timestamp") as string, "svix-signature": headerPayload.get("svix-signature") as string, }; const wh = new Webhook(webhookSecret); return wh.verify(payloadString, svixHeaders) as WebhookEvent; } ``` ---------------------------------------- TITLE: Ensure Idempotency in context.run - TypeScript DESCRIPTION: This TypeScript code snippet emphasizes the importance of idempotency within `context.run` in Upstash Workflow. The `someWork(input)` function must be idempotent, ensuring that running the workflow multiple times with the same input results in the same end state. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#_snippet_14 LANGUAGE: typescript CODE: ``` export const { POST } = serve<string>(async (context) => { const input = context.requestPayload await context.run("step-1", async () => { return someWork(input) }) }) ``` ---------------------------------------- TITLE: Avoid Non-Idempotent Functions - Python DESCRIPTION: This Python code shows an incorrect implementation where a non-idempotent function, `get_result_from_db(entry_id)`, is called outside `context.run`. This can cause inconsistent behavior during workflow retries or multiple invocations of the workflow endpoint. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#_snippet_9 LANGUAGE: python CODE: ``` @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: entry_id = context.request_payload["entry_id"] # 👇 Problem: Non-idempotent function outside context.run: result = await get_result_from_db(entry_id) if result.should_return: return # ... ``` ---------------------------------------- TITLE: Calling OpenAI Compatible Provider DESCRIPTION: This snippet illustrates how to call an OpenAI-compatible provider (e.g., Deepseek) using the `context.api.openai.call` method. It sets the `baseURL` parameter to the provider's API endpoint and provides the API key as a token. SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/openai.mdx#_snippet_2 LANGUAGE: typescript CODE: ``` const { status, body } = await context.api.openai.call( "Call Deepseek", { baseURL: "https://api.deepseek.com", token: process.env.DEEPSEEK_API_KEY, operation: "chat.completions.create", body: { model: "gpt-4o", messages: [ { role: "system", content: "Assistant says 'hello!'", }, { role: "user", content: "User shouts back 'hi!'" } ], }, } ); ``` ---------------------------------------- TITLE: Run Non-Idempotent Functions in context.run - Python DESCRIPTION: This Python code shows the correct way to call non-idempotent function. The get_result_from_db is called inside `context.run` to prevent `Failed to authenticate Workflow request`. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#_snippet_13 LANGUAGE: python CODE: ``` async def _get_result_from_db(): return await get_result_from_db(entry_id) result = await context.run("get-result-from-db", _get_result_from_db) if result.should_return: return ``` ---------------------------------------- TITLE: QStash Client Initialization in TypeScript DESCRIPTION: Illustrates the default initialization of a QStash Client in TypeScript, using environment variables for the base URL and token. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#_snippet_16 LANGUAGE: TypeScript CODE: ``` new Client({ baseUrl: process.env.QSTASH_URL!, token: process.env.QSTASH_TOKEN!, }); ``` ---------------------------------------- TITLE: Fetching Workflow Logs with Workflow JS SDK DESCRIPTION: This snippet demonstrates how to retrieve workflow logs using the Upstash Workflow JS SDK. It initializes a client with a token and then uses the `logs` method to fetch logs, optionally filtering by workflow run ID, workflow URL or state. SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/logs.mdx#_snippet_1 LANGUAGE: js CODE: ``` import { Client } from "@upstash/workflow"; const client = new Client({ token: "<QSTASH_TOKEN>" }); // Filter by workflow run ID const { runs } = await client.logs({ workflowRunId: "<WORKFLOW_RUN_ID>"}); // Filter by workflow server url const { runs } = await client.logs({ workflowUrl: "<WORKFLOW_URL>"}); // Filter by state const { runs } = await client.logs({ state: "RUN_SUCCESS"}); ``` ---------------------------------------- TITLE: Run New User Signup Task - Python DESCRIPTION: This Python snippet defines and executes the 'new-signup' task within the Upstash workflow context. It defines an asynchronous function `_new_signup` that calls the `send_email` function to send a welcome email to the user. It is then executed using `context.run`. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/customerOnboarding.mdx#_snippet_3 LANGUAGE: python CODE: ``` async def _new_signup() -> None: await send_email("Welcome to the platform", email) await context.run("new-signup", _new_signup) ``` ---------------------------------------- TITLE: Parallel Agent Execution with Upstash Workflow (Next.js) DESCRIPTION: This code snippet demonstrates how to run multiple Upstash agents in parallel using `Promise.all` within a Next.js environment. It defines three worker agents, each with a specific expertise, and an aggregator agent to summarize their results. The agents leverage the OpenAI GPT-3.5 Turbo model. It uses the `serve` function to create a Next.js API endpoint. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/patterns/parallelization.mdx#_snippet_0 LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-3.5-turbo'); // Define worker agents const worker1 = context.agents.agent({ model, name: 'worker1', maxSteps: 1, background: 'You are an agent that explains quantum physics.', tools: {} }); const worker2 = context.agents.agent({ model, name: 'worker2', maxSteps: 1, background: 'You are an agent that explains relativity.', tools: {} }); const worker3 = context.agents.agent({ model, name: 'worker3', maxSteps: 1, background: 'You are an agent that explains string theory.', tools: {} }); // Await results const [result1, result2, result3] = await Promise.all([ context.agents.task({ agent: worker1, prompt: "Explain quantum physics." }).run(), context.agents.task({ agent: worker2, prompt: "Explain relativity." }).run(), context.agents.task({ agent: worker3, prompt: "Explain string theory." }).run(), ]); // Aggregating results const aggregator = context.agents.agent({ model, name: 'aggregator', maxSteps: 1, background: 'You are an agent that summarizes multiple answers.', tools: {} }); const task = await context.agents.task({ agent: aggregator, prompt: `Summarize these three explanations: ${result1.text}, ${result2.text}, ${result3.text}` }) const finalSummary = await task.run(); console.log(finalSummary.text); }); ``` ---------------------------------------- TITLE: Charging Customer (Python) DESCRIPTION: This Python code attempts to charge a customer using the `charge_customer` function within an Upstash Workflow context. It defines an asynchronous function `_charge_customer` that includes a try-except block to handle potential exceptions during the charging process, returning `None` if charging fails. The attempt number is passed to the `charge_customer` function. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#_snippet_3 LANGUAGE: python CODE: ``` async def _charge_customer() -> Optional[ChargeResult]: try: return await charge_customer(i + 1) except Exception as e: print(f"Error: {e}") return None result = await context.run("charge customer", _charge_customer) ``` ---------------------------------------- TITLE: Notify Workflow with Fetch API - Javascript DESCRIPTION: Sends a POST request to the Qstash notify endpoint using the fetch API in Node.js. The request includes the event ID in the URL, authorization header, and event data in the request body. SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/notify.mdx#_snippet_3 LANGUAGE: js Node CODE: ``` const response = await fetch('https://qstash.upstash.io/v2/notify/myEvent', { method: 'POST', body: "Hello world!", headers: { 'Authorization': 'Bearer <token>' } }); ``` ---------------------------------------- TITLE: Bulk Cancel Workflows with Upstash Workflow SDK DESCRIPTION: Cancels workflow runs using the Upstash Workflow SDK. Demonstrates cancelling by IDs, URL prefix, and all workflows. SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/bulk-cancel.mdx#_snippet_2 LANGUAGE: JavaScript CODE: ``` import { Client } from "@upstash/workflow"; // cancel a set of workflow runs await client.cancel({ ids: [ "<WORKFLOW_RUN_ID_1>", "<WORKFLOW_RUN_ID_2>", ]}) // cancel workflows starting with a url await client.cancel({ urlStartingWith: "https://your-endpoint.com" }) // cancel all workflows await client.cancel({ all: true }) ``` ---------------------------------------- TITLE: Complete Workflow Example with Parallel Inventory Checks in TypeScript DESCRIPTION: This is a complete workflow example in a Next.js application demonstrating how to use parallel runs. It fetches the availability of coffee beans, cups, and milk concurrently using `Promise.all` and the `ctx.run` function. If all ingredients are available, it proceeds to brew coffee and print a receipt. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/parallel-runs.mdx#_snippet_1 LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { checkInventory, brewCoffee, printReceipt } from "@/utils"; export const { POST } = serve(async (ctx) => { const [coffeeBeansAvailable, cupsAvailable, milkAvailable] = await Promise.all([ ctx.run("check-coffee-beans", () => checkInventory("coffee-beans")), ctx.run("check-cups", () => checkInventory("cups")), ctx.run("check-milk", () => checkInventory("milk")), ]); // If all ingedients available, brew coffee if (coffeeBeansAvailable && cupsAvailable && milkAvailable) { const price = await ctx.run("brew-coffee", async () => { return await brewCoffee({ style: "cappuccino" }); }); await printReceipt(price); } }); ``` ---------------------------------------- TITLE: Conditional Email Based on Solved Problems - Python DESCRIPTION: This Python function `send_problem_solved_email` conditionally sends an email based on the number of problems the user has solved. If the user hasn't solved any problems, a 'no answers' email is sent; otherwise, a stats email is sent indicating the number of problems solved in the last 7 days. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#_snippet_13 LANGUAGE: python CODE: ``` async def send_problem_solved_email( context: AsyncWorkflowContext[UserCreatedPayload], email: str, stats: UserStats ) -> None: if stats["total_problems_solved"] == 0: async def _send_no_answers_email() -> None: await send_email( email, "Hey, you haven't solved any questions in the last 7 days..." ) await context.run("send no answers email", _send_no_answers_email) else: async def _send_stats_email() -> None: await send_email( email, f"You have solved {stats['total_problems_solved']} problems in the last 7 days. Keep it up!", ) await context.run("send stats email", _send_stats_email) ``` ---------------------------------------- TITLE: Set Retries in Python DESCRIPTION: Shows how to set the number of retries for a workflow endpoint using the `retries` parameter in the `@serve.post` decorator. This specifies how many times QStash will call the endpoint in case of errors. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#_snippet_5 LANGUAGE: Python CODE: ``` @serve.post("/api/example", retries=3) async def example(context: AsyncWorkflowContext[str]) -> None: ... ``` ---------------------------------------- TITLE: Workflow Endpoint with Request Object (Next.js Pages Router) DESCRIPTION: This snippet demonstrates how to access the Next.js request and response objects within a workflow endpoint using the Pages Router. It imports `NextApiRequest` and `NextApiResponse` and uses them in the handler function. Requires `@upstash/workflow/nextjs` and `next` types. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/vercel-nextjs.mdx#_snippet_10 LANGUAGE: typescript CODE: ``` import type { NextApiRequest, NextApiResponse } from "next"; import { servePagesRouter } from "@upstash/workflow/nextjs"; export default async function handler( req: NextApiRequest, res: NextApiResponse ) { // do something with the native request object const { handler } = servePagesRouter( async (context) => { // Your workflow steps } ) await handler(req, res) } ``` ---------------------------------------- TITLE: Send Invoice Email (Python) DESCRIPTION: This Python code sends an invoice email to the user after a successful payment using the `send_email` function within an Upstash Workflow context. An asynchronous function `_send_invoice_email` encapsulates the call to the underlying function. The email content includes the invoice ID and total cost retrieved from the payment result. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#_snippet_9 LANGUAGE: python CODE: ``` async def _send_invoice_email() -> None: await send_email( email, f"Payment successful. Invoice: {result.invoice_id}, Total cost: ${result.total_cost}", ) await context.run("send invoice email", _send_invoice_email) ``` ---------------------------------------- TITLE: WorkflowTool with executeAsStep DESCRIPTION: This example illustrates how to define a `WorkflowTool` and disable automatic step wrapping using the `executeAsStep` option. This allows using workflow context methods such as `context.call` inside the tool's invoke function. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/features.mdx#_snippet_9 LANGUAGE: typescript CODE: ``` import { WorkflowTool } from '@upstash/workflow' import { serve } from '@upstash/workflow/nextjs' export const { POST } = serve(async (context) => { const tool = new WorkflowTool({ description: ..., schema: ..., invoke: ( ... ) => { // make HTTP call inside the tool with context.call: await context.call( ... ) }, executeAsStep: false }) // pass the tool to agent }) ``` ---------------------------------------- TITLE: LangChain (custom) DESCRIPTION: This code defines a custom tool using `DynamicStructuredTool` from `@langchain/core/tools`. This tool generates a random number between two given numbers (low and high), defined by a Zod schema. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/features.mdx#_snippet_7 LANGUAGE: typescript CODE: ``` import { DynamicStructuredTool } from "@langchain/core/tools"; const numberGenerator = new DynamicStructuredTool({ name: "random-number-generator", description: "generates a random number between two input numbers", schema: z.object({ low: z.number().describe("The lower bound of the generated number"), high: z.number().describe("The upper bound of the generated number"), }), func: async ({ low, high }) => (Math.random() * (high - low) + low).toString(), // Outputs still must be strings }) ``` ---------------------------------------- TITLE: Minimal Workflow Endpoint Example (Next.js App Router) DESCRIPTION: This is a minimal example of a workflow endpoint using Next.js App Router. It defines a POST request handler that executes two steps in a workflow. Requires `@upstash/workflow/nextjs`. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/vercel-nextjs.mdx#_snippet_7 LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs" export const { POST } = serve( async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) } ) ``` ---------------------------------------- TITLE: Trigger Workflow Endpoint with curl DESCRIPTION: This command sends a POST request to the `/api/workflow` endpoint of the FastAPI application using curl. This triggers the workflow defined at that endpoint, initiating its execution. The endpoint will return a JSON response containing the unique workflow run ID. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/fastapi.mdx#_snippet_12 LANGUAGE: bash CODE: ``` curl -X POST https://localhost:8000/api/workflow ``` ---------------------------------------- TITLE: Workflow with Authentication in Flask DESCRIPTION: Defines a workflow endpoint in a Flask application using Upstash Workflow that implements authentication. The function checks for a specific authentication header before running the workflow steps. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/flask.mdx#_snippet_9 LANGUAGE: python CODE: ``` from flask import Flask from upstash_workflow.flask import Serve from upstash_workflow import WorkflowContext app = Flask(__name__) serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @serve.route("/auth") def auth(context: WorkflowContext[str]) -> None: if context.headers.get("Authentication") != "Bearer secret_password": print("Authentication failed.") return def _step1() -> str: return "output 1" context.run("step1", _step1) def _step2() -> str: return "output 2" context.run("step2", _step2) ``` ---------------------------------------- TITLE: Image Filtering - Python DESCRIPTION: This Python snippet applies various filters to the resized images. It iterates through a list of filters and resized images, calling `context.call` for each combination. The results are collected and processed to extract the image data. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/imageProcessing.mdx#_snippet_7 LANGUAGE: python CODE: ``` filters = ["grayscale", "sepia", "contrast"] filter_responses = [] for resized_image in resized_images: for filter in filters: response: CallResponse[ImageResult] = await context.call( f"apply-filter-{filter}", # endpoint which returns ImageResult type in response url="https://image-processing-service.com/filter", method="POST", body={"imageUrl": resized_image["imageUrl"], "filter": filter}, ) filter_responses.append(response) processed_images = [response.body for response in filter_responses] ``` ---------------------------------------- TITLE: Payment Retry Loop (TypeScript) DESCRIPTION: This TypeScript code implements a loop that attempts to charge a customer up to 3 times. If the payment fails, it waits for 24 hours before retrying. If the payment succeeds, it executes logic to unsuspend the user, send an invoice email, and then ends the workflow. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#_snippet_4 LANGUAGE: typescript CODE: ``` for (let i = 0; i < 3; i++) { // attempt to charge the customer if (!result) { // Wait for a day await context.sleep("wait for retry", 24 * 60 * 60); } else { // Payment succeeded // Unsuspend user, send invoice email // end the workflow: return; } } ``` ---------------------------------------- TITLE: Overriding OpenAI API Types DESCRIPTION: This snippet shows how to override the default request and response types when calling the OpenAI API using `context.api.openai.call`. It defines custom types `ResponseBodyType` and `RequestBodyType` and passes them as generic type parameters to the call method. SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/openai.mdx#_snippet_1 LANGUAGE: typescript CODE: ``` type ResponseBodyType = { ... }; // Define your response body type type RequestBodyType = { ... }; // Define your request body type const { status, body } = await context.api.openai.call< ResponseBodyType, RequestBodyType >( "Call OpenAI", { ... } ); ``` ---------------------------------------- TITLE: Serve User Requests with Upstash Workflow DESCRIPTION: This JavaScript code snippet defines a Next.js route using `@upstash/workflow/nextjs` to handle user requests. It uses the `serve` function to create an endpoint that processes a `UserRequest` payload, sleeps for 10 milliseconds, runs two asynchronous tasks (`retrieveEmail` and `fetchFromLLm`) in parallel, and waits for both tasks to complete. The retrieveEmail and fetchFromLLm functions are imported from a local util file. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/monitor.mdx#_snippet_0 LANGUAGE: javascript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { retrieveEmail, fetchFromLLm, UserRequest} from "../../../lib/util"; export const { POST } = serve<UserRequest>( async (context) => { const input = context.requestPayload; await context.sleep("sleep", 10); const p1 = context.run("retrieveEmail", async () => { return retrieveEmail(input.id); }); const p2 = context.run("askllm", async () => { return fetchFromLLm(input.question); }); await Promise.all([p1, p2]) }, ); ``` ---------------------------------------- TITLE: Minimal Workflow Endpoint (TypeScript) DESCRIPTION: Defines a minimal workflow endpoint using the Upstash Workflow SDK in an Astro project. This endpoint defines a workflow with two steps and returns a POST handler. The `env` configures environment variables for local development or deployment. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/astro.mdx#_snippet_5 LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/astro"; export const { POST } = serve<string>(async (context) => { const result1 = await context.run("initial-step", () => { console.log("initial step ran") return "hello world!" }) await context.run("second-step", () => { console.log(`second step ran with value ${result1}`) }) }, { // env must be passed in astro. // for local dev, we need import.meta.env. // For deployment, we need process.env: env: { ...process.env, ...import.meta.env } }) ``` ---------------------------------------- TITLE: Image Resizing - TypeScript DESCRIPTION: This TypeScript snippet resizes the image to multiple resolutions using an external image processing service. It iterates through a list of resolutions and calls `context.call` for each resolution to trigger an external service call. `Promise.all` is used to run the resizing operations concurrently. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/imageProcessing.mdx#_snippet_4 LANGUAGE: typescript CODE: ``` const resolutions = [640, 1280, 1920] const resizedImages: { body: ImageResult }[] = await Promise.all(resolutions.map( resolution => context.call<ImageResult>( `resize-image-${resolution}`, { // endpoint which returns ImageResult type in response url: "https://image-processing-service.com/resize", method: "POST", body: { imageUrl, width: resolution, } } ) )) ``` ---------------------------------------- TITLE: Set Failure Function in TypeScript DESCRIPTION: Shows how to define a `failureFunction` option within the `serve` method. This function is executed if the workflow fails after all retries. If both `failureUrl` and `failureFunction` are defined, the function takes precedence. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#_snippet_3 LANGUAGE: TypeScript CODE: ``` export const { POST } = serve<string>( async (context) => { ... }, { failureFunction: async ({ context, // context during failure failStatus, // failure status failResponse, // failure message failHeaders // failure headers }) => { // handle the failure } } ); ``` ---------------------------------------- TITLE: Scheduling a Workflow with Upstash Workflow (FastAPI) DESCRIPTION: This Python code defines a FastAPI endpoint that uses Upstash Workflow to create and upload a backup. It initializes the `Serve` class from `upstash_workflow.fastapi` to handle workflow execution and uses the `AsyncWorkflowContext` to manage the workflow steps. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/schedule.mdx#_snippet_1 LANGUAGE: python CODE: ``` from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext from utils import create_backup, upload_backup app = FastAPI() serve = Serve(app) @serve.post("/api/workflow") async def workflow(context: AsyncWorkflowContext[str]) -> None: async def _step1(): return await create_backup() backup = await context.run("create_backup", _step1) async def _step2(): await upload_backup(backup) await context.run("upload_backup", _step2) ``` ---------------------------------------- TITLE: Generating Text with Tools and OpenAI Client DESCRIPTION: Creates a Next.js API route with Upstash Workflow that uses the OpenAI client and Vercel AI SDK tools to generate text. Includes error handling for `ToolExecutionError` and `WorkflowAbort` exceptions. Each tool execution is wrapped in a workflow step. SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/aisdk.mdx#_snippet_6 LANGUAGE: typescript CODE: ``` import { z } from 'zod'; import { serve } from "@upstash/workflow/nextjs"; import { WorkflowAbort } from '@upstash/workflow'; import { generateText, ToolExecutionError, tool } from 'ai'; import { createWorkflowOpenAI } from './model'; export const { POST } = serve<{ prompt: string }>(async (context) => { const openai = createWorkflowOpenAI(context); const prompt = await context.run("get prompt", async () => { return context.requestPayload.prompt; }); try { const result = await generateText({ model: openai('gpt-3.5-turbo'), tools: { weather: tool({ description: 'Get the weather in a location', parameters: z.object({ location: z.string().describe('The location to get the weather for'), }), execute: ({ location }) => context.run("weather tool", () => { // Mock data, replace with actual weather API call return { location, temperature: 72 + Math.floor(Math.random() * 21) - 10, }; }) }), }, maxSteps: 2, prompt, }); await context.run("text", () => { console.log(`TEXT: ${result.text}`); return result.text; }); } catch (error) { if (error instanceof ToolExecutionError && error.cause instanceof WorkflowAbort) { throw error.cause; } else { throw error; } } }); ``` ---------------------------------------- TITLE: Install @upstash/workflow with npm DESCRIPTION: Installs the @upstash/workflow package using npm. This is the first step in migrating to the new SDK. SOURCE: https://github.com/upstash/docs/blob/main/workflow/migration.mdx#_snippet_0 LANGUAGE: bash CODE: ``` npm install @upstash/workflow ``` ---------------------------------------- TITLE: Override URL in Python DESCRIPTION: Demonstrates how to override the URL inferred from `request.url` using the `url` parameter in the `@serve.post` decorator in Python. This is useful when using proxies or local tunnels. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#_snippet_11 LANGUAGE: Python CODE: ``` @serve.post("/api/example", url="https://<YOUR-DEPLOYED-APP>.com/api/workflow") async def example(context: AsyncWorkflowContext[str]) -> None: ... ``` ---------------------------------------- TITLE: Defining a Workflow Endpoint in SvelteKit DESCRIPTION: This TypeScript code defines a workflow endpoint using the Upstash Workflow SDK for SvelteKit. It imports necessary modules, defines steps using `context.run`, and exports a POST handler to expose the workflow as an API endpoint. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/svelte.mdx#_snippet_7 LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/svelte" import { env } from "$env/dynamic/private" export const { POST } = serve( async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) }, { env } ) ``` ---------------------------------------- TITLE: Secure Workflow with QStash Receiver (Python) DESCRIPTION: This Python code snippet demonstrates how to integrate QStash's `Receiver` with an Upstash Workflow using the `@serve.post` decorator. It imports `Receiver` from `qstash` and configures it with the current and next signing keys from environment variables. The decorator registers the `example` function as a handler for POST requests to the `/api/example` endpoint, requiring valid QStash signatures for all requests. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/security.mdx#_snippet_3 LANGUAGE: python CODE: ``` from qstash import Receiver @serve.post( "/api/example", receiver=Receiver( current_signing_key=os.environ["QSTASH_CURRENT_SIGNING_KEY"], next_signing_key=os.environ["QSTASH_NEXT_SIGNING_KEY"], ), ) async def example(context: AsyncWorkflowContext[str]) -> None: ... ``` ---------------------------------------- TITLE: Minimal Workflow Endpoint Example (Next.js Pages Router) DESCRIPTION: This is a minimal example of a workflow endpoint using Next.js Pages Router. It defines a handler function that executes two steps in a workflow. Requires `@upstash/workflow/nextjs`. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/vercel-nextjs.mdx#_snippet_9 LANGUAGE: typescript CODE: ``` import { servePagesRouter } from "@upstash/workflow/nextjs"; const { handler } = servePagesRouter<string>( async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) } ) export default handler; ``` ---------------------------------------- TITLE: Run FastAPI Application DESCRIPTION: This command starts the FastAPI application using Uvicorn, enabling hot-reloading. The `--reload` flag ensures that the server automatically restarts whenever changes are made to the code. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/fastapi.mdx#_snippet_11 LANGUAGE: bash CODE: ``` uvicorn main:app --reload ``` ---------------------------------------- TITLE: Correct Result Handling - Python DESCRIPTION: This Python code demonstrates the correct way to handle results within Upstash Workflow. The return value from `some_work(input)` is directly returned from `_step_1` and assigned to `result` using `result = await context.run("step-1", _step_1)`. This ensures that `result` is initialized properly before being used in `_step_2`. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#_snippet_5 LANGUAGE: python CODE: ``` @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step_1() -> Dict: return await some_work(input) result = await context.run("step-1", _step_1) async def _step_2() -> None: await some_other_work(result) await context.run("step-2", _step_2) ``` ---------------------------------------- TITLE: Calling OpenAI Chat Completion API DESCRIPTION: This snippet demonstrates how to call the OpenAI chat completion API using `context.api.openai.call`. It sets the operation, OpenAI API key, model, and message content. The response status and body are returned. SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/openai.mdx#_snippet_0 LANGUAGE: typescript CODE: ``` const { status, body } = await context.api.openai.call( "Call OpenAI", { token: "<OPENAI_API_KEY>", operation: "chat.completions.create", body: { model: "gpt-4o", messages: [ { role: "system", content: "Assistant says 'hello!'", }, { role: "user", content: "User shouts back 'hi!'" } ], }, } ); // get text: console.log(body.content[0].text) ``` ---------------------------------------- TITLE: Customize Anthropic API Request/Response Types (TypeScript) DESCRIPTION: This code snippet shows how to override the predefined request and response body types when calling the Anthropic API. It involves defining custom TypeScript types for `ResponseBodyType` and `RequestBodyType` and passing them as generic type arguments to the `context.api.anthropic.call` method. SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/anthropic.mdx#_snippet_1 LANGUAGE: typescript CODE: ``` type ResponseBodyType = { ... }; // Define your response body type type RequestBodyType = { ... }; // Define your request body type const { status, body } = await context.api.anthropic.call< ResponseBodyType, RequestBodyType >( "Call Anthropic", { ... } ); ``` ---------------------------------------- TITLE: Configuring a Failure URL with TypeScript DESCRIPTION: This snippet illustrates how to set up a `failureUrl` within the `serve` method. The `failureUrl` is used to send failure notifications to a designated endpoint when the workflow URL is unreachable. This allows for external monitoring and handling of workflow failures. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/failures.mdx#_snippet_1 LANGUAGE: typescript CODE: ``` export const { POST } = serve<string>( async (context) => { // Your workflow logic... }, { failureUrl: "https://<YOUR_FAILURE_URL>/workflow-failure", } ); ``` ---------------------------------------- TITLE: Install Dependencies DESCRIPTION: This snippet installs the necessary dependencies for the project: FastAPI, Uvicorn (an ASGI server), and the Upstash Workflow SDK. These packages are required to create and run the FastAPI application with Upstash Workflow integration. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/fastapi.mdx#_snippet_1 LANGUAGE: bash CODE: ``` pip install fastapi uvicorn upstash-workflow ``` ---------------------------------------- TITLE: Image Filtering - TypeScript DESCRIPTION: This TypeScript snippet applies various filters to the resized images. It iterates through a list of filters and resized images, calling `context.call` for each combination. `Promise.all` is used to run the filtering operations concurrently. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/imageProcessing.mdx#_snippet_6 LANGUAGE: typescript CODE: ``` const filters = ["grayscale", "sepia", "contrast"] const processedImagePromises: Promise<string>[] = [] for (const resizedImage of resizedImages) { for (const filter of filters) { const processedImagePromise = context.call<ImageResult>( `apply-filter-${filter}`, { // endpoint which returns ImageResult type in response url: "https://image-processing-service.com/filter", method: "POST", body: { imageUrl: resizedImage.body.imageUrl, filter, } } ) processedImagePromises.push(processedImagePromise) } } const processedImages: { body: ImageResult }[] = await Promise.all(processedImagePromises) ``` ---------------------------------------- TITLE: Customizing Resend API Request/Response Types in Upstash Workflow (TypeScript) DESCRIPTION: This code snippet demonstrates how to override the default request and response types when calling the Resend API using `context.api.resend.call` within an Upstash Workflow. This allows for greater flexibility when dealing with non-standard Resend API responses. SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/resend.mdx#_snippet_2 LANGUAGE: typescript CODE: ``` type IsBatch = true; // Set to either true or false type ResponseBodyType = { ... }; // Define your response body type type RequestBodyType = { ... }; // Define your request body type const { status, body } = await context.api.resend.call< IsBatch, ResponseBodyType, RequestBodyType >( "Call Resend", { ... } ); ``` ---------------------------------------- TITLE: Install @upstash/workflow with bun DESCRIPTION: Installs the @upstash/workflow package using bun. This is the first step in migrating to the new SDK. SOURCE: https://github.com/upstash/docs/blob/main/workflow/migration.mdx#_snippet_2 LANGUAGE: bash CODE: ``` bun add @upstash/workflow ``` ---------------------------------------- TITLE: Workflow Run ID Example DESCRIPTION: This shows the expected output from triggering the workflow endpoint. The `workflowRunId` is a unique identifier that can be used to track the workflow execution. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/fastapi.mdx#_snippet_13 LANGUAGE: bash CODE: ``` # result: {"workflowRunId":"wfr_xxxxxx"} ``` ---------------------------------------- TITLE: Limiting Workflow Environment in trigger method (JS) DESCRIPTION: This code snippet demonstrates how to limit the execution environment by configuring the `trigger` method using `flowControl`. It uses the Upstash Workflow Client to trigger a workflow with specified rate per second and parallelism limits. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/flow-control.mdx#_snippet_1 LANGUAGE: javascript CODE: ``` import { Client } from "@upstash/workflow"; const client = new Client({ token: "<QStash_TOKEN>" }); const { workflowRunId } = await client.trigger({ url: "https://workflow-endpoint.com", body: "hello there!", flowControl: { key: "app1", parallelism: 3, ratePerSecond: 10 } }); ``` ---------------------------------------- TITLE: Use Initial Payload Parser in TypeScript DESCRIPTION: Illustrates how to define and use an `initialPayloadParser` to process the initial request's payload. The parser transforms the raw initial payload into a structured object. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#_snippet_8 LANGUAGE: TypeScript CODE: ``` type InitialPayload = { foo: string; bar: number; }; // 👇 1: provide initial payload type export const { POST } = serve<InitialPayload>( async (context) => { // 👇 3: parsing result is available as requestPayload const payload: InitialPayload = context.requestPayload; }, { // 👇 2: custom parsing for initial payload initialPayloadParser: (initialPayload) => { const payload: InitialPayload = parsePayload(initialPayload); return payload; }, } ); ``` ---------------------------------------- TITLE: Setting QSTASH_URL and QSTASH_TOKEN DESCRIPTION: Configures the `.env` file with the `QSTASH_URL` and `QSTASH_TOKEN` for local development using the QStash development server. These values are obtained from the output of `npx @upstash/qstash-cli dev`. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/nextjs-fastapi.mdx#_snippet_4 LANGUAGE: bash CODE: ``` export QSTASH_URL="http://127.0.0.1:8080" export QSTASH_TOKEN=<QSTASH_TOKEN> ``` ---------------------------------------- TITLE: Re-throwing WorkflowAbort Error in Python DESCRIPTION: This Python code snippet demonstrates how to re-throw the `WorkflowAbort` error within a try/except block when using Upstash Workflow. It's necessary to re-raise the `WorkflowAbort` to maintain the workflow's intended execution flow. The snippet imports `WorkflowAbort` from the `upstash_workflow` package, wraps a `context.run` call in a try/except block, verifies if the caught exception is an instance of `WorkflowAbort`, and re-raises it if it is. SOURCE: https://github.com/upstash/docs/blob/main/workflow/troubleshooting/general.mdx#_snippet_1 LANGUAGE: python CODE: ``` from upstash_workflow import WorkflowAbort try: await context.run( ... ) except Exception as e: if isinstance(e, WorkflowAbort): raise e else: # handle other errors ``` ---------------------------------------- TITLE: Sleep Workflow Execution - TypeScript DESCRIPTION: This TypeScript snippet pauses the Upstash workflow execution for 3 days using `context.sleep`. The duration is specified in seconds (60 * 60 * 24 * 3). SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/customerOnboarding.mdx#_snippet_4 LANGUAGE: typescript CODE: ``` await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3) ``` ---------------------------------------- TITLE: Implementing QStash Receiver in Next.js DESCRIPTION: This snippet shows how to implement the QStash Receiver in a Next.js application using TypeScript. It uses the `Receiver` class from the `@upstash/qstash` package to verify that requests come from QStash. The signing keys are retrieved from environment variables. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#_snippet_18 LANGUAGE: typescript CODE: ``` import { Receiver } from "@upstash/qstash"; import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve<string>( async (context) => { ... }, { receiver: new Receiver({ // 👇 grab these variables from your QStash dashboard currentSigningKey: process.env.QSTASH_CURRENT_SIGNING_KEY!, nextSigningKey: process.env.QSTASH_NEXT_SIGNING_KEY!, }) } ); ``` ---------------------------------------- TITLE: Use Explicit QStash Client in TypeScript DESCRIPTION: Shows how to pass a QStash Client explicitly using the `qstashClient` option in TypeScript. This is useful when using multiple QStash clients with different environment variables. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#_snippet_14 LANGUAGE: TypeScript CODE: ``` import { Client } from "@upstash/qstash"; import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { ... }, { qstashClient: new Client({ token: process.env.QSTASH_TOKEN }) } ); ``` ---------------------------------------- TITLE: Setting Up Basic Webhook Endpoint - Python DESCRIPTION: This code snippet demonstrates how to set up a basic webhook endpoint using FastAPI and `upstash_workflow.fastapi.Serve`. It defines a POST endpoint `/api/example` that receives an `AsyncWorkflowContext` object. The `initial_payload_parser` function is used to process the raw payload. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/use-webhooks.mdx#_snippet_1 LANGUAGE: Python CODE: ``` from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) def initial_payload_parser(payload): return payload @serve.post("/api/example", initial_payload_parser=initial_payload_parser) async def example(context: AsyncWorkflowContext[str]) -> None: # Your webhook handling logic here ``` ---------------------------------------- TITLE: WorkflowTool (custom) DESCRIPTION: This code defines a custom tool using the `WorkflowTool` class from `@upstash/workflow`. This tool evaluates mathematical expressions. It takes an expression string as input, uses mathjs to evaluate it, and returns the result. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/features.mdx#_snippet_4 LANGUAGE: typescript CODE: ``` import { WorkflowTool } from '@upstash/workflow' import { z } from 'zod' import * as mathjs from 'mathjs' const tool = new WorkflowTool({ description: 'A tool for evaluating mathematical expressions. ' + 'Example expressions: ' + "'1.2 * (2 + 4.5)', '12.7 cm to inch', 'sin(45 deg) ^ 2'.", schema: z.object({ expression: z.string() }), invoke: async ({ expression }) => mathjs.evaluate(expression), }) ``` ---------------------------------------- TITLE: Installing Upstash Workflow SDK with bun DESCRIPTION: This command installs the Upstash Workflow SDK as a dependency in your SvelteKit project using bun. This SDK provides the necessary functions to define and manage workflows. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/svelte.mdx#_snippet_2 LANGUAGE: bash CODE: ``` bun add @upstash/workflow ``` ---------------------------------------- TITLE: Configure Local Tunnel Environment Variables DESCRIPTION: This snippet shows the environment variables that need to be configured in the `.env.local` file when using a local tunnel, such as ngrok, for local development with Upstash Workflow. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/vercel-nextjs.mdx#_snippet_6 LANGUAGE: txt CODE: ``` QSTASH_TOKEN="***" UPSTASH_WORKFLOW_URL=<UPSTASH_WORKFLOW_URL> ``` ---------------------------------------- TITLE: Configuring Environment Variables in Next.js DESCRIPTION: This snippet illustrates how to configure environment variables using the `env` option within the Next.js context using TypeScript. It demonstrates how the environment variables will then be available within the `context` of the asynchronous function. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#_snippet_20 LANGUAGE: typescript CODE: ``` import { Receiver } from "@upstash/qstash"; import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve<string>( async (context) => { // the env option will be available in the env field of the context: const env = context.env; }, { receiver: new Receiver({ // 👇 grab these variables from your QStash dashboard currentSigningKey: process.env.QSTASH_CURRENT_SIGNING_KEY!, nextSigningKey: process.env.QSTASH_NEXT_SIGNING_KEY!, }), } ); ``` ---------------------------------------- TITLE: Creating a Weekly Summary Workflow (Next.js) DESCRIPTION: This TypeScript code implements a Next.js API route to send weekly summary emails using Upstash Workflow. It fetches user data, generates a summary, and sends an email with the summary. The `serve` function from `@upstash/workflow/nextjs` is used to handle the workflow, with type safety provided for the workflow data. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/schedule.mdx#_snippet_4 LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { getUserData, generateSummary } from "@/utils/user-utils"; import { sendEmail } from "@/utils/email-utils"; // Type-safety for starting our workflow interface WeeklySummaryData { userId: string; } export const { POST } = serve<WeeklySummaryData>(async (context) => { const { userId } = context.requestPayload; // Step 1: Fetch user data const user = await context.run("fetch-user-data", async () => { return await getUserData(userId); }); // Step 2: Generate weekly summary const summary = await context.run("generate-summary", async () => { return await generateSummary(userId); }); // Step 3: Send email with weekly summary await context.run("send-summary-email", async () => { await sendEmail(user.email, "Your Weekly Summary", summary); }); }); ``` ---------------------------------------- TITLE: Sourcing the .env file DESCRIPTION: Sources the .env file to load the environment variables into the current shell session. This is necessary for the application to access the configured environment variables. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/nextjs-flask.mdx#_snippet_6 LANGUAGE: bash CODE: ``` source .env ``` ---------------------------------------- TITLE: Bypassing Vercel Deployment Protection with curl DESCRIPTION: This code snippet demonstrates how to bypass Vercel's deployment protection for preview deployments by adding a bypass secret to the workflow URL as a query parameter. It uses `curl` to send a POST request to the workflow endpoint. The `<PROTECTION_BYPASS_SECRET>` placeholder should be replaced with the actual secret generated in the Vercel project settings. SOURCE: https://github.com/upstash/docs/blob/main/workflow/troubleshooting/vercel.mdx#_snippet_0 LANGUAGE: bash CODE: ``` curl -X POST \ 'https://vercel-preview.com/workflow?x-vercel-protection-bypass=<PROTECTION_BYPASS_SECRET>' \ -d 'Hello world!' ``` ---------------------------------------- TITLE: Trigger Workflow with Python SDK DESCRIPTION: Illustrates how to trigger the workflow using the Python SDK. It initializes an `AsyncQStash` client and uses the `message.publish_json` method to send a JSON payload to the workflow endpoint, including headers and retry configurations. SOURCE: https://github.com/upstash/docs/blob/main/workflow/getstarted.mdx#_snippet_5 LANGUAGE: python CODE: ``` from qstash import AsyncQStash client = AsyncQStash("<QSTASH_TOKEN>") res = await client.message.publish_json( url="https://<YOUR_WORKFLOW_ENDPOINT>/<YOUR-WORKFLOW-ROUTE>", body={"hello": "there!"}, headers={...}, retries=3, ) ``` ---------------------------------------- TITLE: Onboarding Workflow Definition DESCRIPTION: Defines an asynchronous function `onboarding_workflow` that represents the main workflow logic. It receives initial data, sends a welcome email, waits for 3 days, uses an AI model to generate a follow-up message, and sends a personalized follow-up email. SOURCE: https://github.com/upstash/docs/blob/main/workflow/getstarted.mdx#_snippet_3 LANGUAGE: python CODE: ``` @serve.post("/api/onboarding") async def onboarding_workflow(context: AsyncWorkflowContext[InitialData]) -> None: data = context.request_payload user_id = data["user_id"] email = data["email"] name = data["name"] # Step 1: Send welcome email async def _send_welcome_email() -> None: await send_email(email, "Welcome to our service!") await context.run("send-welcome-email", _send_welcome_email) # Step 2: Wait for 3 days (in seconds) await context.sleep("sleep-until-follow-up", 60 * 60 * 24 * 3) # Step 3: AI-generate personalized follow-up message ai_response: CallResponse[Dict[str, str]] = await context.call( "generate-personalized-message", url="https://api.openai.com/v1/chat/completions", method="POST", headers={...}, body={ "model": "gpt-3.5-turbo", "messages": [ { "role": "system", "content": "You are an assistant creating personalized follow-up messages.", }, { "role": "user", "content": f"Create a short, friendly follow-up message for {name} who joined our service 3 days ago.", }, ], }, ) personalized_message = ai_response.body["choices"][0]["message"]["content"] # Step 4: Send personalized follow-up email async def _send_follow_up_email() -> None: await send_email(email, personalized_message) await context.run("send-follow-up-email", _send_follow_up_email) ``` ---------------------------------------- TITLE: Update context.call usage (JavaScript) DESCRIPTION: Updates how `context.call` is used, including parameter passing and return value. The new version returns an object with `status`, `headers`, and `body`, and does not automatically fail the workflow on request failure. SOURCE: https://github.com/upstash/docs/blob/main/workflow/migration.mdx#_snippet_5 LANGUAGE: javascript CODE: ``` // old const result = await context.call("call step", "<call-url>", "POST", ...) // new const { status, // response status headers, // response headers body // response body } = await context.call("call step", { url: "<call-url>", method: "POST", ... }) ``` ---------------------------------------- TITLE: Sync User in Database (Python) DESCRIPTION: This Python snippet shows how to create a new user in a database. It's a placeholder that logs user details and returns a mock userid. In a real implementation, this function would interact with a database to create a user record. It's wrapped in an async function and called using context.run SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#_snippet_3 LANGUAGE: python CODE: ``` async def _sync_user() -> str: return await create_user_in_database(name, email) result = await context.run("sync user", _sync_user) userid = result["userid"] ```