Skip to content

Graph of Thoughts Framework

An implementation of the Graph of Thoughts (GoT) reasoning framework from this paper.

The implementation represents the reasoning process as a graph where nodes are thoughts and edges represent transformations. The flow of reasoning is controlled by a Graph of Operations (GoO).

Defining the Graph of Operations (GoO)

To use GraphOfThoughts, you must provide a graph_of_operations argument to the run_async method. This argument is a list of tuples, where each tuple defines an operation step:

graph_of_operations: List[Tuple[str, Dict]]

  • The first element of the tuple is the name of the operation (e.g., 'Generate', 'Score', 'KeepBest').
  • The second element is a dictionary containing the parameters specific to that operation.

Example GoO:

from cogitator import ThoughtExpansion

EXAMPLE_GOO = [
    # Step 1: Generate 3 new thoughts from the initial question (in 'frontier' set)
    #         Store results in the 'generated_thoughts' set. Use the 'expand' prompt. Expect ThoughtExpansion schema.
    ('Generate', {'k': 3, 'target_set': 'frontier', 'output_set': 'generated_thoughts', 'prompt_key': 'expand', 'response_schema': ThoughtExpansion}),

    # Step 2: Score the thoughts generated in the previous step. Use 'evaluate' prompt.
    ('Score', {'target_set': 'generated_thoughts', 'prompt_key': 'evaluate'}),

    # Step 3: Keep only the single best-scoring thought from the previous step.
    #         Put the result back into the 'frontier' set for potential further steps or final answer generation.
    ('KeepBest', {'N': 1, 'target_set': 'generated_thoughts', 'output_set': 'frontier'})
]

Main Class (GraphOfThoughts)

cogitator.strategies.graph_of_thoughts.GraphOfThoughts

Implements the Graph of Thoughts (GoT) prompting framework.

GoT represents the reasoning process as a graph where nodes are partial solutions (thoughts) and edges represent dependencies or transformations between them. It allows for applying operations like generation, aggregation, scoring, and selection according to a defined Graph of Operations (GoO).

Reference

Besta et al. (v4; 2024) "Graph of Thoughts: Solving Elaborate Problems with Large Language Models". https://arxiv.org/abs/2308.09687

