Spaces:
Sleeping
Sleeping
| import PyPDF2 | |
| import faiss | |
| import numpy as np | |
| from transformers import AutoModelForSeq2SeqLM, AutoTokenizer | |
| # Load the LLM for generation | |
| generation_model_name = 'facebook/bart-large-cnn' | |
| generation_model = AutoModelForSeq2SeqLM.from_pretrained(generation_model_name) | |
| tokenizer = AutoTokenizer.from_pretrained(generation_model_name) | |
| #Specify file paths | |
| file_path1 = './AST-1.pdf' | |
| file_path2 = './AST-2.pdf' | |
| #Step 1 : Load the document files | |
| def read_pdf(file_path): | |
| with open(file_path, 'rb') as file: | |
| reader = PyPDF2.PdfReader(file) | |
| text = '' | |
| for page_num in range(len(reader.pages)): | |
| page = reader.pages[page_num] | |
| text += page.extract_text() | |
| return text | |
| ast1_text = read_pdf(file_path1) | |
| ast2_text = read_pdf(file_path2) | |
| #Step 2 Split the loaded documents into chunks | |
| # Split by Fixed Number of Words: | |
| def chunk_text(text, chunk_size=200): | |
| words = text.split() | |
| chunks = [' '.join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)] | |
| return chunks | |
| ast1_chunks = chunk_text(ast1_text, chunk_size=100) | |
| ast2_chunks = chunk_text(ast2_text, chunk_size=150) | |
| #label the chunks | |
| ast1_chunks = [(chunk, 'AST-1') for chunk in ast1_chunks] | |
| ast2_chunks = [(chunk, 'AST-2') for chunk in ast2_chunks] | |
| all_chunks = ast1_chunks + ast2_chunks | |
| print('Created the chunks') | |
| #Load the Embedding Model and LLM | |
| from sentence_transformers import SentenceTransformer | |
| from transformers import AutoModelForSeq2SeqLM, AutoTokenizer | |
| # Load the pre-trained model from the MTEB leaderboard | |
| embedding_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') | |
| embeddings = embedding_model.encode(all_chunks, convert_to_tensor=True) | |
| # Convert embeddings to numpy arrays | |
| embeddings_np = np.array([embedding.cpu().numpy() for embedding in embeddings]) | |
| # Create a FAISS index | |
| dimension = embeddings_np.shape[1] | |
| index = faiss.IndexFlatL2(dimension) | |
| index.add(embeddings_np) | |
| # Save the index | |
| faiss.write_index(index, 'embeddings_index.faiss') | |
| # Load FAISS index | |
| stored_index = faiss.read_index('./embeddings_index.faiss') | |
| print('Stored embedding in db') | |
| #Function to retrieve chunks | |
| def retrieve_chunks(query, top_k=10, use_mmr=False, diversity=0.5, target_doc='AST-1'): | |
| query_embedding = embedding_model.encode(query, convert_to_tensor=True).cpu().numpy() | |
| distances, indices = stored_index.search(np.array([query_embedding]), top_k) | |
| if use_mmr: | |
| # Implement MMR-based retrieval | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| selected_indices = [] | |
| selected_distances = [] | |
| candidate_indices = [i for i in indices[0] if all_chunks[i][1] == target_doc] | |
| candidate_distances = [distances[0][i] for i in range(len(indices[0])) if all_chunks[indices[0][i]][1] == target_doc] | |
| while len(selected_indices) < top_k and candidate_indices: | |
| if not selected_indices: | |
| selected_indices.append(candidate_indices.pop(0)) | |
| selected_distances.append(candidate_distances.pop(0)) | |
| else: | |
| remaining_candidates = [candidate_indices[i] for i in range(len(candidate_indices))] | |
| remaining_embeddings = embeddings_np[remaining_candidates] | |
| selected_embeddings = embeddings_np[selected_indices] | |
| similarities = cosine_similarity(remaining_embeddings, selected_embeddings) | |
| mmr_scores = (1 - diversity) * np.array(candidate_distances[:len(remaining_candidates)]) - diversity * np.max(similarities, axis=1) | |
| next_index = np.argmax(mmr_scores) | |
| selected_indices.append(candidate_indices.pop(next_index)) | |
| selected_distances.append(candidate_distances.pop(next_index)) | |
| return [all_chunks[i][0] for i in selected_indices] | |
| else: | |
| retrieved_chunks = [] | |
| for idx in indices[0]: | |
| chunk, doc_label = all_chunks[idx] | |
| if doc_label == target_doc: | |
| retrieved_chunks.append(chunk) | |
| if len(retrieved_chunks) >= top_k: | |
| break | |
| return retrieved_chunks | |
| # Generate response | |
| def generate_response(query, retrieved_chunks): | |
| #context = " ".join([chunk for chunk, _ in retrieved_chunks]) | |
| context = " ".join(retrieved_chunks) # for retrieved_chunks as array of strings | |
| input_text = f"Query: {query}\nContext: {context}" | |
| inputs = tokenizer(input_text, return_tensors='pt', max_length=1024, truncation=True) | |
| summary_ids = generation_model.generate(inputs['input_ids'], max_length=150, min_length=40, length_penalty=2.0, num_beams=4, early_stopping=True) | |
| return tokenizer.decode(summary_ids[0], skip_special_tokens=True) | |
| def rag_system(query, use_mmr=False): | |
| retrieved_chunks = retrieve_chunks(query, top_k=3, use_mmr=use_mmr) | |
| response = generate_response(query, retrieved_chunks) | |
| print(response) | |
| return response | |
| import gradio as gr | |
| def query_rag_system(query, use_mmr): | |
| return rag_system(query, use_mmr=use_mmr) | |
| interface = gr.Interface( | |
| fn=query_rag_system, | |
| inputs=[ | |
| gr.Textbox(lines=2, placeholder="Enter your query here..."), | |
| gr.Checkbox(label="Use MMR") | |
| ], | |
| outputs="text", | |
| title="RAG System", | |
| description="Enter a query to get a response from the RAG system. Optionally, use MMR for better results." | |
| ) | |
| interface.launch() |