Skip to content

Agent-001 Part-1

Series

  1. Agent-001 Part-1
  2. Agent-001 Part-2
  3. Agent-001 Part-3

This three-part series will guide you through building an AI agent from the ground up. Our goal is to create a simple, autonomous, multi-hop agent.

  • Simple: The agent will be implemented in under 100 lines of code.
  • Autonomous: We'll specify only the task, leaving the agent free to determine its own steps to achieve the goal.
  • Multi-Hop: The agent will solve problems by executing multiple commands one after another, after checking the result from each step - similar to a human.

By the end of this series, you'll understand the core components of an AI agent, how to implement and customize one, and how to approach complex, non-linear problems using agent-based solutions.

The name Agent-001 is chosen as a playful nod to James Bond's 007. Our agent is starting out as a humble "001," but with enough development and the right tools, it could one day evolve into a full-fledged "007"—complete with a license to kill -9.

Problem statement

AI shines when working with vast amounts of unstructured, natural language data—such as lengthy log files, technical articles, or legal documents. To illustrate this, let's consider a scenario where natural language content involved. We have a public API that serves random programming jokes. Each time you call the API, it returns a new joke, providing a dynamic and unpredictable data source for our agent to process.

Programming Jokes API
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
$ curl --silent https://official-joke-api.appspot.com/jokes/programming/random | jq .
[
  {
    "type": "programming",
    "setup": "What's the best thing about a Boolean?",
    "punchline": "Even if you're wrong, you're only off by a bit.",
    "id": 15
  }
]
$

Let's define a practical use case for our agent. Suppose I want to help my team start each day with a bit of humor by sending out a programming joke via email. To do this, I'll use the programming jokes API and automate the process of sharing a joke with the team. Here are some important considerations for this workflow:

  • Not everyone may understand the joke, so the email should include a brief explanation.
  • The joke must be culturally appropriate and not offensive to anyone in the team.
  • If a joke is specific to a certain area of technology, it should be sent only to the relevant division. For example, a scheduling-related joke should go to the OS and Systems team, not the OOPS team.

A manual version of this workflow would look like this:

  1. After logging in each morning, I fetch a joke from the API.
  2. I use ChatGPT to generate an explanation for the joke.
  3. If the joke is appropriate and funny, I include both the joke and its explanation in an email and send it to the relevant division.

Ask AI

After repeating this manual workflow for several days, I realized it was becoming tedious and inefficient. Even though ChatGPT generates responses quickly, the wait time—however brief—adds unnecessary friction to my morning routine. As any programmer would, I decided to automate the process. Instead of manually fetching a joke and then asking ChatGPT for an explanation, I thought: why not create a script that does both tasks automatically and saves the results? This way, both the joke and its explanation would be ready for me as soon as I log in.

To start, I wrote a script that fetches a joke from the API and stores the JSON output in a file. Full file will be given at the end of this blog. Here, we'll have the snippet of main logic.

producer.py `fetch_text`: API call to programming jokes
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def fetch_text(url: str, timeout: int = 10) -> str:
    """Fetch text content from a URL and return it as a string.

    Uses the standard library so there are no extra dependencies.
   """
    try:
        with urllib.request.urlopen(url, timeout=timeout) as resp:
            raw = resp.read()
            try:
                return raw.decode("utf-8")
            except Exception:
                return raw.decode(errors="replace")
    except urllib.error.URLError as e:
        raise RuntimeError(f"Failed to fetch {url}: {e}")
producer.py Call `fetch_text` and store output
53
54
55
56
            json_text = json.loads(fetch_text(url))[0]
            target_file = out_dir / f"{json_text['id']}.txt"
            with target_file.open("w", encoding="utf-8") as fh:
                fh.write(json_text["setup"] + "\n" + json_text["punchline"] + "\n")

The script is named producer.py because it generates the content our agent will process. In a real-world context, this is similar to a CI/CD pipeline producing logs or artifacts. Keeping this script independent allows all three agents in our series to reuse it.