Source code in cogitator/strategies/graph_of_thoughts.py
class GraphOfThoughts:
    """Implements the Graph of Thoughts (GoT) prompting framework.

    GoT represents the reasoning process as a graph where nodes are partial solutions
    (thoughts) and edges represent dependencies or transformations between them.
    It allows for applying operations like generation, aggregation, scoring,
    and selection according to a defined Graph of Operations (GoO).

    Reference:
        Besta et al. (v4; 2024) "Graph of Thoughts: Solving Elaborate Problems with Large Language Models".
        https://arxiv.org/abs/2308.09687
    """

    def __init__(
        self,
        llm: BaseLLM,
        embedder: Optional[BaseEmbedder] = None,
        final_answer_format: Literal["text", "json"] = "text",
        prompts: Optional[Dict[str, str]] = None,
        max_tokens: Optional[int] = None,
        seed: Optional[int] = None,
    ) -> None:
        """Initializes the GraphOfThoughts strategy handler.

        Args:
            llm: The language model instance.
            embedder: Optional embedding model instance for similarity checks.
            final_answer_format: Whether to extract the final answer as raw text or JSON.
            prompts: A dictionary mapping operation types (e.g., 'expand', 'evaluate',
                     'aggregate', 'improve') to their prompt templates.
            max_tokens: Default maximum tokens for LLM generation calls.
            seed: Default random seed for LLM calls.
        """
        self.llm = llm
        self.embedder = embedder or SentenceTransformerEmbedder()
        self.final_answer_format = final_answer_format
        self.prompts = prompts or self._get_default_prompts()
        self.max_tokens = max_tokens
        self.seed = seed

    def _get_default_prompts(self) -> Dict[str, str]:
        """Provides default prompt templates."""
        return {
            "expand": (
                "Generate {k} distinct reasoning steps or thoughts to continue "
                "from the context below. Return ONLY a JSON object with a SINGLE KEY named 'thoughts' "
                "whose value is a list of strings.\n"
                "Context:\n{ctx}\n\nJSON Output:"
            ),
            "evaluate": (
                "Evaluate the quality of the reasoning path below on a scale of 1-10 "
                "(1=bad, 10=excellent). Return response as a JSON object with keys "
                '"score" (int) and "justification" (str).\n'
                "Path:\n{steps}\n\nJSON Evaluation:"
            ),
            "aggregate": (
                "Combine the information from the following inputs into {k} synthesized thought(s). "
                "Maximize coherence and completeness. Return as a JSON list of strings.\n"
                "Inputs:\n{context}\n\nJSON Output:"
            ),
            "improve": (
                "Improve the following thought based on the initial query and context. "
                "Return {k} improved versions as a JSON list of strings.\n"
                "Original Thought:\n{ctx}\n\nJSON Output:"
            ),
            # Add other prompts as needed
        }

    def _find_similar_node(
        self, new_node: GoTNode, nodes_to_check: List[GoTNode], threshold: float
    ) -> Optional[GoTNode]:
        """Finds an existing node similar to `new_node` based on embedding similarity.

        Args:
            new_node: The node to check for similarity.
            nodes_to_check: A list of existing nodes to compare against.
            threshold: The cosine similarity threshold for merging.

        Returns:
            The similar node if found above the threshold, otherwise None.
        """
        if not self.embedder or new_node.embed is None:
            logger.debug(
                f"Skipping similarity check for node {new_node.id} (no embedder or embedding)."
            )
            return None

        new_norm = np.linalg.norm(new_node.embed)
        if new_norm < 1e-9:
            logger.debug(f"Skipping similarity check for node {new_node.id} (zero norm embedding).")
            return None

        logger.debug(
            f"Checking similarity for node {new_node.id} against {len(nodes_to_check)} nodes."
        )
        for other in nodes_to_check:
            if other.id == new_node.id or other.embed is None:
                continue

            other_norm = np.linalg.norm(other.embed)
            if other_norm < 1e-9 or new_node.is_ancestor(other):
                continue

            try:
                embed1 = new_node.embed.ravel()
                embed2 = other.embed.ravel()
                if embed1.shape != embed2.shape:
                    logger.warning(f"Embedding shape mismatch: {embed1.shape} vs {embed2.shape}")
                    continue

                dot_product = np.dot(embed1, embed2)
                sim = float(dot_product / (new_norm * other_norm))
            except ValueError as e:
                logger.warning(
                    f"Error calculating similarity between node {new_node.id} ({embed1.shape}) and {other.id} ({embed2.shape}): {e}"
                )
                continue

            if sim > threshold:
                logger.info(
                    f"Potential merge: node {new_node.id} similar to node {other.id} (similarity: {sim:.3f})"
                )
                return other
        return None

    def _create_operation(self, op_name: str, params: Dict) -> GoTOperation:
        """Factory method to create operation instances."""
        if op_name == "Generate":
            return GenerateOp(**params)
        elif op_name == "Aggregate":
            return AggregateOp(**params)
        elif op_name == "Improve":
            raise NotImplementedError(f"Operation '{op_name}' not implemented yet.")
        elif op_name == "Score":
            return ScoreOp(**params)
        elif op_name == "KeepBest":
            return KeepBestOp(**params)
        # Add other operations like Validate, etc.
        else:
            raise ValueError(f"Unknown GoT operation: {op_name}")

    async def run_async(
        self,
        question: str,
        graph_of_operations: List[Tuple[str, Dict]],
        semaphore: Optional[asyncio.Semaphore] = None,
        **kwargs: Any,
    ) -> str:
        """Asynchronously executes the Graph of Thoughts reasoning process based on a GoO.

        Args:
            question: The initial question or problem statement.
            graph_of_operations: A list defining the sequence of operations and their parameters.
                                 Example: [('Generate', {'k': 5, 'output_set': 'thoughts1'}),
                                           ('Score', {'target_set': 'thoughts1'}),
                                           ('KeepBest', {'N': 3, 'target_set': 'thoughts1', 'output_set': 'frontier'}),
                                           ('Aggregate', {'target_sets': ['frontier'], 'k': 1, 'output_set': 'aggregated'}),
                                           ...]
            semaphore: Optional asyncio.Semaphore to limit concurrent LLM calls.
            **kwargs: Additional arguments passed to internal LLM calls (e.g., seed, max_tokens).

        Returns:
            The final answer string generated by the LLM based on the best reasoning path found.
            Returns an error message if no paths are generated or the final generation fails.
        """
        GoTNode._id_counter = 0
        root = GoTNode([question], embedder=None, text_content=question)  # Embed root optionally
        grs = GraphReasoningState(root)

        global_llm_params = {
            "seed": kwargs.pop("seed", self.seed),
            "max_tokens": kwargs.pop("max_tokens", self.max_tokens),
            **kwargs,  # Pass remaining kwargs
        }

        logger.info(f"Starting GoT run (async) with {len(graph_of_operations)} operations.")

        for op_name, op_params in graph_of_operations:
            logger.info(f"Executing GoO Step: {op_name} with params {op_params}")
            try:
                operation = self._create_operation(op_name, op_params)
                await operation.execute_async(
                    grs=grs,
                    llm=self.llm,
                    prompts=self.prompts,
                    embedder=self.embedder,
                    semaphore=semaphore,
                    **global_llm_params,
                )
            except Exception as e:
                logger.error(f"Error executing operation {op_name}: {e}", exc_info=True)
                return f"Error during operation {op_name}"

            # Optional: Add logging for GRS state after each step
            # logger.debug(f"GRS after {op_name}: {grs.active_sets}")

        # Determine final result - assumes the relevant result is in 'frontier' or last output set
        final_candidates = grs.get_active_set(
            "frontier"
        )  # Or use a specific output set name from GoO
        if not final_candidates:
            # Fallback if frontier is empty - check last known generated/aggregated set etc.
            # This needs robust handling based on GoO structure
            logger.warning("Frontier is empty, checking all nodes...")
            final_candidates = list(grs.all_nodes.values())

        if not final_candidates:
            logger.error("No candidate nodes found at the end of GoT run (async).")
            return "Error: No reasoning paths generated."

        # Select best node based on score (or other criteria if defined)
        best_node = max(final_candidates, key=lambda n: n.score)
        logger.info(f"Selected best node (async): {best_node}")

        # Use the best node's text_content for the final answer generation
        reasoning_context = best_node.text_content or "No final thought content available."
        # Or, reconstruct path if needed: numbered_reasoning = "\n".join(f"{i + 1}. {s}" for i, s in enumerate(best_node.steps))
        final_prompt = f"Based on the final reasoning or result:\n{reasoning_context}\n\nAnswer the original question: {question}"
        logger.debug(f"Final prompt (async):\n{final_prompt}")

        try:
            local_kwargs_final = global_llm_params.copy()
            final_seed = local_kwargs_final.pop("seed", self.seed)
            final_max_tokens = local_kwargs_final.pop("max_tokens", self.max_tokens)

            if self.final_answer_format == "json":
                json_req = (
                    final_prompt
                    + '\n\nReturn exactly one JSON object with a single key "final_answer" whose value is the answer string.\n\nJSON Answer:'
                )
                gen_args = {
                    "response_model": ExtractedAnswer,
                    "max_tokens": final_max_tokens,
                    "seed": final_seed,
                    **local_kwargs_final,
                }
                if semaphore:
                    async with semaphore:
                        parsed = await self.llm.generate_json_async(json_req, **gen_args)
                else:
                    parsed = await self.llm.generate_json_async(json_req, **gen_args)
                final_answer_value = parsed.final_answer
                if isinstance(final_answer_value, str):
                    return final_answer_value.strip()
                elif final_answer_value is not None:
                    return str(final_answer_value)
                else:
                    logger.warning("GoT final async JSON extraction returned None.")
                    return ""
            else:
                gen_args = {
                    "max_tokens": final_max_tokens,
                    "seed": final_seed,
                    **local_kwargs_final,
                }
                if semaphore:
                    async with semaphore:
                        return (await self.llm.generate_async(final_prompt, **gen_args)).strip()
                else:
                    return (await self.llm.generate_async(final_prompt, **gen_args)).strip()
        except Exception as e:
            logger.error("Final async answer generation failed: %s", e, exc_info=True)
            return "Error generating final async answer."

    def run(self, question: str, graph_of_operations: List[Tuple[str, Dict]], **kwargs: Any) -> str:
        """Synchronous execution is not supported for GraphOfThoughts."""
        raise NotImplementedError(
            "Synchronous execution (run()) is not supported for GraphOfThoughts due to its "
            "reliance on internal async operations and potential event loop conflicts. "
            "Please use the asynchronous run_async() method within an async context instead."
        )

