You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
306 lines
12 KiB
306 lines
12 KiB
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
|
|
import time
|
|
import torch
|
|
import random
|
|
from tool_helper import tool_list, parse_and_execute_tool_call
|
|
from tool_functions import register_dummy
|
|
import utils
|
|
import re
|
|
|
|
t_start = time.time()
|
|
|
|
# model_name = "NousResearch/Llama-2-7b-hf" # will cache on C:\Users\ftobler\.cache\huggingface\hub
|
|
model_name = "NousResearch/Hermes-3-Llama-3.2-3B" # will cache on C:\Users\ftobler\.cache\huggingface\hub
|
|
# model_name = "NousResearch/Hermes-2-Pro-Llama-3-8B"
|
|
# model_name = "Orenguteng/Llama-3.1-8B-Lexi-Uncensored-V2"
|
|
# "meta-llama/Llama-2-7b-hf" # Replace with your chosen model
|
|
|
|
|
|
quantization_config_4bit = BitsAndBytesConfig( # tool calls don't really work in 4 bit mode
|
|
load_in_4bit=True,
|
|
bnb_4bit_quant_type="nf4", # Recommended for better performance
|
|
bnb_4bit_use_double_quant=True, # Optional: Further quantization for more memory saving
|
|
bnb_4bit_compute_dtype=torch.bfloat16 # Use bfloat16 for computation
|
|
)
|
|
|
|
quantization_config_8bit = BitsAndBytesConfig(load_in_8bit=True)
|
|
|
|
# Load the model with quantization (optional)
|
|
model = AutoModelForCausalLM.from_pretrained(
|
|
model_name,
|
|
# device_map="auto", # Automatically places parts of the model on GPU/CPU
|
|
# device_map="cuda", # Automatically places parts of the model on GPU/CPU
|
|
device_map="cuda", # Automatically places parts of the model on GPU/CPU
|
|
# load_in_8bit=True, # Enables 8-bit quantization if bitsandbytes is installed
|
|
quantization_config=quantization_config_8bit
|
|
)
|
|
|
|
# Load tokenizer
|
|
tokenizer = AutoTokenizer.from_pretrained(model_name)
|
|
|
|
print("load took %.3fs" % (time.time() - t_start))
|
|
|
|
max_context_length = model.config.max_position_embeddings
|
|
|
|
|
|
tokenizer.chat_template = utils.load_json_file("chat_template.json")
|
|
|
|
|
|
|
|
|
|
print("max_context_length is %d tokens." % (max_context_length))
|
|
|
|
|
|
# Generate text
|
|
|
|
# schema = """
|
|
# {
|
|
# "properties": {
|
|
# "program": {
|
|
# "description": "Python program to be executed. The Message response to the input query is the output of this program",
|
|
# "title": "Program",
|
|
# "type": "string"
|
|
# },
|
|
|
|
# },
|
|
# "required": [
|
|
# "program"
|
|
# ],
|
|
# "title": "Response",
|
|
# "type": "object"
|
|
# }
|
|
# """
|
|
|
|
|
|
# """
|
|
# "confidence": {
|
|
# "description": "How sure you are the above message facts are true. Rate harsh from 0 to 1",
|
|
# "title": "Confidence",
|
|
# "type": "float"
|
|
# }
|
|
# """
|
|
|
|
# tool_assist = """
|
|
# You are a python assisted AI model. You may call the interpreter one or more times to assist with the user query. You might tell the user that you optained ground truth with the help of python or a calculator if asked about. The user is not able to see if python has been used, therefore do not expose and share failed attempts or syntax errors.
|
|
# To invoke a this function, the answer may start and end with <python_tool_call> and </python_tool_call> respectively. The rest must be a valid python script, additional text is not allowed before and after. Calling python is not needed when just providing example code.
|
|
# """
|
|
|
|
messages = [
|
|
# {"role": "system", "content": "Hold a casual conversation with the user. Keep responses short at max 3 sentences."},
|
|
# {"role": "system", "content": "Hold a casual conversation with the user. Keep responses short at max 3 sentences. It is %s now." % datetime.datetime.now().strftime("%Y-%m-%d %H:%M")},
|
|
# {"role": "system", "content": "Hold a casual conversation with the user. Keep responses short at max 3 sentences. " + tool_assist},
|
|
# {"role": "system", "content": "You are a helpful assistant that answers in JSON. Here's the json schema you must adhere to:\n<schema>\n%s\n</schema>" % schema},
|
|
# {"role": "system", "content": "You are a helpful assistant that answers by entering commands into a python interpreter. The user only sees the stdout of your python input."},
|
|
# {"role": "system", "content": "Make a summary of the below input prompt. Do not answer. The description should fit on 80 characters."},
|
|
# {"role": "user", "content": "Hello, who are you?"}
|
|
]
|
|
|
|
systemmessage = "Hold a casual conversation with the user. Keep responses short at max 3 sentences."
|
|
|
|
roleflip = {"role": "system", "content": "Keep the conversation going, ask for more information on the subject. Keep messages short at max 1-2 sentences. Do not thank and say goodbye."}
|
|
|
|
|
|
|
|
register_dummy()
|
|
# tool_functions = [current_time, random_float, random_int]
|
|
|
|
|
|
|
|
def generate_batch(inputs):
|
|
outputs = model.generate(
|
|
inputs["input_ids"], # **inputs,
|
|
max_new_tokens=500, # max_length=max_context_length,
|
|
pad_token_id=tokenizer.pad_token_id,
|
|
eos_token_id=tokenizer.eos_token_id,
|
|
do_sample=True,
|
|
num_return_sequences=1
|
|
)
|
|
# skip all input tokens and only output the additional generated part of the conversation
|
|
input_token_count = len(inputs["input_ids"][0])
|
|
out_text = tokenizer.decode(outputs[0][input_token_count:], skip_special_tokens=True)
|
|
print(out_text)
|
|
return outputs, out_text
|
|
|
|
|
|
|
|
def generate_incremental(inputs):
|
|
# Start with the initial input tokens
|
|
input_ids = inputs["input_ids"]
|
|
generated_tokens = input_ids # Initially, this is just the input tokens
|
|
|
|
n = 0
|
|
try:
|
|
|
|
# Loop to generate one token at a time
|
|
while True:
|
|
# Call the model with the current tokens
|
|
outputs = model(input_ids=generated_tokens, use_cache=True)
|
|
|
|
# Get the next token (the last token from the generated sequence)
|
|
next_token = outputs.logits.argmax(dim=-1)[:, -1]
|
|
|
|
# Append the new token to the sequence
|
|
generated_tokens = torch.cat([generated_tokens, next_token.unsqueeze(0)], dim=1)
|
|
|
|
# Decode and print the newly generated token (skip special tokens)
|
|
out_text = tokenizer.decode(next_token, skip_special_tokens=True)
|
|
print(out_text, end="", flush=True) # Print without newline
|
|
|
|
# Check if the generated token is the end-of-sequence token
|
|
if next_token.item() == tokenizer.eos_token_id:
|
|
print("")
|
|
break
|
|
|
|
n += 1
|
|
if n >= 15:
|
|
n = 0
|
|
torch.cuda.empty_cache()
|
|
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
|
|
# Once done, return the full generated sequence
|
|
input_token_count = len(inputs["input_ids"][0])
|
|
full_output = tokenizer.decode(generated_tokens[0][input_token_count:], skip_special_tokens=True)
|
|
|
|
torch.cuda.empty_cache()
|
|
|
|
return generated_tokens, full_output
|
|
|
|
|
|
def append_generate_chat(input_text: str, role="user"):
|
|
t_start = time.time()
|
|
|
|
# generate AI response
|
|
if input_text != None:
|
|
messages.append({"role": role, "content": input_text})
|
|
|
|
# input_text = "Hello, who are you?"
|
|
# inputs = tokenizer(input_text, return_tensors="pt").to("cpu") # .to("cuda") .to("cpu")
|
|
inputs = tokenizer.apply_chat_template(messages, return_tensors="pt", tokenize=True, return_dict=True, add_generation_prompt=True) #continue_final_message=True,
|
|
inputs = {key: value.to(model.device) for key, value in inputs.items()}
|
|
# inputs = {key: value.to("cpu") for key, value in inputs.items()}
|
|
# inputs["input_ids"] = inputs["input_ids"][:, 1:]
|
|
|
|
with torch.inference_mode():
|
|
outputs, out_text = generate_incremental(inputs)
|
|
|
|
# append result to message history
|
|
messages.append({"role": "assistant", "content": out_text})
|
|
|
|
print("")
|
|
print("generation took %.3fs (%d tokens)" % (time.time() - t_start, len(outputs[0])))
|
|
|
|
# handle tool call and check if a tool call has happened.
|
|
tool_result = parse_and_execute_tool_call(out_text, tool_list)
|
|
if tool_result != None:
|
|
# tool call happened
|
|
# tool_result = "<tool_response>%s</tool_response>" % tool_result
|
|
# depending on the chat template the tool response tags must or must not be passed. :(
|
|
append_generate_chat(tool_result, role="tool")
|
|
|
|
|
|
|
|
def generate_tool_use_header(tools: list[callable]) -> str:
|
|
temp_messages = [{}] # for some reason an empty array is not allowed but a {} inside works like an empty array.
|
|
s = tokenizer.apply_chat_template(temp_messages, return_tensors="pt", tokenize=False, add_generation_prompt=False, tools=tools)
|
|
pattern = r"<\|im_start\|>system\n(.*)<\|im_end\|>"
|
|
match = re.search(pattern, s, re.DOTALL)
|
|
if not match:
|
|
raise Exception("Failed to regex match the template tool system text.")
|
|
extraction = match.group(1)
|
|
return extraction
|
|
|
|
|
|
def main():
|
|
global messages
|
|
|
|
messages = [{"role": "system", "content": systemmessage + "\n" + generate_tool_use_header(tool_list)}]
|
|
|
|
while True:
|
|
# print an input prompt to receive text or commands
|
|
input_text = input(">>> ")
|
|
print("")
|
|
|
|
|
|
if input_text.startswith("!"):
|
|
# append_generate_chat("<tool_response>%s</tool_response>" % input_text[1:], role="tool")
|
|
append_generate_chat("%s" % input_text[1:], role="tool") # depending on the chat template the tool response tags must or must not be passed. :(
|
|
|
|
elif input_text.startswith("/clear"):
|
|
print("clearing chat history")
|
|
start_msg = messages[0]
|
|
messages = [start_msg]
|
|
print("")
|
|
|
|
elif input_text.startswith("/history"):
|
|
history = tokenizer.apply_chat_template(messages, return_tensors="pt", tokenize=False, add_generation_prompt=False)
|
|
print(history)
|
|
|
|
elif input_text.startswith("/undo"):
|
|
if len(messages) > 2:
|
|
print("undo latest prompt")
|
|
messages = messages[:-2]
|
|
else:
|
|
print("cannot undo because there are not enough messages on history.")
|
|
print("")
|
|
|
|
elif input_text.startswith("/regen"):
|
|
if len(messages) >= 2:
|
|
print("regenerating message (not working)")
|
|
messages = messages[:-1]
|
|
seed = random.randint(0, 2**32 - 1) # Generate a random seed
|
|
torch.manual_seed(seed)
|
|
torch.cuda.manual_seed_all(seed)
|
|
append_generate_chat(None)
|
|
else:
|
|
print("cannot regenerate because there are not enough messages on history.")
|
|
print("")
|
|
|
|
elif input_text.startswith("/more"):
|
|
append_generate_chat(None)
|
|
|
|
elif input_text.startswith("/file"):
|
|
filename = input_text[len("/file "):]
|
|
print("read '%s' for prompt:" % filename)
|
|
with open(filename, "r") as f:
|
|
content = f.read()
|
|
print(content)
|
|
append_generate_chat(content)
|
|
|
|
elif input_text.startswith("/auto"):
|
|
messages_backup = messages
|
|
messages = [roleflip]
|
|
for m in messages_backup:
|
|
role = m["role"]
|
|
content = m["content"]
|
|
if role == "user":
|
|
role = "assistant"
|
|
elif role == "assistant":
|
|
role = "user"
|
|
if role != "system":
|
|
messages.append({"role": role, "content": content})
|
|
append_generate_chat(None) # will automatically advance the conversation as 'user'
|
|
last_message = messages[-1]
|
|
last_message["role"] = "user"
|
|
messages = messages_backup + [last_message]
|
|
append_generate_chat(None) # 'regular' chatbot answer
|
|
|
|
elif input_text.startswith("/help"):
|
|
print("!<prompt> answer as 'tool' in <tool_response> tags")
|
|
print("/clear clear chat history")
|
|
print("/undo undo latest prompt")
|
|
print("/regen regenerate the last message")
|
|
print("/more generate more additional information")
|
|
print("/file read prompt input from file")
|
|
print("/auto automatically advance conversation")
|
|
print("/help print this message")
|
|
print("")
|
|
|
|
elif input_text.startswith("/"):
|
|
print("unknown command.")
|
|
|
|
else:
|
|
append_generate_chat(input_text)
|
|
|
|
|