TITLE: E-commerce Order Fulfillment Route Handler (Upstash, Next.js)
DESCRIPTION: This TypeScript code defines a Next.js route handler that uses Upstash Workflow to automate e-commerce order fulfillment. It receives an order payload, creates an order ID, verifies stock availability, processes payment, dispatches the order, and sends confirmation and dispatch notifications. It depends on the `@upstash/workflow/nextjs` package and several utility functions.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/eCommerceOrderFulfillment.mdx#_snippet_0

LANGUAGE: typescript
CODE:
```
import { serve } from "@upstash/workflow/nextjs"
import {
  createOrderId,
  checkStockAvailability,
  processPayment,
  dispatchOrder,
  sendOrderConfirmation,
  sendDispatchNotification,
} from "./utils"

type OrderPayload = {
  userId: string
  items: { productId: string, quantity: number }[]
}

export const { POST } = serve<OrderPayload>(async (context) => {
  const { userId, items } = context.requestPayload;

  // Step 1: Create Order Id
  const orderId = await context.run("create-order-id", async () => {
    return await createOrderId(userId);
  });

  // Step 2: Verify stock availability
  const stockAvailable = await context.run("check-stock", async () => {
    return await checkStockAvailability(items);
  });

  if (!stockAvailable) {
    console.warn("Some items are out of stock");
    return;
  };

  // Step 3: Process payment
  await context.run("process-payment", async () => {
    return await processPayment(orderId)
  })

  // Step 4: Dispatch the order
  await context.run("dispatch-order", async () => {
    return await dispatchOrder(orderId, items)
  })

  // Step 5: Send order confirmation email
  await context.run("send-confirmation", async () => {
    return await sendOrderConfirmation(userId, orderId)
  })

  // Step 6: Send dispatch notification
  await context.run("send-dispatch-notification", async () => {
    return await sendDispatchNotification(userId, orderId)
  })
})
```

----------------------------------------

TITLE: E-commerce Order Fulfillment API (Upstash, FastAPI)
DESCRIPTION: This Python code defines a FastAPI endpoint that uses Upstash Workflow to manage e-commerce order fulfillment.  It receives an order payload with user ID and items, then creates an order ID, checks stock, processes payment, dispatches the order, and sends notifications. It relies on the `upstash_workflow.fastapi` library and utility functions for core operations.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/eCommerceOrderFulfillment.mdx#_snippet_1

LANGUAGE: python
CODE:
```
from fastapi import FastAPI
from typing import List, TypedDict
from upstash_workflow.fastapi import Serve
from upstash_workflow import AsyncWorkflowContext
from utils import (
    create_order_id,
    check_stock_availability,
    process_payment,
    dispatch_order,
    send_order_confirmation,
    send_dispatch_notification,
)

app = FastAPI()
serve = Serve(app)


class OrderItem(TypedDict):
    product_id: str
    quantity: int


class OrderPayload(TypedDict):
    user_id: str
    items: List[OrderItem]


@serve.post("/order-fulfillment")
async def order_fulfillment(context: AsyncWorkflowContext[OrderPayload]) -> None:
    # Get the order payload from the request
    payload = context.request_payload
    user_id = payload["user_id"]
    items = payload["items"]

    # Step 1: Create Order Id
    async def _create_order_id():
        return await create_order_id(user_id)

    order_id: str = await context.run("create-order-id", _create_order_id)

    # Step 2: Verify stock availability
    async def _check_stock():
        return await check_stock_availability(items)

    stock_available: bool = await context.run("check-stock", _check_stock)

    if not stock_available:
        print("Some items are out of stock")
        return

    # Step 3: Process payment
    async def _process_payment():
        return await process_payment(order_id)

    await context.run("process-payment", _process_payment)

    # Step 4: Dispatch the order
    async def _dispatch_order():
        return await dispatch_order(order_id, items)

    await context.run("dispatch-order", _dispatch_order)

    # Step 5: Send order confirmation email
    async def _send_confirmation():
        return await send_order_confirmation(user_id, order_id)

    await context.run("send-confirmation", _send_confirmation)

    # Step 6: Send dispatch notification
    async def _send_dispatch_notification():
        return await send_dispatch_notification(user_id, order_id)

    await context.run("send-dispatch-notification", _send_dispatch_notification)
```

----------------------------------------

TITLE: Payment Retry Loop (Python)
DESCRIPTION: This Python code implements a loop that attempts to charge a customer up to 3 times. If the payment fails, it waits for 24 hours before retrying. If the payment succeeds, it executes logic to unsuspend the user, send an invoice email, and then ends the workflow.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#_snippet_5

LANGUAGE: python
CODE:
```
for i in range(3):
    # attempt to charge the customer

    if not result:
        # Wait for a day
        await context.sleep("wait for retry", 24 * 60 * 60)
    else:
        # Payment succeeded
        # Unsuspend user, send invoice email
        # end the workflow:
        return

```

----------------------------------------

TITLE: Webhook Handler with Upstash Workflow in FastAPI
DESCRIPTION: This Python snippet defines a FastAPI endpoint that serves as a webhook handler for authentication providers using Upstash Workflow. It mirrors the functionality of the TypeScript example, managing user creation, Stripe integration, and email notifications. The workflow leverages `context.run` to execute asynchronous operations and `context.sleep` to pause execution. Type hints are used for data validation and clarity.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#_snippet_1

LANGUAGE: python
CODE:
```
from fastapi import FastAPI
from typing import Dict, TypedDict
from upstash_workflow.fastapi import Serve
from upstash_workflow import AsyncWorkflowContext

app = FastAPI()
serve = Serve(app)


class UserCreatedPayload(TypedDict):
    name: str
    email: str


class UserStats(TypedDict):
    total_problems_solved: int
    most_interested_topic: str


async def create_user_in_database(name: str, email: str) -> Dict[str, str]:
    print("Creating a user in the database:", name, email)
    return {"userid": "12345"}


async def create_new_user_in_stripe(email: str) -> None:
    # Implement logic to create a new user in Stripe
    print("Creating a user in Stripe for", email)


async def start_trial_in_stripe(email: str) -> None:
    # Implement logic to start a trial in Stripe
    print("Starting a trial of 14 days in Stripe for", email)


async def get_user_stats(userid: str) -> UserStats:
    # Implement logic to get user stats
    print("Getting user stats for", userid)
    return {"total_problems_solved": 10000, "most_interested_topic": "Python"}


async def check_upgraded_plan(email: str) -> bool:
    # Implement logic to check if the user has upgraded the plan
    print("Checking if the user has upgraded the plan", email)
    return False


async def send_email(email: str, content: str) -> None:
    # Implement logic to send an email
    print("Sending email to", email, content)


async def send_problem_solved_email(
    context: AsyncWorkflowContext[UserCreatedPayload], email: str, stats: UserStats
) -> None:
    if stats["total_problems_solved"] == 0:

        async def _send_no_answers_email() -> None:
            await send_email(
                email, "Hey, you haven't solved any questions in the last 7 days..."
            )

        await context.run("send no answers email", _send_no_answers_email)
    else:

        async def _send_stats_email() -> None:
            await send_email(
                email,
                f"You have solved {stats['total_problems_solved']} problems in the last 7 days. Keep it up!",
            )

        await context.run("send stats email", _send_stats_email)


@serve.post("/auth-provider-webhook")
async def auth_provider_webhook(
    context: AsyncWorkflowContext[UserCreatedPayload],
) -> None:
    payload = context.request_payload
    name = payload["name"]
    email = payload["email"]

    async def _sync_user() -> str:
        return await create_user_in_database(name, email)

    result = await context.run("sync user", _sync_user)
    userid = result["userid"]

    async def _create_new_user_in_stripe() -> None:
        await create_new_user_in_stripe(email)

    await context.run("create new user in stripe", _create_new_user_in_stripe)

    async def _start_trial_in_stripe() -> None:
        await start_trial_in_stripe(email)

    await context.run("start trial in Stripe", _start_trial_in_stripe)

    async def _send_welcome_email() -> None:
        await send_email(
            email, "Welcome to our platform!, You have 14 days of free trial."
        )

    await context.run("send welcome email", _send_welcome_email)

    await context.sleep("wait", 7 * 24 * 60 * 60)

    # get user stats and send email with them

    async def _get_user_stats() -> UserStats:
        return await get_user_stats(userid)

    stats: UserStats = await context.run("get user stats", _get_user_stats)

    await send_problem_solved_email(context, email, stats)

    # wait until there are two days to the end of trial period and check upgrade status
    await context.sleep("wait for trial warning", 5 * 24 * 60 * 60)

    async def _check_upgraded_plan() -> bool:
        return await check_upgraded_plan(email)

    is_upgraded = await context.run("check upgraded plan", _check_upgraded_plan)

    # end the workflow if upgraded
    if is_upgraded:
        return

    async def _send_trial_warning_email() -> None:
        await send_email(
            email,
            "Your trial is about to end in 2 days. Please upgrade your plan to keep using our platform.",
        )

    await context.run("send trial warning email", _send_trial_warning_email)

    await context.sleep("wait for trial end", 2 * 24 * 60 * 60)

    async def _send_trial_end_email() -> None:
        await send_email(
            email,
            "Your trial has ended. Please upgrade your plan to keep using our platform.",
        )

    await context.run("send trial end email", _send_trial_end_email)

```

----------------------------------------

TITLE: Evaluator-Optimizer Workflow with Upstash Agents (Next.js)
DESCRIPTION: This snippet defines a Next.js API route using Upstash Workflow to implement an evaluator-optimizer workflow. It uses two agents: a generator to create content based on a prompt, and an evaluator to provide feedback and corrections. The generator is called iteratively until the evaluator approves the generated text. The workflow repeats up to 3 times or until the evaluator returns a "PASS" result. It uses the `@upstash/workflow/nextjs` package to integrate with Next.js, and leverages OpenAI's `gpt-3.5-turbo` model.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/patterns/evaluator-optimizer.mdx#_snippet_0

LANGUAGE: typescript
CODE:
```
import { serve } from "@upstash/workflow/nextjs";

export const { POST } = serve(async (context) => {
  const model = context.agents.openai('gpt-3.5-turbo');

  // Generator agent that generates content
  const generator = context.agents.agent({
    model,
    name: 'generator',
    maxSteps: 1,
    background: 'You are an agent that generates text based on a prompt.',
    tools: {}
  });

  // Evaluator agent that evaluates the text and gives corrections
  const evaluator = context.agents.agent({
    model,
    name: 'evaluator',
    maxSteps: 1,
    background: 'You are an agent that evaluates the generated text and provides corrections if needed.',
    tools: {}
  });

  let generatedText = '';
  let evaluationResult = '';

  const prompt = "Generate a short explanation of quantum mechanics.";
  let nextPrompt = prompt;
  for (let i = 0; i < 3; i++) {
    // Construct prompt for generator: 
    // - If there's no evaluation, use the original prompt
    // - If there's an evaluation, provide the prompt, the last generated text, and the evaluator's feedback
    if (evaluationResult && evaluationResult !== "PASS") {
      nextPrompt = `Please revise the answer to the question "${prompt}". Previous answer was: "${generatedText}", which received this feedback: "${evaluationResult}".`;
    }

    // Generate content
    const generatedResponse = await context.agents.task({ agent: generator, prompt: nextPrompt }).run();
    generatedText = generatedResponse.text

    // Evaluate the generated content
    const evaluationResponse = await context.agents.task({ agent: evaluator, prompt: `Evaluate and provide feedback for the following text: ${generatedText}` }).run();
    evaluationResult = evaluationResponse.text

    // If the evaluator accepts the content (i.e., "PASS"), stop
    if (evaluationResult.includes("PASS")) {
      break;
    }
  }

  console.log(generatedText);
});
```

----------------------------------------

TITLE: Defining Upstash Workflow Route (Next.js)
DESCRIPTION: This TypeScript code defines a Next.js API route that implements a payment retry workflow using Upstash Workflow.  It handles charging a customer, retrying on failure with a delay, unsuspending the user if the payment succeeds, sending an invoice email, and suspending the user if all retries fail.  It depends on the `@upstash/workflow/nextjs` package.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#_snippet_0

LANGUAGE: typescript
CODE:
```
import { serve } from "@upstash/workflow/nextjs";

type ChargeUserPayload = {
  email: string;
};

export const { POST } = serve<ChargeUserPayload>(async (context) => {
  const { email } = context.requestPayload;

  for (let i = 0; i < 3; i++) {
    // attempt to charge the user
    const result = await context.run("charge customer", async () => {
      try {
        return await chargeCustomer(i + 1),
      } catch (e) {
        console.error(e);
        return
      }
    });

    if (!result) {
      // Wait for a day
      await context.sleep("wait for retry", 24 * 60 * 60);
    } else {
      // Unsuspend User
      const isSuspended = await context.run("check suspension", async () => {
        return await checkSuspension(email);
      });
      if (isSuspended) {
        await context.run("unsuspend user", async () => {
          await unsuspendUser(email);
        });
      }

      // send invoice email
      await context.run("send invoice email", async () => {
        await sendEmail(
          email,
          `Payment successfull. Incoice: ${result.invoiceId}, Total cost: $${result.totalCost}`
        );
      });

      // by retuning, we end the workflow run
      return;
    }
  }

  // suspend user if the user isn't suspended
  const isSuspended = await context.run("check suspension", async () => {
    return await checkSuspension(email);
  });

  if (!isSuspended) {
    await context.run("suspend user", async () => {
      await suspendUser(email);
    });

    await context.run("send suspended email", async () => {
      await sendEmail(
        email,
        "Your account has been suspended due to payment failure. Please update your payment method."
      );
    });
  }
});

async function sendEmail(email: string, content: string) {
  // Implement the logic to send an email
  console.log("Sending email to", email, "with content:", content);
}

async function checkSuspension(email: string) {
  // Implement the logic to check if the user is suspended
  console.log("Checking suspension status for", email);
  return true;
}

async function suspendUser(email: string) {
  // Implement the logic to suspend the user
  console.log("Suspending the user", email);
}

async function unsuspendUser(email: string) {
  // Implement the logic to unsuspend the user
  console.log("Unsuspending the user", email);
}

async function chargeCustomer(attempt: number) {
  // Implement the logic to charge the customer
  console.log("Charging the customer");

  if (attempt <= 2) {
    throw new Error("Payment failed");
  }

  return {
    invoiceId: "INV123",
    totalCost: 100,
  } as const;
}
```

----------------------------------------

TITLE: Prompt Chaining with Upstash Workflow and Langchain (TS)
DESCRIPTION: This code defines a prompt chaining workflow using Upstash Workflow, Langchain, and OpenAI's GPT-3.5-turbo model.  It creates three agents: one to list famous physicists, another to describe their work using Wikipedia, and a third to summarize the findings. The output of each agent serves as the input for the subsequent agent, creating a chain of actions. It uses `serve` from `@upstash/workflow/nextjs` to define the API endpoint.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/patterns/prompt-chaining.mdx#_snippet_0

LANGUAGE: typescript
CODE:
```
import { serve } from "@upstash/workflow/nextjs";
import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run";

export const { POST } = serve(async (context) => {
  const model = context.agents.openai('gpt-3.5-turbo');

  const agent1 = context.agents.agent({
    model,
    name: 'firstAgent',
    maxSteps: 1,
    background: 'You are an agent that lists famous physicists.',
    tools: {}
  });

  const agent2 = context.agents.agent({
    model,
    name: 'secondAgent',
    // set to 2 as this agent will first request tools
    // and then summarize them:
    maxSteps: 2,
    background:
      'You are an agent that describes the work of' +
      ' the physicists listed in the previous prompt.',
    tools: {
      wikiTool: new WikipediaQueryRun({
        topKResults: 1,
        maxDocContentLength: 500,
      })
    }
  });

  const agent3 = context.agents.agent({
    model,
    name: 'thirdAgent',
    maxSteps: 1,
    background:
      'You are an agent that summarizes the ' +
      'works of the physicists mentioned previously.',
    tools: {}
  });

  // Chaining agents
  const firstOutput = await context.agents.task({
    agent: agent1,
    prompt: "List 3 famous physicists."
  }).run();

  const secondOutput = await context.agents.task({
    agent: agent2,
    prompt: `Describe the work of: ${firstOutput.text}`
  }).run();
  

  const { text } = await context.agents.task({
    agent: agent3,
    prompt: `Summarize: ${secondOutput.text}`
  }).run();

  console.log(text);
});
```

----------------------------------------

TITLE: AI Generation Workflow (FastAPI)
DESCRIPTION: This Python code defines a FastAPI endpoint that uses Upstash Workflow to process a dataset with OpenAI.  It follows the same steps as the TypeScript version: downloading the dataset, splitting it into chunks, processing each chunk with GPT-4 via OpenAI API, aggregating the results, and generating and sending a final report. It uses `context.run` for asynchronous task execution and `context.call` to make external API calls.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/allInOne.mdx#_snippet_1

LANGUAGE: python
CODE:
```
from fastapi import FastAPI
import json
import os
from typing import Dict, List, Any, TypedDict
from upstash_workflow.fastapi import Serve
from upstash_workflow import AsyncWorkflowContext, CallResponse
from utils import (
    aggregate_results,
    generate_report,
    send_report,
    get_dataset_url,
    split_into_chunks,
)


app = FastAPI()
serve = Serve(app)


class RequestPayload(TypedDict):
    dataset_id: str
    user_id: str


@serve.post("/ai-generation")
async def ai_generation(context: AsyncWorkflowContext[RequestPayload]) -> None:
    request = context.request_payload
    dataset_id = request["dataset_id"]
    user_id = request["user_id"]

    # Step 1: Download the dataset
    async def _get_dataset_url() -> str:
        return await get_dataset_url(dataset_id)

    dataset_url = await context.run("get-dataset-url", _get_dataset_url)

    # HTTP request with much longer timeout (2hrs)
    response: CallResponse[Any] = await context.call(
        "download-dataset", url=dataset_url, method="GET"
    )
    dataset = response.body

    # Step 2: Process data in chunks using OpenAI
    chunk_size = 1000
    chunks = split_into_chunks(dataset, chunk_size)
    processed_chunks: List[str] = []

    for i, chunk in enumerate(chunks):
        openai_response: CallResponse[Dict[str, str]] = await context.call(
            f"process-chunk-{i}",
            url="https://api.openai.com/v1/chat/completions",
            method="POST",
            headers={
                "authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}",
            },
            body={
                "model": "gpt-4",
                "messages": [
                {
                    "role": "system",
                    "content":
                    "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.",
                },
                {
                    "role": "user",
                    "content": f"Analyze this data chunk: {json.dumps(chunk)}",
                },
                ],
                "max_tokens": 150,
            },
        )

        processed_chunks.append(
            openai_response.body["choices"][0]["message"]["content"]
        )

        # Every 10 chunks, we'll aggregate intermediate results
        if i % 10 == 9 or i == len(chunks) - 1:

            async def _aggregate_results() -> None:
                await aggregate_results(processed_chunks)
                processed_chunks.clear()

            await context.run(f"aggregate-results{i}", _aggregate_results)

    # Step 3: Generate and send data report
    async def _generate_report() -> Any:
        return await generate_report(dataset_id)

    report = await context.run("generate-report", _generate_report)

    async def _send_report() -> None:
        await send_report(report, user_id)

    await context.run("send-report", _send_report)

```

----------------------------------------

TITLE: Customer Onboarding Workflow Handler - TypeScript
DESCRIPTION: This TypeScript code defines a Next.js API route that handles the customer onboarding workflow. It receives an email address, sends a welcome email, waits for 3 days, and then periodically checks the user's state, sending appropriate emails based on their activity. It relies on the `@upstash/workflow/nextjs` package for workflow management and `sendEmail` and `getUserState` functions for external actions.  The initial data for the workflow consists of the user's email.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/customerOnboarding.mdx#_snippet_0