__init__(llm, embedder=None, final_answer_format='text', prompts=None, max_tokens=None, seed=None)

Initializes the GraphOfThoughts strategy handler.

Parameters:

Name Type Description Default
llm BaseLLM

The language model instance.

required
embedder Optional[BaseEmbedder]

Optional embedding model instance for similarity checks.

None
final_answer_format Literal['text', 'json']

Whether to extract the final answer as raw text or JSON.

'text'
prompts Optional[Dict[str, str]]

A dictionary mapping operation types (e.g., 'expand', 'evaluate', 'aggregate', 'improve') to their prompt templates.

None
max_tokens Optional[int]

Default maximum tokens for LLM generation calls.

None
seed Optional[int]

Default random seed for LLM calls.

None
Source code in cogitator/strategies/graph_of_thoughts.py
def __init__(
    self,
    llm: BaseLLM,
    embedder: Optional[BaseEmbedder] = None,
    final_answer_format: Literal["text", "json"] = "text",
    prompts: Optional[Dict[str, str]] = None,
    max_tokens: Optional[int] = None,
    seed: Optional[int] = None,
) -> None:
    """Initializes the GraphOfThoughts strategy handler.

    Args:
        llm: The language model instance.
        embedder: Optional embedding model instance for similarity checks.
        final_answer_format: Whether to extract the final answer as raw text or JSON.
        prompts: A dictionary mapping operation types (e.g., 'expand', 'evaluate',
                 'aggregate', 'improve') to their prompt templates.
        max_tokens: Default maximum tokens for LLM generation calls.
        seed: Default random seed for LLM calls.
    """
    self.llm = llm
    self.embedder = embedder or SentenceTransformerEmbedder()
    self.final_answer_format = final_answer_format
    self.prompts = prompts or self._get_default_prompts()
    self.max_tokens = max_tokens
    self.seed = seed

