Hackster is hosting Hackster Holidays, Ep. 7: Livestream & Giveaway Drawing. Watch previous episodes or stream live on Friday!Stream Hackster Holidays, Ep. 7 on Friday!
Ludwig Kian Soon Hoon
Created July 17, 2024

AMD local llm assistant

This project uses AMD's cutting-edge AI hardware chips to bring a powerful assistant that respects your privacy to your personal device.

11
AMD local llm assistant

Things used in this project

Hardware components

Venus UM790 Pro
×1
Android device
Android device
×1

Software apps and online services

AMD Ryzen AI software

Story

Read more

Schematics

Software architecture

Code

main.py

Python
import base64
from io import BytesIO
import quantize_and_load as ql
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from threading import Thread
from transformers import TextIteratorStreamer, AutoFeatureExtractor
import numpy as np
from pydantic import BaseModel
from typing import Union
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI(port = 8000)
# Configure CORS
origins = [
    "*"
    # Add more origins if needed
]

app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"]
)


class ChatRequest(BaseModel):
    prompt: Union[str, list]
    max_new_tokens: Union[int, None] = 256
    do_sample: Union[bool, None] = True
    temperature: Union[float, None] = 0.7
    top_p: Union[float, None] = 0.9

model, tokenizer = ql.run_model()

terminators = [
    tokenizer.eos_token_id,
    tokenizer.convert_tokens_to_ids("<|eot_id|>")
]

def generate_response(formatted_prompt: str, max_new_tokens: int = 1000, do_sample: bool = True, temperature: float = 0.7, top_p: float = 0.9):
    streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
    encodeds = tokenizer(
        formatted_prompt,
        return_tensors='pt'
    ).input_ids

    def generate_and_signal_complete(encodeds, max_new_tokens, do_sample,
        temperature=0.7, top_p=0.9):
        model.generate(encodeds, streamer=streamer, max_new_tokens=max_new_tokens, do_sample=do_sample,
            eos_token_id=terminators, temperature=temperature, top_p=top_p)

    t1 = Thread(target=generate_and_signal_complete, kwargs=dict(encodeds=encodeds, max_new_tokens=max_new_tokens, do_sample=do_sample,
        temperature=temperature, top_p=top_p))

    t1.start()
    partial_text = ""
    lstrip_once = False
    for new_text in streamer:
        partial_text += new_text
        if not lstrip_once:
            partial_text = partial_text.lstrip()
            lstrip_once = True
        print(partial_text)
        yield partial_text

@app.post("/chat/")
async def chat(c_request: ChatRequest):
    formatted_prompt = c_request.prompt
    max_new_tokens = c_request.max_new_tokens
    do_sample = c_request.do_sample
    temperature = c_request.temperature
    top_p = c_request.top_p
    if isinstance(formatted_prompt, list):
        formatted_prompt = tokenizer.apply_chat_template(
            formatted_prompt,
            add_generation_prompt=True,
            tokenize=False)
        formatted_prompt += "<|start_header_id|>assistant<|end_header_id|>"
    print(formatted_prompt)
    return StreamingResponse(
        generate_response(formatted_prompt, max_new_tokens, do_sample, temperature=temperature, top_p=top_p), media_type="text/plain")

quantize_and_load.py

Python
Quantizing and loading llm in one go
from transformers import AutoModelForCausalLM, AutoTokenizer
import os
import torch


from pre_quant import run_awq, apply_awq
from quantizer import real_quantize_model_weight
from qmodule import WQLinear
import qlinear

import sys
from utils import Utils
import gc

model_name = "meta-llama/Meta-Llama-3-8B-Instruct" # "meta-llama/Llama-2-7b-chat-hf" #
AWQ_CACHE = os.environ.get("AWQ_CACHE")
AWQ_FILE = "llama3-instruct-8b-w4-g128.pt" # "llama-2-7b-chat-w4-g128.pt" # 
ckpt_folder = "ckpt/"
ckpt = ckpt_folder + "llama3-instruct-8b-w4-g128.pt" # "llama-2-7b-chat-w4-g128.pt" # 