LANGUAGE: typescript
CODE:
```
import { serve } from "@upstash/workflow/nextjs"

type InitialData = {
  email: string
}

export const { POST } = serve<InitialData>(async (context) => {
  const { email } = context.requestPayload

  await context.run("new-signup", async () => {
    await sendEmail("Welcome to the platform", email)
  })

  await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3)

  while (true) {
    const state = await context.run("check-user-state", async () => {
      return await getUserState()
    })

    if (state === "non-active") {
      await context.run("send-email-non-active", async () => {
        await sendEmail("Email to non-active users", email)
      })
    } else if (state === "active") {
      await context.run("send-email-active", async () => {
        await sendEmail("Send newsletter to active users", email)
      })
    }

    await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30)
  }
})

async function sendEmail(message: string, email: string) {
  // Implement email sending logic here
  console.log(`Sending ${message} email to ${email}`)
}

type UserState = "non-active" | "active"

const getUserState = async (): Promise<UserState> => {
  // Implement user state logic here
  return "non-active"
}
```

----------------------------------------

TITLE: Ensure Idempotency in context.run - Python
DESCRIPTION: This Python code snippet highlights the need for idempotency within `context.run` in Upstash Workflow. The `some_work(input)` function must be idempotent to ensure the workflow behaves consistently even if retried.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#_snippet_15

LANGUAGE: python
CODE:
```
@serve.post("/api/example")
async def example(context: AsyncWorkflowContext[str]) -> None:
    input = context.request_payload

    async def _step_1() -> None:
        return await some_work(input)

    await context.run("step-1", _step_1)

```

----------------------------------------

TITLE: Define a Workflow Endpoint with Upstash Agents API in Next.js
DESCRIPTION: Defines a Next.js route that serves as an Upstash Workflow endpoint.  It uses the `@upstash/workflow/nextjs` library's `serve` function to handle incoming requests.  The code defines an agent using the `context.agents.agent` function with a tool for communicating inner thoughts.  The prompt from the request payload is passed to the agent, and the final response is logged and returned.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/getting-started.mdx#_snippet_5

LANGUAGE: typescript
CODE:
```
import { z } from "zod";
import { tool } from "ai";

import { serve } from "@upstash/workflow/nextjs";

export const { POST } = serve<{ prompt: string }>(async (context) => {
  const prompt = context.requestPayload.prompt
  const model = context.agents.openai('gpt-3.5-turbo')

  const communicatorAgent = context.agents.agent({
    model,
    name: 'communicatorAgent',
    maxSteps: 2,
    tools: {
      communicationTool: tool({
        description: 'A tool for informing the caller about your inner thoughts',
        parameters: z.object({ message: z.string() }),
        execute: async ({ message }) => {
          console.log("Inner thought:", message)
          return "success"
        }
      })
    },
    background:
      'Answer questions directed towards you.' +
      ' You have access to a tool to share your inner thoughts' +
      ' with the caller. Utilize this tool at least once before' +
      ' answering the prompt. In your inner thougts, briefly' +
      ' explain what you will talk about and why. Keep your' +
      ' answers brief.',
  })

  const task = context.agents.task({
    agent: communicatorAgent,
    prompt
  })

  const { text } = await task.run()

  console.log("Final response:", text);
})
```

----------------------------------------

TITLE: Defining Upstash Workflow Route (FastAPI)
DESCRIPTION: This Python code defines a FastAPI endpoint that implements a payment retry workflow using Upstash Workflow. It leverages `upstash_workflow.fastapi.Serve` to integrate with FastAPI. The workflow includes charging a customer, retrying upon failure with a delay, unsuspending the user if the payment succeeds, sending an invoice email, and suspending the user if all retries fail.  It uses `dataclasses`, `typing` and `fastapi`.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#_snippet_1

LANGUAGE: python
CODE:
```
from fastapi import FastAPI
from typing import TypedDict, Optional
from dataclasses import dataclass
from upstash_workflow.fastapi import Serve
from upstash_workflow import AsyncWorkflowContext

app = FastAPI()
serve = Serve(app)


@dataclass
class ChargeResult:
    invoice_id: str
    total_cost: float


class ChargeUserPayload(TypedDict):
    email: str


async def send_email(email: str, content: str) -> None:
    # Implement the logic to send an email
    print("Sending email to", email, "with content:", content)


async def check_suspension(email: str) -> bool:
    # Implement the logic to check if the user is suspended
    print("Checking suspension status for", email)
    return True


async def suspend_user(email: str) -> None:
    # Implement the logic to suspend the user
    print("Suspending the user", email)


async def unsuspend_user(email: str) -> None:
    # Implement the logic to unsuspend the user
    print("Unsuspending the user", email)


async def charge_customer(attempt: int) -> Optional[ChargeResult]:
    # Implement the logic to charge the customer
    print("Charging the customer")
    if attempt <= 2:
        raise Exception("Payment failed")
    return ChargeResult(invoice_id="INV123", total_cost=100)


@serve.post("/payment-retries")
async def payment_retries(context: AsyncWorkflowContext[ChargeUserPayload]) -> None:
    email = context.request_payload["email"]

    async def _check_suspension() -> bool:
        return await check_suspension(email)

    for i in range(3):
        # attempt to charge the user
        async def _charge_customer() -> Optional[ChargeResult]:
            try:
                return await charge_customer(i + 1)
            except Exception as e:
                print(f"Error: {e}")
                return None

        result = await context.run("charge customer", _charge_customer)

        if not result:
            # Wait for a day
            await context.sleep("wait for retry", 24 * 60 * 60)
        else:
            # Unsuspend User
            is_suspended = await context.run("check suspension", _check_suspension)

            if is_suspended:

                async def _unsuspend_user() -> None:
                    await unsuspend_user(email)

                await context.run("unsuspend user", _unsuspend_user)

            # send invoice email
            async def _send_invoice_email() -> None:
                await send_email(
                    email,
                    f"Payment successful. Invoice: {result.invoice_id}, Total cost: ${result.total_cost}",
                )

            await context.run("send invoice email", _send_invoice_email)

            # by returning, we end the workflow run
            return

    # suspend user if the user isn't suspended
    is_suspended = await context.run("check suspension", _check_suspension)

    if not is_suspended:

        async def _suspend_user() -> None:
            await suspend_user(email)

        await context.run("suspend user", _suspend_user)

        async def _send_suspended_email() -> None:
            await send_email(
                email,
                "Your account has been suspended due to payment failure. Please update your payment method.",
            )

        await context.run("send suspended email", _send_suspended_email)

```

----------------------------------------

TITLE: Triggering Workflow Run with Client (TypeScript)
DESCRIPTION: This snippet demonstrates how to start a workflow using the `client.trigger` method from the `@upstash/workflow` package. It initializes a workflow client with a QStash token and then uses the `trigger` method to send a request to the workflow endpoint. The `workflowRunId` is returned in the result. The function accepts optional parameters for body, headers, a specific `workflowRunId` to assign, and retries for the initial request.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/start.mdx#_snippet_0

LANGUAGE: TypeScript
CODE:
```
// Using the workflow client
import { Client } from "@upstash/workflow";

const client = new Client({ token: "<QSTASH_TOKEN>" });

const { workflowRunId } = await client.trigger({
  url: "https://<YOUR_WORKFLOW_ENDPOINT>/<YOUR-WORKFLOW-ROUTE>",
  body: "hello there!",         // Optional body
  headers: { ... },             // Optional headers
  workflowRunId: "my-workflow", // Optional workflow run ID
  retries: 3                    // Optional retries for the initial request
});
```

----------------------------------------

TITLE: Periodic State Check and Email Sending - Python
DESCRIPTION: This Python snippet implements a periodic state check within an infinite loop. It retrieves the user state using `get_user_state`, and sends different emails depending on whether the user is 'active' or 'non-active'. The workflow sleeps for one month (60 * 60 * 24 * 30 seconds) after each state check. Asynchronous functions are defined inline using `async def` to execute tasks.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/customerOnboarding.mdx#_snippet_7

LANGUAGE: python
CODE:
```
while True:

    async def _check_user_state() -> UserState:
        return await get_user_state()

    state: UserState = await context.run("check-user-state", _check_user_state)

    if state == "non-active":

        async def _send_email_non_active() -> None:
            await send_email("Email to non-active users", email)

        await context.run("send-email-non-active", _send_email_non_active)
    else:

        async def _send_email_active() -> None:
            await send_email("Send newsletter to active users", email)

        await context.run("send-email-active", _send_email_active)

    await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30)

```

----------------------------------------

TITLE: Send Welcome Email - Python
DESCRIPTION: This Python snippet sends a welcome email to the user using the `send_email` function within an Upstash workflow. An asynchronous function `_send_welcome_email` is defined, and then executed using `context.run`, passing the user's email and welcome message.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#_snippet_9

LANGUAGE: python
CODE:
```
async def _send_welcome_email() -> None:
    await send_email(
        email, "Welcome to our platform!, You have 14 days of free trial."
    )

await context.run("send welcome email", _send_welcome_email)
```

----------------------------------------

TITLE: Trigger Workflow with TypeScript SDK
DESCRIPTION: Demonstrates how to trigger the workflow from a TypeScript application using the Upstash Workflow SDK.  It initializes a `Client` instance and calls the `trigger` method, passing the workflow endpoint URL, optional body, headers, workflow run ID, and retry settings.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/getstarted.mdx#_snippet_4

LANGUAGE: typescript
CODE:
```
import { Client } from "@upstash/workflow";

const client = new Client({ token: "<QSTASH_TOKEN>" });

const { workflowRunId } = await client.trigger({
  url: "https://<YOUR_WORKFLOW_ENDPOINT>/<YOUR-WORKFLOW-ROUTE>",
  body: "hello there!",         // Optional body
  headers: { ... },             // Optional headers
  workflowRunId: "my-workflow", // Optional workflow run ID
  retries: 3                    // Optional retries for the initial request
});
```

----------------------------------------

TITLE: Waiting for an Event using context.waitForEvent TypeScript
DESCRIPTION: This snippet demonstrates how to use `context.waitForEvent` to pause a workflow until a specific event occurs. It takes an event description, event ID, and an optional timeout in seconds. The workflow will resume when the event is received or when the timeout expires.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/events.mdx#_snippet_0

LANGUAGE: typescript
CODE:
```
const { eventData, timeout } = await context.waitForEvent(
  "description of event",
  "event-id",
  {
    timeout: timeoutInSeconds,
  }
);
```

----------------------------------------

TITLE: Secure Workflow with QStash Receiver (TypeScript)
DESCRIPTION: This TypeScript code snippet shows how to integrate QStash's `Receiver` into an Upstash Workflow to verify requests. It imports `Receiver` from `@upstash/qstash` and `serve` from `@upstash/workflow/nextjs`, and configures the receiver with the current and next signing keys from environment variables. This ensures that only requests with a valid QStash signature are processed.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/security.mdx#_snippet_2

LANGUAGE: typescript
CODE:
```
import { Receiver } from "@upstash/qstash";
import { serve } from "@upstash/workflow/nextjs";

export const { POST } = serve(
  async (context) => {
    // Your workflow steps...
  },
  {
    receiver: new Receiver({
      currentSigningKey: process.env.QSTASH_CURRENT_SIGNING_KEY,
      nextSigningKey: process.env.QSTASH_NEXT_SIGNING_KEY,
    }),
  }
);
```

----------------------------------------

TITLE: Trigger Workflow Run with Upstash Client (TS)
DESCRIPTION: This snippet demonstrates how to trigger a workflow run using the Upstash Workflow Client. It includes optional parameters for the request body, headers, workflow run ID, retries, and flow control.  A QSTASH_TOKEN is required for authentication.  The triggered workflowRunId is logged to the console.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/client.mdx#_snippet_0

LANGUAGE: typescript
CODE:
```
import { Client } from "@upstash/workflow";

const client = new Client({ token: "<QSTASH_TOKEN>" })
const { workflowRunId } = await client.trigger({
  url: "https://<YOUR_WORKFLOW_ENDPOINT>/<YOUR-WORKFLOW-ROUTE>",
  body: "hello there!",         // optional body
  headers: { ... },             // optional headers
  workflowRunId: "my-workflow", // optional workflow run id
  retries: 3                    // optional retries in the initial request
  flowControl: {                // optional flow control
    key: "USER_GIVEN_KEY",
    ratePerSecond: 10,
    parallelism: 5
  }
})

console.log(workflowRunId)
// prints wfr_my-workflow
```

----------------------------------------

TITLE: Verifying Stock (Upstash Workflow)
DESCRIPTION: This code snippet demonstrates how to verify stock availability using Upstash Workflow in both Typescript and Python. It creates an order ID and checks if the items are in stock. If not, it logs a warning and halts the process, ensuring that payments are not processed for out-of-stock items.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/eCommerceOrderFulfillment.mdx#_snippet_2

LANGUAGE: typescript
CODE:
```
const orderId = await context.run("create-order-id", async () => {
  return await createOrderId(userId);
});

const stockAvailable = await context.run("check-stock", async () => {
  return await checkStockAvailability(items)
})

if (!stockAvailable) {
  console.warn("Some items are out of stock")
  return;
}
```

LANGUAGE: python
CODE:
```
async def _create_order_id():
    return await create_order_id(user_id)

order_id: str = await context.run("create-order-id", _create_order_id)

async def _check_stock():
    return await check_stock_availability(items)

stock_available: bool = await context.run("check-stock", _check_stock)

if not stock_available:
    print("Some items are out of stock")
    return
```

----------------------------------------

TITLE: Workflow Definition with Event Handling (Next.js)
DESCRIPTION: This code snippet defines a workflow using Upstash Workflow and Next.js to handle order processing. It receives an order request, sends an email to request order processing, waits for an external event indicating order completion, handles timeouts, and sends a confirmation email. The `serve` function integrates the workflow with a Next.js API route.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/waitForEvent.mdx#_snippet_0

LANGUAGE: typescript
CODE:
```
import { serve } from "@upstash/workflow/nextjs";

export const { POST } = serve(async (context) => {
  const { orderId, userEmail } = context.requestPayload;

  // Step 1: request order processing
  await context.run("request order processing", async () => {
    await requestProcessing(orderId)
  })

  // Step 2: Wait for the order to be processed
  const { eventData, timeout } = await context.waitForEvent(
    "wait for order processing",
    `order-${orderId}`,
    {
      timeout: "10m" // 10 minutes timeout
    }
  );

  if (timeout) {
    // end workflow in case of timeout
    return;
  }

  const processedData = eventData;

  // Step 3: Log the processed order
  await context.run("process-order", async () => {
    console.log(`Order ${orderId} processed:`, processedData);
  });

  // Step 4: Send a confirmation email
  await context.run("send-confirmation-email", async () => {
    await sendEmail(
      userEmail,
      "Your order has been processed!",
      processedData
    );
  });
});
```

----------------------------------------

TITLE: Creating OpenAI Client with Workflow Context
DESCRIPTION: Creates an OpenAI client that integrates with Upstash Workflow using a custom `fetch` implementation. This allows Upstash Workflow to manage HTTP requests to the OpenAI API, ensuring durability and handling long-running operations.  It uses `context.call` to make HTTP requests, allowing Upstash Workflow to track and manage the request lifecycle.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/aisdk.mdx#_snippet_3

LANGUAGE: typescript
CODE:
```
import { createOpenAI } from '@ai-sdk/openai';
import { HTTPMethods } from '@upstash/qstash';
import { WorkflowAbort, WorkflowContext } from '@upstash/workflow';

export const createWorkflowOpenAI = (context: WorkflowContext) => {
  return createOpenAI({
    compatibility: "strict",
    fetch: async (input, init) => {
      try {
        // Prepare headers from init.headers
        const headers = init?.headers
          ? Object.fromEntries(new Headers(init.headers).entries())
          : {};

        // Prepare body from init.body
        const body = init?.body ? JSON.parse(init.body as string) : undefined;

        // Make network call
        const responseInfo = await context.call("openai-call-step", {
          url: input.toString(),
          method: init?.method as HTTPMethods,
          headers,
          body,
        });

        // Construct headers for the response
        const responseHeaders = new Headers(
          Object.entries(responseInfo.header).reduce((acc, [key, values]) => {
            acc[key] = values.join(", ");
            return acc;
          }, {} as Record<string, string>)
        );

        // Return the constructed response
        return new Response(JSON.stringify(responseInfo.body), {
          status: responseInfo.status,
          headers: responseHeaders,
        });
      } catch (error) {
        if (error instanceof WorkflowAbort) {
          throw error
        } else {
          console.error("Error in fetch implementation:", error);
          throw error; // Rethrow error for further handling
        }
      }
    },
  });
};
```

----------------------------------------

TITLE: Send Trial Warning Email - TypeScript
DESCRIPTION: This TypeScript snippet sends a trial warning email 2 days before the trial ends. It waits for 5 days (7 - 2), checks if the user has upgraded using `checkUpgradedPlan`, and if not, sends a trial warning email. If the user has upgraded, the workflow returns, ending the process.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#_snippet_14

LANGUAGE: typescript
CODE:
```
await context.sleep("wait for trial warning", 5 * 24 * 60 * 60);

const isUpgraded = await context.run("check upgraded plan", async () => {
  return await checkUpgradedPlan(email);
});

if (isUpgraded) return;

await context.run("send trial warning email", async () => {
  await sendEmail(
    email,
    "Your trial is about to end in 2 days. Please upgrade your plan to keep using our platform."
  );
});
```

----------------------------------------

TITLE: Use Initial Payload Parser in Python
DESCRIPTION: Demonstrates how to define and use an `initial_payload_parser` to process the initial request's payload in Python. The parser transforms the raw initial payload into a structured object, defined using `dataclass`.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#_snippet_9

LANGUAGE: Python
CODE:
```
from dataclasses import dataclass

@dataclass
class InitialPayload:
    foo: str
    bar: int


def initial_payload_parser(initial_payload: str) -> InitialPayload:
    return parse_payload(initial_payload)


@serve.post("/api/example", initial_payload_parser=initial_payload_parser)
async def example(context: AsyncWorkflowContext[InitialPayload]) -> None:
    payload: InitialPayload = context.request_payload
```

----------------------------------------

TITLE: Workflow Handler with Upstash/FastAPI
DESCRIPTION: This Python snippet implements an image processing workflow using Upstash Workflow and FastAPI. It defines an endpoint `/process-image` that receives an image ID and user ID, retrieves the image, resizes it to multiple resolutions, applies filters, and stores the processed images. It uses `upstash_workflow.fastapi.Serve` to integrate with FastAPI and `context.run` and `context.call` to execute workflow steps.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/imageProcessing.mdx#_snippet_1

LANGUAGE: python
CODE:
```
from fastapi import FastAPI
from typing import List, TypedDict
from upstash_workflow.fastapi import Serve
from upstash_workflow import AsyncWorkflowContext, CallResponse
from utils import store_image, get_image_url

app = FastAPI()
serve = Serve(app)


class ImageResult(TypedDict):
    image_url: str


class ImageProcessingPayload(TypedDict):
    image_id: str
    user_id: str


@serve.post("/process-image")
async def process_image(context: AsyncWorkflowContext[ImageProcessingPayload]) -> None:
    payload = context.request_payload
    image_id = payload["image_id"]
    user_id = payload["user_id"]

    # Step 1: Retrieve the uploaded image
    async def _get_image_url() -> str:
        return await get_image_url(image_id)

    image_url: str = await context.run("get-image-url", _get_image_url)

    # Step 2: Resize the image to multiple resolutions
    resolutions = [640, 1280, 1920]
    resize_responses = []

    for resolution in resolutions:
        response: CallResponse[ImageResult] = await context.call(
            f"resize-image-{resolution}",
            # endpoint which returns ImageResult type in response
            url="https://image-processing-service.com/resize",
            method="POST",
            body={"imageUrl": image_url, "width": resolution},
        )
        resize_responses.append(response)

    resized_images = [response.body for response in resize_responses]

    # Step 3: Apply filters to each resized image
    filters = ["grayscale", "sepia", "contrast"]
    filter_responses = []

    for resized_image in resized_images:
        for filter in filters:
            response: CallResponse[ImageResult] = await context.call(
                f"apply-filter-{filter}",
                # endpoint which returns ImageResult type in response
                url="https://image-processing-service.com/filter",
                method="POST",
                body={"imageUrl": resized_image["imageUrl"], "filter": filter},
            )
            filter_responses.append(response)

    processed_images = [response.body for response in filter_responses]

    # Step 4: Store processed images in cloud storage
    async def _store_image() -> str:
        return await store_image(processed_image["imageUrl"])

    stored_image_urls: List[str] = []
    for processed_image in processed_images:
        stored_image_url = await context.run("store-image", _store_image)
        stored_image_urls.append(stored_image_url)
```

