Kalpokoch commited on
Commit
1201e66
·
verified ·
1 Parent(s): acd510f

Update create_granular_chunks.py

Browse files
Files changed (1) hide show
  1. create_granular_chunks.py +218 -208
create_granular_chunks.py CHANGED
@@ -1,231 +1,241 @@
1
- # create_granular_chunks.py
2
- import os
3
  import json
4
  import re
5
- from typing import List, Dict, Any
6
- import nltk
 
7
 
8
- # --- Tokenizer Import ---
9
- import tiktoken # pip install tiktoken
10
-
11
- # Download punkt tokenizer if not already done (Ensure this runs once in your environment setup)
12
- nltk.download('punkt')
13
-
14
- # --- Configuration ---
15
- INPUT_FILE = "combined_context.jsonl"
16
- OUTPUT_FILE = "granular_chunks_final.jsonl"
17
-
18
- # Token-based chunking parameters (typical LLM embedding context ~512 tokens)
19
- MAX_TOKENS = 400
20
- OVERLAP_TOKENS = 50
21
- TOKENIZER_MODEL = "cl100k_base" # use "cl100k_base" for OpenAI, adjust as needed
22
-
23
- # --- Keyword Enhancement ---
24
- FINANCIAL_KEYWORDS = [
25
- "₹", "INR", "crore", "lakh", "limit", "delegation", "expenditure", "budget", "revenue", "capital",
26
- "surplus", "investment", "write-off", "dividend", "pay", "salary", "contract value"
27
- ]
28
- AUTHORITY_KEYWORDS = [
29
- "CMD", "Chairman", "Board", "Director", "ED", "Executive Director", "CGM", "GM", "DGM", "Sr. M",
30
- "Manager", "HOD", "Head of Finance", "Finance Head", "Project Head"
31
- ]
32
-
33
- def get_encoding():
34
- return tiktoken.get_encoding(TOKENIZER_MODEL)
35
-
36
- # --- Global State ---
37
- chunk_counter = 0
38
-
39
- def get_unique_id() -> str:
40
- global chunk_counter
41
- chunk_counter += 1
42
- return f"chunk-{chunk_counter}"
43
-
44
- def enhance_chunk_with_keywords(text: str, metadata: dict) -> dict:
45
- """Add keywords (financial and authority) to metadata if present in text."""
46
- present_financial = [kw for kw in FINANCIAL_KEYWORDS if kw.lower() in text.lower()]
47
- present_authority = [kw for kw in AUTHORITY_KEYWORDS if kw.lower() in text.lower()]
48
- if present_financial:
49
- metadata['financial_keywords'] = present_financial
50
- if present_authority:
51
- metadata['authority_keywords'] = present_authority
52
- return metadata
53
-
54
- def create_chunk(context: Dict, text: str) -> Dict:
55
- """Creates a standardized chunk dictionary with rich metadata."""
56
- metadata = {
57
- "section": context.get("section"),
58
- "clause": context.get("clause") or context.get("Clause"),
59
- "title": context.get("title"),
60
- "source_description": context.get("description"),
61
- }
62
- for key, value in context.items():
63
- if key not in metadata and isinstance(value, (str, int, float, bool)):
64
- metadata[key] = value
65
-
66
- # --- Keyword Enhancement ---
67
- metadata = enhance_chunk_with_keywords(text, metadata)
68
-
69
- return {
70
- "id": get_unique_id(),
71
- "text": text.strip(),
72
- "metadata": {k: v for k, v in metadata.items() if v is not None}
73
- }
74
-
75
- def format_delegation_text(delegation: Any) -> str:
76
- if not isinstance(delegation, dict):
77
- return str(delegation)
78
- parts = [f"the limit for {auth} is {limit if limit and str(limit) != '---' else 'NIL'}"
79
- for auth, limit in delegation.items()]
80
- return ", ".join(parts) if parts else "No specific delegation provided."
81
-
82
- def format_remarks(remarks: Any) -> str:
83
- if isinstance(remarks, list):
84
- remark_parts = []
85
- for item in remarks:
86
- if isinstance(item, dict):
87
- for key, value in item.items():
88
- remark_parts.append(f"{key}: {value}")
89
- else:
90
- remark_parts.append(str(item))
91
- return " ".join(remark_parts)
92
- return str(remarks)
93
-
94
- def build_descriptive_text(context: Dict) -> str:
95
- text_parts = []
96
- if context.get("title"):
97
- text_parts.append(f"Regarding the policy '{context['title']}'")
98
- specific_desc = context.get('description') or context.get('method')
99
- if specific_desc and specific_desc != context.get('title'):
100
- text_parts.append(f"specifically for '{specific_desc}'")
101
- if "delegation" in context:
102
- delegation_text = format_delegation_text(context["delegation"])
103
- text_parts.append(f", financial delegations are: {delegation_text}.")
104
- elif "composition" in context:
105
- composition_parts = []
106
- for item in context["composition"]:
107
- if isinstance(item, dict):
108
- for role, members in item.items():
109
- member_text = (f"the {role} is {members}" if isinstance(members, str)
110
- else f"the {role} are: {', '.join(members)}")
111
- composition_parts.append(member_text)
112
- text_parts.append(f", the composition is: {'; '.join(composition_parts)}.")
113
- if "remarks" in context and context["remarks"]:
114
- remarks_text = format_remarks(context["remarks"])
115
- text_parts.append(f" Important remarks include: {remarks_text}")
116
- return " ".join(text_parts).strip()
117
-
118
- def count_tokens(text: str) -> int:
119
- encoding = get_encoding()
120
- return len(encoding.encode(text))
121
-
122
- def get_token_overlap(text: str, overlap_tokens: int) -> str:
123
- """Return the last `overlap_tokens` worth of text from the input string."""
124
- encoding = get_encoding()
125
- tokens = encoding.encode(text)
126
- if len(tokens) <= overlap_tokens:
127
- return text
128
- # Decode only the last overlap_tokens tokens
129
- overlapped = encoding.decode(tokens[-overlap_tokens:])
130
- # Remove possible split word inconsistencies by finding last complete sentence
131
- # This is optional: can simply return overlapped
132
- last_period = overlapped.rfind('.')
133
- if last_period != -1 and last_period < len(overlapped) - 2:
134
- return overlapped[last_period+1:].strip()
135
- return overlapped.strip()
136
-
137
- def split_text_by_tokens(text: str, max_tokens: int = MAX_TOKENS, overlap_tokens: int = OVERLAP_TOKENS) -> List[str]:
138
- """Split text into chunks based on token count, with specified overlap."""
139
- encoding = get_encoding()
140
- sents = nltk.tokenize.sent_tokenize(text, language='english')
141
  chunks = []