torch.set_num_threads(8)
def quantize_model():
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForCausalLM.from_pretrained(model_name)
    print(model)
    q_config = {
        "zero_point": True,
        "q_group_size": 128,  }
    awq_result = torch.load(AWQ_CACHE + AWQ_FILE, map_location='cpu')
    apply_awq(model, awq_result)
    real_quantize_model_weight(
        model, w_bit=4, q_config=q_config
    )
    Utils.replace_node( model, 
        WQLinear, 
        qlinear.QLinearPerGrp, 
        (), {'device':'cpu', 'w_bit':4, 'group_size':128} )
    ## Matmul group <- skip for now
    ## Not sure if this causes error: RuntimeError: The size of tensor a (4096) must match
    ## the size of tensor b (524288) at non-singleton dimension 1
    Utils.replace_node( model, 
        torch.nn.Linear, 
        qlinear.QLinearPerGrp, 
        (), {'device':'cpu', 'w_bit':4, 'group_size':32} )
    print(model)
    gc.collect()
    torch.save(model, ckpt)

def run_model():
    if not os.path.exists(ckpt):
        quantize_model()
    model = torch.load(ckpt)
    _ = gc.collect()
    model.eval()
    model = model.to(torch.bfloat16)
    print(model)
    tokenizer = AutoTokenizer.from_pretrained(model_name)

    for n, m in model.named_modules():
        if isinstance(m, qlinear.QLinearPerGrp):
            print(f"Preparing weights of layer : {n}")
            m.device = "aie"
            m.quantize_weights()

    return(model, tokenizer)

app.py

Python
Front end app
import loguru
from chromadb.utils import embedding_functions
import sys
import chromadb
import os
import tiktoken
import gradio as gr
import requests
import tiktoken


client = chromadb.PersistentClient(path=PERSISTANT_PATH)
encoding = tiktoken.encoding_for_model("gpt-4")

def add_chunk_text_to_db_with_meta(text: str, meta: dict) -> bool:
    """Add chunked text to the database with meta data
    @parameter: text : str - Text to be chunked
    @parameter: meta : dict - Meta data to be stored with the text
    """
    chunks  = chunk_text(text)
    if (meta["doc_id"] is None) or (meta["doc_id"] == ""):
        logger.error("Document ID is missing")
        return(False)

    collection = client.get_or_create_collection(name="KAI", metadata={"hnsw:space": "cosine"}, embedding_function=embedding_functions.SentenceTransformerEmbeddingFunction(model_name="multi-qa-MiniLM-L6-cos-v1"))
    collection.upsert(
        documents = chunks,
        metadatas = [meta for _ in range(len(chunks))],
        ids = [f"{meta['doc_id']}_{i}" for i in range(len(chunks))]
    )
    return(True)


def chunk_text(text: str, units: int = 256, overlap: int = 50) -> list[str]:
    """Chunk text into smaller pieces based on units and overlap
    @parameter: text : str - Text to be chunked
    @parameter: units : int - How many units per chunk (token)
    @parameter: overlap : int - How much overlap between the chunks
    @returns list[str] - List of chunks
    """
    encoding = tiktoken.encoding_for_model("gpt-4")
    encoded_tokens = encoding.encode(text, disallowed_special=())

    chunks = []
    chunks_embedding = []

    if units > len(encoded_tokens) or units < 1:    
        return([text])

    if overlap >= units:
        logger.error(
            f"Overlap value is greater than unit (Units {units}/ Overlap {overlap})"
        )
        return(None)

    i = 0
    while i < len(encoded_tokens):
        # Overlap
        start_i = i
        end_i = min(i + units, len(encoded_tokens))

        chunk_tokens = encoded_tokens[start_i:end_i]
        chunk_text = encoding.decode(chunk_tokens)
                        
        chunks.append(chunk_text)
        #chunks_embedding.append(chunk_embedding)
        i += units - overlap
    #return(chunks, chunks_embedding)
    return(chunks)
    