----------------------------------------

TITLE: Processing Payment (Upstash Workflow)
DESCRIPTION: This code snippet shows how to process payments using Upstash Workflow in both Typescript and Python. It calls a `processPayment` function with the order ID to handle the payment processing logic, ensuring the order proceeds only if the payment is successful.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/eCommerceOrderFulfillment.mdx#_snippet_3

LANGUAGE: typescript
CODE:
```
await context.run("process-payment", async () => {
  return await processPayment(orderId)
})
```

LANGUAGE: python
CODE:
```
async def _process_payment():
    return await process_payment(order_id)

await context.run("process-payment", _process_payment)
```

----------------------------------------

TITLE: Customer Onboarding Workflow - Python
DESCRIPTION: This Python code implements the customer onboarding workflow using FastAPI and Upstash Workflow. It defines an endpoint `/customer-onboarding` that receives user email as input. The workflow sends a welcome email, waits for 3 days, and then periodically checks the user's state, sending targeted emails based on their engagement. It utilizes the `upstash_workflow.fastapi` module for integration. The initial data expected is a dictionary containing the user's email.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/customerOnboarding.mdx#_snippet_1

LANGUAGE: python
CODE:
```
from fastapi import FastAPI
from typing import Literal, TypedDict
from upstash_workflow.fastapi import Serve
from upstash_workflow import AsyncWorkflowContext

app = FastAPI()
serve = Serve(app)

UserState = Literal["non-active", "active"]


class InitialData(TypedDict):
    email: str


async def send_email(message: str, email: str) -> None:
    # Implement email sending logic here
    print(f"Sending {message} email to {email}")


async def get_user_state() -> UserState:
    # Implement user state logic here
    return "non-active"


@serve.post("/customer-onboarding")
async def customer_onboarding(context: AsyncWorkflowContext[InitialData]) -> None:
    email = context.request_payload["email"]

    async def _new_signup() -> None:
        await send_email("Welcome to the platform", email)

    await context.run("new-signup", _new_signup)

    await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3)

    while True:

        async def _check_user_state() -> UserState:
            return await get_user_state()

        state: UserState = await context.run("check-user-state", _check_user_state)

        if state == "non-active":

            async def _send_email_non_active() -> None:
                await send_email("Email to non-active users", email)

            await context.run("send-email-non-active", _send_email_non_active)
        else:

            async def _send_email_active() -> None:
                await send_email("Send newsletter to active users", email)

            await context.run("send-email-active", _send_email_active)

        await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30)

```

----------------------------------------

TITLE: Notifying Workflow using Client
DESCRIPTION: This code snippet shows how to use the `Client` class to notify the workflow about an event. It creates an instance of the `Client` with an authentication token and then calls the `notify` method with the event ID and event data.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/waitForEvent.mdx#_snippet_6

LANGUAGE: typescript
CODE:
```
import { Client } from "@upstash/workflow";

const client = new Client({ token: "<QSTASH_TOKEN>" });
const orderId = "1324";

await client.notify({
  eventId: `order-${orderId}`,
  eventData: { deliveryTime: "2 days" }
});
```

----------------------------------------

TITLE: Downloading Dataset (TypeScript)
DESCRIPTION: This TypeScript code snippet retrieves a dataset URL using `getDatasetUrl` and downloads the dataset using `context.call`. The `context.call` function is used to make an HTTP request with a longer timeout than a regular serverless function would allow.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/allInOne.mdx#_snippet_2

LANGUAGE: typescript
CODE:
```
const datasetUrl = await context.run("get-dataset-url", async () => {
  return await getDatasetUrl(request.datasetId)
})

const { body: dataset } = await context.call("download-dataset", {
  url: datasetUrl,
  method: "GET"
})
```

----------------------------------------

TITLE: Processing Data with OpenAI (TypeScript)
DESCRIPTION: This TypeScript code snippet processes data in chunks using OpenAI's GPT-4 model. It iterates over the chunks, calling the OpenAI API to analyze each chunk. The `context.api.openai.call` method is used to interact with the OpenAI API, passing the API key, operation, and request body.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/allInOne.mdx#_snippet_4

LANGUAGE: typescript
CODE:
```
for (let i = 0; i < chunks.length; i++) {
  const { body: processedChunk } = await context.api.openai.call<
    OpenAiResponse
  >(
    `process-chunk-${i}`,
    {
      token: process.env.OPENAI_API_KEY!,
      operation: "chat.completions.create",
      body: {
        model: "gpt-4",
        messages: [
          {
            role: "system",
            content:
              "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.",
          },
          {
            role: "user",
            content: `Analyze this data chunk: ${JSON.stringify(chunks[i])}`,
          },
        ],
        max_completion_tokens: 150,
      },
    }
  )
}
```

----------------------------------------

TITLE: Downloading Dataset (Python)
DESCRIPTION: This Python code snippet retrieves a dataset URL using `get_dataset_url` and downloads the dataset using `context.call`. The `context.call` function is used to make an HTTP request that can run for much longer than a regular serverless function normally would allow.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/allInOne.mdx#_snippet_3

LANGUAGE: python
CODE:
```
async def _get_dataset_url() -> str:
    return await get_dataset_url(dataset_id)

dataset_url = await context.run("get-dataset-url", _get_dataset_url)

response: CallResponse[Any] = await context.call(
    "download-dataset", url=dataset_url, method="GET"
)
dataset = response.body
```

----------------------------------------

TITLE: Run New User Signup Task - TypeScript
DESCRIPTION: This TypeScript snippet executes the 'new-signup' task within the Upstash workflow context.  It calls an async function that sends a welcome email to the user using the `sendEmail` function, passing the email address from the workflow context.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/customerOnboarding.mdx#_snippet_2

LANGUAGE: typescript
CODE:
```
await context.run("new-signup", async () => {
  await sendEmail("Welcome to the platform", email)
})
```

----------------------------------------

TITLE: Set Failure URL in TypeScript
DESCRIPTION: Shows how to specify a `failureUrl` option within the `serve` method. This URL is called if the workflow fails after exhausting all retries. The failure callback payload and the error message are included in the `body` field of the request to the `failureUrl`.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#_snippet_2

LANGUAGE: TypeScript
CODE:
```
export const { POST } = serve<string>(
  async (context) => { ... },
  {
    failureUrl: "https://<YOUR-FAILURE-ENDPOINT>/..."
  }
);
```

----------------------------------------

TITLE: Make POST Request to Workflow Endpoint
DESCRIPTION: Sends a POST request to the local workflow endpoint using `curl`.  This command triggers the execution of the workflow defined in the Cloudflare Worker and passes a JSON payload as input. The endpoint returns a workflow run ID.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/cloudflare-workers.mdx#_snippet_9

LANGUAGE: bash
CODE:
```
curl -X POST https://localhost:8787/ -D '{"text": "hello world!"}'

# result: {"workflowRunId":"wfr_xxxxxx"}
```

----------------------------------------

TITLE: Transforming Conditions to Explicit Steps - Python
DESCRIPTION: This Python code provides the recommended pattern for handling non-deterministic conditions in Upstash Workflow. It transforms the condition into an explicit workflow step using `context.run`. This approach avoids the authentication failure that occurs when returning early without executing any steps. some_condition() is assumed to be a non-deterministic function.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/troubleshooting/general.mdx#_snippet_9

LANGUAGE: python
CODE:
```
@serve.post("/api/example")
async def example(context: AsyncWorkflowContext[str]) -> None:

    async def _check_condition() -> bool:
        return some_condition()

    should_return = await context.run("check condition", _check_condition)
    if should_return:
        return

    # rest of the workflow

```

----------------------------------------

TITLE: Webhook Handler with Upstash Workflow in Next.js
DESCRIPTION: This TypeScript snippet defines a Next.js route that serves as a webhook handler for authentication providers using Upstash Workflow. It orchestrates several tasks: creating a user in the database, creating a user and starting a trial in Stripe, sending welcome/reminder/warning/end emails. The workflow uses `context.run` to execute asynchronous operations and `context.sleep` to pause execution for specified durations, managing the user's trial lifecycle.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#_snippet_0

LANGUAGE: typescript
CODE:
```
import { serve } from "@upstash/workflow/nextjs";
import { WorkflowContext } from '@upstash/qstash/workflow'

/**
 * This can be the payload of the user created webhook event coming from your
 * auth provider (e.g. Firebase, Auth0, Clerk etc.)
 */
type UserCreatedPayload = {
  name: string;
  email: string;
};

export const { POST } = serve<UserCreatedPayload>(async (context) => {
  const { name, email } = context.requestPayload;

  const { userid } = await context.run("sync user", async () => {
    return await createUserInDatabase({ name, email });
  });

  await context.run("create new user in stripe", async () => {
    await createNewUserInStripe(email);
  });

  await context.run("start trial in Stripe", async () => {
    await startTrialInStripe(email);
  });

  await context.run("send welcome email", async () => {
    await sendEmail(
      email,
      "Welcome to our platform!, You have 14 days of free trial."
    );
  });

  await context.sleep("wait", 7 * 24 * 60 * 60);

  // get user stats and send email with them
  const stats = await context.run("get user stats", async () => {
    return await getUserStats(userid);
  });
  await sendProblemSolvedEmail({context, email, stats});

  // wait until there are two days to the end of trial period
  // and check upgrade status
  await context.sleep("wait for trial warning", 5 * 24 * 60 * 60);
  const isUpgraded = await context.run("check upgraded plan", async () => {
    return await checkUpgradedPlan(email);
  });

  // end the workflow if upgraded
  if (isUpgraded) return;

  await context.run("send trial warning email", async () => {
    await sendEmail(
      email,
      "Your trial is about to end in 2 days. Please upgrade your plan to keep using our platform."
    );
  });

  await context.sleep("wait for trial end", 2 * 24 * 60 * 60);

  await context.run("send trial end email", async () => {
    await sendEmail(
      email,
      "Your trial has ended. Please upgrade your plan to keep using our platform."
    );
  });
});

async function sendProblemSolvedEmail({
  context: WorkflowContext<UserCreatedPayload>,
  email: string,
  stats: { totalProblemsSolved: number }
}) {
  if (stats.totalProblemsSolved === 0) {
    await context.run("send no answers email", async () => {
      await sendEmail(
        email,
        "Hey, you haven't solved any questions in the last 7 days..."
      );
    });
  } else {
    await context.run("send stats email", async () => {
      await sendEmail(
        email,
        `You have solved ${stats.totalProblemsSolved} problems in the last 7 days. Keep it up!`
      );
    });
  }
}

async function createUserInDatabase({
  name,
  email,
}: {
  name: string;
  email: string;
}) {
  console.log("Creating a user in the database:", name, email);
  return { userid: "12345" };
}

async function createNewUserInStripe(email: string) {
  // Implement logic to create a new user in Stripe
  console.log("Creating a user in Stripe for", email);
}

async function startTrialInStripe(email: string) {
  // Implement logic to start a trial in Stripe
  console.log("Starting a trial of 14 days in Stripe for", email);
}

async function getUserStats(userid: string) {
  // Implement logic to get user stats
  console.log("Getting user stats for", userid);
  return {
    totalProblemsSolved: 10_000,
    mostInterestedTopic: "JavaScript",
  };
}

async function checkUpgradedPlan(email: string) {
  // Implement logic to check if the user has upgraded the plan
  console.log("Checking if the user has upgraded the plan", email);
  return false;
}

async function sendEmail(email: string, content: string) {
  // Implement logic to send an email
  console.log("Sending email to", email, content);
}
```

----------------------------------------

TITLE: Scheduling a Per-User Workflow (Next.js)
DESCRIPTION: This TypeScript code demonstrates how to schedule a per-user workflow to send weekly summary reports using Upstash QStash. It calculates the date for the first summary (7 days after signup) and creates a cron expression to schedule weekly summaries. It uses `@upstash/qstash` to create the schedule.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/schedule.mdx#_snippet_2

LANGUAGE: typescript
CODE:
```
import { signUp } from "@/utils/auth-utils";
import { Client } from "@upstash/qstash";

const client = new Client({ token: process.env.QSTASH_TOKEN! });

export async function POST(request: Request) {
  const userData: UserData = await request.json();

  // Simulate user registration
  const user = await signUp(userData);

  // Calculate the date for the first summary (7 days from now)
  const firstSummaryDate = new Date(Date.now() + 7 * 24 * 60 * 60 * 1000);

  // Create cron expression for weekly summaries starting 7 days from signup
  const cron = `${firstSummaryDate.getMinutes()} ${firstSummaryDate.getHours()} * * ${firstSummaryDate.getDay()}`;

  // Schedule weekly account summary
  await client.schedules.create({
    scheduleId: `user-summary-${user.email}`,
    destination: "https://<YOUR_APP_URL>/api/send-weekly-summary",
    body: { userId: user.id },
    cron: cron,
  });

  return NextResponse.json(
    { success: true, message: "User registered and summary scheduled" },
    { status: 201 }
  );
}
```

----------------------------------------

TITLE: Workflow Endpoint with Call
DESCRIPTION: This Python code demonstrates a FastAPI endpoint integrated with Upstash Workflow, showcasing the `context.call` function to make HTTP requests to other endpoints. The workflow includes steps that process input, call an external `/get-data` endpoint, and then process the response.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/fastapi.mdx#_snippet_8

LANGUAGE: python
CODE:
```
from fastapi import FastAPI
from typing import Dict
from upstash_workflow.fastapi import Serve
from upstash_workflow import AsyncWorkflowContext, CallResponse

app = FastAPI()
serve = Serve(app)


def some_work(input: str) -> str:
    return f"processed '{input}'"


@app.post("/get-data")
async def get_data() -> Dict[str, str]:
    return {"message": "get data response"}


@serve.post("/call")
async def call(context: AsyncWorkflowContext[str]) -> None:
    input = context.request_payload

    async def _step1() -> str:
        output = some_work(input)
        print("step 1 input", input, "output", output)
        return output

    result1: str = await context.run("step1", _step1)

    response: CallResponse[Dict[str, str]] = await context.call(
        "get-data",
        url=f"{context.env.get('UPSTASH_WORKFLOW_URL', 'http://localhost:8000')}/get-data",
        method="POST",
        body={"message": result1},
    )

    async def _step2() -> str:
        output = some_work(response.body["message"])
        print("step 2 input", response, "output", output)
        return output

    await context.run("step2", _step2)

```

----------------------------------------

TITLE: Express.js Workflow Endpoint
DESCRIPTION: Defines a workflow endpoint using Express.js. It imports necessary modules, configures environment variables, sets up middleware to parse JSON, and defines a POST route that uses the `serve` function from `@upstash/workflow/express` to handle the workflow logic.  The workflow defines two steps: step1 retrieves a message from the request payload, and step2 logs the result of step1.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/express.mdx#_snippet_7

LANGUAGE: typescript
CODE:
```
import { serve } from "@upstash/workflow/express";
import express from 'express';
import { config } from 'dotenv';

config();

const app = express();

app.use(
  express.json()
);

app.post(
  '/workflow',
  serve<{ message: string }>(async (context) => {
    const res1 = await context.run("step1", async () => {
      const message = context.requestPayload.message;
      return message;
    })

    await context.run("step2", async () => {
        console.log(res1);
    })
  })
);

app.listen(3000, () => {
  console.log('Server running on port 3000');
});

```

----------------------------------------

TITLE: Invoking a Workflow with Context in Typescript
DESCRIPTION: This snippet shows how to invoke another workflow within a current workflow context using the `context.invoke` method.  It retrieves the response, failure and cancel state from the invoked workflow. It uses options such as `retries` and `flowControl`.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/invoke.mdx#_snippet_0

LANGUAGE: typescript
CODE:
```
const {
  body,      // response from the invoked workflow
  isFailed,  // whether the invoked workflow was canceled
  isCanceled // whether the invoked workflow failed
} = await context.invoke(
  "analyze-content",
  {
    workflow: analyzeContent,
    body: "test",
    header: {...}, // headers to pass to anotherWorkflow (optional)
    retries,       // number of retries (optional, default: 3)
    flowControl,   // flow control settings (optional)
    workflowRunId  // workflowRunId to set (optional)
  }
)
```

----------------------------------------

TITLE: Executing Parallel Workflow Steps with Promise.all in TypeScript
DESCRIPTION: This code demonstrates how to run multiple workflow steps in parallel using `Promise.all` within an Upstash Workflow. Each `ctx.run` invocation initiates a separate asynchronous task. The `result1`, `result2`, and `result3` variables will contain the results of the respective parallel steps once they complete.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/parallel-runs.mdx#_snippet_0

LANGUAGE: typescript
CODE:
```
const [result1, result2, result3] =
  await Promise.all([
    ctx.run("parallel-step-1", async () => { ... }),
    ctx.run("parallel-step-2", async () => { ... }),
    ctx.run("parallel-step-3", async () => { ... }),
  ])
```

----------------------------------------

TITLE: Custom Authorization in Workflow (TypeScript)
DESCRIPTION: This TypeScript code snippet shows how to implement a custom authorization mechanism within an Upstash Workflow. It retrieves the `authorization` header from the request context, extracts the bearer token, and validates it using the `isValid` function. If the token is invalid, it logs an error and returns. The same validation is applied in the `failureFunction` to handle failures due to authentication issues.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/security.mdx#_snippet_4

LANGUAGE: typescript
CODE:
```
import { serve } from "@upstash/workflow/nextjs";

export const { POST } = serve(
  async (context) => {
    const authHeader = context.headers.get("authorization");
    const bearerToken = authHeader?.split(" ")[1];

    if (!isValid(bearerToken)) {
      console.error("Authentication failed.");
      return;
    }

    // Your workflow steps..
  },
  {
    failureFunction: async () => {
      const authHeader = context.headers.get("authorization");
      const bearerToken = authHeader?.split(" ")[1];

      if (!isValid(bearerToken)) {
        // ...
      }
    },
  }
);
```

----------------------------------------

TITLE: Sending Confirmation and Notifications (Upstash Workflow)
DESCRIPTION: This code snippet demonstrates sending order confirmation and dispatch notifications to the customer using Upstash Workflow in both Typescript and Python. It calls `sendOrderConfirmation` and `sendDispatchNotification` functions with the user and order IDs to ensure customers are informed about order status.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/eCommerceOrderFulfillment.mdx#_snippet_5

LANGUAGE: typescript
CODE:
```
await context.run("send-confirmation", async () => {
  return await sendOrderConfirmation(userId, orderId)
})

await context.run("send-dispatch-notification", async () => {
  return await sendDispatchNotification(userId, orderId)
})
```

LANGUAGE: python
CODE:
```
async def _send_confirmation():
    return await send_order_confirmation(user_id, order_id)

await context.run("send-confirmation", _send_confirmation)

async def _send_dispatch_notification():
    return await send_dispatch_notification(user_id, order_id)

await context.run("send-dispatch-notification", _send_dispatch_notification)
```

----------------------------------------

TITLE: Aggregating Results (TypeScript)
DESCRIPTION: This TypeScript code snippet aggregates intermediate results after processing every 10 chunks or at the end of all chunks. It calls the `aggregateResults` function within a `context.run` block, ensuring the aggregation is executed as a separate, potentially long-running task. The processed chunks are then cleared for the next iteration.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/allInOne.mdx#_snippet_6

LANGUAGE: typescript
CODE:
```
if (i % 10 === 9 || i === chunks.length - 1) {
  await context.run(`aggregate-results${i}`, async () => {
    await aggregateResults(processedChunks)
    processedChunks.length = 0
  })
}
```

----------------------------------------

TITLE: Correct Result Handling - TypeScript
DESCRIPTION: This TypeScript code demonstrates the correct way to handle results within Upstash Workflow. The `someWork` function's return value is directly returned from `context.run("step-1", ...)` and assigned to the `result` variable. This ensures that `result` is always properly initialized with the correct value before `step-2` is executed.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#_snippet_3