142
  current_chunk = ""
143
  current_tokens = 0
144
- for sentence in sents:
145
- sentence_tokens = len(encoding.encode(sentence))
146
- if current_tokens + sentence_tokens <= max_tokens:
147
- current_chunk += (" " + sentence) if current_chunk else sentence
148
- current_tokens += sentence_tokens
149
- else:
 
 
 
 
150
  chunks.append(current_chunk.strip())
151
- # Overlap logic
152
- if overlap_tokens < current_tokens:
153
- overlap_text = get_token_overlap(current_chunk, overlap_tokens)
 
154
  current_chunk = overlap_text + " " + sentence
155
- current_tokens = len(encoding.encode(current_chunk))
156
  else:
157
  current_chunk = sentence
158
- current_tokens = sentence_tokens
159
- if current_chunk:
 
 
 
 
 
160
  chunks.append(current_chunk.strip())
 
161
  return chunks
162
 
163
- def process_entry(data: Dict, parent_context: Dict = None) -> List[Dict]:
164
- context = {**(parent_context or {}), **data}
165
- chunks = []
166
-
167
- # Handler 1: Simple Item Lists
168
- list_key = next((key for key in ["items", "exclusions"] if key in data and isinstance(data.get(key), list)), None)
169
- if list_key:
170
- base_title = context.get('title', 'a policy')
171
- for item in data[list_key]:
172
- if isinstance(item, str):
173
- text = f"A rule regarding '{base_title}' is: {item}."
174
- for sub_chunk in split_text_by_tokens(text):
175
- chunks.append(create_chunk(context, sub_chunk))
176
- return chunks
177
-
178
- # Handler 2: Recursive traversal for nested dicts/lists
179
- has_recursed = False
180
- for key, value in data.items():
181
- if isinstance(value, list) and value and all(isinstance(item, dict) for item in value):
182
- for item in value:
183
- chunks.extend(process_entry(item, context))
184
- has_recursed = True
185
 
