Daydreams Logo
Concepts

Tasks

Managing asynchronous operations and concurrency.

Daydreams agents often need to perform asynchronous operations, primarily when executing Actions that interact with external APIs, blockchains, or other time-consuming processes. The framework includes a TaskRunner to manage these operations efficiently.

Purpose

The TaskRunner serves two main purposes:

  1. Concurrency Control: It limits the number of asynchronous tasks (like action handlers) that run simultaneously. This prevents the agent from overwhelming external services with too many requests at once (rate limiting) or consuming excessive local resources.
  2. Prioritization (Future): While the core framework primarily uses default priority, the underlying queue supports prioritizing tasks, allowing more critical operations to potentially execute sooner.

Initialization and Configuration

A TaskRunner instance is automatically created within createDreams unless a custom one is provided in the Config. Its concurrency limit can be configured:

import { createDreams, TaskRunner } from "@daydreamsai/core";
 
// Default TaskRunner with concurrency based on environment or default (e.g., 3)
const agent1 = createDreams({
  /* ... */
});
 
// Explicitly configure concurrency
const agent2 = createDreams({
  // ... other config
  taskRunner: new TaskRunner(5), // Allow up to 5 tasks concurrently
});
 
// Access the runner later
const runner = agent2.taskRunner;

The concurrency limit determines how many tasks from the internal queue can be actively running at any given moment.

Key Implications for Users

While you typically don't interact directly with the TaskRunner or define new task types, understanding how it works is important for writing effective agent components, especially Actions:

  1. Actions are Queued & Run Concurrently: When the LLM calls an action via <action_call>, its handler is scheduled by the TaskRunner, not executed immediately. The runner processes tasks concurrently up to its configured limit (default is 3). If more actions are called than the limit allows, they wait in a queue.
    • Effect: Prevents overwhelming external services; actions might not start instantly if the runner is busy.
  2. async/await is Essential: Because action handlers are executed asynchronously via the Task Runner, any I/O operations (API calls, database queries, file system access) within your handler must use async/await correctly. The framework waits for the promise returned by the handler (managed by the Task Runner) to resolve before considering the action complete.
  3. Action retry Option & Idempotency: You can configure automatic retries for an action using the retry property in its definition. If enabled, the TaskRunner will re-execute the handler if it fails (throws an error).
    • Best Practice: If using retries, design your action handlers to be idempotent – meaning executing the handler multiple times with the same arguments yields the same final state without unwanted side effects (e.g., avoid creating duplicate database entries).
  4. Action Cancellation (ctx.abortSignal): Action handlers receive an AbortSignal within their ctx (ActionCallContext).
    • Requirement: For long-running handlers (e.g., complex loops, waiting for external processes), you should check ctx.abortSignal.aborted periodically or use ctx.abortSignal.throwIfAborted() and cease execution if the signal is aborted. This allows agent runs to be cancelled cleanly.
  5. queueKey Option (Advanced): Actions can specify a queueKey. This directs the TaskRunner to use a specific queue for that action. While the default is 'main', this advanced feature could be used with custom TaskRunner configurations to manage different concurrency limits for different types of actions (e.g., a separate queue with lower concurrency for a rate-limited API).

In summary, focus on writing robust, async-aware, and potentially idempotent action handlers, leveraging ctx.abortSignal for cancellation. The Task Runner handles the scheduling and concurrency control behind the scenes.

Internal Usage (runAction)

The most common use of the TaskRunner is internal to the framework. When the agent parses an <action_call> from the LLM, the handleActionCall function doesn't execute the action's handler directly. Instead, it uses taskRunner.enqueueTask to schedule the execution:

// Simplified logic within handleActionCall (handlers.ts)
 
// ... prepare context and arguments ...
 
result.data = await taskRunner.enqueueTask(
  runAction, // The task definition for running actions
  {
    // Parameters for runAction
    action,
    agent,
    logger,
    ctx: callCtx,
  },
  {
    // Options for the task enqueueing
    debug: agent.debugger,
    retry: action.retry,
    abortSignal,
    priority: 0, // Default priority
  }
);
 
// ... process result ...

This ensures that action executions respect the concurrency limits set for the agent.

Defining Tasks (task helper)

The framework uses a task helper function (from @daydreamsai/core/task) to define named, reusable asynchronous operations that can be managed by the TaskRunner. Key framework tasks like runAction (executing action handlers) and runGenerate (calling the LLM) are defined using this helper.

import { task, type TaskContext } from "@daydreamsai/core";
 
// Example task definition (similar to runAction)
const myCustomTask = task(
  // 1. Unique key/name for the task (used for logging/debugging)
  "agent:run:myCustomTask",
 
  // 2. The async function implementing the task logic
  async (params: { someData: string }, ctx: TaskContext) => {
    // 'params' are the arguments passed when enqueuing
    // 'ctx' provides task-specific context
 
    ctx.debug("myCustomTask", ["Executing"], params); // Use debug provided via options
    console.log(`Task ${ctx.callId} running with data:`, params.someData);
 
    // ... perform asynchronous work ...
    await new Promise((resolve) => setTimeout(resolve, 100));
 
    return { success: true, taskId: ctx.callId };
  },
 
  // 3. Optional default TaskOptions
  {
    retry: 1, // Default retry count
  }
);
 
// How it might be enqueued:
// agent.taskRunner.enqueueTask(myCustomTask, { someData: "hello" }, { priority: 1 });

The TaskContext passed to the task function includes:

  • callId: A unique ID generated for this specific task execution.
  • debug: A Debugger function instance (configured via TaskOptions or defaulting from agent config).

While you typically won't need to define new tasks often (most work happens in action handlers), understanding this pattern helps clarify how core operations like runAction are structured and managed.

Direct Usage

While primarily used internally for actions, you could access the TaskRunner via agent.taskRunner within your custom code (e.g., inside an action handler or context hook) if you need to manage additional complex, long-running, or resource-intensive asynchronous operations with concurrency control. However, simple async/await within action handlers is usually sufficient.

The TaskRunner provides robust management for the asynchronous operations essential to the agent's functioning, ensuring stability and controlled resource usage.

On this page