DeepSeek-R1 Python SDK: Building Async Multi-Step Workflows


- Premium Results
- Publish articles on SitePoint
- Daily curated jobs
- Learning Paths
- Discounts to dev tools
7 Day Free Trial. Cancel Anytime.
Synchronous API calls to large language models create a fundamental bottleneck in multi-step AI pipelines. This article walks through building a complete async multi-step research workflow using DeepSeek-R1's chain-of-thought reasoning model and Python's native asyncio primitives.
How to Build Async Multi-Step Workflows with the DeepSeek-R1 Python SDK
- Configure the
AsyncOpenAIclient with your DeepSeek API key, base URL, and timeout settings. - Implement a resilient query wrapper with exponential backoff and jitter for retries on rate limits and timeouts.
- Decompose a complex topic into sub-questions by prompting DeepSeek-R1 to return structured JSON output.
- Bound concurrency with an
asyncio.Semaphorematched to your API rate limits. - Fan out parallel research calls using
asyncio.gatherwithreturn_exceptions=Truefor graceful degradation. - Synthesize collected results into a final report via a streaming DeepSeek-R1 call.
- Instrument each workflow phase with timing and structured logging for observability and debugging.
Table of Contents
- Why Async Matters for AI Workflows
- Understanding DeepSeek-R1 and the Python SDK
- Core Async Patterns with the DeepSeek-R1 SDK
- Designing Async Multi-Step Workflows
- Putting It All Together: The Complete Workflow
- Best Practices and Production Considerations
- Implementation Checklist
- What Comes Next
Why Async Matters for AI Workflows
Synchronous API calls to large language models create a fundamental bottleneck in multi-step AI pipelines. When a Python application sends a request to DeepSeek-R1 and blocks while waiting for the response, every subsequent task in the chain sits idle. For workflows that require multiple reasoning steps (decomposing a problem, researching sub-questions, synthesizing results), that blocking behavior costs you N sequential calls multiplied by per-call latency in total idle time. Async overlaps those calls. Building async multi-step workflows with the DeepSeek-R1 Python SDK eliminates this waste by running independent tasks concurrently.
DeepSeek-R1 brings a distinctive combination of capabilities to this problem. It exposes chain-of-thought reasoning with transparent reasoning traces, maintains compatibility with the OpenAI API specification (meaning developers can use the familiar openai Python SDK pointed at a different base URL), and offers competitive pricing per DeepSeek's published pricing. These properties make it well suited for orchestrated pipelines where multiple reasoning calls must run efficiently.
This article walks through building a complete async multi-step research workflow: given a topic, the system uses DeepSeek-R1 to generate sub-questions, researches each sub-question in parallel, and synthesizes a final report from the collected results. The implementation uses Python's native asyncio primitives alongside the OpenAI-compatible SDK.
Prerequisites: Python 3.10 or later, basic familiarity with asyncio concepts (coroutines, event loops, await), and an active DeepSeek API key.
Understanding DeepSeek-R1 and the Python SDK
What Is DeepSeek-R1?
DeepSeek-R1 is a chain-of-thought reasoning model that produces transparent reasoning traces alongside its final answers. Rather than emitting only a finished response, R1 exposes the intermediate thinking steps it follows to arrive at a conclusion. This transparency lets you log and diff intermediate reasoning across runs, which matters when you chain multiple reasoning steps and need to debug or validate intermediate outputs.
The model's key differentiator beyond reasoning transparency is cost efficiency. Check DeepSeek's pricing page for current per-million-token rates. This makes it practical to build workflows involving many sequential and parallel API calls without cost becoming prohibitive. Critically, the DeepSeek API implements the OpenAI API specification, which means developers use the standard openai Python package rather than a proprietary SDK. The only configuration difference is pointing the client at DeepSeek's API endpoint instead of OpenAI's.
Setting Up Your Environment
The setup requires two third-party packages: openai for the API client and python-dotenv for environment variable management. asyncio is part of Python's standard library and requires no installation.
The API key should never be hardcoded.
# requirements.txt
openai>=1.0.0,<2.0.0 # Pin to your tested version for reproducibility
python-dotenv>=1.0.0,<2.0.0
httpx>=0.24.0,<1.0.0
# .env
DEEPSEEK_API_KEY=your_api_key_here
Add .env to your .gitignore file to prevent accidental commit of your API key:
echo '.env' >> .gitignore
# config.py
import os
import httpx
from dotenv import load_dotenv
from openai import AsyncOpenAI
load_dotenv()
if not os.getenv("DEEPSEEK_API_KEY"):
raise EnvironmentError("DEEPSEEK_API_KEY is not set. Add it to your .env file.")
MODEL = "deepseek-reasoner" # Single definition; import this constant everywhere
client = AsyncOpenAI(
api_key=os.getenv("DEEPSEEK_API_KEY"),
base_url="https://api.deepseek.com", # Verify this path at https://api-docs.deepseek.com
timeout=httpx.Timeout(30.0, connect=5.0), # 30 s total, 5 s connect
)
The AsyncOpenAI client is the async counterpart to the synchronous OpenAI client in the same package. Setting base_url to DeepSeek's endpoint redirects all API calls to DeepSeek's infrastructure while preserving the full OpenAI-compatible interface. The timeout parameter ensures that API calls do not hang indefinitely — a 30-second total timeout with a 5-second connect timeout provides a reasonable default for reasoning model responses. Verify the correct base URL (including whether a /v1 suffix is required) against the DeepSeek API documentation, as this may vary. Every subsequent code example in this article depends on this client configuration.
Core Async Patterns with the DeepSeek-R1 SDK
Making Your First Async API Call
The fundamental pattern is straightforward: define an async function, await the completion call, and extract the response content. DeepSeek-R1 responses can include both reasoning content (the chain-of-thought trace) and the final answer.
# Requires config.py from "Setting Up Your Environment" section
import asyncio
from config import client, MODEL
async def basic_query(prompt: str) -> dict:
response = await client.chat.completions.create(
model=MODEL,
messages=[
{"role": "user", "content": prompt}
],
max_tokens=1024
)
message = response.choices[0].message
return {
"reasoning": getattr(message, "reasoning_content", None),
"answer": message.content
}
async def main():
result = await basic_query("What are the three laws of thermodynamics?")
print("=== Reasoning Trace ===")
print(result["reasoning"])
print("
=== Final Answer ===")
print(result["answer"])
if __name__ == "__main__":
asyncio.run(main())
The reasoning_content attribute on the message object contains the model's intermediate thinking steps, accessed here via getattr with a default of None to guard against the field being absent depending on model or SDK version. The content attribute holds the final synthesized answer. This separation is central to DeepSeek-R1's design and becomes important when building multi-step workflows where reasoning traces from one step inform prompts for the next.
Handling Streaming Responses
For long reasoning chains, streaming delivers tokens as they are generated rather than waiting for the complete response. This matters both for user experience (showing progress in real time) and for practical timeout management with lengthy chain-of-thought outputs.
# Requires config.py from "Setting Up Your Environment" section
from config import client, MODEL
async def streaming_query(prompt: str) -> None:
stream = await client.chat.completions.create(
model=MODEL,
messages=[
{"role": "user", "content": prompt}
],
stream=True,
max_tokens=2048
)
print("=== Reasoning ===")
answer_started = False
try:
async for chunk in stream:
if not chunk.choices:
continue
delta = chunk.choices[0].delta
if hasattr(delta, "reasoning_content") and delta.reasoning_content:
print(delta.reasoning_content, end="", flush=True)
elif delta.content:
if not answer_started:
print("
=== Answer ===")
answer_started = True
print(delta.content, end="", flush=True)
finally:
await stream.close()
if __name__ == "__main__":
asyncio.run(streaming_query("Explain quantum entanglement step by step."))
Streamed chunks arrive with either reasoning_content or content populated on the delta object (per observed DeepSeek API behavior; verify in official docs). The code above uses this distinction to print reasoning tokens and answer tokens in separate labeled sections, tracking state with a local answer_started variable. The stream is wrapped in a try/finally block to ensure the underlying HTTP connection is always released, even if an exception occurs mid-iteration. A guard on chunk.choices being non-empty prevents IndexError on keep-alive or usage-only chunks that some API implementations send.
Error Handling and Retry Logic
API calls to any external service can fail due to rate limits, network timeouts, or malformed responses. A reusable retry wrapper with exponential backoff and jitter handles the most common failure modes without manual intervention at each call site.
# Requires config.py from "Setting Up Your Environment" section
import asyncio
import random
import logging
from openai import APIError, APITimeoutError, RateLimitError
from config import client, MODEL
logger = logging.getLogger(__name__)
async def resilient_query(
messages: list,
max_retries: int = 3,
base_delay: float = 1.0,
max_tokens: int = 1024,
) -> dict:
last_exc: Exception | None = None
for attempt in range(max_retries):
try:
response = await client.chat.completions.create(
model=MODEL,
messages=messages,
max_tokens=max_tokens
)
message = response.choices[0].message
return {
"reasoning": getattr(message, "reasoning_content", None),
"answer": message.content
}
except (RateLimitError, APITimeoutError) as e:
last_exc = e
delay = base_delay * (2 ** attempt) * (1 + random.random())
logger.warning(
"Retriable error %s (attempt %d/%d). Retrying in %.1fs.",
type(e).__name__, attempt + 1, max_retries, delay,
)
await asyncio.sleep(delay)
except APIError as e:
logger.error("Non-retriable API error (status %s).", getattr(e, "status_code", "unknown"))
raise
raise RuntimeError(f"Failed after {max_retries} attempts") from last_exc
The exponential backoff doubles the wait time on each retry, and random jitter (multiplying by 1 + random.random()) ensures that concurrent coroutines hitting the same rate limit do not all retry at the exact same instant, avoiding a thundering herd effect. The wrapper retries RateLimitError and APITimeoutError but does not retry other APIError subtypes, since they typically indicate a problem with the request itself. from last_exc preserves the original exception so that stack traces remain diagnosable. This is a simplified heuristic — review DeepSeek's error codes to determine which additional status codes may warrant retry in your use case. This function replaces the direct client.chat.completions.create() call throughout the workflow.
The exponential backoff doubles the wait time on each retry, and random jitter ensures that concurrent coroutines hitting the same rate limit do not all retry at the exact same instant, avoiding a thundering herd effect.
Designing Async Multi-Step Workflows
Workflow Architecture Overview
A multi-step AI workflow combines three structural patterns: sequential reasoning chains (where the output of one step feeds the next), parallel fan-out (where you run independent tasks concurrently), and aggregation (where you merge parallel results into a single downstream step). The mental model is a directed acyclic graph (DAG): nodes are API calls, edges are data dependencies.
The rule for choosing sequential versus parallel is straightforward. If step B depends on the output of step A, they must run sequentially. If steps B and C are independent of each other but both depend on A, run them in parallel after A completes. The research workflow in this article follows an A → [B₁ ‖ B₂ ‖ B₃] → C pattern (where ‖ denotes concurrency): decompose a topic (A), research sub-questions in parallel (B₁ through B₃), and synthesize results (C).
Step 1: Decomposing a Complex Task
The first step uses DeepSeek-R1 to break a broad research topic into discrete sub-questions. The prompt explicitly requests JSON output to enable reliable parsing downstream.
# Requires resilient_query from "Error Handling and Retry Logic" section
import json
import re
async def decompose_task(topic: str) -> list[str]:
messages = [
{
"role": "system",
"content": (
"You are a research planner. Given a topic, break it into "
"3 to 5 focused sub-questions. Respond with ONLY a JSON array "
"of strings, no other text."
)
},
{"role": "user", "content": f"Research topic: {topic}"}
]
result = await resilient_query(messages)
answer = result["answer"].strip()
# Strip optional markdown code fences, including language tags (e.g. ```json)
answer = re.sub(r"^```[a-zA-Z]*
?", "", answer)
answer = re.sub(r"
?```$", "", answer)
answer = answer.strip()
try:
sub_questions = json.loads(answer)
except json.JSONDecodeError as e:
raise ValueError(f"Model returned non-JSON output: {answer!r}") from e
if (
not isinstance(sub_questions, list)
or len(sub_questions) < 2
or not all(isinstance(q, str) for q in sub_questions)
):
raise ValueError(f"Invalid decomposition result: {sub_questions!r}")
return sub_questions
The code strips markdown code fences using a regex that handles both plain ``` fences and fences with a language tag like ```json, because models frequently wrap JSON output in triple backticks even when instructed not to. The json.JSONDecodeError handler catches malformed model output and raises a descriptive error rather than an opaque traceback. The validation check ensures the decomposition produced a usable list of strings before the pipeline continues. Catching a malformed decomposition here prevents wasted API calls in the subsequent parallel steps.
Step 2: Parallel Execution with asyncio.gather
With sub-questions in hand, the workflow fans out into concurrent API calls. An asyncio.Semaphore bounds the concurrency to avoid exceeding rate limits.
# Requires resilient_query from "Error Handling and Retry Logic" section
async def research_question(question: str, semaphore: asyncio.Semaphore) -> dict:
async with semaphore:
messages = [
{
"role": "system",
"content": (
"You are a thorough researcher. Provide a detailed, "
"factual answer to the given question."
)
},
{"role": "user", "content": question}
]
result = await resilient_query(messages, max_tokens=1024)
return {"question": question, "answer": result["answer"]}
async def research_parallel(sub_questions: list[str], max_concurrent: int = 3) -> list[dict]:
semaphore = asyncio.Semaphore(max_concurrent)
tasks = [research_question(q, semaphore) for q in sub_questions]
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = []
for r in results:
if isinstance(r, Exception):
logger.warning("Research task failed: %s", r)
else:
successful.append(r)
return successful
The semaphore value of 3 is a conservative default; check your rate limits in the DeepSeek API dashboard or documentation before adjusting this value. return_exceptions=True prevents one failed task from canceling the entire gather, and the post-processing loop separates successful results from exceptions. This graceful degradation means the workflow can still synthesize a partial report if one sub-question fails.
This graceful degradation means the workflow can still synthesize a partial report if one sub-question fails.
Step 3: Synthesizing Results
The final step feeds all collected research back into DeepSeek-R1 for synthesis, using streaming to provide immediate feedback.
# Requires config.py from "Setting Up Your Environment" section
from config import client, MODEL
async def synthesize(topic: str, research_results: list[dict]) -> None:
compiled_research = "
".join(
f"### {r['question']}
{r['answer']}" for r in research_results
)
messages = [
{
"role": "system",
"content": (
"You are a research synthesizer. Combine the provided research "
"into a coherent, well-structured report. Cite which sub-questions "
"informed each section."
)
},
{
"role": "user",
"content": (
f"Topic: {topic}
"
f"Research findings:
{compiled_research}
"
"Write a comprehensive synthesis report."
)
}
]
stream = await client.chat.completions.create(
model=MODEL,
messages=messages,
stream=True,
max_tokens=2048
)
print("
=== Synthesized Report ===
")
try:
async for chunk in stream:
if not chunk.choices:
continue
delta = chunk.choices[0].delta
if delta.content:
print(delta.content, end="", flush=True)
finally:
await stream.close()
print()
The synthesis prompt includes the full text of all research answers, formatted with their corresponding sub-questions as headers. This gives the model enough context to produce a coherent report that references specific sub-findings. Streaming the output here provides immediate visible progress on what is typically the longest single generation in the pipeline. The stream is wrapped in try/finally to ensure the HTTP connection is released even if an error occurs mid-stream, and empty choices chunks are skipped to avoid IndexError on keep-alive messages.
Putting It All Together: The Complete Workflow
The Orchestrator Function
The orchestrator wires the three phases together with timing instrumentation at each stage.
import asyncio
import time
import os
import json
import re
import random
import logging
import httpx
from dotenv import load_dotenv
from openai import AsyncOpenAI, APIError, APITimeoutError, RateLimitError
load_dotenv()
if not os.getenv("DEEPSEEK_API_KEY"):
raise EnvironmentError("DEEPSEEK_API_KEY is not set. Add it to your .env file.")
logger = logging.getLogger(__name__)
MODEL = "deepseek-reasoner"
client = AsyncOpenAI(
api_key=os.getenv("DEEPSEEK_API_KEY"),
base_url="https://api.deepseek.com", # Verify this path at https://api-docs.deepseek.com
timeout=httpx.Timeout(30.0, connect=5.0),
)
async def resilient_query(
messages: list,
max_retries: int = 3,
base_delay: float = 1.0,
max_tokens: int = 1024,
) -> dict:
last_exc: Exception | None = None
for attempt in range(max_retries):
try:
response = await client.chat.completions.create(
model=MODEL,
messages=messages,
max_tokens=max_tokens
)
msg = response.choices[0].message
return {
"reasoning": getattr(msg, "reasoning_content", None),
"answer": msg.content
}
except (RateLimitError, APITimeoutError) as e:
last_exc = e
delay = base_delay * (2 ** attempt) * (1 + random.random())
logger.warning(
"Retriable error %s (attempt %d/%d). Retrying in %.1fs.",
type(e).__name__, attempt + 1, max_retries, delay,
)
await asyncio.sleep(delay)
except APIError as e:
logger.error("Non-retriable API error (status %s).", getattr(e, "status_code", "unknown"))
raise
raise RuntimeError(f"Failed after {max_retries} attempts") from last_exc
async def decompose_task(topic: str) -> list[str]:
messages = [
{"role": "system", "content": "Break this topic into 3-5 sub-questions. Respond ONLY with a JSON array of strings."},
{"role": "user", "content": f"Research topic: {topic}"}
]
result = await resilient_query(messages)
answer = result["answer"].strip()
answer = re.sub(r"^```[a-zA-Z]*
?", "", answer)
answer = re.sub(r"
?```$", "", answer)
answer = answer.strip()
try:
sub_questions = json.loads(answer)
except json.JSONDecodeError as e:
raise ValueError(f"Model returned non-JSON output: {answer!r}") from e
if (
not isinstance(sub_questions, list)
or len(sub_questions) < 2
or not all(isinstance(q, str) for q in sub_questions)
):
raise ValueError(f"Invalid decomposition result: {sub_questions!r}")
return sub_questions
async def research_question(question: str, semaphore: asyncio.Semaphore) -> dict:
async with semaphore:
messages = [
{"role": "system", "content": "Provide a detailed, factual answer."},
{"role": "user", "content": question}
]
result = await resilient_query(messages, max_tokens=1024)
return {"question": question, "answer": result["answer"]}
async def research_parallel(sub_questions: list[str], max_concurrent: int = 3) -> list[dict]:
semaphore = asyncio.Semaphore(max_concurrent)
tasks = [research_question(q, semaphore) for q in sub_questions]
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = []
for r in results:
if isinstance(r, Exception):
logger.warning("Research task failed: %s", r)
else:
successful.append(r)
return successful
async def synthesize(topic: str, research_results: list[dict]) -> None:
compiled = "
".join(f"### {r['question']}
{r['answer']}" for r in research_results)
messages = [
{"role": "system", "content": "Synthesize research into a coherent report."},
{"role": "user", "content": f"Topic: {topic}
Findings:
{compiled}
Write a report."}
]
stream = await client.chat.completions.create(
model=MODEL,
messages=messages,
stream=True,
max_tokens=2048
)
print("
=== Synthesized Report ===
")
try:
async for chunk in stream:
if not chunk.choices:
continue
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
finally:
await stream.close()
print()
async def main():
topic = "The impact of quantum computing on modern cryptography"
# Step 1: Decompose
t0 = time.perf_counter()
sub_questions = await decompose_task(topic)
t1 = time.perf_counter()
print(f"Decomposition ({t1 - t0:.1f}s): {sub_questions}
")
# Step 2: Parallel research
results = await research_parallel(sub_questions)
t2 = time.perf_counter()
print(f"
Parallel research ({t2 - t1:.1f}s): {len(results)} of {len(sub_questions)} completed
")
# Step 3: Synthesize
await synthesize(topic, results)
t3 = time.perf_counter()
print(f"
Synthesis ({t3 - t2:.1f}s)")
print(f"Total workflow time: {t3 - t0:.1f}s")
if __name__ == "__main__":
asyncio.run(main())
Running and Testing the Workflow
Run the workflow with asyncio.run(main()), which creates the event loop and drives the coroutine to completion. A typical run with 5 sub-questions produces timing output showing the decomposition step taking anywhere from 3 to 8 seconds depending on prompt length, the parallel research phase completing in roughly the time of two sequential API calls (since the semaphore bounds concurrency to 3, requiring two batches for 5 tasks), and the synthesis step running last.
The performance gap between sequential and parallel is substantial. If each research call takes approximately 8 to 12 seconds, running 5 sequentially costs 40 to 60 seconds. Running them in parallel with a concurrency limit of 3 means two sequential batches, bringing the research phase to roughly 16 to 24 seconds — approximately a 2 to 2.5x speedup. The exact improvement depends on individual response latencies and the concurrency ceiling.
Best Practices and Production Considerations
Prompt Engineering for Multi-Step Chains
Each step should receive only the information it needs, not the full history of every prior step. The decomposition step needs only the topic. Research steps need only their individual sub-question. The synthesis step needs the collected answers but not the reasoning traces from earlier steps. This keeps context windows tight and costs predictable.
Specify the exact output format in your system prompt for any step where downstream code must parse the result (like the decomposition step returning JSON). Temperature settings also matter: values between 0.0 and 0.3 generally suit structured output and factual research steps. For synthesis, values in the 0.5 to 0.7 range can encourage more connective prose, but consult the DeepSeek API documentation for supported temperature ranges for deepseek-reasoner, as reasoning models may have different or fixed temperature constraints compared to standard chat models.
Cost and Token Management
Multi-step workflows multiply token consumption: each step incurs input and output tokens, and the synthesis step's input includes all prior outputs. Estimating total token usage before running a workflow prevents cost surprises. Setting max_tokens per step constrains runaway generation; in the code above, research answers are capped at 1024 tokens while the synthesis step gets 2048. Always set max_tokens in production to prevent unbounded generation and unexpected costs.
If the same sub-questions recur across workflow runs, cache the results. A simple dictionary cache keyed by the prompt hash avoids redundant API calls. For production systems, a Redis-backed cache with TTL expiration is more appropriate.
Scaling Beyond a Single Script
An asyncio.run() script will not hold up under production load. Task queues like Celery or Arq can distribute workflow steps across workers. Persistent state storage (a database or Redis) between steps enables recovery from partial failures without re-running the entire pipeline. Structured logging with correlation IDs per workflow run lets you trace individual requests through the multi-step chain, which is essential for debugging failures in production.
DeepSeek-R1's OpenAI-compatible API and transparent reasoning traces, combined with Python's async capabilities, enable cost-effective AI pipelines that scale through concurrency rather than raw speed.
Implementation Checklist
- Python 3.10+ installed
- DeepSeek API key obtained and stored in
.env(and.envadded to.gitignore) openai,python-dotenv, andhttpxpackages installed (do not installasyncio— it is part of the standard library)AsyncOpenAIclient configured with DeepSeek base URL and timeout (verify at API docs)- Basic async call tested and returning responses
- Streaming responses implemented and rendering correctly
- Retry logic with exponential backoff and jitter in place
- Task decomposition prompt returning valid JSON (with
JSONDecodeErrorhandling and element type validation) asyncio.Semaphoreconfigured for rate-limit compliance- Parallel research tested with
asyncio.gather - Synthesis step producing coherent final output
- End-to-end workflow timed and benchmarked
- Token usage monitored and
max_tokensset per step (as shown inresilient_queryandsynthesize)
What Comes Next
This article built an async multi-step research workflow using DeepSeek-R1's chain-of-thought reasoning model and Python's native asyncio primitives. The pipeline decomposes a complex topic into sub-questions, researches them in parallel with bounded concurrency, and synthesizes results through a streaming final call.
The core takeaway is that DeepSeek-R1's OpenAI-compatible API and transparent reasoning traces, combined with Python's async capabilities, enable cost-effective AI pipelines that scale through concurrency rather than raw speed. The most natural next step is integrating tool use and function calling so the model can fetch live data during research steps. Beyond that, you can build agent-based architectures where the model decides its own workflow branching, or add structured evaluation steps that validate intermediate results before the pipeline proceeds. The DeepSeek API documentation at https://api-docs.deepseek.com covers the full specification, including model variants and parameter details.
Mark HarbottleMark Harbottle is the co-founder of SitePoint, 99designs, and Flippa.