LANGUAGE: typescript
CODE:
```
export const { POST } = serve<string>(async (context) => {
  const input = context.requestPayload

  const result = await context.run("step-1", async () => {
    return await someWork(input)
  })

  await context.run("step-2", async () => {
    someOtherWork(result)
  })
})
```

----------------------------------------

TITLE: Image Storage - TypeScript
DESCRIPTION: This TypeScript snippet stores the processed images in cloud storage using the `storeImage` function. It iterates through the processed images and calls `context.run` for each image to store it. `Promise.all` is used to run the storage operations concurrently.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/imageProcessing.mdx#_snippet_8

LANGUAGE: typescript
CODE:
```
const storedImageUrls: string[] = await Promise.all(
  processedImages.map(
    processedImage => context.run(`store-image`, async () => {
      return await storeImage(processedImage.body.imageUrl)
    })
  )
)
```

----------------------------------------

TITLE: Make a POST Request to Workflow Endpoint
DESCRIPTION: Sends a POST request to the workflow endpoint to trigger the workflow. The request includes a JSON payload. The response contains a unique workflow run ID.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/astro.mdx#_snippet_7

LANGUAGE: bash
CODE:
```
curl -X POST http://localhost:3000/api/workflow \
    -H "Content-Type: application/json" \
    -d '{"message": "Hello from the workflow!"}'

# result: {"workflowRunId":"wfr_xxxxxx"}
```

----------------------------------------

TITLE: Workflow Handler with Upstash/NextJS
DESCRIPTION: This TypeScript snippet defines a Next.js route handler that processes images using Upstash Workflow. It receives an image ID and user ID, retrieves the image, resizes it to multiple resolutions, applies filters, and stores the processed images. It uses the `serve` function from `@upstash/workflow/nextjs` to create the handler and `context.run` and `context.call` to execute workflow steps.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/imageProcessing.mdx#_snippet_0

LANGUAGE: typescript
CODE:
```
import { serve } from "@upstash/workflow/nextjs"
import {
  resizeImage,
  applyFilters,
  storeImage,
  getImageUrl,
} from "./utils"

type ImageResult = {
  imageUrl: string
}

export const { POST } = serve<{ imageId: string; userId: string }>
  (async (context) => {
    const { imageId, userId } = context.requestPayload

    // Step 1: Retrieve the uploaded image
    const imageUrl = await context.run("get-image-url", async () => {
      return await getImageUrl(imageId)
    })

    // Step 2: Resize the image to multiple resolutions
    const resolutions = [640, 1280, 1920]

    const resizedImages: { body: ImageResult }[] = await Promise.all(resolutions.map(
      resolution => context.call<ImageResult>(
        `resize-image-${resolution}`,
        {
          // endpoint which returns ImageResult type in response
          url: "https://image-processing-service.com/resize",
          method: "POST",
          body: {
            imageUrl,
            width: resolution,
          }
        }
      )
    ))

    // Step 3: Apply filters to each resized image
    const filters = ["grayscale", "sepia", "contrast"]
    const processedImagePromises: Promise<string>[] = []

    for (const resizedImage of resizedImages) {
      for (const filter of filters) {
        const processedImagePromise = context.call<ImageResult>(
          `apply-filter-${filter}`,
          {
            // endpoint which returns ImageResult type in response
            url: "https://image-processing-service.com/filter",
            method: "POST",
            body: {
              imageUrl: resizedImage.body.imageUrl,
              filter,
            }
          }
        )
        processedImagePromises.push(processedImagePromise)
      }
    }
    const processedImages: { body: ImageResult }[] = await Promise.all(processedImagePromises)

    // Step 4: Store processed images in cloud storage
    const storedImageUrls: string[] = await Promise.all(
      processedImages.map(
        processedImage => context.run(`store-image`, async () => {
          return await storeImage(processedImage.body.imageUrl)
        })
      )
    )
  }
)
```

----------------------------------------

TITLE: AI Generation Workflow (Next.js)
DESCRIPTION: This TypeScript code defines a Next.js route that leverages Upstash Workflow to process a dataset using OpenAI. It downloads the dataset, splits it into chunks, processes each chunk with GPT-4, aggregates the results, and generates a final report.  The code uses `context.run` for long-running tasks and `context.call` to interact with the OpenAI API.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/allInOne.mdx#_snippet_0

LANGUAGE: typescript
CODE:
```
import { serve } from "@upstash/workflow/nextjs"
import {
  downloadData,
  aggregateResults,
  generateReport,
  sendReport,
  getDatasetUrl,
  splitIntoChunks,
} from "./utils"

type OpenAiResponse = {
  choices: {
    message: {
      role: string,
      content: string
    }
  }[]
}

export const { POST } = serve<{ datasetId: string; userId: string }>
  (async (context) => {
    const request = context.requestPayload

    // Step 1: Download the dataset
    const datasetUrl = await context.run("get-dataset-url", async () => {
      return await getDatasetUrl(request.datasetId)
    })

    // HTTP request with much longer timeout (2hrs)
    const { body: dataset } = await context.call("download-dataset", {
      url: datasetUrl,
      method: "GET"
    })
      

    // Step 2: Process data in chunks using OpenAI
    const chunkSize = 1000
    const chunks = splitIntoChunks(dataset, chunkSize)
    const processedChunks: string[] = []

    for (let i = 0; i < chunks.length; i++) {
      const { body: processedChunk } = await context.api.openai.call<
        OpenAiResponse
      >(
        `process-chunk-${i}`,
        {
          token: process.env.OPENAI_API_KEY,
          operation: "chat.completions.create",
          body: {
            model: "gpt-4",
            messages: [
              {
                role: "system",
                content:
                  "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.",
              },
              {
                role: "user",
                content: `Analyze this data chunk: ${JSON.stringify(chunks[i])}`,
              },
            ],
            max_completion_tokens: 150,
          },
        }
      )

      processedChunks.push(processedChunk.choices[0].message.content!)

      // Every 10 chunks, we'll aggregate intermediate results
      if (i % 10 === 9 || i === chunks.length - 1) {
        await context.run(`aggregate-results${i}`, async () => {
          await aggregateResults(processedChunks)
          processedChunks.length = 0
        })
      }
    }

    // Step 3: Generate and send data report
    const report = await context.run("generate-report", async () => {
      return await generateReport(request.datasetId)
    })

    await context.run("send-report", async () => {
      await sendReport(report, request.userId)
    })
  }
)
```

----------------------------------------

TITLE: Call Anthropic API with Type-Safe Context Call (TypeScript)
DESCRIPTION: This code snippet demonstrates how to call the Anthropic API's `/v1/messages` endpoint using the `context.api.anthropic.call` method in an Upstash workflow. It includes setting the API key, operation, and request body. The code then extracts and logs the generated text from the response.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/anthropic.mdx#_snippet_0

LANGUAGE: typescript
CODE:
```
const { status, body } = await context.api.anthropic.call(
  "Call Anthropic",
  {
    token: "<ANTHROPIC_API_KEY>",
    operation: "messages.create",
    body: {
      model: "claude-3-5-sonnet-20241022",
      max_tokens: 1024,
      messages: [
          {"role": "user", "content": "Hello, world"}
      ]
    },
  }
);

// get text:
console.log(body.content[0].text)
```

----------------------------------------

TITLE: Workflow Endpoint Example
DESCRIPTION: This Python code defines a basic FastAPI endpoint that uses Upstash Workflow. It initializes FastAPI and Serve, defines a `/api/workflow` endpoint, and defines two asynchronous steps that are executed within the workflow context. The context.run function registers the steps to be executed within the workflow.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/fastapi.mdx#_snippet_6

LANGUAGE: python
CODE:
```
from fastapi import FastAPI
from upstash_workflow.fastapi import Serve

app = FastAPI()
serve = Serve(app)


@serve.post("/api/workflow")
async def workflow(context) -> None:
    async def _step1() -> None:
        print("initial step ran")

    await context.run("initial-step", _step1)

    async def _step2() -> None:
        print("second step ran")

    await context.run("second-step", _step2)

```

----------------------------------------

TITLE: Creating a Workflow with createWorkflow in Typescript
DESCRIPTION: This snippet demonstrates how to define a workflow using the `createWorkflow` function. It specifies the type of the request body and ensures type safety for request and response when invoking other workflows. The workflow logic is defined as an asynchronous function that receives a `WorkflowContext`.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/invoke.mdx#_snippet_1

LANGUAGE: typescript
CODE:
```
import { WorkflowContext }  from "@upstash/workflow";
import { createWorkflow } from "@upstash/workflow/nextjs";

const anotherWorkflow = createWorkflow(
  // Define the workflow logic, specifying the type of the initial request body.
  // In this case, the body is a string:
  async (context: WorkflowContext<string>) => {

    await context.sleep("wait 1 second", 1)

    // Return a response from the workflow. The type of this
    // response will be available when `context.invoke` is
    // called with `anotherWorkflow`.
    return { message: "This is the data returned by the workflow" };
  }
);

const someWorkflow = createWorkflow(async (context) => {
  // Invoke anotherWorkflow with a string body and get the response
  // The types of the body parameter and the response are
  // typesafe and inferred from anotherWorkflow
  const { body } = await context.invoke(
    "invoke anotherWorkflow",
    {
      workflow: anotherWorkflow,
      body: "user-1"
    }
  ),
});
```

----------------------------------------

TITLE: Processing Data with OpenAI (Python)
DESCRIPTION: This Python code snippet processes data in chunks using OpenAI's GPT-4 model via the OpenAI API. For each chunk, it calls the OpenAI API with the `context.call` method, including the API URL, method, headers (containing the API key), and request body. The chunk is analyzed, and the response is stored.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/allInOne.mdx#_snippet_5

LANGUAGE: python
CODE:
```
for i, chunk in enumerate(chunks):
    openai_response: CallResponse[Dict[str, str]] = await context.call(
        f"process-chunk-{i}",
        url="https://api.openai.com/v1/chat/completions",
        method="POST",
        headers={
            "authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}",
        },
        body={
            "model": "gpt-4",
            "messages": [
            {
                "role": "system",
                "content":
                "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.",
            },
            {
                "role": "user",
                "content": f"Analyze this data chunk: {json.dumps(chunk)}",
            },
            ],
            "max_tokens": 150,
        },
    )
```

----------------------------------------

TITLE: Configuring a Failure Function with TypeScript
DESCRIPTION: This snippet demonstrates how to configure a `failureFunction` within the `serve` method for handling workflow failures. The `failureFunction` allows for custom error handling logic, such as logging to Sentry or implementing specific cleanup actions. It receives the context, failure status, response, and headers as input.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/failures.mdx#_snippet_0

LANGUAGE: typescript
CODE:
```
export const { POST } = serve<string>(
  async (context) => {
    // Your workflow logic...
  },
  {
    failureFunction: async ({
      context,
      failStatus,
      failResponse,
      failHeaders,
    }) => {
      // Handle error, i.e. log to Sentry
    },
  }
);

```

----------------------------------------

TITLE: Calling Resend API for Batch Email in Upstash Workflow (TypeScript)
DESCRIPTION: This code snippet demonstrates how to call the Resend API to send a batch of emails using `context.api.resend.call` within an Upstash Workflow.  It requires a Resend API key and setting `batch: true`. The `status` and `body` of the response are captured.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/resend.mdx#_snippet_1

LANGUAGE: typescript
CODE:
```
const { status, body } = await context.api.resend.call(
  "Call Resend",
  {
    batch: true,
    token: "<RESEND_API_KEY>",
    body: [
      {
        from: "Acme <onboarding@resend.dev>",
        to: ["delivered@resend.dev"],
        subject: "Hello World",
        html: "<p>It works!</p>",
      },
      {
        from: "Acme <onboarding@resend.dev>",
        to: ["delivered@resend.dev"],
        subject: "Hello World",
        html: "<p>It works!</p>",
      },
    ],
    headers: {
      "content-type": "application/json",
    },
  }
);
```

----------------------------------------

TITLE: Fetching Workflow Logs with Node.js
DESCRIPTION: This snippet demonstrates how to fetch workflow logs using Node.js and the `fetch` API. It makes a GET request to the `/v2/workflows/logs` endpoint and includes the Authorization header with a Bearer token.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/logs.mdx#_snippet_2

LANGUAGE: javascript
CODE:
```
const response = await fetch("https://qstash.upstash.io/v2/workflows/logs", {
  headers: {
    Authorization: "Bearer <token>",
  },
});
```

----------------------------------------

TITLE: Unsuspend User Logic (Python)
DESCRIPTION: This Python code checks if a user is suspended and, if so, unsuspends them using the `check_suspension` and `unsuspend_user` functions within an Upstash Workflow context. Asynchronous functions `_check_suspension` and `_unsuspend_user` are defined to encapsulate the calls to the underlying functions.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#_snippet_7

LANGUAGE: python
CODE:
```
async def _check_suspension() -> bool:
    return await check_suspension(email)

is_suspended = await context.run("check suspension", _check_suspension)

if is_suspended:

    async def _unsuspend_user() -> None:
        await unsuspend_user(email)

    await context.run("unsuspend user", _unsuspend_user)

```

----------------------------------------

TITLE: Send Welcome Email - TypeScript
DESCRIPTION: This TypeScript snippet sends a welcome email to the user upon creation. It uses `context.run` to execute the `sendEmail` function with the user's email and a welcome message, indicating the start of their free trial.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#_snippet_8

LANGUAGE: typescript
CODE:
```
await context.run("send welcome email", async () => {
  await sendEmail(
    email,
    "Welcome to our platform!, You have 14 days of free trial."
  );
});
```

----------------------------------------

TITLE: Run Non-Idempotent Functions in context.run - TypeScript
DESCRIPTION: This TypeScript code shows the correct way to call non-idempotent function. The getResultFromDb is called inside `context.run` to prevent `Failed to authenticate Workflow request`.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#_snippet_12

LANGUAGE: typescript
CODE:
```
const result = await context.run(async () => {
  await getResultFromDb(entryId)
});
if (result.return) {
  return;
}
```

----------------------------------------

TITLE: Unsuspend User Logic (TypeScript)
DESCRIPTION: This TypeScript code checks if a user is suspended and, if so, unsuspends them using the `checkSuspension` and `unsuspendUser` functions within an Upstash Workflow context. It first checks suspension status, and then conditionally runs the unsuspend user step.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#_snippet_6

LANGUAGE: typescript
CODE:
```
const isSuspended = await context.run("check suspension", async () => {
  return await checkSuspension(email);
});
if (isSuspended) {
  await context.run("unsuspend user", async () => {
    await unsuspendUser(email);
  });
}
```

----------------------------------------

TITLE: Send Reminder Email - TypeScript
DESCRIPTION: This TypeScript snippet handles sending a reminder email after 7 days if the user hasn't solved any questions. It uses `context.sleep` to wait for 7 days, fetches user stats using `getUserStats`, and then calls `sendProblemSolvedEmail` to conditionally send the reminder email.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#_snippet_10

LANGUAGE: typescript
CODE:
```
await context.sleep("wait", 7 * 24 * 60 * 60);

const stats = await context.run("get user stats", async () => {
  return await getUserStats(userid);
});
await sendProblemSolvedEmail({context, email, stats});
```

----------------------------------------

TITLE: Suspend User: TypeScript
DESCRIPTION: This TypeScript snippet checks if a user is suspended, suspends the user if not already suspended, and sends an email notification. It relies on `checkSuspension`, `suspendUser`, and `sendEmail` functions.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#_snippet_10

LANGUAGE: TypeScript
CODE:
```
const isSuspended = await context.run("check suspension", async () => {
  return await checkSuspension(email);
});

if (!isSuspended) {
  await context.run("suspend user", async () => {
    await suspendUser(email);
  });

  await context.run("send suspended email", async () => {
    await sendEmail(
      context.requestPayload.email,
      "Your account has been suspended due to payment failure. Please update your payment method."
    );
  });
}
```

----------------------------------------

TITLE: Workflow Endpoint with Sleep
DESCRIPTION: This Python code defines a FastAPI endpoint that utilizes Upstash Workflow with sleep functionality. The workflow includes steps that perform some work using the `some_work` function, print input and output, and then pause execution using `context.sleep_until` and `context.sleep`.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/fastapi.mdx#_snippet_7

LANGUAGE: python
CODE:
```
from fastapi import FastAPI
import time
from upstash_workflow.fastapi import Serve
from upstash_workflow import AsyncWorkflowContext

app = FastAPI()
serve = Serve(app)


def some_work(input: str) -> str:
    return f"processed '{input}'"


@serve.post("/sleep")
async def sleep(context: AsyncWorkflowContext[str]) -> None:
    input = context.request_payload

    async def _step1() -> str:
        output = some_work(input)
        print("step 1 input", input, "output", output)
        return output

    result1: str = await context.run("step1", _step1)

    await context.sleep_until("sleep1", time.time() + 3)

    async def _step2() -> str:
        output = some_work(result1)
        print("step 2 input", result1, "output", output)
        return output

    result2: str = await context.run("step2", _step2)

    await context.sleep("sleep2", 2)

    async def _step3() -> None:
        output = some_work(result2)
        print("step 3 input", result2, "output", output)

    await context.run("step3", _step3)

```

----------------------------------------

TITLE: Handling Webhook Events - TypeScript
DESCRIPTION: This TypeScript code snippet demonstrates how to handle webhook events within an Upstash Workflow using the `context.run` method. It parses and validates the incoming request, and then uses `context.run` to process the event. If the event type is `user.created`, it extracts relevant user data and returns it; otherwise, it returns false.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/use-webhooks.mdx#_snippet_5

LANGUAGE: TypeScript
CODE:
```
export const { POST } = serve(async (context) => {
  // ... Parse and validate the incoming request

  const user = await context.run<false | UserPayload>(
    "handle-webhook-event",
    async () => {
      if (event.type === "user.created") {
        const { id: clerkUserId, email_addresses, first_name } = event.data;
        const primaryEmail = email_addresses.find(
          (email) => (email.id = event.data.primary_email_address_id)
        );

        if (!primaryEmail) {
          return false;
        }

        return {
          event: event.type,
          userId: clerkUserId,
          email: primaryEmail.email_address,
          firstName: first_name,
        } as UserPayload;
      }
      return false;
    }
  );
});
```

----------------------------------------

TITLE: Requesting Order Processing in Workflow
DESCRIPTION: This snippet shows how to encapsulate the order processing request in a `context.run` call inside the workflow. It executes the `requestProcessing` function with the `orderId`.  It's a part of the workflow defined previously and demonstrates running a specific step.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/waitForEvent.mdx#_snippet_2

LANGUAGE: typescript
CODE:
```
await context.run("request order processing", async () => {
  await requestProcessing(orderId)
})
```

----------------------------------------

TITLE: Conditional Email Based on Solved Problems - TypeScript
DESCRIPTION: This TypeScript function `sendProblemSolvedEmail` conditionally sends an email based on the number of problems the user has solved. If the user hasn't solved any problems, a 'no answers' email is sent; otherwise, a stats email is sent indicating the number of problems solved in the last 7 days.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#_snippet_12

LANGUAGE: typescript
CODE:
```
async function sendProblemSolvedEmail({
  context: WorkflowContext<UserCreatedPayload>,
  email: string,
  stats: { totalProblemsSolved: number }
}) {
  if (stats.totalProblemsSolved === 0) {
    await context.run("send no answers email", async () => {
      await sendEmail(
        email,
        "Hey, you haven't solved any questions in the last 7 days..."
      );
    });
  } else {
    await context.run("send stats email", async () => {
      await sendEmail(
        email,
        `You have solved ${stats.totalProblemsSolved} problems in the last 7 days. Keep it up!`
      );
    });
  }
}
```

----------------------------------------

TITLE: Defining OpenAI Model
DESCRIPTION: This code snippet shows how to define an OpenAI model using the `context.agents.openai` method in a Next.js route. It imports the `serve` function from `@upstash/workflow/nextjs` and defines a POST handler that initializes the model. The model will be used for generating responses and deciding which tools to call.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/features.mdx#_snippet_2

LANGUAGE: typescript
CODE:
```
import { serve } from "@upstash/workflow/nextjs";

export const { POST } = serve(async (context) => {
  const model = context.agents.openai('gpt-3.5-turbo')

  // ...
})
```