run_async(question, graph_of_operations, semaphore=None, **kwargs) async

Asynchronously executes the Graph of Thoughts reasoning process based on a GoO.

Parameters:

Name Type Description Default
question str

The initial question or problem statement.

required
graph_of_operations List[Tuple[str, Dict]]

A list defining the sequence of operations and their parameters. Example: [('Generate', {'k': 5, 'output_set': 'thoughts1'}), ('Score', {'target_set': 'thoughts1'}), ('KeepBest', {'N': 3, 'target_set': 'thoughts1', 'output_set': 'frontier'}), ('Aggregate', {'target_sets': ['frontier'], 'k': 1, 'output_set': 'aggregated'}), ...]

required
semaphore Optional[Semaphore]

Optional asyncio.Semaphore to limit concurrent LLM calls.

None
**kwargs Any

Additional arguments passed to internal LLM calls (e.g., seed, max_tokens).

{}

Returns:

Type Description
str

The final answer string generated by the LLM based on the best reasoning path found.

str

Returns an error message if no paths are generated or the final generation fails.

Source code in cogitator/strategies/graph_of_thoughts.py
async def run_async(
    self,
    question: str,
    graph_of_operations: List[Tuple[str, Dict]],
    semaphore: Optional[asyncio.Semaphore] = None,
    **kwargs: Any,
) -> str:
    """Asynchronously executes the Graph of Thoughts reasoning process based on a GoO.

    Args:
        question: The initial question or problem statement.
        graph_of_operations: A list defining the sequence of operations and their parameters.
                             Example: [('Generate', {'k': 5, 'output_set': 'thoughts1'}),
                                       ('Score', {'target_set': 'thoughts1'}),
                                       ('KeepBest', {'N': 3, 'target_set': 'thoughts1', 'output_set': 'frontier'}),
                                       ('Aggregate', {'target_sets': ['frontier'], 'k': 1, 'output_set': 'aggregated'}),
                                       ...]
        semaphore: Optional asyncio.Semaphore to limit concurrent LLM calls.
        **kwargs: Additional arguments passed to internal LLM calls (e.g., seed, max_tokens).

    Returns:
        The final answer string generated by the LLM based on the best reasoning path found.
        Returns an error message if no paths are generated or the final generation fails.
    """
    GoTNode._id_counter = 0
    root = GoTNode([question], embedder=None, text_content=question)  # Embed root optionally
    grs = GraphReasoningState(root)

    global_llm_params = {
        "seed": kwargs.pop("seed", self.seed),
        "max_tokens": kwargs.pop("max_tokens", self.max_tokens),
        **kwargs,  # Pass remaining kwargs
    }

    logger.info(f"Starting GoT run (async) with {len(graph_of_operations)} operations.")

    for op_name, op_params in graph_of_operations:
        logger.info(f"Executing GoO Step: {op_name} with params {op_params}")
        try:
            operation = self._create_operation(op_name, op_params)
            await operation.execute_async(
                grs=grs,
                llm=self.llm,
                prompts=self.prompts,
                embedder=self.embedder,
                semaphore=semaphore,
                **global_llm_params,
            )
        except Exception as e:
            logger.error(f"Error executing operation {op_name}: {e}", exc_info=True)
            return f"Error during operation {op_name}"

        # Optional: Add logging for GRS state after each step
        # logger.debug(f"GRS after {op_name}: {grs.active_sets}")

    # Determine final result - assumes the relevant result is in 'frontier' or last output set
    final_candidates = grs.get_active_set(
        "frontier"
    )  # Or use a specific output set name from GoO
    if not final_candidates:
        # Fallback if frontier is empty - check last known generated/aggregated set etc.
        # This needs robust handling based on GoO structure
        logger.warning("Frontier is empty, checking all nodes...")
        final_candidates = list(grs.all_nodes.values())

    if not final_candidates:
        logger.error("No candidate nodes found at the end of GoT run (async).")
        return "Error: No reasoning paths generated."

    # Select best node based on score (or other criteria if defined)
    best_node = max(final_candidates, key=lambda n: n.score)
    logger.info(f"Selected best node (async): {best_node}")

    # Use the best node's text_content for the final answer generation
    reasoning_context = best_node.text_content or "No final thought content available."
    # Or, reconstruct path if needed: numbered_reasoning = "\n".join(f"{i + 1}. {s}" for i, s in enumerate(best_node.steps))
    final_prompt = f"Based on the final reasoning or result:\n{reasoning_context}\n\nAnswer the original question: {question}"
    logger.debug(f"Final prompt (async):\n{final_prompt}")

    try:
        local_kwargs_final = global_llm_params.copy()
        final_seed = local_kwargs_final.pop("seed", self.seed)
        final_max_tokens = local_kwargs_final.pop("max_tokens", self.max_tokens)

        if self.final_answer_format == "json":
            json_req = (
                final_prompt
                + '\n\nReturn exactly one JSON object with a single key "final_answer" whose value is the answer string.\n\nJSON Answer:'
            )
            gen_args = {
                "response_model": ExtractedAnswer,
                "max_tokens": final_max_tokens,
                "seed": final_seed,
                **local_kwargs_final,
            }
            if semaphore:
                async with semaphore:
                    parsed = await self.llm.generate_json_async(json_req, **gen_args)
            else:
                parsed = await self.llm.generate_json_async(json_req, **gen_args)
            final_answer_value = parsed.final_answer
            if isinstance(final_answer_value, str):
                return final_answer_value.strip()
            elif final_answer_value is not None:
                return str(final_answer_value)
            else:
                logger.warning("GoT final async JSON extraction returned None.")
                return ""
        else:
            gen_args = {
                "max_tokens": final_max_tokens,
                "seed": final_seed,
                **local_kwargs_final,
            }
            if semaphore:
                async with semaphore:
                    return (await self.llm.generate_async(final_prompt, **gen_args)).strip()
            else:
                return (await self.llm.generate_async(final_prompt, **gen_args)).strip()
    except Exception as e:
        logger.error("Final async answer generation failed: %s", e, exc_info=True)
        return "Error generating final async answer."

