AI_den
article thumbnail
Published 2025. 3. 19. 10:42
[ 코드 리뷰 ] OpenManus ( 3 ) Code Review

개요

  • 이전 리뷰 보기 : OpenManus 리뷰 1, OpenManus 리뷰 2
    • 리뷰 1,2에서 OpenManus 주요한 main Class들에 대해 다루었습니다.
    • main logic에 따라 참여하는 코드는 모두 다루었다고 할 수 있습니다.
    • 다만, OpenManus에서 Unstable version으로 공개한 runflow 코드와 Tool 관련 코드가 아직 정리되지 않았습니다. 따라서 이번에 Tool 관련 코드를 정리하고 다음 글에서, runflow 코드를 정리하고 마지막 글로 마무리를 하면 될 것 같습니다.

설명

BaseTool

  • BaseTool's 클래스 변수

      name: str
      description: str
      parameters: Optional[dict] = None
    • Tool의 부모 Class인 BaseTool로 보입니다.
    • name, description에서 Tool의 이름과 설명을 저장합니다.
    • parameters 에서 dict or None 값을 저장합니다.
      • dict를 사용한 이유는 아래 메서드 설명에 나옵니다.
  • __call__ ( magic method )

    ```python
    async def call(self, **kwargs) -> Any:

      """Execute the tool with given parameters."""
      return await self.execute(**kwargs)
     ```
    • magic method인 __call__메서드입니다. parameter와 함께 tool을 실행하는 메서드입니다.
    • __call__ method는 코드에서 보이듯이 class내에 있는 execute 메서드로 넘겨주는 것을 확인할 수 있습니다.
  • excute ( abstract method )

      @abstractmethod
      async def execute(self, **kwargs) -> Any:
          """Execute the tool with given parameters."""
    • 추상 메서드는 리뷰 2에서도 설명하였듯이 sub-class가 overriding하여 구현하는 method입니다.
  • to_param

      def to_param(self) -> Dict:
          """Convert tool to function call format."""
          return {
              "type": "function",
              "function": {
                  "name": self.name,
                  "description": self.description,
                  "parameters": self.parameters,
              },
          }
    • to_param 함수는 tool을 function call format(dictionary)로 변환해주는 메서드입니다.
    • 아까 execute__call__에서 받는 parameters가 이것입니다.
    • 또한, attributes 중 parameter도 이것입니다.

ToolResult

  • ToolResult

    • 해당 클래스는 execution의 결과를 보여주는 클래스입니다.
  • ToolResult's 클래스 변수

      output: Any = Field(default=None)
      error: Optional[str] = Field(default=None)
      system: Optional[str] = Field(default=None)
    • output : execution 결과를 담는 변수로 보입니다.
    • error : execution 결과가 error라면 error 내용을 담는 변수로 보입니다.
      • error가 없을 때는 None이 있어야 하므로 Optional 하게 설정합니다.
    • system : System Message로 유추됩니다.
  • __bool__

      def __bool__(self):
          return any(getattr(self, field) for field in self.__fields__)
    • return 부분을 분석하겠습니다.
      1. getattr(self,field)
        • self.field와 같은 의미입니다. field 값을 가져온다는 것입니다.
      2. for field in self.__fields__
        • __fields__로 field list를 가져와 반복합니다.
      3. any()
        • ()안의 값 중 하나라도 True면 True를 반환하는 함수입니다.
      • 따라서, return은 Field값이 전부 비어있지는 않은지 검사하는 것이라고 할 수 있습니다.
        • error가 발생하면 error가 채워지고, 정상 실행 시, output이 채워지기 떄문입니다.
  • __add__

      def __add__(self, other: "ToolResult"):
          def combine_fields(
              field: Optional[str], other_field: Optional[str], concatenate: bool = True
          ):
              if field and other_field:
                  if concatenate:
                      return field + other_field
                  raise ValueError("Cannot combine tool results")
              return field or other_field
    
          return ToolResult(
              output=combine_fields(self.output, other.output),
              error=combine_fields(self.error, other.error),
              system=combine_fields(self.system, other.system),
          )
    • 메서드 내부에서 combine_fields 함수를 정의합니다.
      • 해당 함수는 객체 1의 Field, 객체 2의 Field, 합칠지 말지 정하는 변수 이렇게 총 3개의 값을 전달 받습니다.
      • 만약 객체 1,2 field가 정상적으로 채워져있고, concatenate가 True라면 합쳐서 return하고 아니라면 각 객체 중 하나를 return합니다.
    • return 부분을 보면, 그 2개의 field 값을 합쳐서 하나의 ToolResult를 반환합니다.
  • __str__

      def __str__(self):
          return f"Error: {self.error}" if self.error else self.output
    • error가 있다면 error를 출력하고 아니라면 output을 출력합니다.
  • replace

      def replace(self, **kwargs):
          """Returns a new ToolResult with the given fields replaced."""
          # return self.copy(update=kwargs)
          return type(self)(**{**self.dict(), **kwargs})
    • 주어진 field로 교체된 새로운 ToolResult를 반환합니다.