----------------------------------------

TITLE: Custom Authorization in Workflow (Python)
DESCRIPTION: This Python code snippet illustrates a custom authorization implementation for an Upstash Workflow. It retrieves the `authorization` header from the request context, extracts the bearer token, and checks its validity using the `is_valid` function. If the token is invalid, it prints an error message and returns. It leverages FastAPI and `upstash_workflow.fastapi` for workflow integration.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/security.mdx#_snippet_5

LANGUAGE: python
CODE:
```
from fastapi import FastAPI
from upstash_workflow.fastapi import Serve
from upstash_workflow import AsyncWorkflowContext

app = FastAPI()
serve = Serve(app)


@serve.post("/api/example")
async def example(context: AsyncWorkflowContext[str]) -> None:
    auth_header = context.headers.get("authorization")
    bearer_token = auth_header.split(" ")[1] if auth_header else None

    if not is_valid(bearer_token):
        print("Authentication failed.")
        return

    # Your workflow steps...
```

----------------------------------------

TITLE: Charging Customer (TypeScript)
DESCRIPTION: This TypeScript code attempts to charge a customer using the `chargeCustomer` function within an Upstash Workflow context. It includes a try-catch block to handle potential errors during the charging process, allowing for custom logic to be executed upon failure. The attempt number is passed to the `chargeCustomer` function.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#_snippet_2

LANGUAGE: typescript
CODE:
```
const result = await context.run("charge customer", async () => {
  try {
    return await chargeCustomer(i + 1),
  } catch (e) {
    console.error(e);
    return
  }
});
```

----------------------------------------

TITLE: Initialize Workflow Endpoint with Python
DESCRIPTION: Defines a workflow endpoint using FastAPI and the `Serve` class from `upstash_workflow.fastapi`. It defines an asynchronous function `example` that uses the `AsyncWorkflowContext` to run two steps. Each step is defined as an asynchronous function.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#_snippet_1

LANGUAGE: Python
CODE:
```
from fastapi import FastAPI
from upstash_workflow.fastapi import Serve
from upstash_workflow import AsyncWorkflowContext

app = FastAPI()
serve = Serve(app)


@serve.post("/api/example")
async def example(context: AsyncWorkflowContext[str]) -> None:
    async def _step1() -> str:
        # define a piece of business logic as step 1
        return "step 1 result"

    result = await context.run("step-1", _step1)

    async def _step2() -> None:
        # define another piece of business logic as step 2
        pass

    await context.run("step-2", _step2)
```

----------------------------------------

TITLE: Generating and Sending Report (TypeScript)
DESCRIPTION: This TypeScript code snippet generates a report based on the aggregated results and sends it to the user. It calls the `generateReport` and `sendReport` functions within `context.run` blocks to execute them as separate workflow steps.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/allInOne.mdx#_snippet_8

LANGUAGE: typescript
CODE:
```
const report = await context.run("generate-report", async () => {
  return await generateReport(request.datasetId)
})

await context.run("send-report", async () => {
  await sendReport(report, request.userId)
})
```

----------------------------------------

TITLE: Execute Business Logic in context.run - TypeScript
DESCRIPTION: This code snippet demonstrates the correct way to execute business logic within the `context.run` function in Upstash Workflow. The business logic (returning `{ success: true }` and logging) is placed inside `context.run`, ensuring it only executes once per step during a workflow run. Logging outside `context.run` can appear multiple times because the workflow endpoint can be called multiple times.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#_snippet_0

LANGUAGE: typescript
CODE:
```
export const { POST } = serve<string>(async (context) => {
  const input = context.requestPayload

  const result = await context.run("step-1", () => {
    return { success: true }
  })

  console.log("This log will appear multiple times")

  await context.run("step-2", () => {
    console.log("This log will appear just once")
    console.log("Step 1 status is:", result.success)
  })
})
```

----------------------------------------

TITLE: Generating and Sending Report (Python)
DESCRIPTION: This Python code snippet generates a report and sends it to the user. It defines asynchronous functions `_generate_report` and `_send_report` which call `generate_report` and `send_report` respectively. These functions are then executed using `context.run` to ensure they are executed as separate workflow steps.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/allInOne.mdx#_snippet_9

LANGUAGE: python
CODE:
```
async def _generate_report() -> Any:
    return await generate_report(dataset_id)

report = await context.run("generate-report", _generate_report)

async def _send_report() -> None:
    await send_report(report, user_id)

await context.run("send-report", _send_report)
```

----------------------------------------

TITLE: Calling Resend API for Single Email in Upstash Workflow (TypeScript)
DESCRIPTION: This code snippet demonstrates how to call the Resend API to send a single email using `context.api.resend.call` within an Upstash Workflow.  It requires a Resend API key. The `status` and `body` of the response are captured.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/resend.mdx#_snippet_0

LANGUAGE: typescript
CODE:
```
const { status, body } = await context.api.resend.call(
  "Call Resend",
  {
    token: "<RESEND_API_KEY>",
    body: {
      from: "Acme <onboarding@resend.dev>",
      to: ["delivered@resend.dev"],
      subject: "Hello World",
      html: "<p>It works!</p>",
    },
    headers: {
      "content-type": "application/json",
    },
  }
);
```

----------------------------------------

TITLE: Generating Text with OpenAI Client in Workflow
DESCRIPTION: Creates a Next.js API route that uses the Upstash Workflow to generate text using the OpenAI client.  It defines a workflow that takes a prompt as input, uses the `generateText` function from the `ai` package to generate text, and logs the result.  The code includes error handling for `ToolExecutionError` and `WorkflowAbort` exceptions.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/aisdk.mdx#_snippet_4

LANGUAGE: typescript
CODE:
```
import { serve } from "@upstash/workflow/nextjs";
import { WorkflowAbort } from '@upstash/workflow';
import { generateText, ToolExecutionError } from 'ai';

import { createWorkflowOpenAI } from './model';

export const { POST } = serve<{ prompt: string }>(async (context) => {
  const openai = createWorkflowOpenAI(context);

  // Important: Must have a step before generateText
  const prompt = await context.run("get prompt", async () => {
    return context.requestPayload.prompt;
  });

  try {
    const result = await generateText({
      model: openai('gpt-3.5-turbo'),
      maxTokens: 2048,
      prompt,
    });

    await context.run("text", () => {
      console.log(`TEXT: ${result.text}`);
      return result.text;
    });
    
  } catch (error) {    
    if (error instanceof ToolExecutionError && error.cause instanceof WorkflowAbort) {
      throw error.cause;
    } else {
      throw error;
    }
  }
});
```

----------------------------------------

TITLE: Limiting External API Calls with context.call (JS)
DESCRIPTION: This code snippet shows how to limit requests to an external API using `context.call` within the `serve` method. It configures the rate per second and parallelism limits for the API call, preventing excessive requests.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/flow-control.mdx#_snippet_2

LANGUAGE: javascript
CODE:
```
import { serve } from "@upstash/workflow/nextjs";

export const { POST } = serve<{ topic: string }>(async (context) => {
  const request = context.requestPayload;

  const response = await context.call(
    "generate-long-essay", 
    {
      url: "https://api.openai.com/v1/chat/completions",
      method: "POST",
      body: {/*****/},
      flowControl: { key: "opani-call", parallelism: 3, ratePerSecond: 10 }
    }
  );
});
```

----------------------------------------

TITLE: Defining an Agent
DESCRIPTION: This code defines an agent with a specified model, name, maximum number of steps, tools, and background using `context.agents.agent`. It leverages the `WikipediaQueryRun` tool for accessing Wikipedia information. The agent represents an LLM with access to tools and background knowledge, which it can use to perform tasks.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/features.mdx#_snippet_10

LANGUAGE: typescript
CODE:
```
import { serve } from "@upstash/workflow/nextjs";
import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run";

export const { POST } = serve(async (context) => {
  const model = context.agents.openai('gpt-3.5-turbo')

  const researcherAgent = context.agents.agent({
    model,
    name: 'academic',
    maxSteps: 2,
    tools: {
      wikiTool: new WikipediaQueryRun({
        topKResults: 1,
        maxDocContentLength: 500,
      })
    },
    background:
      'You are researcher agent with access to Wikipedia. '
      + 'Utilize Wikipedia as much as possible for correct information',
  })
})
```

----------------------------------------

TITLE: Sending HTTP Request with Requests (Python)
DESCRIPTION: This Python snippet demonstrates how to send a POST request to a workflow endpoint using the `requests` library. It sends a JSON payload and includes a custom header.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/start.mdx#_snippet_5

LANGUAGE: Python
CODE:
```
import requests

  requests.post(
      "https://<YOUR_WORKFLOW_ENDPOINT>/<YOUR-WORKFLOW-ROUTE>", json={"foo": "bar"}, headers={"my-header": "foo"}
  )

```

----------------------------------------

TITLE: Execute Business Logic in context.run - Python
DESCRIPTION: This code snippet demonstrates the correct way to execute business logic within the `context.run` function in Upstash Workflow using Python. The business logic (returning `{"success": True}` and logging) is placed inside `context.run`, ensuring it only executes once per step during a workflow run. Logging outside `context.run` can appear multiple times because the workflow endpoint can be called multiple times.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#_snippet_1

LANGUAGE: python
CODE:
```
@serve.post("/api/example")
async def example(context: AsyncWorkflowContext[str]) -> None:
    input = context.request_payload

    async def _step_1() -> Dict:
        return {"success": True}

    result = await context.run("step-1", _step_1)

    print("This log will appear multiple times")

    async def _step_2() -> None:
        print("This log will appear just once")
        print("Step 1 status is:", result["success"])

    await context.run("step-2", _step_2)
```

----------------------------------------

TITLE: Configure Flow Control in TypeScript
DESCRIPTION: Illustrates how to configure flow control for a workflow using the `flowControl` option. This allows setting `ratePerSecond` to limit requests and `parallelism` to limit concurrent requests.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#_snippet_6

LANGUAGE: TypeScript
CODE:
```
export const { POST } = serve<string>(
  async (context) => { ... },
  {
    flowControl: { key: "aFlowControlKey",  ratePerSecond: 10, parallelism: 3 }
  }
);
```

----------------------------------------

TITLE: Configuring OpenAI Model with baseURL
DESCRIPTION: This code snippet shows how to configure the OpenAI model with a custom baseURL and apiKey, allowing the use of OpenAI-compatible providers like DeepSeek. The baseURL specifies the API endpoint for the provider.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/features.mdx#_snippet_3

LANGUAGE: typescript
CODE:
```
const model = context.agents.openai('gpt-3.5-turbo', {
  baseURL: "https://api.deepseek.com",
  apiKey: process.env.DEEPSEEK_API_KEY
})
```

----------------------------------------

TITLE: Orchestrating Workers with Upstash Workflow and Langchain
DESCRIPTION: This snippet demonstrates how to use Upstash Workflow with Next.js to create an orchestrator that directs multiple worker agents using Langchain to handle different subtasks and synthesize their outputs. It defines worker agents with specific tools and backgrounds, and a task that orchestrates these agents to create a Q&A for advanced topics in physics.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/patterns/orchestrator-workers.mdx#_snippet_0

LANGUAGE: typescript
CODE:
```
import { serve } from "@upstash/workflow/nextjs";
import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run";

const wikiTool = new WikipediaQueryRun({
  topKResults: 1,
  maxDocContentLength: 500,
})

export const { POST } = serve(async (context) => {
  const model = context.agents.openai('gpt-4o');

  // Worker agents
  const worker1 = context.agents.agent({
    model,
    name: 'worker1',
    tools: { wikiTool },
    maxSteps: 3,
    background: 'You are a worker agent that answers general questions about advanced physics.'
  });

  const worker2 = context.agents.agent({
    model,
    name: 'worker2',
    tools: { wikiTool },
    maxSteps: 3,
    background: 'You are a worker agent that answers questions about quantum mechanics.'
  });

  const worker3 = context.agents.agent({
    model,
    name: 'worker3',
    tools: { wikiTool },
    maxSteps: 3,
    background: 'You are a worker agent that answers questions about relativity.'
  });

  // Synthesizing results
  const task = context.agents.task({
    model,
    prompt: `Create a Q&A for advanced topics in physics`,
    agents: [worker1, worker2, worker3],
    maxSteps: 3,
  })
  const { text } = await task.run();

  console.log(text);
});
```

----------------------------------------

TITLE: Cancel Workflow Run Programmatically - JavaScript
DESCRIPTION: This snippet demonstrates how to cancel a running workflow using the Upstash Workflow JavaScript client. It initializes the client with a Qstash token and then calls the `cancel` method with the ID of the workflow run to be canceled.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/cancel.mdx#_snippet_0

LANGUAGE: JavaScript
CODE:
```
import { Client } from "@upstash/workflow";

const client = new Client({ token: "<QSTASH_TOKEN>" });
await client.cancel({ ids: "<WORKFLOW_RUN_ID>" });
```

----------------------------------------

TITLE: Handling Webhook Events - Python
DESCRIPTION: This Python code snippet demonstrates how to handle webhook events within an Upstash Workflow.  It uses `context.run` to execute an asynchronous function `_handle_webhook_event`. Inside this function, it checks if the event type is `user.created`, extracts the data, and returns it as a dictionary.  If no primary email is found or the event type is different, it returns `False`.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/use-webhooks.mdx#_snippet_6

LANGUAGE: Python
CODE:
```
@serve.post("/api/example")
async def example(context: AsyncWorkflowContext[str]) -> None:
    # ... Parse and validate the incoming request

    async def _handle_webhook_event():
        if event.type == "user.created":
            clerk_user_id = event.data["id"]
            email_addresses = event.data["email_addresses"]
            first_name = event.data["first_name"]

            primary_email = next(
                (
                    email
                    for email in email_addresses
                    if email.id == event.data["primary_email_address_id"]
                ),
                None,
            )

            if not primary_email:
                return False

            return {
                "event": event.type,
                "user_id": clerk_user_id,
                "email": primary_email["email_address"],
                "first_name": first_name,
            }

        return False

    user = await context.run("handle-webhook-event", _handle_webhook_event)

```

----------------------------------------

TITLE: Aggregating Results (Python)
DESCRIPTION: This Python code snippet aggregates the processed chunks every 10 chunks or at the end of the dataset. It defines an asynchronous function `_aggregate_results` that calls the `aggregate_results` function and clears the `processed_chunks` list.  This function is executed within a `context.run` block, ensuring the aggregation task can run independently.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/allInOne.mdx#_snippet_7

LANGUAGE: python
CODE:
```
if i % 10 == 9 or i == len(chunks) - 1:

    async def _aggregate_results() -> None:
        await aggregate_results(processed_chunks)
        processed_chunks.clear()

    await context.run(f"aggregate-results{i}", _aggregate_results)
```

----------------------------------------

TITLE: Workflow Endpoint with Auth
DESCRIPTION: This Python code presents a FastAPI endpoint using Upstash Workflow with authentication. The workflow verifies an 'authentication' header with a specific token, and only proceeds if the authentication is successful.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/fastapi.mdx#_snippet_9

LANGUAGE: python
CODE:
```
from fastapi import FastAPI
from upstash_workflow.fastapi import Serve
from upstash_workflow import AsyncWorkflowContext

app = FastAPI()
serve = Serve(app)


def some_work(input: str) -> str:
    return f"processed '{input}'"


@serve.post("/auth")
async def auth(context: AsyncWorkflowContext[str]) -> None:
    if context.headers.get("authentication") != "Bearer secret_password":
        print("Authentication failed.")
        return

    async def _step1() -> str:
        return "output 1"

    await context.run("step1", _step1)

    async def _step2() -> str:
        return "output 2"

    await context.run("step2", _step2)

```

----------------------------------------

TITLE: Creating Stripe Customer - TypeScript
DESCRIPTION: This TypeScript code snippet demonstrates how to create a Stripe customer using the Stripe API within an Upstash Workflow. It assumes that a user object has been previously extracted.  If a user object exists, it calls the Stripe API to create a customer with the user's email, name, and user ID.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/use-webhooks.mdx#_snippet_7

LANGUAGE: TypeScript
CODE:
```
export const { POST } = serve(async (context) => {
  // ... Previous validation and user data extraction

  if (!user) {
    return;
  }

  const customer = await context.run("create-stripe-customer", async () => {
    return await stripe.customers.create({
      email: user.email,
      name: `${user.firstName} ${user.lastName}`,
      metadata: {
        userId: user.userId,
      },
    });
  });

  /// ... Additional steps
});
```

----------------------------------------

TITLE: Notifying a Workflow with client.notify TypeScript
DESCRIPTION: This snippet shows how to use the `client.notify` method to notify a waiting workflow that a specific event has occurred. It sends the event ID and event data to the workflow, allowing it to resume execution. Requires the `@upstash/workflow` package and a QSTASH_TOKEN.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/events.mdx#_snippet_1

LANGUAGE: typescript
CODE:
```
import { Client } from "@upstash/workflow";

const client = new Client({ token: "<QSTASH_TOKEN>" });

await client.notify({
  eventId: "event-id",
  eventData: { my: "data" },
});
```

----------------------------------------

TITLE: Image Resizing - Python
DESCRIPTION: This Python snippet resizes the image to multiple resolutions using an external image processing service. It iterates through a list of resolutions and calls `context.call` for each resolution to trigger an external service call. The results are collected and processed to extract the image data.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/imageProcessing.mdx#_snippet_5

LANGUAGE: python
CODE:
```
resolutions = [640, 1280, 1920]
resize_responses = []

for resolution in resolutions:
    response: CallResponse[ImageResult] = await context.call(
        f"resize-image-{resolution}",
        # endpoint which returns ImageResult type in response
        url="https://image-processing-service.com/resize",
        method="POST",
        body={"imageUrl": image_url, "width": resolution},
    )
    resize_responses.append(response)

resized_images = [response.body for response in resize_responses]
```

----------------------------------------

TITLE: Publishing Message with QStash (TypeScript)
DESCRIPTION: This snippet shows how to publish a message to a workflow endpoint using the QStash client in TypeScript. It initializes a QStash client and uses the `publishJSON` method to send a JSON payload to the specified URL. The function takes parameters such as the URL, body, headers, and the number of retries.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/start.mdx#_snippet_1

LANGUAGE: TypeScript
CODE:
```
// Using the QStash client
import { Client } from "@upstash/qstash";

const client = new Client({ token: "<QSTASH_TOKEN>" });

const { messageId } = await client.publishJSON({
  url: "https://<YOUR_WORKFLOW_ENDPOINT>/<YOUR-WORKFLOW-ROUTE>",
  body: { hello: "there!" },
  headers: { ... },    
  retries: 3           
});
```

----------------------------------------

TITLE: Transforming Conditions to Explicit Steps - TypeScript
DESCRIPTION: This TypeScript code provides the recommended pattern for handling non-deterministic conditions in Upstash Workflow. It transforms the condition into an explicit workflow step using `context.run`. This approach avoids the authentication failure that occurs when returning early without executing any steps. someCondition() is assumed to be a non-deterministic function.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/troubleshooting/general.mdx#_snippet_8

LANGUAGE: typescript
CODE:
```
export const { POST } = serve(async (context) => {
  
  const shouldReturn = await context.run("check condition", () => someCondition())
  if (shouldReturn) => {
    return;
  }

  // rest of the workflow
})
```

----------------------------------------

TITLE: Periodic State Check and Email Sending - TypeScript
DESCRIPTION: This TypeScript snippet implements a periodic state check within an infinite loop. It retrieves the user state using `getUserState`, and sends different emails depending on whether the user is 'active' or 'non-active'.  The workflow sleeps for one month (60 * 60 * 24 * 30 seconds) after each state check.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/customerOnboarding.mdx#_snippet_6

LANGUAGE: typescript
CODE:
```
while (true) {
  const state = await context.run("check-user-state", async () => {
    return await getUserState()
  })

  if (state === "non-active") {
    await context.run("send-email-non-active", async () => {
      await sendEmail("Email to non-active users", email)
    })
  } else if (state === "active") {
    await context.run("send-email-active", async () => {
      await sendEmail("Send newsletter to active users", email)
    })
  }

  await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30)
}
```

----------------------------------------