run(question, graph_of_operations, **kwargs)

Synchronous execution is not supported for GraphOfThoughts.

Source code in cogitator/strategies/graph_of_thoughts.py
def run(self, question: str, graph_of_operations: List[Tuple[str, Dict]], **kwargs: Any) -> str:
    """Synchronous execution is not supported for GraphOfThoughts."""
    raise NotImplementedError(
        "Synchronous execution (run()) is not supported for GraphOfThoughts due to its "
        "reliance on internal async operations and potential event loop conflicts. "
        "Please use the asynchronous run_async() method within an async context instead."
    )

Available Operations

Here are the standard operations available. You can create custom operations by subclassing GoTOperation.

Base Operation Class

cogitator.strategies.graph_of_thoughts.GoTOperation

Bases: ABC

Abstract base class for all Graph of Thoughts operations.

__init__(**params)

Initializes the operation with specific parameters.

execute(grs, llm, prompts, embedder=None, **global_kwargs) abstractmethod

Executes the operation, modifying the GraphReasoningState.

Parameters:

Name Type Description Default
grs GraphReasoningState

The current graph reasoning state.

required
llm BaseLLM

The language model instance.

required
prompts Dict[str, str]

A dictionary of available prompt templates.

required
embedder Optional[BaseEmbedder]