ToolCollection

  • define된 tool의 collection class입니다.

  • ToolCollection's 클래스 변수

    • 없습니다.
  • __init__

      def __init__(self, *tools: BaseTool):
          self.tools = tools
          self.tool_map = {tool.name: tool for tool in tools}
    • tools, tool_map 인스턴스 변수 2개를 정의합니다.
    • tools는 BaseTool들로 정의하고, tool_map은 BaseTool들의 이름을 이용하여 이름 : tool 로 mapping합니다.
  • __iter__

      def __iter__(self):
          return iter(self.tools)
    • iter() 함수를 이용하여 tools를 iterable하게 만들어 return합니다.
  • to_params

      def to_params(self) -> List[Dict[str, Any]]:
          return [tool.to_param() for tool in self.tools]
    • 가진 tool을 모두 parameter화 하여 생긴 list를 return합니다.
  • execute

      async def execute(
          self, *, name: str, tool_input: Dict[str, Any] = None
      ) -> ToolResult:
          tool = self.tool_map.get(name)
          if not tool:
              return ToolFailure(error=f"Tool {name} is invalid")
          try:
              result = await tool(**tool_input)
              return result
          except ToolError as e:
              return ToolFailure(error=e.message)
    • 실행 함수입니다.
      • 이름, tool_input을 입력 받아 execute합니다.
      • 아까 위에서 BaseTool에 있던 Abstract Method를 Overriding하는 것을 볼 수 있습니다.
    • 로직은 아래와 같습니다.
      1. 전달받은 name을 이용하여 tool_map에서 tool을 get
      2. tool이 없다면 Error raise
      3. tool이 있다면, tool에 tool_input 전달하여 결과 받고 종료
  • execute_all

       async def execute_all(self) -> List[ToolResult]:
          """Execute all tools in the collection sequentially."""
          results = []
          for tool in self.tools:
              try:
                  result = await tool()
                  results.append(result)
              except ToolError as e:
                  results.append(ToolFailure(error=e.message))
          return results
    • tools에 있는 모든 tool을 execute하는 메서드입니다.
    • 중간에 error 발생 시, result에 error를 추가합니다.
  • get_tool

      def get_tool(self, name: str) -> BaseTool:
          return self.tool_map.get(name)
    • 이름으로 tool을 가져오는 메서드입니다. 이게 있으면 왜 위에서 get_tool을 안 썼는지 궁금해집니다.
  • add_tool

      def add_tool(self, tool: BaseTool):
          self.tools += (tool,)
          self.tool_map[tool.name] = tool
          return self
    • Instance가 가진 tool collection에 tool을 하나 더 추가합니다.
  • add_tools

      def add_tools(self, *tools: BaseTool):
          for tool in tools:
              self.add_tool(tool)
          return self
    • Instance가 가진 tool collection에 tool을 여러 개 추가합니다.
    • 여러 번, add_tool을 실행하는 방식으로 동작합니다.

  • 이제 OpenManus에서 이용하는 tool을 어떻게 구현하였는지 리뷰해보겠습니다.

