from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
import time
import torch
import random
import datetime
import json
t_start = time.time()
# model_name = "NousResearch/Llama-2-7b-hf" # will cache on C:\Users\ftobler\.cache\huggingface\hub
model_name = "NousResearch/Hermes-3-Llama-3.2-3B" # will cache on C:\Users\ftobler\.cache\huggingface\hub
# model_name = "NousResearch/Hermes-2-Pro-Llama-3-8B"
# model_name = "Orenguteng/Llama-3.1-8B-Lexi-Uncensored-V2"
# "meta-llama/Llama-2-7b-hf" # Replace with your chosen model
quantization_config_4bit = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_quant_type="nf4", # Recommended for better performance
bnb_4bit_use_double_quant=True, # Optional: Further quantization for more memory saving
bnb_4bit_compute_dtype=torch.bfloat16 # Use bfloat16 for computation
)
quantization_config_8bit = BitsAndBytesConfig(load_in_8bit=True)
# Load the model with quantization (optional)
model = AutoModelForCausalLM.from_pretrained(
model_name,
# device_map="auto", # Automatically places parts of the model on GPU/CPU
# device_map="cuda", # Automatically places parts of the model on GPU/CPU
device_map="cuda", # Automatically places parts of the model on GPU/CPU
# load_in_8bit=True, # Enables 8-bit quantization if bitsandbytes is installed
quantization_config=quantization_config_4bit
)
# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_name)
print("load took %.3fs" % (time.time() - t_start))
max_context_length = model.config.max_position_embeddings
# if tokenizer.chat_template is None:
print("apply external chat template...")
with open("chat_template.json", "r") as f:
tokenizer.chat_template = json.load(f)
print("max_context_length is %d tokens." % (max_context_length))
# Generate text
schema = """
{
"properties": {
"program": {
"description": "Python program to be executed. The Message response to the input query is the output of this program",
"title": "Program",
"type": "string"
},
},
"required": [
"program"
],
"title": "Response",
"type": "object"
}
"""
# """
# "confidence": {
# "description": "How sure you are the above message facts are true. Rate harsh from 0 to 1",
# "title": "Confidence",
# "type": "float"
# }
# """
tool_assist = """
You are a python assisted AI model. You may call the interpreter one or more times to assist with the user query. You might tell the user that you optained ground truth with the help of python or a calculator if asked about. The user is not able to see if python has been used, therefore do not expose and share failed attempts or syntax errors.
To invoke a this function, the answer may start and end with and respectively. The rest must be a valid python script, additional text is not allowed before and after. Calling python is not needed when just providing example code.
"""
messages = [
# {"role": "system", "content": "Hold a casual conversation with the user. Keep responses short at max 3 sentences."},
# {"role": "system", "content": "Hold a casual conversation with the user. Keep responses short at max 3 sentences. It is %s now." % datetime.datetime.now().strftime("%Y-%m-%d %H:%M")},
# {"role": "system", "content": "Hold a casual conversation with the user. Keep responses short at max 3 sentences. " + tool_assist},
# {"role": "system", "content": "You are a helpful assistant that answers in JSON. Here's the json schema you must adhere to:\n\n%s\n" % schema},
# {"role": "system", "content": "You are a helpful assistant that answers by entering commands into a python interpreter. The user only sees the stdout of your python input."},
# {"role": "system", "content": "Make a summary of the below input prompt. Do not answer. The description should fit on 80 characters."},
# {"role": "user", "content": "Hello, who are you?"}
]
roleflip = {"role": "system", "content": "Keep the conversation going, ask for more information on the subject. Keep messages short at max 1-2 sentences. Do not thank and say goodbye."}
def current_time():
"""Get the current local date and time as a string."""
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
def random_float():
"""Get a random float from 0..1"""
return str(random.random())
def random_int(a: int, b: int):
"""Return random integer in range [a, b], including both end points.
Args:
a: minimum possible value
b: maximum possible value"""
return str(random.randint(a, b))
tool_functions = [current_time, random_float, random_int]
def generate_batch(inputs):
outputs = model.generate(
inputs["input_ids"], # **inputs,
max_new_tokens=500, # max_length=max_context_length,
pad_token_id=tokenizer.pad_token_id,
eos_token_id=tokenizer.eos_token_id,
do_sample=True,
num_return_sequences=1
)
# skip all input tokens and only output the additional generated part of the conversation
input_token_count = len(inputs["input_ids"][0])
out_text = tokenizer.decode(outputs[0][input_token_count:], skip_special_tokens=True)
print(out_text)
return outputs, out_text
def generate_incremental(inputs):
# Start with the initial input tokens
input_ids = inputs["input_ids"]
generated_tokens = input_ids # Initially, this is just the input tokens
n = 0
# Loop to generate one token at a time
while True:
# Call the model with the current tokens
outputs = model(input_ids=generated_tokens, use_cache=True)
# Get the next token (the last token from the generated sequence)
next_token = outputs.logits.argmax(dim=-1)[:, -1]
# Append the new token to the sequence
generated_tokens = torch.cat([generated_tokens, next_token.unsqueeze(0)], dim=1)
# Decode and print the newly generated token (skip special tokens)
out_text = tokenizer.decode(next_token, skip_special_tokens=True)
print(out_text, end="", flush=True) # Print without newline
# Check if the generated token is the end-of-sequence token
if next_token.item() == tokenizer.eos_token_id:
print("")
break
n += 1
if n >= 30:
n = 0
torch.cuda.empty_cache()
# Once done, return the full generated sequence
input_token_count = len(inputs["input_ids"][0])
full_output = tokenizer.decode(generated_tokens[0][input_token_count:], skip_special_tokens=True)
torch.cuda.empty_cache()
return generated_tokens, full_output
def append_generate_chat(input_text: str, role="user"):
t_start = time.time()
# generate AI response
if input_text != None:
messages.append({"role": role, "content": input_text})
# input_text = "Hello, who are you?"
# inputs = tokenizer(input_text, return_tensors="pt").to("cpu") # .to("cuda") .to("cpu")
inputs = tokenizer.apply_chat_template(messages, return_tensors="pt", tokenize=True, return_dict=True, add_generation_prompt=True, tools=tool_functions) #continue_final_message=True,
inputs = {key: value.to(model.device) for key, value in inputs.items()}
# inputs = {key: value.to("cpu") for key, value in inputs.items()}
with torch.inference_mode():
outputs, out_text = generate_incremental(inputs)
# append result to message history
messages.append({"role": "assistant", "content": out_text})
print("")
print("generation took %.3fs (%d tokens)" % (time.time() - t_start, len(outputs[0])))
while True:
# print an input prompt to receive text or commands
input_text = input(">>> ")
print("")
if input_text.startswith("!"):
# append_generate_chat("%s" % input_text[1:], role="tool")
append_generate_chat("%s" % input_text[1:], role="tool") # depending on the chat template the tool response tags must or must not be passed. :(
elif input_text.startswith("/clear"):
print("clearing chat history")
messages = [messages[0]]
print("")
elif input_text.startswith("/history"):
history = tokenizer.apply_chat_template(messages, return_tensors="pt", tokenize=False, add_generation_prompt=False, tools=tool_functions)
print(history)
elif input_text.startswith("/undo"):
if len(messages) > 2:
print("undo latest prompt")
messages = messages[:-2]
else:
print("cannot undo because there are not enough messages on history.")
print("")
elif input_text.startswith("/regen"):
if len(messages) >= 2:
print("regenerating message (not working)")
messages = messages[:-1]
seed = random.randint(0, 2**32 - 1) # Generate a random seed
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
append_generate_chat(None)
else:
print("cannot regenerate because there are not enough messages on history.")
print("")
elif input_text.startswith("/more"):
append_generate_chat(None)
elif input_text.startswith("/auto"):
messages_backup = messages
messages = [roleflip]
for m in messages_backup:
role = m["role"]
content = m["content"]
if role == "user":
role = "assistant"
elif role == "assistant":
role = "user"
if role != "system":
messages.append({"role": role, "content": content})
append_generate_chat(None) # will automatically advance the conversation as 'user'
last_message = messages[-1]
last_message["role"] = "user"
messages = messages_backup + [last_message]
append_generate_chat(None) # 'regular' chatbot answer
elif input_text.startswith("/help"):
print("! answer as 'tool' in tags")
print("/clear clear chat history")
print("/undo undo latest prompt")
print("/regen regenerate the last message")
print("/more generate more additional information")
print("/auto automatically advance conversation")
print("/help print this message")
print("")
elif input_text.startswith("/"):
print("unknown command.")
else:
append_generate_chat(input_text)