TITLE: Send Reminder Email - Python
DESCRIPTION: This Python snippet sends a reminder email after 7 days if the user hasn't solved any questions. It waits using `context.sleep`, retrieves user stats using `get_user_stats`, and then calls `send_problem_solved_email` to conditionally send the reminder based on the user's progress.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#_snippet_11

LANGUAGE: python
CODE:
```
await context.sleep("wait", 7 * 24 * 60 * 60)

async def _get_user_stats() -> UserStats:
    return await get_user_stats(userid)

stats: UserStats = await context.run("get user stats", _get_user_stats)

await send_problem_solved_email(context, email, stats)
```

----------------------------------------

TITLE: Define Cloudflare Workers Workflow Endpoint
DESCRIPTION: Defines a workflow endpoint using Cloudflare Workers.  This code imports the `serve` function from the `@upstash/workflow/cloudflare` library and defines a simple workflow with two steps, demonstrating how to access the request payload and execute asynchronous tasks within the workflow.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/cloudflare-workers.mdx#_snippet_7

LANGUAGE: typescript
CODE:
```
import { serve } from "@upstash/workflow/cloudflare"

interface Env {
  ENVIRONMENT: "development" | "production"
}

export default serve<{ text: string }>(
  async (context) => {
    const initialPayload = context.requestPayload.text

    const result = await context.run("initial-step", async () => {
        console.log(`Step 1 running with payload: ${initialPayload}`)

        return { text: "initial step ran" }
      }
    )

    await context.run("second-step", async () => {
      console.log(`Step 2 running with result from step 1: ${result.text}`)
    })
  }
)
```

----------------------------------------

TITLE: Get Workflow Logs with Upstash Client (TS)
DESCRIPTION: This snippet demonstrates how to retrieve workflow logs using the Upstash Workflow Client.  It allows filtering by workflow run ID, count, state, workflow URL, workflow creation timestamp, and cursor. A QSTASH_TOKEN is required for authentication.  All parameters are optional.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/client.mdx#_snippet_1

LANGUAGE: typescript
CODE:
```
import { Client } from "@upstash/workflow";

const client = new Client({ token: "<QSTASH_TOKEN>" })
const { runs, cursor } = await client.logs({
  // Id of the workflow run to get
  workflowRunId,
  // Number of workflows to get
  count,
  // Workflow state to filter for.
  // One of "RUN_STARTED", "RUN_SUCCESS", "RUN_FAILED", "RUN_CANCELED"
  state,
  // Workflow url to search for. should be an exact match
  workflowUrl,       
  // Unix timestamp when the run was created
  workflowCreatedAt, 
  // Cursor from a previous request to continue the search
  cursor             
})
```

----------------------------------------

TITLE: Creating a Weekly Summary Workflow (FastAPI)
DESCRIPTION: This Python code defines a FastAPI endpoint to send weekly summary emails using Upstash Workflow. It fetches user data, generates a summary, and sends an email with the summary, leveraging the `upstash_workflow.fastapi` module.  It utilizes `AsyncWorkflowContext` and asynchronous functions for each step.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/schedule.mdx#_snippet_5

LANGUAGE: python
CODE:
```
from fastapi import FastAPI
from upstash_workflow.fastapi import Serve
from upstash_workflow import AsyncWorkflowContext
from utils import get_user_data, generate_summary, send_email

app = FastAPI()
serve = Serve(app)


@dataclass
class WeeklySummaryData:
    user_id: str


@serve.post("/api/send-weekly-summary")
async def send_weekly_summary(context: AsyncWorkflowContext[WeeklySummaryData]) -> None:
    user_id = context.request_payload.user_id

    # Step 1: Fetch user data
    async def _step1():
        return await get_user_data(user_id)

    user = await context.run("fetch_user_data", _step1)

    # Step 2: Generate weekly summary
    async def _step2():
        return await generate_summary(user_id)

    summary = await context.run("generate_summary", _step2)

    # Step 3: Send email with weekly summary
    async def _step3():
        await send_email(user.email, "Your Weekly Summary", summary)

    await context.run("send_summary_email", _step3)
```

----------------------------------------

TITLE: Single Agent Task
DESCRIPTION: This code demonstrates how to create and run a task assigned to a single agent. It initializes the agent, defines a prompt, and then calls the `task.run()` method to execute the task and print the result to the console.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/features.mdx#_snippet_11

LANGUAGE: typescript
CODE:
```
import { serve } from "@upstash/workflow/nextjs";
import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run";

export const { POST } = serve(async (context) => {
  const model = context.agents.openai('gpt-3.5-turbo');

  const researcherAgent = context.agents.agent({
    model,
    name: 'academic',
    maxSteps: 2,
    tools: {
      wikiTool: new WikipediaQueryRun({
        topKResults: 1,
        maxDocContentLength: 500,
      })
    },
    background:
      'You are researcher agent with access to Wikipedia. '
      + 'Utilize Wikipedia as much as possible for correct information',
  });

  const task = context.agents.task({
    agent: researcherAgent,
    prompt: "Tell me about 5 topics in advanced physics.",
  });
  const { text } = await task.run();
  console.log("result:", text)
})
```

----------------------------------------

TITLE: Minimal Workflow Endpoint with Hono
DESCRIPTION: This TypeScript code defines a minimal workflow endpoint using Hono and the Upstash Workflow SDK. It creates a POST route at `/workflow` that serves a workflow with two steps, each logging a message to the console.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/hono.mdx#_snippet_7

LANGUAGE: typescript
CODE:
```
import { Hono } from "hono"
import { serve } from "@upstash/workflow/hono"

const app = new Hono()

app.post("/workflow",
  serve(async (context) => {
    await context.run("initial-step", () => {
      console.log("initial step ran")
    })

    await context.run("second-step", () => {
      console.log("second step ran")
    })
  })
)

export default app
```

----------------------------------------

TITLE: Notifying a Workflow with context.notify TypeScript
DESCRIPTION: This snippet demonstrates how to use `context.notify` within a workflow to trigger another workflow's event listener. It takes a step name, event ID, and event data as parameters and returns a `notifyResponse`.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/events.mdx#_snippet_2

LANGUAGE: typescript
CODE:
```
const { notifyResponse } = await context.notify(
  "notify step", // notify step name
  "event-Id", // event id
  { my: "data" } // event data
);
```

----------------------------------------

TITLE: Early Return - TypeScript
DESCRIPTION: This TypeScript code demonstrates the behavior of returning from a workflow function before any steps are executed when using Upstash Workflow. An early return without running any steps will result in an HTTP 400 status code and the error message 'Failed to authenticate Workflow request.', because the Workflow SDK interprets this scenario as an authentication failure. someCondition() is a placeholder for the authorization logic.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/troubleshooting/general.mdx#_snippet_6

LANGUAGE: typescript
CODE:
```
export const { POST } = serve(async (context) => {
  
  if (someCondition()) => {
    return;
  }

  // rest of the workflow
})
```

----------------------------------------

TITLE: Bulk Cancel Workflows with Python Requests
DESCRIPTION: Cancels workflow runs using the Python requests library. Sets the Authorization and Content-Type headers, and provides a dictionary with workflowRunIds as the request body.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/bulk-cancel.mdx#_snippet_4

LANGUAGE: Python
CODE:
```
import requests

headers = {
    'Authorization': 'Bearer <token>',
    'Content-Type': 'application/json',
}

data = {
  "workflowRunIds": [
    "run_id_1",
    "run_id_2",
    "run_id_3"
  ]
}

response = requests.delete(
  'https://qstash.upstash.io/v2/workflows/runs',
  headers=headers,
  data=data
)
```

----------------------------------------

TITLE: Suspend User: Python
DESCRIPTION: This Python snippet checks if a user is suspended, suspends the user if not already suspended, and sends an email notification. It uses asynchronous functions and depends on `check_suspension`, `suspend_user`, and `send_email` functions.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#_snippet_11

LANGUAGE: Python
CODE:
```
async def _check_suspension() -> bool:
    return await check_suspension(email)

is_suspended = await context.run("check suspension", _check_suspension)

if not is_suspended:

    async def _suspend_user() -> None:
        await suspend_user(email)

    await context.run("suspend user", _suspend_user)

    async def _send_suspended_email() -> None:
        await send_email(
            email,
            "Your account has been suspended due to payment failure. Please update your payment method.",
        )

    await context.run("send suspended email", _send_suspended_email)
```

----------------------------------------

TITLE: Initialize Workflow Endpoint with TypeScript
DESCRIPTION: Defines a workflow endpoint using the `serve` method from `@upstash/workflow/nextjs`. It exports a POST handler that executes two steps using the provided context object. Each step contains a placeholder for business logic.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#_snippet_0

LANGUAGE: TypeScript
CODE:
```
import { serve } from "@upstash/workflow/nextjs";

export const { POST } = serve(async (context) => {
  const result = await context.run("step-1", async () => {
    // define a piece of business logic as step 1
  });

  await context.run("step-2", async () => {
    // define another piece of business logic as step 2
  });
});
```

----------------------------------------

TITLE: Notify Workflow with Workflow SDK - Javascript
DESCRIPTION: Uses the Upstash Workflow SDK to notify a workflow. It initializes the client with the QSTASH_TOKEN and then calls the notify method with the event ID and event data.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/notify.mdx#_snippet_2

LANGUAGE: js Workflow SDK
CODE:
```
import { Client } from "@upstash/workflow";

const client = new Client({ token: "<QSTASH_TOKEN>" })
await client.notify({
  eventId: "my-event-id",
  eventData: "Hello World!"
});
```

----------------------------------------

TITLE: Configuring Workflow Options in Typescript
DESCRIPTION: This snippet shows how to configure options for both `createWorkflow` and `serveMany`. Options specified in `createWorkflow` take precedence. Options defined in `createWorkflow` such as `retries`, `failureFunction`, `failureUrl`, and `flowControl` will be applied in invocation requests.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/invoke.mdx#_snippet_3

LANGUAGE: typescript
CODE:
```
const workflowOne = createWorkflow(
  async (context) => {
    // ...
  },
  {
    retries: 0
  }
)

export const { POST } = serveMany(
  {
    workflowOne
  },
  {
    failureUrl: "https://some-url"
  }
)
```

----------------------------------------

TITLE: Multi-Agent Workflow with Upstash and Langchain (TS)
DESCRIPTION: This code defines a Next.js API endpoint that utilizes Upstash Workflow to create a multi-agent system. It includes a researcher agent using Langchain's Wikipedia tool and a math agent that evaluates mathematical expressions.  The main task prompts the agents to research Japanese cities and calculate the sum of their populations. It uses the `@upstash/workflow/nextjs`, `@langchain/community/tools/wikipedia_query_run`, `mathjs`, `ai`, and `zod` packages.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/features.mdx#_snippet_12

LANGUAGE: typescript
CODE:
```
import { serve } from "@upstash/workflow/nextjs";
import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run";
import * as mathjs from 'mathjs'
import { tool } from "ai";
import { z } from "zod";

export const { POST } = serve(async (context) => {
  const model = context.agents.openai('gpt-4o');

  const researcherAgent = context.agents.agent({
    model,
    name: 'academic',
    maxSteps: 2,
    tools: {
      wikiTool: new WikipediaQueryRun({
        topKResults: 1,
        maxDocContentLength: 500,
      })
    },
    background:
      'You are researcher agent with access to Wikipedia. ' +
      'Utilize Wikipedia as much as possible for correct information',
  });

  const mathAgent = context.agents.agent({
    model,
    name: "mathematician",
    maxSteps: 2,
    tools: {
      calculate: tool({
        description:
          'A tool for evaluating mathematical expressions. ' +
          'Example expressions: ' +
          "'1.2 * (2 + 4.5)', '12.7 cm to inch', 'sin(45 deg) ^ 2'." +
          "only call this tool if you need to calculate a mathematical expression." +
          "when writing an expression, don't use words like 'thousand' or 'million'",
        parameters: z.object({ expression: z.string() }),
        execute: async ({ expression }) => mathjs.evaluate(expression),
      }),
    },
    background:
      "You are a mathematician agent which can utilize" +
      "a calculator to compute expressions"
  })

  const task = context.agents.task({
    model,
    maxSteps: 3,
    agents: [researcherAgent, mathAgent],
    prompt: "Tell me about 3 cities in Japan and calculate the sum of their populations",
  });
  const { text } = await task.run();
  console.log("result:", text)
})
```

----------------------------------------

TITLE: Trigger workflow endpoint
DESCRIPTION: Sends a POST request to the local workflow endpoint using `curl`.  This triggers the workflow defined in the `server/api/workflow.ts` file. A successful request returns a JSON object containing a `workflowRunId`.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/nuxt.mdx#_snippet_7

LANGUAGE: bash
CODE:
```
curl -X POST https://localhost:3000/api/workflow

# result: {"workflowRunId":"wfr_xxxxxx"}
```

----------------------------------------

TITLE: Cancel Workflow Run with Workflow SDK
DESCRIPTION: Cancels a workflow run using the Upstash Workflow SDK. Requires a QStash token and a workflow run ID.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/cancel.mdx#_snippet_1

LANGUAGE: js Workflow SDK
CODE:
```
import { Client } from "@upstash/workflow";

const client = new Client({ token: "<QSTASH_TOKEN>" })
await client.cancel({ workflowRunId: "<WORKFLOW_RUN_ID>" })
```

----------------------------------------

TITLE: Webhook Request Validation Function - TypeScript
DESCRIPTION: This TypeScript code snippet provides a function, `validateRequest`, to validate webhook requests using the `svix` library. It retrieves the `svix-id`, `svix-timestamp`, and `svix-signature` headers, initializes a `Webhook` object with a secret, and then verifies the payload against the headers.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/use-webhooks.mdx#_snippet_4

LANGUAGE: TypeScript
CODE:
```
import { Webhook } from "svix";
import { WebhookEvent } from "@clerk/nextjs/server";

const webhookSecret = "YOUR_WEBHOOK_SECRET";

async function validateRequest(payloadString: string, headerPayload: Headers) {
  const svixHeaders = {
    "svix-id": headerPayload.get("svix-id") as string,
    "svix-timestamp": headerPayload.get("svix-timestamp") as string,
    "svix-signature": headerPayload.get("svix-signature") as string,
  };
  const wh = new Webhook(webhookSecret);
  return wh.verify(payloadString, svixHeaders) as WebhookEvent;
}
```

----------------------------------------

TITLE: Ensure Idempotency in context.run - TypeScript
DESCRIPTION: This TypeScript code snippet emphasizes the importance of idempotency within `context.run` in Upstash Workflow. The `someWork(input)` function must be idempotent, ensuring that running the workflow multiple times with the same input results in the same end state.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#_snippet_14

LANGUAGE: typescript
CODE:
```
export const { POST } = serve<string>(async (context) => {
  const input = context.requestPayload

  await context.run("step-1", async () => {
    return someWork(input)
  })
})
```

----------------------------------------

TITLE: Avoid Non-Idempotent Functions - Python
DESCRIPTION: This Python code shows an incorrect implementation where a non-idempotent function, `get_result_from_db(entry_id)`, is called outside `context.run`. This can cause inconsistent behavior during workflow retries or multiple invocations of the workflow endpoint.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#_snippet_9

LANGUAGE: python
CODE:
```
@serve.post("/api/example")
async def example(context: AsyncWorkflowContext[str]) -> None:
    entry_id = context.request_payload["entry_id"]

    # 👇 Problem: Non-idempotent function outside context.run:
    result = await get_result_from_db(entry_id)
    if result.should_return:
        return

    # ...

```

----------------------------------------

TITLE: Calling OpenAI Compatible Provider
DESCRIPTION: This snippet illustrates how to call an OpenAI-compatible provider (e.g., Deepseek) using the `context.api.openai.call` method. It sets the `baseURL` parameter to the provider's API endpoint and provides the API key as a token.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/openai.mdx#_snippet_2

LANGUAGE: typescript
CODE:
```
const { status, body } = await context.api.openai.call(
  "Call Deepseek",
  {
    baseURL: "https://api.deepseek.com",
    token: process.env.DEEPSEEK_API_KEY,
    operation: "chat.completions.create",
    body: {
      model: "gpt-4o",
      messages: [
        {
          role: "system",
          content: "Assistant says 'hello!'",
        },
        {
          role: "user",
          content: "User shouts back 'hi!'"
        }
      ],
    },
  }
);
```

----------------------------------------

TITLE: Run Non-Idempotent Functions in context.run - Python
DESCRIPTION: This Python code shows the correct way to call non-idempotent function. The get_result_from_db is called inside `context.run` to prevent `Failed to authenticate Workflow request`.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#_snippet_13

LANGUAGE: python
CODE:
```
async def _get_result_from_db():
    return await get_result_from_db(entry_id)

result = await context.run("get-result-from-db", _get_result_from_db)

if result.should_return:
    return

```

----------------------------------------

TITLE: QStash Client Initialization in TypeScript
DESCRIPTION: Illustrates the default initialization of a QStash Client in TypeScript, using environment variables for the base URL and token.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#_snippet_16

LANGUAGE: TypeScript
CODE:
```
new Client({
  baseUrl: process.env.QSTASH_URL!,
  token: process.env.QSTASH_TOKEN!,
});
```

----------------------------------------

TITLE: Fetching Workflow Logs with Workflow JS SDK
DESCRIPTION: This snippet demonstrates how to retrieve workflow logs using the Upstash Workflow JS SDK. It initializes a client with a token and then uses the `logs` method to fetch logs, optionally filtering by workflow run ID, workflow URL or state.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/logs.mdx#_snippet_1

LANGUAGE: js
CODE:
```
import { Client } from "@upstash/workflow";

const client = new Client({ token: "<QSTASH_TOKEN>" });

// Filter by workflow run ID
const { runs } = await client.logs({ workflowRunId: "<WORKFLOW_RUN_ID>"});

// Filter by workflow server url
const { runs } = await client.logs({ workflowUrl: "<WORKFLOW_URL>"});

// Filter by state
const { runs } = await client.logs({ state: "RUN_SUCCESS"});
```

----------------------------------------

TITLE: Run New User Signup Task - Python
DESCRIPTION: This Python snippet defines and executes the 'new-signup' task within the Upstash workflow context. It defines an asynchronous function `_new_signup` that calls the `send_email` function to send a welcome email to the user. It is then executed using `context.run`.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/customerOnboarding.mdx#_snippet_3

LANGUAGE: python
CODE:
```
async def _new_signup() -> None:
    await send_email("Welcome to the platform", email)

await context.run("new-signup", _new_signup)
```

----------------------------------------

TITLE: Parallel Agent Execution with Upstash Workflow (Next.js)
DESCRIPTION: This code snippet demonstrates how to run multiple Upstash agents in parallel using `Promise.all` within a Next.js environment. It defines three worker agents, each with a specific expertise, and an aggregator agent to summarize their results. The agents leverage the OpenAI GPT-3.5 Turbo model. It uses the `serve` function to create a Next.js API endpoint.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/patterns/parallelization.mdx#_snippet_0

LANGUAGE: typescript
CODE:
```
import { serve } from "@upstash/workflow/nextjs";

export const { POST } = serve(async (context) => {
  const model = context.agents.openai('gpt-3.5-turbo');

  // Define worker agents
  const worker1 = context.agents.agent({
    model,
    name: 'worker1',
    maxSteps: 1,
    background: 'You are an agent that explains quantum physics.',
    tools: {}
  });

  const worker2 = context.agents.agent({
    model,
    name: 'worker2',
    maxSteps: 1,
    background: 'You are an agent that explains relativity.',
    tools: {}
  });

  const worker3 = context.agents.agent({
    model,
    name: 'worker3',
    maxSteps: 1,
    background: 'You are an agent that explains string theory.',
    tools: {}
  });

  // Await results
  const [result1, result2, result3] = await Promise.all([
    context.agents.task({ agent: worker1, prompt: "Explain quantum physics." }).run(),
    context.agents.task({ agent: worker2, prompt: "Explain relativity." }).run(),
    context.agents.task({ agent: worker3, prompt: "Explain string theory." }).run(),
  ]);

  // Aggregating results
  const aggregator = context.agents.agent({
    model,
    name: 'aggregator',
    maxSteps: 1,
    background: 'You are an agent that summarizes multiple answers.',
    tools: {}
  });

  const task = await context.agents.task({
    agent: aggregator,
    prompt: `Summarize these three explanations: ${result1.text}, ${result2.text}, ${result3.text}`
  })
  const finalSummary = await task.run();

  console.log(finalSummary.text);
});

```