FileSaver

  • 클래스 선언

      class FileSaver(BaseTool):
          name: str = "file_saver"
          description: str = """Save content to a local file at a specified path.
          Use this tool when you need to save text, code, or generated content to a file on the local filesystem.
          The tool accepts content and a file path, and saves the content to that location.
          """
    • 앞서, BaseTool 부분에서 확인하였듯 name과 discription으로 해당 클래스 이름과 설명을 작성해두었습니다.
      • 설명을 읽어보면, 특정 경로에 content를 저장하는 것이라고 합니다.
  • parameters

      parameters: dict = {
          "type": "object",
          "properties": {
              "content": {
                  "type": "string",
                  "description": "(required) The content to save to the file.",
              },
              "file_path": {
                  "type": "string",
                  "description": "(required) The path where the file should be saved, including filename and extension.",
              },
              "mode": {
                  "type": "string",
                  "description": "(optional) The file opening mode. Default is 'w' for write. Use 'a' for append.",
                  "enum": ["w", "a"],
                  "default": "w",
              },
          },
          "required": ["content", "file_path"],
      }
    • excute에서 사용될 parameter 부분입니다.
      • 3가지 Key를 가지며 일반적으로 파일을 저장하기 위해 필요하다고 생각할만한 정보만 들어있습니다.
        1. content - 저장할 내용
        2. file_path - 저장할 위치
        3. mode - 기존 내용에 추가인지 새로 작성하는 것인지 ( 기본은 새로 작성입니다. )
  • execute

      async def execute(self, content: str, file_path: str, mode: str = "w") -> str:
          """
          Save content to a file at the specified path.
    
          Args:
              content (str): The content to save to the file.
              file_path (str): The path where the file should be saved.
              mode (str, optional): The file opening mode. Default is 'w' for write. Use 'a' for append.
    
          Returns:
              str: A message indicating the result of the operation.
          """
          try:
              # Ensure the directory exists
              directory = os.path.dirname(file_path)
              if directory and not os.path.exists(directory):
                  os.makedirs(directory)
    
              # Write directly to the file
              async with aiofiles.open(file_path, mode, encoding="utf-8") as file:
                  await file.write(content)
    
              return f"Content successfully saved to {file_path}"
          except Exception as e:
              return f"Error saving file: {str(e)}"
    • tool 실행을 위한 함수입니다.
      • parameter에 있는 key 3가지를 인자로 받고, 결과를 msg로 return합니다.
    • logic은 아래와 같습니다.
      1. file_path로 받은 위치에 폴더가 존재하는지 확인합니다.
        • 폴더가 없다면 제작합니다.
      2. 비동기로 content를 담은 파일을 작성합니다.
      3. 수행 결과를 return합니다.
    • logic은 단순하게 저희가 생각하는 것과 동일합니다. 파일을 python 코드로 작성한다면 어떻게 할까를 떠올려 보았을 때, 구현할 방식 그대로 execute에 작성한 것이라고 보아도 됩니다.

GoogleSearch

  • 클래스 선언

      class GoogleSearch(BaseTool):
      name: str = "google_search"
      description: str = """Perform a Google search and return a list of relevant links.
      Use this tool when you need to find information on the web, get up-to-date data, or research specific topics.
      The tool returns a list of URLs that match the search query.
      """
    • BaseTool을 상속하며, name과 description을 작성합니다.
      • 설명대로, Google Search를 수행하고, 관련된 link list를 return합니다.
  • parameters

    • execute에서 사용할 parameter입니다.

      parameters: dict = {
        "type": "object",
        "properties": {
            "query": {
                "type": "string",
                "description": "(required) The search query to submit to Google.",
            },
            "num_results": {
                "type": "integer",
                "description": "(optional) The number of search results to return. Default is 10.",
                "default": 10,
            },
        },
        "required": ["query"],
      }
    • query, num_results를 key로 가지며 각각 아래와 같은 역할을 합니다.

      1. query - google에 submit할 search query (질문이라고 할 수 있습니다.)
      2. num_results - search result의 수를 가집니다. (default = 10)
  • execute

      async def execute(self, query: str, num_results: int = 10) -> List[str]:
          """
          Execute a Google search and return a list of URLs.
    
          Args:
              query (str): The search query to submit to Google.
              num_results (int, optional): The number of search results to return. Default is 10.
    
          Returns:
              List[str]: A list of URLs matching the search query.
          """
          # Run the search in a thread pool to prevent blocking
          loop = asyncio.get_event_loop()
          links = await loop.run_in_executor(
              None, lambda: list(search(query, num_results=num_results))
          )
    
          return links
    • parameter의 query와 num_results를 가집니다.

    • logic은 아래와 같습니다.

      1. 현재 OS thread에서 실행중인 event loop를 가져옵니다.

        • asyncio.get_event_loop에 대한 설명은 아래와 같습니다. (공식문서 참고)

          asyncio.get_running_loop()
          Return the running event loop in the current OS thread.
          
          Raise a RuntimeError if there is no running event loop.
          
          This function can only be called from a coroutine or a callback.
          
          Added in version 3.7.
      2. 가져온 loop를 이용하여 google의 search 함수를 실행합니다.

      3. 실행 결과를 반환합니다.

    • Google Search에서도 execute는 loop를 가져와서 실행하는 것 외에 저희가 직접 구현하는 것과 별반 다르지 않습니다.