Optional embedder for operations needing similarity.

None
global_kwargs Any

Global arguments like seed, max_tokens passed to LLM calls.

{}

execute_async(grs, llm, prompts, embedder=None, semaphore=None, **global_kwargs) abstractmethod async

Asynchronously executes the operation.

Generate Operation

cogitator.strategies.graph_of_thoughts.GenerateOp

Bases: GoTOperation

Generates new thoughts based on parent nodes.

execute_async(grs, llm, prompts, embedder, semaphore, **global_kwargs) async

Generates new thoughts asynchronously.

Score Operation

cogitator.strategies.graph_of_thoughts.ScoreOp

Bases: GoTOperation

Scores thoughts using the LLM.

execute_async(grs, llm, prompts, embedder, semaphore, **global_kwargs) async

Scores nodes asynchronously.

KeepBest Operation

cogitator.strategies.graph_of_thoughts.KeepBestOp

Bases: GoTOperation

Selects the top N nodes based on score.

execute_async(grs, llm, prompts, embedder, semaphore, **global_kwargs) async

Selects best nodes (synchronous logic sufficient).

execute(grs, llm, prompts, embedder, **global_kwargs)

Selects best nodes.

Aggregate Operation

cogitator.strategies.graph_of_thoughts.AggregateOp

Bases: GoTOperation

Aggregates multiple thoughts into new ones.

execute_async(grs, llm, prompts, embedder, semaphore, **global_kwargs) async

Aggregates thoughts asynchronously.

Internal State (Advanced)

These classes manage the internal graph structure.

GoTNode

cogitator.strategies.graph_of_thoughts.GoTNode

Represents a node (thought) in the Graph of Thoughts.

__init__(steps, embedder=None, parents=None, data=None, text_content=None)

Initializes a GoT node.

Parameters:

Name Type Description Default
steps List[str]

The sequence of reasoning steps/operations leading to this node.

required
embedder Optional[BaseEmbedder]

The embedding model used for calculating node similarity. Can be None.

None
parents Optional[List[GoTNode]]

A list of parent nodes.

None
data Optional[Any]

Optional arbitrary data associated with the node.

None
text_content Optional[str]

Optional string representing the actual thought content.

None

GraphReasoningState

cogitator.strategies.graph_of_thoughts.GraphReasoningState

Maintains the dynamic state of the GoT reasoning process.

__init__(root_node)

Initializes the Graph Reasoning State.

Parameters:

Name Type Description Default
root_node GoTNode

The initial node containing the problem input.

required

add_node(node)

Adds a new node to the state.

get_active_set(set_name)

Gets the list of nodes in a named active set.

get_nodes(node_ids)

Retrieves nodes by their IDs.

set_active_set(set_name, nodes)

Sets or replaces a named active set.

update_node(node)

Updates an existing node in the state (e.g., score, validity).