Daydreams Logo

Task Runner

Managing asynchronous operations and concurrency.

Daydreams agents often need to perform asynchronous operations, primarily when executing Actions that interact with external APIs, blockchains, or other time-consuming processes. The framework includes a TaskRunner to manage these operations efficiently.

Purpose

The TaskRunner serves two main purposes:

  1. Concurrency Control: It limits the number of asynchronous tasks (like action handlers) that run simultaneously. This prevents the agent from overwhelming external services with too many requests at once (rate limiting) or consuming excessive local resources.
  2. Prioritization (Future): While the core framework primarily uses default priority, the underlying queue supports prioritizing tasks, allowing more critical operations to potentially execute sooner.

Initialization and Configuration

A TaskRunner instance is automatically created within createDreams unless a custom one is provided in the Config. Its concurrency limit can be configured:

import { createDreams, TaskRunner } from "@daydreamsai/core";
 
// Default TaskRunner with concurrency based on environment or default (e.g., 3)
const agent1 = createDreams({
  /* ... */
});
 
// Explicitly configure concurrency
const agent2 = createDreams({
  // ... other config
  taskRunner: new TaskRunner(5), // Allow up to 5 tasks concurrently
});
 
// Access the runner later
const runner = agent2.taskRunner;

The concurrency limit determines how many tasks from the internal queue can be actively running at any given moment.

Internal Usage (runAction)

The most common use of the TaskRunner is internal to the framework. When the agent parses an <action_call> from the LLM (see Streaming & Parsing), the handleActionCall function doesn't execute the action's handler directly. Instead, it uses taskRunner.enqueueTask to schedule the execution:

// Simplified logic within handleActionCall (handlers.ts)
 
// ... prepare context and arguments ...
 
result.data = await taskRunner.enqueueTask(
  runAction, // The task definition for running actions
  {
    // Parameters for runAction
    action,
    agent,
    logger,
    ctx: callCtx,
  },
  {
    // Options for the task enqueueing
    debug: agent.debugger,
    retry: action.retry,
    abortSignal,
    priority: 0, // Default priority
  }
);
 
// ... process result ...

This ensures that action executions respect the concurrency limits set for the agent.

Defining Tasks (task helper)

The framework uses a task helper function (from @daydreamsai/core/task) to define named, reusable asynchronous operations that can be managed by the TaskRunner. Key framework tasks like runAction (executing action handlers) and runGenerate (calling the LLM) are defined using this helper.

import { task, type TaskContext } from "@daydreamsai/core";
 
// Example task definition (similar to runAction)
const myCustomTask = task(
  // 1. Unique key/name for the task (used for logging/debugging)
  "agent:run:myCustomTask",
 
  // 2. The async function implementing the task logic
  async (params: { someData: string }, ctx: TaskContext) => {
    // 'params' are the arguments passed when enqueuing
    // 'ctx' provides task-specific context
 
    ctx.debug("myCustomTask", ["Executing"], params); // Use debug provided via options
    console.log(`Task ${ctx.callId} running with data:`, params.someData);
 
    // ... perform asynchronous work ...
    await new Promise((resolve) => setTimeout(resolve, 100));
 
    return { success: true, taskId: ctx.callId };
  },
 
  // 3. Optional default TaskOptions
  {
    retry: 1, // Default retry count
  }
);
 
// How it might be enqueued:
// agent.taskRunner.enqueueTask(myCustomTask, { someData: "hello" }, { priority: 1 });

The TaskContext passed to the task function includes:

  • callId: A unique ID generated for this specific task execution.
  • debug: A Debugger function instance (configured via TaskOptions or defaulting from agent config).

While you typically won't need to define new tasks often (most work happens in action handlers), understanding this pattern helps clarify how core operations like runAction are structured and managed.

Direct Usage

While primarily used internally for actions, you could access the TaskRunner via agent.taskRunner within your custom code (e.g., inside an action handler or context hook) if you need to manage additional complex, long-running, or resource-intensive asynchronous operations with concurrency control. However, simple async/await within action handlers is usually sufficient.

The TaskRunner provides robust management for the asynchronous operations essential to the agent's functioning, ensuring stability and controlled resource usage.

On this page