186
- # Handler 3: Leaf nodes with delegation, composition or description
187
- if not has_recursed and ("delegation" in data or "composition" in data or "description" in data):
188
- text = build_descriptive_text(context)
189
- for chunk_text in split_text_by_tokens(text):
190
- chunks.append(create_chunk(context, chunk_text))
191
-
192
- return chunks
193
-
194
- def main():
195
- print(f"Starting to process '{INPUT_FILE}' with token-based chunking and keyword enhancement...")
196
  all_chunks = []
197
-
 
 
198
  try:
199
- with open(INPUT_FILE, 'r', encoding='utf-8') as f:
200
- for i, line in enumerate(f):
201
  try:
202
- data = json.loads(line)
203
- processed = process_entry(data)
204
- if processed:
205
- all_chunks.extend(processed)
206
- except json.JSONDecodeError:
207
- print(f"Warning: Skipping malformed JSON on line {i+1}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
208
  continue
 
209
  except FileNotFoundError:
210
- print(f"Error: Input file '{INPUT_FILE}' not found.")
211
  return
212
-
 
 
 
213
  print(f"Generated {len(all_chunks)} chunks before deduplication.")
214
-
215
- # Deduplicate by text content (retaining last occurrences)
216
- unique_chunks_map = {}
217
- for chunk in all_chunks:
218
- unique_chunks_map[chunk['text']] = chunk
219
-
220
- unique_chunks = list(unique_chunks_map.values())
221
- print(f"{len(unique_chunks)} unique chunks after deduplication.")
222
-
223
- # Write output in JSONL format for later vector DB ingestion
224
- with open(OUTPUT_FILE, 'w', encoding='utf-8') as outf:
225
- for chunk in unique_chunks:
226
- outf.write(json.dumps(chunk, ensure_ascii=False) + "\\n")
227
-
228
- print(f"Successfully wrote improved granular chunks to '{OUTPUT_FILE}'.")
 
 
 
 
 
 
229
 
230
  if __name__ == "__main__":
231
- main()
 
 
 
1
+ # create_granular_chunks.py (place this in root directory)
 
2
  import json
3
  import re
4
+ import hashlib
5
+ from typing import List, Dict, Any, Set
6
+ import tiktoken
7
 
8
+ def count_tokens(text: str, model: str = "gpt-3.5-turbo") -> int:
9
+ """Count tokens using tiktoken."""
10
+ try:
11
+ encoding = tiktoken.encoding_for_model(model)
12
+ return len(encoding.encode(text))
13
+ except Exception:
14
+ # Fallback to simple word-based estimation
15
+ return len(text.split()) * 1.3
16
+
17
+ def extract_financial_keywords(text: str) -> List[str]:
18
+ """Extract financial keywords from text."""
19
+ financial_patterns = [
20
+ r'₹[\d,]+(?:\.\d{1,2})?(?:\s*(?:crore|lakh|thousand))?',
21
+ r'\b(?:budget|cost|expenditure|estimate|payment|procurement)\b',
22
+ r'\b(?:tender|contract|purchase|award)\b',
23
+ r'\b(?:crore|lakh|thousand)\b'
24
+ ]
25
+
26
+ keywords = set()
27
+ for pattern in financial_patterns:
28
+ matches = re.findall(pattern, text, re.IGNORECASE)
29
+ keywords.update(matches)
30
+
31
+ return list(keywords)[:10] # Limit to 10 keywords
32
+
33
+ def extract_authority_keywords(text: str) -> List[str]:
34
+ """Extract authority/designation keywords from text."""
35
+ authority_patterns = [
36
+ r'\b(?:D\([TPF]\)|ED|CGM|GM|DGM|Sr\.?\s*M(?:anager)?)\b',
37
+ r'\b(?:Director|Manager|Chief|Head)\b',
38
+ r'\b(?:CMD|BOD|HOP|HOD|HOF)\b',
39
+ r'\b(?:approval|sanction|delegation|authority|power)\b'
40
+ ]
41
+
42
+ keywords = set()
43
+ for pattern in authority_patterns:
44
+ matches = re.findall(pattern, text, re.IGNORECASE)
45
+ keywords.update(matches)
46
+
47
+ return list(keywords)[:10] # Limit to 10 keywords
48
+
49
+ def create_chunk_text_from_item(item: Dict) -> str:
50
+ """Create comprehensive chunk text from a single item."""
51
+ parts = []
52
+
53
+ # Add section and title context
54
+ if item.get('section'):
55
+ parts.append(f"Regarding the policy '{item.get('title', 'Unknown')}' under section '{item['section']}':")
56
+
57
+ # Add main description
58
+ if item.get('description'):
59
+ parts.append(item['description'])
60
+
61
+ # Add items if present
62
+ if item.get('items'):
63
+ if len(item['items']) == 1:
64
+ parts.append(f"This covers: {item['items'][0]}")
65
+ else:
66
+ parts.append("This covers the following:")
67
+ for i, sub_item in enumerate(item['items'], 1):
68
+ parts.append(f"{i}. {sub_item}")
69
+
70
+ # Add delegation information
71
+ if item.get('delegation'):
72
+ parts.append("Authority delegation:")
73
+ for role, limit in item['delegation'].items():
74
+ if limit and limit != "NIL":
75
+ parts.append(f"- {role}: {limit}")
76
+
77
+ # Add subclauses
78
+ if item.get('subclauses'):
79
+ parts.append("This includes:")
80
+ for subclause in item['subclauses']:
81
+ if subclause.get('description'):
82
+ parts.append(f"• {subclause['description']}")
83
+ if subclause.get('delegation'):
84
+ for role, limit in subclause['delegation'].items():
85
+ if limit and limit != "NIL":
86
+ parts.append(f" - {role}: {limit}")
87
+
88
+ # Add methods (for complex delegation structures)
89
+ if item.get('methods'):
90
+ for method in item['methods']:
91
+ if method.get('delegation'):
92
+ parts.append(f"For {method.get('method', 'this method')}:")
93
+ for role, limit in method['delegation'].items():
94
+ if limit and limit != "NIL":
95
+ parts.append(f"- {role}: {limit}")
96
+
97
+ # Add remarks
98
+ if item.get('remarks'):
99
+ parts.append("Important notes:")
100
+ if isinstance(item['remarks'], list):
101
+ for remark in item['remarks']:
102
+ if isinstance(remark, str):
103
+ parts.append(f" {remark}")
104
+ elif isinstance(item['remarks'], str):
105
+ parts.append(f"• {item['remarks']}")
106
+
107
+ return " ".join(parts)
108
+
109
+ def split_into_token_chunks(text: str, max_tokens: int = 400, overlap_tokens: int = 50) -> List[str]:
110
+ """Split text into chunks based on token count."""
111
+ sentences = re.split(r'[.!?]\s+', text)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
112
  chunks = []
113
  current_chunk = ""
114
  current_tokens = 0
115
+
116
+ for sentence in sentences:
117
+ sentence = sentence.strip()
118
+ if not sentence:
119
+ continue
120
+
121
+ sentence_tokens = count_tokens(sentence)
122
+
123
+ # If adding this sentence would exceed max_tokens, finalize current chunk
124
+ if current_tokens + sentence_tokens > max_tokens and current_chunk:
125
  chunks.append(current_chunk.strip())
126
+
127
+ # Start new chunk with overlap
128
+ if overlap_tokens > 0 and chunks:
129
+ overlap_text = current_chunk[-overlap_tokens*5:] # Rough overlap estimation
130
  current_chunk = overlap_text + " " + sentence
 
131
  else:
132
  current_chunk = sentence
133
+ current_tokens = count_tokens(current_chunk)
134
+ else:
135
+ current_chunk += (" " if current_chunk else "") + sentence
136
+ current_tokens += sentence_tokens
137
+
138
+ # Add the last chunk if it has content
139
+ if current_chunk.strip():
140
  chunks.append(current_chunk.strip())
141
+
142
  return chunks
143
 
144
+ def create_chunk_hash(text: str) -> str:
145
+ """Create a hash of the chunk text for deduplication."""
146
+ return hashlib.md5(text.encode('utf-8')).hexdigest()[:12]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
147
 
148
+ def process_jsonl_file(file_path: str, output_path: str):
149
+ """Process the JSONL file and create granular chunks."""
150
+ print(f"Starting to process '{file_path}' with token-based chunking and keyword enhancement...")
151
+
 
 
 
 
 
 
152
  all_chunks = []
153
+ chunk_hashes = set() # For deduplication
154
+ chunk_id_counter = 1
155
+
156
  try:
157
+ with open(file_path, 'r', encoding='utf-8') as file:
158
+ for line_num, line in enumerate(file, 1):
159
  try:
160
+ item = json.loads(line.strip())
161
+
162
+ # Create comprehensive text from the item
163
+ chunk_text = create_chunk_text_from_item(item)
164
+
165
+ if not chunk_text.strip():
166
+ continue
167
+
168
+ # Split into token-based chunks
169
+ text_chunks = split_into_token_chunks(chunk_text)
170
+
171
+ for i, chunk in enumerate(text_chunks):
172
+ if not chunk.strip():
173
+ continue
174
+
175
+ # Check for duplicates
176
+ chunk_hash = create_chunk_hash(chunk)
177
+ if chunk_hash in chunk_hashes:
178
+ continue
179
+ chunk_hashes.add(chunk_hash)
180
+
181
+ # Extract keywords
182
+ financial_keywords = extract_financial_keywords(chunk)
183
+ authority_keywords = extract_authority_keywords(chunk)
184
+
185
+ # Create chunk object
186
+ chunk_obj = {
187
+ 'id': f'chunk-{chunk_id_counter}',
188
+ 'text': chunk,
189
+ 'metadata': {
190
+ 'section': item.get('section', ''),
191
+ 'clause': item.get('clause', ''),
192
+ 'title': item.get('title', ''),
193
+ 'chunk_index': i,
194
+ 'source_line': line_num,
195
+ 'financial_keywords': financial_keywords,
196
+ 'authority_keywords': authority_keywords,
197
+ 'token_count': count_tokens(chunk)
198
+ }
199
+ }
200
+
201
+ all_chunks.append(chunk_obj)
202
+ chunk_id_counter += 1
203
+
204
+ except json.JSONDecodeError as e:
205
+ print(f"Warning: Invalid JSON on line {line_num}: {e}")
206
  continue
207
+
208
  except FileNotFoundError:
209
+ print(f"Error: File '{file_path}' not found.")
210
  return
211
+ except Exception as e:
212
+ print(f"Error reading file: {e}")
213
+ return
214
+
215
  print(f"Generated {len(all_chunks)} chunks before deduplication.")
216
+ print(f"{len(chunk_hashes)} unique chunks after deduplication.")
217
+
218
+ # Write chunks to output file
219
+ try:
220
+ with open(output_path, 'w', encoding='utf-8') as output_file:
221
+ for chunk in all_chunks:
222
+ json.dump(chunk, output_file, ensure_ascii=False)
223
+ output_file.write('\n')
224
+
225
+ print(f"Successfully wrote improved granular chunks to '{output_path}'.")
226
+ print(f"Sample chunk structure:")
227
+ if all_chunks:
228
+ sample = all_chunks[0]
229
+ print(f" ID: {sample['id']}")
230
+ print(f" Text length: {len(sample['text'])} chars")
231
+ print(f" Section: {sample['metadata']['section']}")
232
+ print(f" Financial keywords: {sample['metadata']['financial_keywords'][:3]}...")
233
+ print(f" Token count: {sample['metadata']['token_count']}")
234
+
235
+ except Exception as e:
236
+ print(f"Error writing output file: {e}")
237
 
238
  if __name__ == "__main__":
239
+ input_file = "combined_context.jsonl"
240
+ output_file = "granular_chunks_final.jsonl"
241
+ process_jsonl_file(input_file, output_file)