For our initial implementation, we'll make a simple LLM call: the script will pass the fetched joke to the LLM and request a brief explanation. Both the joke and its explanation will be saved to an output file. This way, each morning when I log in, I can quickly access the day's joke and its explanation, ready to include in the team email.

To interact with LLM, we need one LLM service provider and secret keys to access it. In our case, Azure-Foundry is used for deploying the LLMs. The secrets to access the foundry models should be stored in .env file.

.env
1
2
3
4
5
$ cat .env
AZURE_OPENAI_API_KEY='XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
AZURE_OPENAI_ENDPOINT='https://ai-XXXXXXXXXXXXXXXXXXXXXXXXXX.openai.azure.com/'
AZURE_OPENAI_API_VERSION='2024-12-01-preview'
$

These environmental variables will be consumed by the program using the load_dontenv() library function. Then we have our variables loaded with from these env variables.

The save_state and load_state functions are to store and load the processed jokes. Every time this program runs, it will take the already processed jokes in an object and skip them from processing again.

ask_ai.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
### State Management ###

def save_state(state: Dict[str, Any]) -> None:
    STATE_FILE.write_text(json.dumps(state, indent=2), encoding="utf-8")


def load_state() -> Dict[str, Any]:
    if STATE_FILE.exists():
        try:
            return json.loads(STATE_FILE.read_text(encoding="utf-8"))
        except Exception:
            logger.exception("Failed to load state file, starting fresh")
    # default state
    return {"processed": {}, "last_sent": {}}

### State Management ###

You can ignore the signal handlers. These are to gracefully handle Ctrl+C. The main_loop lists all the files from the producer's output dir and calls process_joke_file for each file.

ask_ai.py: main_loop
118
119
120
121
122
        txt_files = sorted(glob.glob(str(OUTPUT_DIR / "*.txt")))
        for f in txt_files:
            if shutdown_requested:
                break
            process_joke_file(Path(f), state)

The process_joke_file constructs the message and calls chat_completion.

ask_ai.py: process_joke_file
 98
 99
100
101
102
        message = {
            "role": "user",
            "content": f"Explain me the joke: '{joke.replace('\n', ' ')}' in a short paragraph.",
        }
        explanation = chat_completion([message])["choices"][0]["message"]["content"]
ask_ai.py: chat_completion
70
71
72
73
74
75
76
77
78
79
80
81
82
83
    url = f"{AZURE_ENDPOINT}/openai/deployments/{AZURE_DEPLOYMENT}/chat/completions?api-version={API_VERSION}"
    headers = {
        "Content-Type": "application/json",
        "api-key": AZURE_KEY,
    }
    payload: Dict[str, Any] = {
        "messages": messages,
        "temperature": temperature,
        "max_tokens": max_tokens,
    }
    if tools:
        payload["tools"] = tools
        payload["tool_choice"] = "auto"
    resp = requests.post(url, headers=headers, json=payload, timeout=90)

As shown above, the chat_completion function builds an HTTP request to the Azure OpenAI service. The response is saved in the state file, making it easy to review each morning.

ask_ai.py: process_joke_file
108
109
110
    # Mark processed
    state.setdefault("processed", {})[file_id] = {"agent": "001", "joke": joke, "processed_at": datetime.datetime.utcnow().isoformat(), "explanation": explanation}
    save_state(state)

Each day, I review the processed jokes and forward them to the relevant teams, ensuring they are culturally appropriate before sharing. In the upcoming blogs, we'll automate these tasks as well.

Conclusion

Series

  1. Agent-001 Part-1
  2. Agent-001 Part-2
  3. Agent-001 Part-3

