Hardware components | ||||||
| × | 1 | ||||
| × | 1 | ||||
Software apps and online services | ||||||
|
"The 'AMD Local LLM Assistant' is an innovative project that aims to harness the power of AMD Ryzen AI software to run a Large Language Model (LLM) assistant locally on users' devices. By utilizing AMD's cutting-edge AI hardware chips, we can bring advanced language processing capabilities directly to the user's fingertips, ensuring lightning-fast performance and unparalleled privacy.
This prototype was built using llama3-8b (but user can swap it with any other models that have already been quantised using the AWQ library, change in quantize_and_load.py), focuses on enhancing daily productivity by processing emails and calendar events. It generates a concise daily summary, providing users with a quick overview of their day, along with personalized to-dos, ensuring they stay organized and on track.
One of the key strengths of the project is the emphasis on user privacy. Unlike traditional cloud-based assistants, our LLM assistant does not send any data to external services or servers. Instead, it synchronizes with popular email clients like Thunderbird (see app.py), ensuring that all user information remains securely on their devices. This approach not only protects user privacy but also enables offline functionality, making our assistant accessible anytime, anywhere.
import base64
from io import BytesIO
import quantize_and_load as ql
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from threading import Thread
from transformers import TextIteratorStreamer, AutoFeatureExtractor
import numpy as np
from pydantic import BaseModel
from typing import Union
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI(port = 8000)
# Configure CORS
origins = [
"*"
# Add more origins if needed
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"]
)
class ChatRequest(BaseModel):
prompt: Union[str, list]
max_new_tokens: Union[int, None] = 256
do_sample: Union[bool, None] = True
temperature: Union[float, None] = 0.7
top_p: Union[float, None] = 0.9
model, tokenizer = ql.run_model()
terminators = [
tokenizer.eos_token_id,
tokenizer.convert_tokens_to_ids("<|eot_id|>")
]
def generate_response(formatted_prompt: str, max_new_tokens: int = 1000, do_sample: bool = True, temperature: float = 0.7, top_p: float = 0.9):
streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
encodeds = tokenizer(
formatted_prompt,
return_tensors='pt'
).input_ids
def generate_and_signal_complete(encodeds, max_new_tokens, do_sample,
temperature=0.7, top_p=0.9):
model.generate(encodeds, streamer=streamer, max_new_tokens=max_new_tokens, do_sample=do_sample,
eos_token_id=terminators, temperature=temperature, top_p=top_p)
t1 = Thread(target=generate_and_signal_complete, kwargs=dict(encodeds=encodeds, max_new_tokens=max_new_tokens, do_sample=do_sample,
temperature=temperature, top_p=top_p))
t1.start()
partial_text = ""
lstrip_once = False
for new_text in streamer:
partial_text += new_text
if not lstrip_once:
partial_text = partial_text.lstrip()
lstrip_once = True
print(partial_text)
yield partial_text
@app.post("/chat/")
async def chat(c_request: ChatRequest):
formatted_prompt = c_request.prompt
max_new_tokens = c_request.max_new_tokens
do_sample = c_request.do_sample
temperature = c_request.temperature
top_p = c_request.top_p
if isinstance(formatted_prompt, list):
formatted_prompt = tokenizer.apply_chat_template(
formatted_prompt,
add_generation_prompt=True,
tokenize=False)
formatted_prompt += "<|start_header_id|>assistant<|end_header_id|>"
print(formatted_prompt)
return StreamingResponse(
generate_response(formatted_prompt, max_new_tokens, do_sample, temperature=temperature, top_p=top_p), media_type="text/plain")
from transformers import AutoModelForCausalLM, AutoTokenizer
import os
import torch
from pre_quant import run_awq, apply_awq
from quantizer import real_quantize_model_weight
from qmodule import WQLinear
import qlinear
import sys
from utils import Utils
import gc
model_name = "meta-llama/Meta-Llama-3-8B-Instruct" # "meta-llama/Llama-2-7b-chat-hf" #
AWQ_CACHE = os.environ.get("AWQ_CACHE")
AWQ_FILE = "llama3-instruct-8b-w4-g128.pt" # "llama-2-7b-chat-w4-g128.pt" #
ckpt_folder = "ckpt/"
ckpt = ckpt_folder + "llama3-instruct-8b-w4-g128.pt" # "llama-2-7b-chat-w4-g128.pt" #
torch.set_num_threads(8)
def quantize_model():
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)
print(model)
q_config = {
"zero_point": True,
"q_group_size": 128, }
awq_result = torch.load(AWQ_CACHE + AWQ_FILE, map_location='cpu')
apply_awq(model, awq_result)
real_quantize_model_weight(
model, w_bit=4, q_config=q_config
)
Utils.replace_node( model,
WQLinear,
qlinear.QLinearPerGrp,
(), {'device':'cpu', 'w_bit':4, 'group_size':128} )
## Matmul group <- skip for now
## Not sure if this causes error: RuntimeError: The size of tensor a (4096) must match
## the size of tensor b (524288) at non-singleton dimension 1
Utils.replace_node( model,
torch.nn.Linear,
qlinear.QLinearPerGrp,
(), {'device':'cpu', 'w_bit':4, 'group_size':32} )
print(model)
gc.collect()
torch.save(model, ckpt)
def run_model():
if not os.path.exists(ckpt):
quantize_model()
model = torch.load(ckpt)
_ = gc.collect()
model.eval()
model = model.to(torch.bfloat16)
print(model)
tokenizer = AutoTokenizer.from_pretrained(model_name)
for n, m in model.named_modules():
if isinstance(m, qlinear.QLinearPerGrp):
print(f"Preparing weights of layer : {n}")
m.device = "aie"
m.quantize_weights()
return(model, tokenizer)
import loguru
from chromadb.utils import embedding_functions
import sys
import chromadb
import os
import tiktoken
import gradio as gr
import requests
import tiktoken
client = chromadb.PersistentClient(path=PERSISTANT_PATH)
encoding = tiktoken.encoding_for_model("gpt-4")
def add_chunk_text_to_db_with_meta(text: str, meta: dict) -> bool:
"""Add chunked text to the database with meta data
@parameter: text : str - Text to be chunked
@parameter: meta : dict - Meta data to be stored with the text
"""
chunks = chunk_text(text)
if (meta["doc_id"] is None) or (meta["doc_id"] == ""):
logger.error("Document ID is missing")
return(False)
collection = client.get_or_create_collection(name="KAI", metadata={"hnsw:space": "cosine"}, embedding_function=embedding_functions.SentenceTransformerEmbeddingFunction(model_name="multi-qa-MiniLM-L6-cos-v1"))
collection.upsert(
documents = chunks,
metadatas = [meta for _ in range(len(chunks))],
ids = [f"{meta['doc_id']}_{i}" for i in range(len(chunks))]
)
return(True)
def chunk_text(text: str, units: int = 256, overlap: int = 50) -> list[str]:
"""Chunk text into smaller pieces based on units and overlap
@parameter: text : str - Text to be chunked
@parameter: units : int - How many units per chunk (token)
@parameter: overlap : int - How much overlap between the chunks
@returns list[str] - List of chunks
"""
encoding = tiktoken.encoding_for_model("gpt-4")
encoded_tokens = encoding.encode(text, disallowed_special=())
chunks = []
chunks_embedding = []
if units > len(encoded_tokens) or units < 1:
return([text])
if overlap >= units:
logger.error(
f"Overlap value is greater than unit (Units {units}/ Overlap {overlap})"
)
return(None)
i = 0
while i < len(encoded_tokens):
# Overlap
start_i = i
end_i = min(i + units, len(encoded_tokens))
chunk_tokens = encoded_tokens[start_i:end_i]
chunk_text = encoding.decode(chunk_tokens)
chunks.append(chunk_text)
#chunks_embedding.append(chunk_embedding)
i += units - overlap
#return(chunks, chunks_embedding)
return(chunks)
def query_db_for_context(question, n_results=5, max_distance = 0.5): #using cosine, score range from 0 - 1 (higher score = less semantically similar)
collection = client.get_or_create_collection(name="KAI", metadata={"hnsw:space": "cosine"}, embedding_function=embedding_functions.SentenceTransformerEmbeddingFunction(model_name="multi-qa-MiniLM-L6-cos-v1"))
context = collection.query(
query_texts=question,
n_results=n_results,
include=["documents", "distances", "metadatas"]
)
#Filtering documents with a score less than min_score
filtered_pos = [i for i in range(len(context["distances"][0]))
if context["distances"][0][i] <= max_distance]
print(context)
docs = [context["documents"][0][i] for i in filtered_pos]
context_text = "\n\n".join(docs)
context_subject = [context["metadatas"][0][i]["subject"] for i in filtered_pos]
context_links = [context["metadatas"][0][i]["thunderlink"] for i in filtered_pos]
print(f"Context text: {context_text}")
print(f"\nContext subject and Link: {context_subject}\n\n {context_links}\n\n")
return context_text, context_subject, context_links
def chat_history_to_prompt(chat_history):
system_prompt = {"role": "system", "content": "You're a chatbot designed to run on AMD AI processor"}
if chat_history == []:
messages_with_system_prompt = [system_prompt]
else:
# First one is always system prompt
messages_with_system_prompt = sum([
[{"role": "user", "content": exchange[0]}, {"role": "assistant", "content": exchange[1]}] for exchange in chat_history],
[system_prompt])
return messages_with_system_prompt
def respond(message, chat_history, query_db):
print(chat_history)
print("\n=======Chat History END=======\n")
# Append chat history into new prompt for LLM as reference
formatted_prompt = chat_history_to_prompt(chat_history)
if query_db:
#Query similar context found in ChromaDB (Emails and calender events)
print("====> Querying DB :D")
context, context_subjects, context_links = query_db_for_context(message)
else:
context = ""
messages = formatted_prompt + [{"role": "user", "content": message}]
if context != "":
#send context to LLM for emails/calender info related response
messages.append({"role": "context", "content": context})
print(messages)
partialText = ""
contextURL =""
if len(context_subjects) >0:
contextURL = f"\n External Reference:"
for i in range(len(context_subjects)):
if context_links[i] == "":
contextURL += f"\n\t- {context_subjects[i]}"
else:
contextURL += f"\n\t- <a href=\"{context_links[i]}\" target=\"_blank\">{context_subjects[i]}</a>"
#Acquire output token size dynamically based on input length
encoded_tokens = encoding.encode(message, disallowed_special=())
total_token_limit = 500
# Calculate the remaining tokens available for the response
input_length = len(encoded_tokens)
max_output_tokens = total_token_limit - input_length
max_output_tokens = min(max_output_tokens, total_token_limit)
print(f"Output Token size: {max_output_tokens}, input size: {input_length}")
#sending POST request to backend to get LLAMA3 model responses
with requests.post(url, json = {"prompt": messages, "max_new_tokens":max_output_tokens}, stream=True) as r:
for chunk in r.iter_content(1024):
partialText = chunk.decode('utf-8')
yield partialText + contextURL
def store_db_data(is_thunderbird= True):
if(is_thunderbird):
print("Storing Emails and Events data into DB...")
email_objs = get_emails_obj()
for email in email_objs:
add_chunk_text_to_db_with_meta(email.get_content(), email.get_meta())
event_objs = get_calander_obj()
for event in event_objs:
add_chunk_text_to_db_with_meta(event.get_content(), event.get_meta())
print("Today Emails and Events are stored into DB!")
else:
print("Future Implementation for other specific email/calender serviceses...")
store_db_data()
demo = gr.ChatInterface(respond,
chatbot=gr.Chatbot(sanitize_html=False),
additional_inputs = [
gr.Checkbox(label="Query vector DB", value=True)]).queue()
demo.launch(inline=True, server_port=7860)
Comments