PythonExecute

  • 클래스 선언

      class PythonExecute(BaseTool):
      """A tool for executing Python code with timeout and safety restrictions."""
    
      name: str = "python_execute"
      description: str = "Executes Python code string. Note: Only print outputs are visible, function return values are not captured. Use print statements to see results."
    • BaseTool을 상속하고, name과 description을 작성합니다.
      • 설명의 내용은 Python code를 실행하는 함수라고 되어 있습니다.
  • parameter

      parameters: dict = {
          "type": "object",
          "properties": {
              "code": {
                  "type": "string",
                  "description": "The Python code to execute.",
              },
          },
          "required": ["code"],
      }
    • 다른 기능과 다르게 python execute에서는 key가 code 1개만 존재합니다.
      • 이 코드는 실행하려하는 코드를 말합니다.
  • execute

      async def execute(
          self,
          code: str,
          timeout: int = 5,
      ) -> Dict:
          """
          Executes the provided Python code with a timeout.
    
          Args:
              code (str): The Python code to execute.
              timeout (int): Execution timeout in seconds.
    
          Returns:
              Dict: Contains 'output' with execution output or error message and 'success' status.
          """
          result = {"observation": ""}
    
          def run_code():
              try:
                  safe_globals = {"__builtins__": dict(__builtins__)}
    
                  import sys
                  from io import StringIO
    
                  output_buffer = StringIO()
                  sys.stdout = output_buffer
    
                  exec(code, safe_globals, {})
    
                  sys.stdout = sys.__stdout__
    
                  result["observation"] = output_buffer.getvalue()
    
              except Exception as e:
                  result["observation"] = str(e)
                  result["success"] = False
    
          thread = threading.Thread(target=run_code)
          thread.start()
          thread.join(timeout)
    
          if thread.is_alive():
              return {
                  "observation": f"Execution timeout after {timeout} seconds",
                  "success": False,
              }
    
          return result
    • code와 timeout을 전달받아 실행합니다.

      • code는 parameters에 있고, timeout은 따로 받습니다.
    • 해당 함수에는 run_code라는 지역메서드가 존재합니다.

      • 해당 코드는 아래와 같습니다.
        1. 출력을 capture하기 위한 버퍼를 StringIO를 이용하여 생성합니다.
        2. exec로 코드를 실행하고 sys.stdout에 결과를 캡처합니다.
        3. 결과를 dictionary에 저장합니다.
    • run_code 함수를 포함한 전제 logic은 아래와 같습니다.

      1. 코드 실행을 위한 별도의 Thread 생성
      2. Thread시작
      3. 지정된 timeout까지 thread 대기
      4. timeout 초과시, error message
      5. 초과하지 않을 시, 정상 결과 return
      • is_alive() 체크를 하는 이유는 timeout이 초과했다면, thread가 살아있을 것이고, 그때 에러를 발생시키기 위함입니다.
        • 만약 정상 종료했다면, timeout은 초과되지 않습니다.

결론 및 다음 글 예고

결론

  • Tool 코드는 대부분 일반적으로 Python에서 저희가 작성하는 코드와 동일합니다.
    • 동일한 코드를 함수화하여 작성하고, 비동기적으로 처리해야 하므로, 쓰레드를 이용하거나, 비동기 모듈을 사용하여 구현한다는 점에서 차이가 있습니다.
    • Agent를 구현하시는 분들은 이 점을 기억하셔서 구현하면 좋을 것 같습니다.

예고

  • 마지막 남은 Brower tool이 길이가 길어 다음 글에서 정리 하겠습니다.
  • 또한, 다음번에 run flow코드를 정리하고 길이가 감당 된다면, 실제 실행 결과 및 실행 로직 정리를 함께 하도록 하겠습니다.

'Code Review' 카테고리의 다른 글

[ 코드 리뷰 ] OpenManus (4)  (0) 2025.04.02
[ 코드 리뷰 ] OpenManus ( 2 )  (0) 2025.03.11
[ 코드 리뷰 ] OpenManus (1)  (0) 2025.03.10
profile

AI_den

@전길원

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!