Автоматты аударма пайдаланылды

LangGraph, LangChain және Ollama көмегімен AI SQL көмекшісін құру

Қазіргі деректер әлемінде пайдаланушылар дерекқорлармен жылдам және интуитивті жұмыс істеуді қажет етеді. Дегенмен, барлығы SQL-ге ие емес. Сондықтан біз ai көмекшісін құрдық:

  • табиғи тілде сұрақтар қабылдайды,
  • оларды түсіндіреді,
  • дұрыс SQL сұрауларын жазады,
  • оларды орындайды,
  • және түсінікті жауап қалыптастырады.

💡 Бұл шешім LangChain, LangGraph және Ollama-ға негізделген және PostgreSQL немесе ClickHouse-пен жұмыс істейді.

  • LLM: Qwen 3(Ollama арқылы, жергілікті).
  • LangChain: SQL генерациясы, сұраныстарды орындау.
  • LangGraph: өңдеу қадамдарын басқару.
  • PostgreSQL: деректер көзі.
  • Python: желім коды және логика.
pip install langchain langgraph langchain-community langchain-ollama sqlalchemy
from langchain_community.utilities import SQLDatabase
from langchain_ollama import ChatOllama
import re
from typing_extensions import TypedDict, Annotated
from langchain_core.prompts import ChatPromptTemplate
from langchain_community.tools.sql_database.tool import QuerySQLDatabaseTool
from langgraph.graph import START, StateGraph
import difflib
from sqlalchemy import text
from typing import Optional, List

🔐 Дерекқорға қосылу

db = SQLDatabase.from_uri (#URI ішінде сақтауды ұмытпаңыз .env файлында!

    "postgresql://<username>:<password>@<host>:<port>/<dbname>",

    include_tables=["projects"],

)

🧠 LLM-ге қосылу (Ollama арқылы)

llm = ChatOllama(
    model="qwen3:32b",
    base_url="http://<ollama-host>:11434",
    num_ctx=8192,
    temperature=0.1,
)
pythonкөшіруөңдеуclass State(TypedDict):
    question: str
    query: str
    result: str
    answer: str
    columns_to_search: Optional[List[str]]  

KNOWN_NAMES: set[str] = set()
with db._engine.begin() as conn:
    for col in ("primary_executive", "product_managers", "collaborators"):
        result = conn.execute(text(f"SELECT DISTINCT {col} FROM projects WHERE {col} IS NOT NULL"))
        for row in result:
            if row[0]:
                names = str(row[0]).strip().split(', ')
                KNOWN_NAMES.update(name.strip() for name in names if name)
system_template = """
Given an input question, create a syntactically correct {dialect} query to run to help find the answer. \
Unless the user specifies a specific number of examples they wish to obtain, always limit your query to at most {top_k} results.

When searching for names, use the following columns: {columns_to_search}. \
Apply LIKE '%name fragment%' separately for each column, and combine multiple conditions using OR.

Never query for all the columns from a table, only select the few relevant columns needed for answering the question.

Pay attention to use only the column names you see in the schema description.

When searching by department or division (stored in the 'direction' column), use LIKE '%fragment%'. \
Interpret common abbreviations .
Only use the following tables:
{table_info}
"""
human_template = "Question: {input}"

custom_query_prompt = ChatPromptTemplate.from_messages([
    ("system", system_template),
    ("human", human_template),
])
def resolve_name(state: State):
    question = state["question"]
    tokens = re.findall (r " [a-Za-za-Yaa-yaeieikoüuik] {4,}", question)
    corrected = {}

    for tok in tokens:
        if any(tok.lower() == name.lower() for name in KNOWN_NAMES):
            continue
        match = difflib.get_close_matches(tok, KNOWN_NAMES, n=1, cutoff=0.75)
        if match:
            corrected[tok] = match[0]

    if corrected:
        new_q = question
        for wrong, right in corrected.items():
            new_q = re.sub(rf"{wrong}", right, new_q, flags=re.IGNORECASE)
        return {"question": new_q}
    
    return {"question": question}
def analyze_question_for_role(state: State) -> dict:
    question = state["question"].lower()
    participation_words = [
        "қатысады", "орындаушылар", "менеджерлер", "жұмыс істеді"," пайда болды","қызметкерлер"
    ]
    tokens = re.findall (r " [a-Za-za-Yaa-yaeieikoüuik] {4,}", question)
    name_matches = [tok for tok in tokens if any(tok.lower() in name.lower() for name in KNOWN_NAMES)]

    if not name_matches:
        return state

    multiple_names = len(name_matches) > 1
    has_keywords = any(word in question for word in participation_words)

    columns = ["primary_executive", "product_managers", "collaborators"] if (multiple_names or has_keywords) else ["primary_executive", "product_managers"]
    
    return {**state, "columns_to_search": columns}
class QueryOutput(TypedDict):
    query: Annotated[str, ..., "Syntactically valid SQL query."]

def write_query(state: State):
    prompt_args = {
        "dialect": db.dialect,
        "top_k": 10,
        "table_info": db.get_table_info(),
        "columns_to_search": ", ".join(state.get("columns_to_search", [])) if "columns_to_search" in state else "",
        "input": state["question"],
    }
    prompt = custom_query_prompt.invoke(prompt_args)
    result = llm.with_structured_output(QueryOutput).invoke(prompt)
    print("write_query")
    return {"query": result["query"]}
def execute_query(state: State):
    print("execute_query")
    return {"result": QuerySQLDatabaseTool(db=db).invoke(state["query"])}
async def generate_answer(state: State):
    prompt = (
        "Given the following user question, executed SQL query, and SQL result, "
        "answer the user's question based only on the SQL result.\n\n"
        "You must answer in Russian language.\n\n"
        f"User Question:\n{state['question']}\n\n"
        f"Executed SQL Query:\n{state['query']}\n\n"
        f"SQL Result:\n{state['result']}\n\n"
        "Now write a complete and understandable answer in Russian."
    )
    print("generate_answer")
    response = await llm.ainvoke(prompt)
    return {"answer": response.content}
graph_builder = StateGraph(State)

graph_builder.add_node("resolve_name", resolve_name)
graph_builder.add_node("analyze_question_for_role", analyze_question_for_role)
graph_builder.add_node("write_query", write_query)
graph_builder.add_node("execute_query", execute_query)
graph_builder.add_node("generate_answer", generate_answer)

graph_builder.add_edge(START, "resolve_name")
graph_builder.add_edge("resolve_name", "analyze_question_for_role")
graph_builder.add_edge("analyze_question_for_role", "write_query")
graph_builder.add_edge("write_query", "execute_query")
graph_builder.add_edge("execute_query", "generate_answer")

graph = graph_builder.compile()
import asyncio

async def ask_stream(question: str):
    async for event in graph.astream_events({"question": question}, version="v2"):
        kind = event["event"]
        if kind == "on_chat_model_stream":
            print(event["data"]["chunk"].content, end="", flush=True)
        elif kind == "on_tool_end":
            output = event["data"].get("output")
            if output:
                print(f"\n[Tool Output]: {output}\n")
    басып шығару("\n\n✅ жауап аяқталды.")

LangChain және LangGraph көмегімен сіз:

  • теңшелетін LLM көмекшілерін жылдам әзірлеу,
  • SQL деректеріне қол жеткізуді автоматтандыру,
  • деректерді бұлтқа жібермей жеке LLM пайдаланыңыз.

Бұл жүйе корпоративтік BI экожүйесінде ақылды чатботты енгізудің негізі болып табылады.

Пікірлер 0

Кіру пікір қалдыру үшін