In this first part, we've defined the problem, explored a practical use case, and implemented the foundational scripts to fetch and process programming jokes using an LLM. By automating the initial steps, we've set the stage for building a more capable and autonomous agent. In the next part, we'll focus on enhancing our agent's decision-making abilities—filtering jokes for appropriateness, routing them to the right teams, and handling more complex workflows. Stay tuned as we continue to evolve Agent-001 into a smarter, more autonomous assistant!

Complete code can be found at code/agent-001 directory of this blog's GitRepo

Complete Code

producer.py

producer.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
import json
import time
import datetime
import logging
import signal
import threading
import urllib.request
import urllib.error
from pathlib import Path

DEFAULT_URL = "https://official-joke-api.appspot.com/jokes/programming/random"
DEFAULT_OUTPUT_DIR = "/tmp/agent-001/"
DEFAULT_INTERVAL_SECONDS = 1 * 60  # 1 minute


def ensure_output_dir(path: str) -> Path:
    """Ensure the output directory exists and return a Path object."""
    p = Path(path)
    p.mkdir(parents=True, exist_ok=True)
    return p


def fetch_text(url: str, timeout: int = 10) -> str:
    """Fetch text content from a URL and return it as a string.

    Uses the standard library so there are no extra dependencies.
   """
    try:
        with urllib.request.urlopen(url, timeout=timeout) as resp:
            raw = resp.read()
            try:
                return raw.decode("utf-8")
            except Exception:
                return raw.decode(errors="replace")
    except urllib.error.URLError as e:
        raise RuntimeError(f"Failed to fetch {url}: {e}")


def run_loop(url: str = DEFAULT_URL, output_dir: str = DEFAULT_OUTPUT_DIR, interval_seconds: int = DEFAULT_INTERVAL_SECONDS, stop_event: threading.Event = None) -> None:
    """Run the fetch->write loop until stop_event is set.

    The loop is responsive to shutdown requests via the provided stop_event.
   """
    if stop_event is None:
        stop_event = threading.Event()

    out_dir = ensure_output_dir(output_dir)
    logging.info("Starting producer loop: url=%s interval=%s output=%s", url, interval_seconds, out_dir)

    while not stop_event.is_set():
        start = time.time()
        try:
            json_text = json.loads(fetch_text(url))[0]
            target_file = out_dir / f"{json_text['id']}.txt"
            with target_file.open("w", encoding="utf-8") as fh:
                fh.write(json_text["setup"] + "\n" + json_text["punchline"] + "\n")
            logging.info("Wrote file %s", target_file)
        except Exception as exc:
            logging.exception("Error during fetch/write: %s", exc)

        # Wait for next cycle but be responsive to stop_event
        elapsed = time.time() - start
        wait_for = max(0, interval_seconds - elapsed)
        logging.debug("Sleeping for %.1f seconds", wait_for)
        # wait returns True if the event is set while waiting
        stop_event.wait(wait_for)

    logging.info("Producer loop exiting cleanly.")


def _setup_signal_handlers(stop_event: threading.Event) -> None:
    """Attach SIGINT and SIGTERM handlers to set the stop_event for graceful shutdown."""

    def _handler(signum, frame):
        logging.info("Received signal %s, shutting down...", signum)
        stop_event.set()

    signal.signal(signal.SIGINT, _handler)
    signal.signal(signal.SIGTERM, _handler)


def main() -> None:
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s %(levelname)s %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )

    stop_event = threading.Event()
    _setup_signal_handlers(stop_event)

    try:
        run_loop(stop_event=stop_event)
    except Exception:
        logging.exception("Unexpected error in main loop")
    finally:
        logging.info("Shutdown complete")


if __name__ == "__main__":
    main()

ask_ai.py

ask_ai.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import os
import sys
import json
import time
import logging
import datetime
import glob
import signal
from pathlib import Path
from typing import Dict, Any

from dotenv import load_dotenv
load_dotenv()

import requests

OUTPUT_DIR = Path("/tmp/agent-001/")
STATE_FILE = OUTPUT_DIR / "state.json"