----------------------------------------

TITLE: Charging Customer (Python)
DESCRIPTION: This Python code attempts to charge a customer using the `charge_customer` function within an Upstash Workflow context. It defines an asynchronous function `_charge_customer` that includes a try-except block to handle potential exceptions during the charging process, returning `None` if charging fails. The attempt number is passed to the `charge_customer` function.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#_snippet_3

LANGUAGE: python
CODE:
```
async def _charge_customer() -> Optional[ChargeResult]:
    try:
        return await charge_customer(i + 1)
    except Exception as e:
        print(f"Error: {e}")
        return None

result = await context.run("charge customer", _charge_customer)
```

----------------------------------------

TITLE: Notify Workflow with Fetch API - Javascript
DESCRIPTION: Sends a POST request to the Qstash notify endpoint using the fetch API in Node.js. The request includes the event ID in the URL, authorization header, and event data in the request body.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/notify.mdx#_snippet_3

LANGUAGE: js Node
CODE:
```
const response = await fetch('https://qstash.upstash.io/v2/notify/myEvent', {
  method: 'POST',
  body: "Hello world!",
  headers: {
    'Authorization': 'Bearer <token>'
  }
});
```

----------------------------------------

TITLE: Bulk Cancel Workflows with Upstash Workflow SDK
DESCRIPTION: Cancels workflow runs using the Upstash Workflow SDK. Demonstrates cancelling by IDs, URL prefix, and all workflows.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/bulk-cancel.mdx#_snippet_2

LANGUAGE: JavaScript
CODE:
```
import { Client } from "@upstash/workflow";

// cancel a set of workflow runs
await client.cancel({ ids: [
  "<WORKFLOW_RUN_ID_1>",
  "<WORKFLOW_RUN_ID_2>",
]})

// cancel workflows starting with a url
await client.cancel({ urlStartingWith: "https://your-endpoint.com" })

// cancel all workflows
await client.cancel({ all: true })
```

----------------------------------------

TITLE: Complete Workflow Example with Parallel Inventory Checks in TypeScript
DESCRIPTION: This is a complete workflow example in a Next.js application demonstrating how to use parallel runs. It fetches the availability of coffee beans, cups, and milk concurrently using `Promise.all` and the `ctx.run` function. If all ingredients are available, it proceeds to brew coffee and print a receipt.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/parallel-runs.mdx#_snippet_1

LANGUAGE: typescript
CODE:
```
import { serve } from "@upstash/workflow/nextjs";
import { checkInventory, brewCoffee, printReceipt } from "@/utils";

export const { POST } = serve(async (ctx) => {
  const [coffeeBeansAvailable, cupsAvailable, milkAvailable] =
    await Promise.all([
      ctx.run("check-coffee-beans", () => checkInventory("coffee-beans")),
      ctx.run("check-cups", () => checkInventory("cups")),
      ctx.run("check-milk", () => checkInventory("milk")),
    ]);

  // If all ingedients available, brew coffee
  if (coffeeBeansAvailable && cupsAvailable && milkAvailable) {
    const price = await ctx.run("brew-coffee", async () => {
      return await brewCoffee({ style: "cappuccino" });
    });

    await printReceipt(price);
  }
});
```

----------------------------------------

TITLE: Conditional Email Based on Solved Problems - Python
DESCRIPTION: This Python function `send_problem_solved_email` conditionally sends an email based on the number of problems the user has solved. If the user hasn't solved any problems, a 'no answers' email is sent; otherwise, a stats email is sent indicating the number of problems solved in the last 7 days.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#_snippet_13

LANGUAGE: python
CODE:
```
async def send_problem_solved_email(
    context: AsyncWorkflowContext[UserCreatedPayload], email: str, stats: UserStats
) -> None:
    if stats["total_problems_solved"] == 0:

        async def _send_no_answers_email() -> None:
            await send_email(
                email, "Hey, you haven't solved any questions in the last 7 days..."
            )

        await context.run("send no answers email", _send_no_answers_email)
    else:

        async def _send_stats_email() -> None:
            await send_email(
                email,
                f"You have solved {stats['total_problems_solved']} problems in the last 7 days. Keep it up!",
            )

        await context.run("send stats email", _send_stats_email)
```

----------------------------------------

TITLE: Set Retries in Python
DESCRIPTION: Shows how to set the number of retries for a workflow endpoint using the `retries` parameter in the `@serve.post` decorator. This specifies how many times QStash will call the endpoint in case of errors.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#_snippet_5

LANGUAGE: Python
CODE:
```
@serve.post("/api/example", retries=3)
async def example(context: AsyncWorkflowContext[str]) -> None: ...
```

----------------------------------------

TITLE: Workflow Endpoint with Request Object (Next.js Pages Router)
DESCRIPTION: This snippet demonstrates how to access the Next.js request and response objects within a workflow endpoint using the Pages Router. It imports `NextApiRequest` and `NextApiResponse` and uses them in the handler function. Requires `@upstash/workflow/nextjs` and `next` types.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/vercel-nextjs.mdx#_snippet_10

LANGUAGE: typescript
CODE:
```
import type { NextApiRequest, NextApiResponse } from "next";
import { servePagesRouter } from "@upstash/workflow/nextjs";

export default async function handler(
  req: NextApiRequest,
  res: NextApiResponse
) {
  // do something with the native request object
  const { handler } = servePagesRouter(
    async (context) => {
      // Your workflow steps
    }
  )

  await handler(req, res)
}
```

----------------------------------------

TITLE: Send Invoice Email (Python)
DESCRIPTION: This Python code sends an invoice email to the user after a successful payment using the `send_email` function within an Upstash Workflow context. An asynchronous function `_send_invoice_email` encapsulates the call to the underlying function. The email content includes the invoice ID and total cost retrieved from the payment result.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#_snippet_9

LANGUAGE: python
CODE:
```
async def _send_invoice_email() -> None:
    await send_email(
        email,
        f"Payment successful. Invoice: {result.invoice_id}, Total cost: ${result.total_cost}",
    )

await context.run("send invoice email", _send_invoice_email)

```

----------------------------------------

TITLE: WorkflowTool with executeAsStep
DESCRIPTION: This example illustrates how to define a `WorkflowTool` and disable automatic step wrapping using the `executeAsStep` option. This allows using workflow context methods such as `context.call` inside the tool's invoke function.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/features.mdx#_snippet_9

LANGUAGE: typescript
CODE:
```
import { WorkflowTool } from '@upstash/workflow'
import { serve } from '@upstash/workflow/nextjs'

export const { POST } = serve(async (context) => {
  const tool = new WorkflowTool({
    description: ...,
    schema: ...,
    invoke: ( ... ) => {
      // make HTTP call inside the tool with context.call:
      await context.call( ... )
    },
    executeAsStep: false
  })

  // pass the tool to agent
})
```

----------------------------------------

TITLE: LangChain (custom)
DESCRIPTION: This code defines a custom tool using `DynamicStructuredTool` from `@langchain/core/tools`. This tool generates a random number between two given numbers (low and high), defined by a Zod schema.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/features.mdx#_snippet_7

LANGUAGE: typescript
CODE:
```
import { DynamicStructuredTool } from "@langchain/core/tools";

const numberGenerator = new DynamicStructuredTool({
  name: "random-number-generator",
  description: "generates a random number between two input numbers",
  schema: z.object({
    low: z.number().describe("The lower bound of the generated number"),
    high: z.number().describe("The upper bound of the generated number"),
  }),
  func: async ({ low, high }) =>
    (Math.random() * (high - low) + low).toString(), // Outputs still must be strings
})
```

----------------------------------------

TITLE: Minimal Workflow Endpoint Example (Next.js App Router)
DESCRIPTION: This is a minimal example of a workflow endpoint using Next.js App Router.  It defines a POST request handler that executes two steps in a workflow.  Requires `@upstash/workflow/nextjs`.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/vercel-nextjs.mdx#_snippet_7

LANGUAGE: typescript
CODE:
```
import { serve } from "@upstash/workflow/nextjs"

export const { POST } = serve(
  async (context) => {
    await context.run("initial-step", () => {
      console.log("initial step ran")
    })

    await context.run("second-step", () => {
      console.log("second step ran")
    })
  }
)
```

----------------------------------------

TITLE: Trigger Workflow Endpoint with curl
DESCRIPTION: This command sends a POST request to the `/api/workflow` endpoint of the FastAPI application using curl. This triggers the workflow defined at that endpoint, initiating its execution. The endpoint will return a JSON response containing the unique workflow run ID.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/fastapi.mdx#_snippet_12

LANGUAGE: bash
CODE:
```
curl -X POST https://localhost:8000/api/workflow
```

----------------------------------------

TITLE: Workflow with Authentication in Flask
DESCRIPTION: Defines a workflow endpoint in a Flask application using Upstash Workflow that implements authentication. The function checks for a specific authentication header before running the workflow steps.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/flask.mdx#_snippet_9

LANGUAGE: python
CODE:
```
from flask import Flask
from upstash_workflow.flask import Serve
from upstash_workflow import WorkflowContext

app = Flask(__name__)
serve = Serve(app)


def some_work(input: str) -> str:
    return f"processed '{input}'"


@serve.route("/auth")
def auth(context: WorkflowContext[str]) -> None:
    if context.headers.get("Authentication") != "Bearer secret_password":
        print("Authentication failed.")
        return

    def _step1() -> str:
        return "output 1"

    context.run("step1", _step1)

    def _step2() -> str:
        return "output 2"

    context.run("step2", _step2)

```

----------------------------------------

TITLE: Image Filtering - Python
DESCRIPTION: This Python snippet applies various filters to the resized images. It iterates through a list of filters and resized images, calling `context.call` for each combination. The results are collected and processed to extract the image data.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/imageProcessing.mdx#_snippet_7

LANGUAGE: python
CODE:
```
filters = ["grayscale", "sepia", "contrast"]
filter_responses = []

for resized_image in resized_images:
    for filter in filters:
        response: CallResponse[ImageResult] = await context.call(
            f"apply-filter-{filter}",
            # endpoint which returns ImageResult type in response
            url="https://image-processing-service.com/filter",
            method="POST",
            body={"imageUrl": resized_image["imageUrl"], "filter": filter},
        )
        filter_responses.append(response)

processed_images = [response.body for response in filter_responses]
```

----------------------------------------

TITLE: Payment Retry Loop (TypeScript)
DESCRIPTION: This TypeScript code implements a loop that attempts to charge a customer up to 3 times. If the payment fails, it waits for 24 hours before retrying. If the payment succeeds, it executes logic to unsuspend the user, send an invoice email, and then ends the workflow.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#_snippet_4

LANGUAGE: typescript
CODE:
```
for (let i = 0; i < 3; i++) {
  // attempt to charge the customer

  if (!result) {
    // Wait for a day
    await context.sleep("wait for retry", 24 * 60 * 60);
  } else {
    // Payment succeeded
    // Unsuspend user, send invoice email
    // end the workflow:
    return;
  }
}
```

----------------------------------------

TITLE: Overriding OpenAI API Types
DESCRIPTION: This snippet shows how to override the default request and response types when calling the OpenAI API using `context.api.openai.call`. It defines custom types `ResponseBodyType` and `RequestBodyType` and passes them as generic type parameters to the call method.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/openai.mdx#_snippet_1

LANGUAGE: typescript
CODE:
```
type ResponseBodyType = { ... }; // Define your response body type
type RequestBodyType = { ... };  // Define your request body type

const { status, body } = await context.api.openai.call<
  ResponseBodyType,
  RequestBodyType
>(
  "Call OpenAI",
  {
    ...
  }
);
```

----------------------------------------

TITLE: Serve User Requests with Upstash Workflow
DESCRIPTION: This JavaScript code snippet defines a Next.js route using `@upstash/workflow/nextjs` to handle user requests. It uses the `serve` function to create an endpoint that processes a `UserRequest` payload, sleeps for 10 milliseconds, runs two asynchronous tasks (`retrieveEmail` and `fetchFromLLm`) in parallel, and waits for both tasks to complete.  The retrieveEmail and fetchFromLLm functions are imported from a local util file.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/monitor.mdx#_snippet_0

LANGUAGE: javascript
CODE:
```
import { serve } from "@upstash/workflow/nextjs";
import { retrieveEmail, fetchFromLLm, UserRequest} from "../../../lib/util";


export const { POST } = serve<UserRequest>(
  async (context) => {
    const input = context.requestPayload;

    await context.sleep("sleep", 10);

    const p1 = context.run("retrieveEmail", async () => {
      return retrieveEmail(input.id);
    });

    const p2 = context.run("askllm", async () => {
      return fetchFromLLm(input.question);
    });

    await Promise.all([p1, p2])
  },
);
```

----------------------------------------

TITLE: Minimal Workflow Endpoint (TypeScript)
DESCRIPTION: Defines a minimal workflow endpoint using the Upstash Workflow SDK in an Astro project. This endpoint defines a workflow with two steps and returns a POST handler. The `env` configures environment variables for local development or deployment.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/astro.mdx#_snippet_5

LANGUAGE: typescript
CODE:
```
import { serve } from "@upstash/workflow/astro";


export const { POST } = serve<string>(async (context) => {
  const result1 = await context.run("initial-step", () => {
    console.log("initial step ran")
    return "hello world!"
  })

  await context.run("second-step", () => {
    console.log(`second step ran with value ${result1}`)
  })
}, {
  // env must be passed in astro.
  // for local dev, we need import.meta.env.
  // For deployment, we need process.env:
  env: {
    ...process.env,
    ...import.meta.env
  }
})
```

----------------------------------------

TITLE: Image Resizing - TypeScript
DESCRIPTION: This TypeScript snippet resizes the image to multiple resolutions using an external image processing service. It iterates through a list of resolutions and calls `context.call` for each resolution to trigger an external service call. `Promise.all` is used to run the resizing operations concurrently.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/imageProcessing.mdx#_snippet_4

LANGUAGE: typescript
CODE:
```
const resolutions = [640, 1280, 1920]

const resizedImages: { body: ImageResult }[] = await Promise.all(resolutions.map(
  resolution => context.call<ImageResult>(
    `resize-image-${resolution}`,
    {
      // endpoint which returns ImageResult type in response
      url: "https://image-processing-service.com/resize",
      method: "POST",
      body: {
        imageUrl,
        width: resolution,
      }
    }
  )
))
```

----------------------------------------

TITLE: Set Failure Function in TypeScript
DESCRIPTION: Shows how to define a `failureFunction` option within the `serve` method. This function is executed if the workflow fails after all retries. If both `failureUrl` and `failureFunction` are defined, the function takes precedence.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#_snippet_3

LANGUAGE: TypeScript
CODE:
```
export const { POST } = serve<string>(
  async (context) => { ... },
  {
    failureFunction: async ({
      context,      // context during failure
      failStatus,   // failure status
      failResponse, // failure message
      failHeaders   // failure headers
    }) => {
      // handle the failure
    }
  }
);
```

----------------------------------------

TITLE: Scheduling a Workflow with Upstash Workflow (FastAPI)
DESCRIPTION: This Python code defines a FastAPI endpoint that uses Upstash Workflow to create and upload a backup. It initializes the `Serve` class from `upstash_workflow.fastapi` to handle workflow execution and uses the `AsyncWorkflowContext` to manage the workflow steps.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/schedule.mdx#_snippet_1

LANGUAGE: python
CODE:
```
from fastapi import FastAPI
from upstash_workflow.fastapi import Serve
from upstash_workflow import AsyncWorkflowContext
from utils import create_backup, upload_backup

app = FastAPI()
serve = Serve(app)


@serve.post("/api/workflow")
async def workflow(context: AsyncWorkflowContext[str]) -> None:
    async def _step1():
        return await create_backup()

    backup = await context.run("create_backup", _step1)

    async def _step2():
        await upload_backup(backup)

    await context.run("upload_backup", _step2)
```

----------------------------------------

TITLE: Generating Text with Tools and OpenAI Client
DESCRIPTION: Creates a Next.js API route with Upstash Workflow that uses the OpenAI client and Vercel AI SDK tools to generate text. Includes error handling for `ToolExecutionError` and `WorkflowAbort` exceptions.  Each tool execution is wrapped in a workflow step.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/aisdk.mdx#_snippet_6

LANGUAGE: typescript
CODE:
```
import { z } from 'zod';
import { serve } from "@upstash/workflow/nextjs";
import { WorkflowAbort } from '@upstash/workflow';
import { generateText, ToolExecutionError, tool } from 'ai';

import { createWorkflowOpenAI } from './model';

export const { POST } = serve<{ prompt: string }>(async (context) => {
  const openai = createWorkflowOpenAI(context);

  const prompt = await context.run("get prompt", async () => {
    return context.requestPayload.prompt;
  });

  try {
    const result = await generateText({
      model: openai('gpt-3.5-turbo'),
      tools: {
        weather: tool({
          description: 'Get the weather in a location',
          parameters: z.object({
            location: z.string().describe('The location to get the weather for'),
          }),
          execute: ({ location }) => context.run("weather tool", () => {
	        // Mock data, replace with actual weather API call
            return {
              location,
              temperature: 72 + Math.floor(Math.random() * 21) - 10,
            };
          })
        }),
      },
      maxSteps: 2,
      prompt,
    });
    
    await context.run("text", () => {
      console.log(`TEXT: ${result.text}`);
      return result.text;
    });
  } catch (error) {
    if (error instanceof ToolExecutionError && error.cause instanceof WorkflowAbort) {
      throw error.cause;
    } else {
      throw error;
    }
  }
});
```

----------------------------------------

TITLE: Install @upstash/workflow with npm
DESCRIPTION: Installs the @upstash/workflow package using npm. This is the first step in migrating to the new SDK.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/migration.mdx#_snippet_0

LANGUAGE: bash
CODE:
```
npm install @upstash/workflow
```

----------------------------------------

TITLE: Override URL in Python
DESCRIPTION: Demonstrates how to override the URL inferred from `request.url` using the `url` parameter in the `@serve.post` decorator in Python. This is useful when using proxies or local tunnels.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#_snippet_11

LANGUAGE: Python
CODE:
```
@serve.post("/api/example", url="https://<YOUR-DEPLOYED-APP>.com/api/workflow")
async def example(context: AsyncWorkflowContext[str]) -> None: ...
```

----------------------------------------

TITLE: Defining a Workflow Endpoint in SvelteKit
DESCRIPTION: This TypeScript code defines a workflow endpoint using the Upstash Workflow SDK for SvelteKit. It imports necessary modules, defines steps using `context.run`, and exports a POST handler to expose the workflow as an API endpoint.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/svelte.mdx#_snippet_7

LANGUAGE: typescript
CODE:
```
import { serve } from "@upstash/workflow/svelte"
import { env } from "$env/dynamic/private"

export const { POST } = serve(
  async (context) => {
    await context.run("initial-step", () => {
      console.log("initial step ran")
    })

    await context.run("second-step", () => {
      console.log("second step ran")
    })
  },
  { env }
)
```

----------------------------------------

TITLE: Secure Workflow with QStash Receiver (Python)
DESCRIPTION: This Python code snippet demonstrates how to integrate QStash's `Receiver` with an Upstash Workflow using the `@serve.post` decorator. It imports `Receiver` from `qstash` and configures it with the current and next signing keys from environment variables. The decorator registers the `example` function as a handler for POST requests to the `/api/example` endpoint, requiring valid QStash signatures for all requests.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/security.mdx#_snippet_3

LANGUAGE: python
CODE:
```
from qstash import Receiver

@serve.post(
    "/api/example",
    receiver=Receiver(
        current_signing_key=os.environ["QSTASH_CURRENT_SIGNING_KEY"],
        next_signing_key=os.environ["QSTASH_NEXT_SIGNING_KEY"],
    ),
)
async def example(context: AsyncWorkflowContext[str]) -> None:
    ...
```

----------------------------------------