def query_db_for_context(question, n_results=5, max_distance = 0.5): #using cosine, score range from 0 - 1 (higher score = less semantically similar)
    collection = client.get_or_create_collection(name="KAI", metadata={"hnsw:space": "cosine"}, embedding_function=embedding_functions.SentenceTransformerEmbeddingFunction(model_name="multi-qa-MiniLM-L6-cos-v1"))
    context = collection.query(
        query_texts=question,
        n_results=n_results,
        include=["documents", "distances", "metadatas"]
    )
    #Filtering documents with a score less than min_score
    filtered_pos = [i for i in range(len(context["distances"][0]))
                    if context["distances"][0][i] <= max_distance]
    print(context)
    docs = [context["documents"][0][i] for i in filtered_pos]
    context_text = "\n\n".join(docs)
    context_subject = [context["metadatas"][0][i]["subject"] for i in filtered_pos]
    context_links = [context["metadatas"][0][i]["thunderlink"] for i in filtered_pos]
    print(f"Context text: {context_text}")
    print(f"\nContext subject and Link: {context_subject}\n\n {context_links}\n\n")
    return context_text, context_subject, context_links


def chat_history_to_prompt(chat_history):
    system_prompt = {"role": "system", "content": "You're a chatbot designed to run on AMD AI processor"}
    if chat_history == []:
        messages_with_system_prompt = [system_prompt]
    else:
        # First one is always system prompt
        messages_with_system_prompt = sum([
            [{"role": "user", "content": exchange[0]}, {"role": "assistant", "content": exchange[1]}] for exchange in chat_history],
             [system_prompt])
    return messages_with_system_prompt

def respond(message, chat_history, query_db):
    print(chat_history)
    print("\n=======Chat History END=======\n")

    # Append chat history into new prompt for LLM as reference
    formatted_prompt = chat_history_to_prompt(chat_history)
    if query_db:
        #Query similar context found in ChromaDB (Emails and calender events)
        print("====> Querying DB :D")
        context, context_subjects, context_links = query_db_for_context(message)
    else:
        context = ""
    messages = formatted_prompt + [{"role": "user", "content": message}]
    if context != "":
        #send context to LLM for emails/calender info related response
        messages.append({"role": "context", "content": context})
    
    print(messages)
    partialText = ""
    contextURL =""
    if len(context_subjects) >0:
        contextURL = f"\n External Reference:"
        for i in range(len(context_subjects)):
            if context_links[i] == "":
                contextURL += f"\n\t- {context_subjects[i]}"
            else:
                contextURL += f"\n\t- <a href=\"{context_links[i]}\" target=\"_blank\">{context_subjects[i]}</a>"
    #Acquire output token size dynamically based on input length
    encoded_tokens = encoding.encode(message, disallowed_special=())
    total_token_limit = 500
    # Calculate the remaining tokens available for the response
    input_length = len(encoded_tokens)
    max_output_tokens = total_token_limit - input_length
    max_output_tokens = min(max_output_tokens, total_token_limit) 
    print(f"Output Token size: {max_output_tokens}, input size: {input_length}")
    
    #sending POST request to backend to get LLAMA3 model responses
    with requests.post(url, json = {"prompt": messages, "max_new_tokens":max_output_tokens}, stream=True) as r:
        for chunk in r.iter_content(1024):
            partialText = chunk.decode('utf-8')
            yield partialText + contextURL
            
            
def store_db_data(is_thunderbird= True):
    if(is_thunderbird):
        print("Storing Emails and Events data into DB...")
        email_objs = get_emails_obj()
        for email in email_objs:
            add_chunk_text_to_db_with_meta(email.get_content(), email.get_meta())
        event_objs = get_calander_obj()
        for event in event_objs:
            add_chunk_text_to_db_with_meta(event.get_content(), event.get_meta())
        print("Today Emails and Events are stored into DB!")
    else:
        print("Future Implementation for other specific email/calender serviceses...")

store_db_data()

demo = gr.ChatInterface(respond,
                        chatbot=gr.Chatbot(sanitize_html=False),
    additional_inputs = [
        gr.Checkbox(label="Query vector DB", value=True)]).queue()

demo.launch(inline=True, server_port=7860)

Credits

Ludwig Kian Soon Hoon
1 project • 0 followers

Comments