# Azure OpenAI settings - must be provided as environment variables
AZURE_ENDPOINT = os.environ.get("AZURE_OPENAI_ENDPOINT")
AZURE_KEY = os.environ.get("AZURE_OPENAI_API_KEY")
AZURE_DEPLOYMENT = os.environ.get("AZURE_OPENAI_DEPLOYMENT", "gpt-4.1")
API_VERSION = os.environ.get("AZURE_OPENAI_API_VERSION", "2024-12-01-preview")

# Ensure directories exist
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("agent")

shutdown_requested = False

### State Management ###

def save_state(state: Dict[str, Any]) -> None:
    STATE_FILE.write_text(json.dumps(state, indent=2), encoding="utf-8")


def load_state() -> Dict[str, Any]:
    if STATE_FILE.exists():
        try:
            return json.loads(STATE_FILE.read_text(encoding="utf-8"))
        except Exception:
            logger.exception("Failed to load state file, starting fresh")
    # default state
    return {"processed": {}, "last_sent": {}}

### State Management ###


def _signal_handler(signum, frame):
    global shutdown_requested
    logger.info("Signal %s received, will shut down gracefully", signum)
    shutdown_requested = True


signal.signal(signal.SIGINT, _signal_handler)
signal.signal(signal.SIGTERM, _signal_handler)


def chat_completion(messages, tools=None, temperature=0.0, max_tokens=800) -> Dict[str, Any]:
    """Call Azure OpenAI chat completion returning the full JSON, supporting tool (function) calls."""
    # Random jitter 3-5s to reduce rate spikes
    time.sleep(3 + (2 * os.urandom(1)[0] / 255.0))

    if not AZURE_ENDPOINT or not AZURE_KEY:
        raise RuntimeError("Azure OpenAI credentials (AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_KEY) not set")

    url = f"{AZURE_ENDPOINT}/openai/deployments/{AZURE_DEPLOYMENT}/chat/completions?api-version={API_VERSION}"
    headers = {
        "Content-Type": "application/json",
        "api-key": AZURE_KEY,
    }
    payload: Dict[str, Any] = {
        "messages": messages,
        "temperature": temperature,
        "max_tokens": max_tokens,
    }
    if tools:
        payload["tools"] = tools
        payload["tool_choice"] = "auto"
    resp = requests.post(url, headers=headers, json=payload, timeout=90)
    resp.raise_for_status()
    return resp.json()


def process_joke_file(path: Path, state: Dict[str, Any]) -> None:
    logger.info("Processing joke file: %s", path)
    joke = path.read_text(encoding="utf-8").strip()
    file_id = path.name

    if file_id in state.get("processed", {}):
        logger.info("Already processed %s, skipping", file_id)
        return

    try:
        message = {
            "role": "user",
            "content": f"Explain me the joke: '{joke.replace('\n', ' ')}' in a short paragraph.",
        }
        explanation = chat_completion([message])["choices"][0]["message"]["content"]
    except Exception:
        logger.exception("LLM tool-driven processing failed for %s", file_id)
        sys.exit(1)
        # result = {"safe": False, "category": "Other", "explanation": "LLM error"}

    # Mark processed
    state.setdefault("processed", {})[file_id] = {"agent": "001", "joke": joke, "processed_at": datetime.datetime.utcnow().isoformat(), "explanation": explanation}
    save_state(state)


def main_loop(poll_interval: int = 60):
    state = load_state()
    logger.info("Agent started, watching %s", OUTPUT_DIR)

    while not shutdown_requested:
        txt_files = sorted(glob.glob(str(OUTPUT_DIR / "*.txt")))
        for f in txt_files:
            if shutdown_requested:
                break
            process_joke_file(Path(f), state)
        # Sleep and be responsive to shutdown
        for _ in range(int(poll_interval)):
            if shutdown_requested:
                break
            time.sleep(1)

    logger.info("Agent shutting down")


if __name__ == "__main__":
    main_loop()