TITLE: Minimal Workflow Endpoint Example (Next.js Pages Router)
DESCRIPTION: This is a minimal example of a workflow endpoint using Next.js Pages Router.  It defines a handler function that executes two steps in a workflow. Requires `@upstash/workflow/nextjs`.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/vercel-nextjs.mdx#_snippet_9

LANGUAGE: typescript
CODE:
```
import { servePagesRouter } from "@upstash/workflow/nextjs";

const { handler } = servePagesRouter<string>(
  async (context) => {
    await context.run("initial-step", () => {
      console.log("initial step ran")
    })

    await context.run("second-step", () => {
      console.log("second step ran")
    })
  }
)
export default handler;
```

----------------------------------------

TITLE: Run FastAPI Application
DESCRIPTION: This command starts the FastAPI application using Uvicorn, enabling hot-reloading. The `--reload` flag ensures that the server automatically restarts whenever changes are made to the code.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/fastapi.mdx#_snippet_11

LANGUAGE: bash
CODE:
```
uvicorn main:app --reload
```

----------------------------------------

TITLE: Correct Result Handling - Python
DESCRIPTION: This Python code demonstrates the correct way to handle results within Upstash Workflow. The return value from `some_work(input)` is directly returned from `_step_1` and assigned to `result` using `result = await context.run("step-1", _step_1)`.  This ensures that `result` is initialized properly before being used in `_step_2`.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#_snippet_5

LANGUAGE: python
CODE:
```
@serve.post("/api/example")
async def example(context: AsyncWorkflowContext[str]) -> None:
    input = context.request_payload

    async def _step_1() -> Dict:
        return await some_work(input)

    result = await context.run("step-1", _step_1)

    async def _step_2() -> None:
        await some_other_work(result)

    await context.run("step-2", _step_2)
```

----------------------------------------

TITLE: Calling OpenAI Chat Completion API
DESCRIPTION: This snippet demonstrates how to call the OpenAI chat completion API using `context.api.openai.call`. It sets the operation, OpenAI API key, model, and message content. The response status and body are returned.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/openai.mdx#_snippet_0

LANGUAGE: typescript
CODE:
```
const { status, body } = await context.api.openai.call(
  "Call OpenAI",
  {
    token: "<OPENAI_API_KEY>",
    operation: "chat.completions.create",
    body: {
      model: "gpt-4o",
      messages: [
        {
          role: "system",
          content: "Assistant says 'hello!'",
        },
        {
          role: "user",
          content: "User shouts back 'hi!'"
        }
      ],
    },
  }
);

// get text:
console.log(body.content[0].text)
```

----------------------------------------

TITLE: Customize Anthropic API Request/Response Types (TypeScript)
DESCRIPTION: This code snippet shows how to override the predefined request and response body types when calling the Anthropic API. It involves defining custom TypeScript types for `ResponseBodyType` and `RequestBodyType` and passing them as generic type arguments to the `context.api.anthropic.call` method.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/anthropic.mdx#_snippet_1

LANGUAGE: typescript
CODE:
```
type ResponseBodyType = { ... }; // Define your response body type
type RequestBodyType = { ... };  // Define your request body type

const { status, body } = await context.api.anthropic.call<
  ResponseBodyType,
  RequestBodyType
>(
  "Call Anthropic",
  {
    ...
  }
);
```

----------------------------------------

TITLE: Configuring a Failure URL with TypeScript
DESCRIPTION: This snippet illustrates how to set up a `failureUrl` within the `serve` method. The `failureUrl` is used to send failure notifications to a designated endpoint when the workflow URL is unreachable. This allows for external monitoring and handling of workflow failures.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/failures.mdx#_snippet_1

LANGUAGE: typescript
CODE:
```
export const { POST } = serve<string>(
  async (context) => {
    // Your workflow logic...
  },
  {
    failureUrl: "https://<YOUR_FAILURE_URL>/workflow-failure",
  }
);

```

----------------------------------------

TITLE: Install Dependencies
DESCRIPTION: This snippet installs the necessary dependencies for the project: FastAPI, Uvicorn (an ASGI server), and the Upstash Workflow SDK. These packages are required to create and run the FastAPI application with Upstash Workflow integration.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/fastapi.mdx#_snippet_1

LANGUAGE: bash
CODE:
```
pip install fastapi uvicorn upstash-workflow
```

----------------------------------------

TITLE: Image Filtering - TypeScript
DESCRIPTION: This TypeScript snippet applies various filters to the resized images. It iterates through a list of filters and resized images, calling `context.call` for each combination. `Promise.all` is used to run the filtering operations concurrently.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/imageProcessing.mdx#_snippet_6

LANGUAGE: typescript
CODE:
```
const filters = ["grayscale", "sepia", "contrast"]
const processedImagePromises: Promise<string>[] = []

for (const resizedImage of resizedImages) {
  for (const filter of filters) {
    const processedImagePromise = context.call<ImageResult>(
      `apply-filter-${filter}`,
      {
        // endpoint which returns ImageResult type in response
        url: "https://image-processing-service.com/filter",
        method: "POST",
        body: {
          imageUrl: resizedImage.body.imageUrl,
          filter,
        }
      }
    )
    processedImagePromises.push(processedImagePromise)
  }
}
const processedImages: { body: ImageResult }[] = await Promise.all(processedImagePromises)
```

----------------------------------------

TITLE: Customizing Resend API Request/Response Types in Upstash Workflow (TypeScript)
DESCRIPTION: This code snippet demonstrates how to override the default request and response types when calling the Resend API using `context.api.resend.call` within an Upstash Workflow. This allows for greater flexibility when dealing with non-standard Resend API responses.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/resend.mdx#_snippet_2

LANGUAGE: typescript
CODE:
```
type IsBatch = true; // Set to either true or false
type ResponseBodyType = { ... }; // Define your response body type
type RequestBodyType = { ... };  // Define your request body type

const { status, body } = await context.api.resend.call<
  IsBatch,
  ResponseBodyType,
  RequestBodyType
>(
  "Call Resend",
  {
    ...
  }
);
```

----------------------------------------

TITLE: Install @upstash/workflow with bun
DESCRIPTION: Installs the @upstash/workflow package using bun. This is the first step in migrating to the new SDK.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/migration.mdx#_snippet_2

LANGUAGE: bash
CODE:
```
bun add @upstash/workflow
```

----------------------------------------

TITLE: Workflow Run ID Example
DESCRIPTION: This shows the expected output from triggering the workflow endpoint. The `workflowRunId` is a unique identifier that can be used to track the workflow execution.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/fastapi.mdx#_snippet_13

LANGUAGE: bash
CODE:
```
# result: {"workflowRunId":"wfr_xxxxxx"}
```

----------------------------------------

TITLE: Limiting Workflow Environment in trigger method (JS)
DESCRIPTION: This code snippet demonstrates how to limit the execution environment by configuring the `trigger` method using `flowControl`. It uses the Upstash Workflow Client to trigger a workflow with specified rate per second and parallelism limits.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/flow-control.mdx#_snippet_1

LANGUAGE: javascript
CODE:
```
import { Client } from "@upstash/workflow";

const client = new Client({ token: "<QStash_TOKEN>" });
const { workflowRunId } = await client.trigger({
  url: "https://workflow-endpoint.com",
  body: "hello there!",
  flowControl: { key: "app1", parallelism: 3, ratePerSecond: 10 }
});
```

----------------------------------------

TITLE: Use Initial Payload Parser in TypeScript
DESCRIPTION: Illustrates how to define and use an `initialPayloadParser` to process the initial request's payload.  The parser transforms the raw initial payload into a structured object.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#_snippet_8

LANGUAGE: TypeScript
CODE:
```
type InitialPayload = {
  foo: string;
  bar: number;
};

// 👇 1: provide initial payload type
export const { POST } = serve<InitialPayload>(
  async (context) => {
    // 👇 3: parsing result is available as requestPayload
    const payload: InitialPayload = context.requestPayload;
  },
  {
    // 👇 2: custom parsing for initial payload
    initialPayloadParser: (initialPayload) => {
      const payload: InitialPayload = parsePayload(initialPayload);
      return payload;
    },
  }
);
```

----------------------------------------

TITLE: Setting QSTASH_URL and QSTASH_TOKEN
DESCRIPTION: Configures the `.env` file with the `QSTASH_URL` and `QSTASH_TOKEN` for local development using the QStash development server. These values are obtained from the output of `npx @upstash/qstash-cli dev`.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/nextjs-fastapi.mdx#_snippet_4

LANGUAGE: bash
CODE:
```
export QSTASH_URL="http://127.0.0.1:8080"
export QSTASH_TOKEN=<QSTASH_TOKEN>
```

----------------------------------------

TITLE: Re-throwing WorkflowAbort Error in Python
DESCRIPTION: This Python code snippet demonstrates how to re-throw the `WorkflowAbort` error within a try/except block when using Upstash Workflow. It's necessary to re-raise the `WorkflowAbort` to maintain the workflow's intended execution flow. The snippet imports `WorkflowAbort` from the `upstash_workflow` package, wraps a `context.run` call in a try/except block, verifies if the caught exception is an instance of `WorkflowAbort`, and re-raises it if it is.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/troubleshooting/general.mdx#_snippet_1

LANGUAGE: python
CODE:
```
from upstash_workflow import WorkflowAbort

try:
    await context.run( ... )
except Exception as e:
    if isinstance(e, WorkflowAbort):
        raise e
    else:
        # handle other errors

```

----------------------------------------

TITLE: Sleep Workflow Execution - TypeScript
DESCRIPTION: This TypeScript snippet pauses the Upstash workflow execution for 3 days using `context.sleep`.  The duration is specified in seconds (60 * 60 * 24 * 3).
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/customerOnboarding.mdx#_snippet_4

LANGUAGE: typescript
CODE:
```
await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3)
```

----------------------------------------

TITLE: Implementing QStash Receiver in Next.js
DESCRIPTION: This snippet shows how to implement the QStash Receiver in a Next.js application using TypeScript. It uses the `Receiver` class from the `@upstash/qstash` package to verify that requests come from QStash. The signing keys are retrieved from environment variables.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#_snippet_18

LANGUAGE: typescript
CODE:
```
import { Receiver } from "@upstash/qstash";
import { serve } from "@upstash/workflow/nextjs";

export const { POST } = serve<string>(
  async (context) => { ... },
  {
    receiver: new Receiver({
      // 👇 grab these variables from your QStash dashboard
      currentSigningKey: process.env.QSTASH_CURRENT_SIGNING_KEY!,
      nextSigningKey: process.env.QSTASH_NEXT_SIGNING_KEY!,
    })
  }
);
```

----------------------------------------

TITLE: Use Explicit QStash Client in TypeScript
DESCRIPTION: Shows how to pass a QStash Client explicitly using the `qstashClient` option in TypeScript. This is useful when using multiple QStash clients with different environment variables.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#_snippet_14

LANGUAGE: TypeScript
CODE:
```
import { Client } from "@upstash/qstash";
import { serve } from "@upstash/workflow/nextjs";

export const { POST } = serve(
  async (context) => { ... },
  {
    qstashClient: new Client({ token: process.env.QSTASH_TOKEN })
  }
);
```

----------------------------------------

TITLE: Setting Up Basic Webhook Endpoint - Python
DESCRIPTION: This code snippet demonstrates how to set up a basic webhook endpoint using FastAPI and `upstash_workflow.fastapi.Serve`.  It defines a POST endpoint `/api/example` that receives an `AsyncWorkflowContext` object. The `initial_payload_parser` function is used to process the raw payload.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/use-webhooks.mdx#_snippet_1

LANGUAGE: Python
CODE:
```
from fastapi import FastAPI
from upstash_workflow.fastapi import Serve
from upstash_workflow import AsyncWorkflowContext

app = FastAPI()
serve = Serve(app)


def initial_payload_parser(payload):
    return payload


@serve.post("/api/example", initial_payload_parser=initial_payload_parser)
async def example(context: AsyncWorkflowContext[str]) -> None:
    # Your webhook handling logic here

```

----------------------------------------

TITLE: WorkflowTool (custom)
DESCRIPTION: This code defines a custom tool using the `WorkflowTool` class from `@upstash/workflow`. This tool evaluates mathematical expressions.  It takes an expression string as input, uses mathjs to evaluate it, and returns the result.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/features.mdx#_snippet_4

LANGUAGE: typescript
CODE:
```
import { WorkflowTool } from '@upstash/workflow'
import { z } from 'zod'
import * as mathjs from 'mathjs'

const tool = new WorkflowTool({
  description:
    'A tool for evaluating mathematical expressions. '
    + 'Example expressions: '
    + "'1.2 * (2 + 4.5)', '12.7 cm to inch', 'sin(45 deg) ^ 2'.",
  schema: z.object({ expression: z.string() }),
  invoke: async ({ expression }) => mathjs.evaluate(expression),
})
```

----------------------------------------

TITLE: Installing Upstash Workflow SDK with bun
DESCRIPTION: This command installs the Upstash Workflow SDK as a dependency in your SvelteKit project using bun. This SDK provides the necessary functions to define and manage workflows.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/svelte.mdx#_snippet_2

LANGUAGE: bash
CODE:
```
bun add @upstash/workflow
```

----------------------------------------

TITLE: Configure Local Tunnel Environment Variables
DESCRIPTION: This snippet shows the environment variables that need to be configured in the `.env.local` file when using a local tunnel, such as ngrok, for local development with Upstash Workflow.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/vercel-nextjs.mdx#_snippet_6

LANGUAGE: txt
CODE:
```
QSTASH_TOKEN="***"
UPSTASH_WORKFLOW_URL=<UPSTASH_WORKFLOW_URL>
```

----------------------------------------

TITLE: Configuring Environment Variables in Next.js
DESCRIPTION: This snippet illustrates how to configure environment variables using the `env` option within the Next.js context using TypeScript. It demonstrates how the environment variables will then be available within the `context` of the asynchronous function.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#_snippet_20

LANGUAGE: typescript
CODE:
```
import { Receiver } from "@upstash/qstash";
import { serve } from "@upstash/workflow/nextjs";

export const { POST } = serve<string>(
  async (context) => {
    // the env option will be available in the env field of the context:
    const env = context.env;
  },
  {
    receiver: new Receiver({
      // 👇 grab these variables from your QStash dashboard
      currentSigningKey: process.env.QSTASH_CURRENT_SIGNING_KEY!,
      nextSigningKey: process.env.QSTASH_NEXT_SIGNING_KEY!,
    }),
  }
);
```

----------------------------------------

TITLE: Creating a Weekly Summary Workflow (Next.js)
DESCRIPTION: This TypeScript code implements a Next.js API route to send weekly summary emails using Upstash Workflow. It fetches user data, generates a summary, and sends an email with the summary. The `serve` function from `@upstash/workflow/nextjs` is used to handle the workflow, with type safety provided for the workflow data.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/schedule.mdx#_snippet_4

LANGUAGE: typescript
CODE:
```
import { serve } from "@upstash/workflow/nextjs";
import { getUserData, generateSummary } from "@/utils/user-utils";
import { sendEmail } from "@/utils/email-utils";

// Type-safety for starting our workflow
interface WeeklySummaryData {
  userId: string;
}

export const { POST } = serve<WeeklySummaryData>(async (context) => {
  const { userId } = context.requestPayload;

  // Step 1: Fetch user data
  const user = await context.run("fetch-user-data", async () => {
    return await getUserData(userId);
  });

  // Step 2: Generate weekly summary
  const summary = await context.run("generate-summary", async () => {
    return await generateSummary(userId);
  });

  // Step 3: Send email with weekly summary
  await context.run("send-summary-email", async () => {
    await sendEmail(user.email, "Your Weekly Summary", summary);
  });
});
```

----------------------------------------

TITLE: Sourcing the .env file
DESCRIPTION: Sources the .env file to load the environment variables into the current shell session. This is necessary for the application to access the configured environment variables.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/nextjs-flask.mdx#_snippet_6

LANGUAGE: bash
CODE:
```
source .env
```

----------------------------------------

TITLE: Bypassing Vercel Deployment Protection with curl
DESCRIPTION: This code snippet demonstrates how to bypass Vercel's deployment protection for preview deployments by adding a bypass secret to the workflow URL as a query parameter. It uses `curl` to send a POST request to the workflow endpoint.  The `<PROTECTION_BYPASS_SECRET>` placeholder should be replaced with the actual secret generated in the Vercel project settings.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/troubleshooting/vercel.mdx#_snippet_0

LANGUAGE: bash
CODE:
```
curl -X POST \
  'https://vercel-preview.com/workflow?x-vercel-protection-bypass=<PROTECTION_BYPASS_SECRET>' \
  -d 'Hello world!'
```

----------------------------------------

TITLE: Trigger Workflow with Python SDK
DESCRIPTION: Illustrates how to trigger the workflow using the Python SDK. It initializes an `AsyncQStash` client and uses the `message.publish_json` method to send a JSON payload to the workflow endpoint, including headers and retry configurations.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/getstarted.mdx#_snippet_5

LANGUAGE: python
CODE:
```
from qstash import AsyncQStash

client = AsyncQStash("<QSTASH_TOKEN>")

res = await client.message.publish_json(
    url="https://<YOUR_WORKFLOW_ENDPOINT>/<YOUR-WORKFLOW-ROUTE>",
    body={"hello": "there!"},
    headers={...},
    retries=3,
)
```

----------------------------------------

TITLE: Onboarding Workflow Definition
DESCRIPTION: Defines an asynchronous function `onboarding_workflow` that represents the main workflow logic. It receives initial data, sends a welcome email, waits for 3 days, uses an AI model to generate a follow-up message, and sends a personalized follow-up email.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/getstarted.mdx#_snippet_3

LANGUAGE: python
CODE:
```
@serve.post("/api/onboarding")
async def onboarding_workflow(context: AsyncWorkflowContext[InitialData]) -> None:
    data = context.request_payload
    user_id = data["user_id"]
    email = data["email"]
    name = data["name"]

    # Step 1: Send welcome email
    async def _send_welcome_email() -> None:
        await send_email(email, "Welcome to our service!")

    await context.run("send-welcome-email", _send_welcome_email)

    # Step 2: Wait for 3 days (in seconds)
    await context.sleep("sleep-until-follow-up", 60 * 60 * 24 * 3)

    # Step 3: AI-generate personalized follow-up message
    ai_response: CallResponse[Dict[str, str]] = await context.call(
        "generate-personalized-message",
        url="https://api.openai.com/v1/chat/completions",
        method="POST",
        headers={...},
        body={
            "model": "gpt-3.5-turbo",
            "messages": [
                {
                    "role": "system",
                    "content": "You are an assistant creating personalized follow-up messages.",
                },
                {
                    "role": "user",
                    "content": f"Create a short, friendly follow-up message for {name} who joined our service 3 days ago.",
                },
            ],
        },
    )

    personalized_message = ai_response.body["choices"][0]["message"]["content"]

    # Step 4: Send personalized follow-up email
    async def _send_follow_up_email() -> None:
        await send_email(email, personalized_message)

    await context.run("send-follow-up-email", _send_follow_up_email)
```

----------------------------------------

TITLE: Update context.call usage (JavaScript)
DESCRIPTION: Updates how `context.call` is used, including parameter passing and return value. The new version returns an object with `status`, `headers`, and `body`, and does not automatically fail the workflow on request failure.
SOURCE: https://github.com/upstash/docs/blob/main/workflow/migration.mdx#_snippet_5

LANGUAGE: javascript
CODE:
```
// old
const result = await context.call("call step", "<call-url>", "POST", ...)

// new
const {
  status,  // response status
  headers, // response headers
  body     // response body
} = await context.call("call step", {
  url: "<call-url>",
  method: "POST",
  ...
})
```

----------------------------------------

TITLE: Sync User in Database (Python)
DESCRIPTION: This Python snippet shows how to create a new user in a database. It's a placeholder that logs user details and returns a mock userid. In a real implementation, this function would interact with a database to create a user record. It's wrapped in an async function and called using context.run
SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#_snippet_3

LANGUAGE: python
CODE:
```
async def _sync_user() -> str:
    return await create_user_in_database(name, email)

result = await context.run("sync user", _sync_user)
userid = result["userid"]
```