
Бұл жазба автоматты түрде орыс тілінен аударылған. Russian
Қазіргі деректер әлемінде пайдаланушылар дерекқорлармен жылдам және интуитивті жұмыс істеуді қажет етеді. Дегенмен, барлығы SQL-ге ие емес. Сондықтан біз ai көмекшісін құрдық:
- табиғи тілде сұрақтар қабылдайды,
- оларды түсіндіреді,
- дұрыс SQL сұрауларын жазады,
- оларды орындайды,
- және түсінікті жауап қалыптастырады.
💡 Бұл шешім LangChain, LangGraph және Ollama-ға негізделген және PostgreSQL немесе ClickHouse-пен жұмыс істейді.
- LLM: Qwen 3(Ollama арқылы, жергілікті).
- LangChain: SQL генерациясы, сұраныстарды орындау.
- LangGraph: өңдеу қадамдарын басқару.
- PostgreSQL: деректер көзі.
- Python: желім коды және логика.
pip install langchain langgraph langchain-community langchain-ollama sqlalchemy
from langchain_community.utilities import SQLDatabase
from langchain_ollama import ChatOllama
import re
from typing_extensions import TypedDict, Annotated
from langchain_core.prompts import ChatPromptTemplate
from langchain_community.tools.sql_database.tool import QuerySQLDatabaseTool
from langgraph.graph import START, StateGraph
import difflib
from sqlalchemy import text
from typing import Optional, List
🔐 Дерекқорға қосылу
db = SQLDatabase.from_uri (#URI ішінде сақтауды ұмытпаңыз .env файлында!
"postgresql://<username>:<password>@<host>:<port>/<dbname>",
include_tables=["projects"],
)
🧠 LLM-ге қосылу (Ollama арқылы)
llm = ChatOllama(
model="qwen3:32b",
base_url="http://<ollama-host>:11434",
num_ctx=8192,
temperature=0.1,
)
pythonкөшіруөңдеуclass State(TypedDict):
question: str
query: str
result: str
answer: str
columns_to_search: Optional[List[str]]
KNOWN_NAMES: set[str] = set()
with db._engine.begin() as conn:
for col in ("primary_executive", "product_managers", "collaborators"):
result = conn.execute(text(f"SELECT DISTINCT {col} FROM projects WHERE {col} IS NOT NULL"))
for row in result:
if row[0]:
names = str(row[0]).strip().split(', ')
KNOWN_NAMES.update(name.strip() for name in names if name)
system_template = """
Given an input question, create a syntactically correct {dialect} query to run to help find the answer. \
Unless the user specifies a specific number of examples they wish to obtain, always limit your query to at most {top_k} results.
When searching for names, use the following columns: {columns_to_search}. \
Apply LIKE '%name fragment%' separately for each column, and combine multiple conditions using OR.
Never query for all the columns from a table, only select the few relevant columns needed for answering the question.
Pay attention to use only the column names you see in the schema description.
When searching by department or division (stored in the 'direction' column), use LIKE '%fragment%'. \
Interpret common abbreviations .
Only use the following tables:
{table_info}
"""
human_template = "Question: {input}"
custom_query_prompt = ChatPromptTemplate.from_messages([
("system", system_template),
("human", human_template),
])
def resolve_name(state: State):
question = state["question"]
tokens = re.findall (r " [a-Za-za-Yaa-yaeieikoüuik] {4,}", question)
corrected = {}
for tok in tokens:
if any(tok.lower() == name.lower() for name in KNOWN_NAMES):
continue
match = difflib.get_close_matches(tok, KNOWN_NAMES, n=1, cutoff=0.75)
if match:
corrected[tok] = match[0]
if corrected:
new_q = question
for wrong, right in corrected.items():
new_q = re.sub(rf"{wrong}", right, new_q, flags=re.IGNORECASE)
return {"question": new_q}
return {"question": question}
def analyze_question_for_role(state: State) -> dict:
question = state["question"].lower()
participation_words = [
"қатысады", "орындаушылар", "менеджерлер", "жұмыс істеді"," пайда болды","қызметкерлер"
]
tokens = re.findall (r " [a-Za-za-Yaa-yaeieikoüuik] {4,}", question)
name_matches = [tok for tok in tokens if any(tok.lower() in name.lower() for name in KNOWN_NAMES)]
if not name_matches:
return state
multiple_names = len(name_matches) > 1
has_keywords = any(word in question for word in participation_words)
columns = ["primary_executive", "product_managers", "collaborators"] if (multiple_names or has_keywords) else ["primary_executive", "product_managers"]
return {**state, "columns_to_search": columns}
class QueryOutput(TypedDict):
query: Annotated[str, ..., "Syntactically valid SQL query."]
def write_query(state: State):
prompt_args = {
"dialect": db.dialect,
"top_k": 10,
"table_info": db.get_table_info(),
"columns_to_search": ", ".join(state.get("columns_to_search", [])) if "columns_to_search" in state else "",
"input": state["question"],
}
prompt = custom_query_prompt.invoke(prompt_args)
result = llm.with_structured_output(QueryOutput).invoke(prompt)
print("write_query")
return {"query": result["query"]}
def execute_query(state: State):
print("execute_query")
return {"result": QuerySQLDatabaseTool(db=db).invoke(state["query"])}
async def generate_answer(state: State):
prompt = (
"Given the following user question, executed SQL query, and SQL result, "
"answer the user's question based only on the SQL result.\n\n"
"You must answer in Russian language.\n\n"
f"User Question:\n{state['question']}\n\n"
f"Executed SQL Query:\n{state['query']}\n\n"
f"SQL Result:\n{state['result']}\n\n"
"Now write a complete and understandable answer in Russian."
)
print("generate_answer")
response = await llm.ainvoke(prompt)
return {"answer": response.content}
graph_builder = StateGraph(State)
graph_builder.add_node("resolve_name", resolve_name)
graph_builder.add_node("analyze_question_for_role", analyze_question_for_role)
graph_builder.add_node("write_query", write_query)
graph_builder.add_node("execute_query", execute_query)
graph_builder.add_node("generate_answer", generate_answer)
graph_builder.add_edge(START, "resolve_name")
graph_builder.add_edge("resolve_name", "analyze_question_for_role")
graph_builder.add_edge("analyze_question_for_role", "write_query")
graph_builder.add_edge("write_query", "execute_query")
graph_builder.add_edge("execute_query", "generate_answer")
graph = graph_builder.compile()
import asyncio
async def ask_stream(question: str):
async for event in graph.astream_events({"question": question}, version="v2"):
kind = event["event"]
if kind == "on_chat_model_stream":
print(event["data"]["chunk"].content, end="", flush=True)
elif kind == "on_tool_end":
output = event["data"].get("output")
if output:
print(f"\n[Tool Output]: {output}\n")
басып шығару("\n\n✅ жауап аяқталды.")
LangChain және LangGraph көмегімен сіз:
- теңшелетін LLM көмекшілерін жылдам әзірлеу,
- SQL деректеріне қол жеткізуді автоматтандыру,
- деректерді бұлтқа жібермей жеке LLM пайдаланыңыз.
Бұл жүйе корпоративтік BI экожүйесінде ақылды чатботты енгізудің негізі болып табылады.
В современном мире данных пользователи всё чаще нуждаются в быстрой и интуитивной работе с базами данных. Однако далеко не каждый владеет SQL. Поэтому мы создали AI-помощника, который:
- принимает вопросы на естественном языке,
- интерпретирует их,
- пишет корректные SQL-запросы,
- исполняет их,
- и формирует понятный ответ.
💡 Это решение построено на LangChain, LangGraph и Ollama, и работает с PostgreSQL или ClickHouse.
- LLM: Qwen 3 (через Ollama, локально).
- LangChain: генерация SQL, выполнение запросов.
- LangGraph: управление шагами обработки.
- PostgreSQL: источник данных.
- Python: glue-код и логика.
pip install langchain langgraph langchain-community langchain-ollama sqlalchemy
from langchain_community.utilities import SQLDatabase
from langchain_ollama import ChatOllama
import re
from typing_extensions import TypedDict, Annotated
from langchain_core.prompts import ChatPromptTemplate
from langchain_community.tools.sql_database.tool import QuerySQLDatabaseTool
from langgraph.graph import START, StateGraph
import difflib
from sqlalchemy import text
from typing import Optional, List
🔐 Подключение к базе данных
db = SQLDatabase.from_uri( # Обязательно храните URI в .env файле!
"postgresql://<username>:<password>@<host>:<port>/<dbname>",
include_tables=["projects"],
)
🧠 Подключение к LLM (через Ollama)
llm = ChatOllama(
model="qwen3:32b",
base_url="http://<ollama-host>:11434",
num_ctx=8192,
temperature=0.1,
)
pythonКопироватьРедактироватьclass State(TypedDict):
question: str
query: str
result: str
answer: str
columns_to_search: Optional[List[str]]
KNOWN_NAMES: set[str] = set()
with db._engine.begin() as conn:
for col in ("primary_executive", "product_managers", "collaborators"):
result = conn.execute(text(f"SELECT DISTINCT {col} FROM projects WHERE {col} IS NOT NULL"))
for row in result:
if row[0]:
names = str(row[0]).strip().split(', ')
KNOWN_NAMES.update(name.strip() for name in names if name)
system_template = """
Given an input question, create a syntactically correct {dialect} query to run to help find the answer. \
Unless the user specifies a specific number of examples they wish to obtain, always limit your query to at most {top_k} results.
When searching for names, use the following columns: {columns_to_search}. \
Apply LIKE '%name fragment%' separately for each column, and combine multiple conditions using OR.
Never query for all the columns from a table, only select the few relevant columns needed for answering the question.
Pay attention to use only the column names you see in the schema description.
When searching by department or division (stored in the 'direction' column), use LIKE '%fragment%'. \
Interpret common abbreviations .
Only use the following tables:
{table_info}
"""
human_template = "Question: {input}"
custom_query_prompt = ChatPromptTemplate.from_messages([
("system", system_template),
("human", human_template),
])
def resolve_name(state: State):
question = state["question"]
tokens = re.findall(r"[A-Za-zА-Яа-яЁёІҚҢӨҮҰӘіқңөүұә]{4,}", question)
corrected = {}
for tok in tokens:
if any(tok.lower() == name.lower() for name in KNOWN_NAMES):
continue
match = difflib.get_close_matches(tok, KNOWN_NAMES, n=1, cutoff=0.75)
if match:
corrected[tok] = match[0]
if corrected:
new_q = question
for wrong, right in corrected.items():
new_q = re.sub(rf"{wrong}", right, new_q, flags=re.IGNORECASE)
return {"question": new_q}
return {"question": question}
def analyze_question_for_role(state: State) -> dict:
question = state["question"].lower()
participation_words = [
"участвует", "исполнители", "менеджеров", "работал", "фигурирует", "сотрудников"
]
tokens = re.findall(r"[A-Za-zА-Яа-яЁёІҚҢӨҮҰӘіқңөүұә]{4,}", question)
name_matches = [tok for tok in tokens if any(tok.lower() in name.lower() for name in KNOWN_NAMES)]
if not name_matches:
return state
multiple_names = len(name_matches) > 1
has_keywords = any(word in question for word in participation_words)
columns = ["primary_executive", "product_managers", "collaborators"] if (multiple_names or has_keywords) else ["primary_executive", "product_managers"]
return {**state, "columns_to_search": columns}
class QueryOutput(TypedDict):
query: Annotated[str, ..., "Syntactically valid SQL query."]
def write_query(state: State):
prompt_args = {
"dialect": db.dialect,
"top_k": 10,
"table_info": db.get_table_info(),
"columns_to_search": ", ".join(state.get("columns_to_search", [])) if "columns_to_search" in state else "",
"input": state["question"],
}
prompt = custom_query_prompt.invoke(prompt_args)
result = llm.with_structured_output(QueryOutput).invoke(prompt)
print("write_query")
return {"query": result["query"]}
def execute_query(state: State):
print("execute_query")
return {"result": QuerySQLDatabaseTool(db=db).invoke(state["query"])}
async def generate_answer(state: State):
prompt = (
"Given the following user question, executed SQL query, and SQL result, "
"answer the user's question based only on the SQL result.\n\n"
"You must answer in Russian language.\n\n"
f"User Question:\n{state['question']}\n\n"
f"Executed SQL Query:\n{state['query']}\n\n"
f"SQL Result:\n{state['result']}\n\n"
"Now write a complete and understandable answer in Russian."
)
print("generate_answer")
response = await llm.ainvoke(prompt)
return {"answer": response.content}
graph_builder = StateGraph(State)
graph_builder.add_node("resolve_name", resolve_name)
graph_builder.add_node("analyze_question_for_role", analyze_question_for_role)
graph_builder.add_node("write_query", write_query)
graph_builder.add_node("execute_query", execute_query)
graph_builder.add_node("generate_answer", generate_answer)
graph_builder.add_edge(START, "resolve_name")
graph_builder.add_edge("resolve_name", "analyze_question_for_role")
graph_builder.add_edge("analyze_question_for_role", "write_query")
graph_builder.add_edge("write_query", "execute_query")
graph_builder.add_edge("execute_query", "generate_answer")
graph = graph_builder.compile()
import asyncio
async def ask_stream(question: str):
async for event in graph.astream_events({"question": question}, version="v2"):
kind = event["event"]
if kind == "on_chat_model_stream":
print(event["data"]["chunk"].content, end="", flush=True)
elif kind == "on_tool_end":
output = event["data"].get("output")
if output:
print(f"\n[Tool Output]: {output}\n")
print("\n\n✅ Ответ завершён.")
С помощью LangChain и LangGraph вы можете:
- быстро разрабатывать кастомных LLM-ассистентов,
- автоматизировать SQL-доступ к данным,
- использовать приватные LLM без передачи данных в облако.
Эта система — основа для внедрения умного чат-бота в корпоративной BI-экосистеме.