Kalpokoch commited on
Commit
1e6e534
·
verified ·
1 Parent(s): c8871ab

Update create_granular_chunks.py

Browse files
Files changed (1) hide show
  1. create_granular_chunks.py +91 -37
create_granular_chunks.py CHANGED
@@ -2,20 +2,27 @@ import os
2
  import json
3
  import re
4
  from typing import List, Dict, Any
 
 
 
 
5
 
6
  # --- Configuration ---
7
  INPUT_FILE = "combined_context.jsonl"
8
- OUTPUT_FILE = "granular_chunks_final.jsonl" # Keeping the filename consistent
 
9
 
10
  # --- Global State ---
11
  chunk_counter = 0
12
 
 
13
  def get_unique_id() -> str:
14
  """Returns a unique, incrementing ID for each chunk."""
15
  global chunk_counter
16
  chunk_counter += 1
17
  return f"chunk-{chunk_counter}"
18
 
 
19
  def create_chunk(context: Dict, text: str) -> Dict:
20
  """Creates a standardized chunk dictionary with rich metadata."""
21
  metadata = {
@@ -24,29 +31,29 @@ def create_chunk(context: Dict, text: str) -> Dict:
24
  "title": context.get("title"),
25
  "source_description": context.get("description"),
26
  }
 
27
  for key, value in context.items():
28
  if key not in metadata and isinstance(value, (str, int, float, bool)):
29
  metadata[key] = value
30
-
31
  return {
32
  "id": get_unique_id(),
33
- "text": text,
34
  "metadata": {k: v for k, v in metadata.items() if v is not None}
35
  }
36
 
 
37
  def format_delegation_text(delegation: Any) -> str:
38
  """
39
  Formats a delegation dictionary or string into a readable string.
40
- --- ACCURACY FIX ---
41
- This function now explicitly includes "NIL" or "---" values instead of skipping them.
42
- This is crucial for the model to correctly answer questions about roles with no power.
43
  """
44
  if not isinstance(delegation, dict):
45
  return str(delegation)
46
- # Use "is NIL" for None or "---", otherwise use "is [limit]"
47
  parts = [f"the limit for {auth} is {limit if limit and str(limit) != '---' else 'NIL'}" for auth, limit in delegation.items()]
48
  return ", ".join(parts) if parts else "No specific delegation provided."
49
 
 
50
  def format_remarks(remarks: Any) -> str:
51
  """Safely formats the 'remarks' field, handling various data types."""
52
  if isinstance(remarks, list):
@@ -60,83 +67,124 @@ def format_remarks(remarks: Any) -> str:
60
  return " ".join(remark_parts)
61
  return str(remarks)
62
 
 
63
  def build_descriptive_text(context: Dict) -> str:
64
  """
65
- Intelligently builds a single, descriptive, natural language sentence
66
- by combining all relevant fields from the context.
67
  """
68
  text_parts = []
69
-
70
  if context.get("title"):
71
- text_parts.append(f"Regarding the policy for '{context['title']}'")
72
 
73
  specific_desc = context.get('description') or context.get('method')
74
  if specific_desc and specific_desc != context.get('title'):
75
- text_parts.append(f"specifically for '{specific_desc}'")
76
 
77
  if "delegation" in context:
78
  delegation_text = format_delegation_text(context["delegation"])
79
- text_parts.append(f", the financial delegations are: {delegation_text}.")
80
  elif "composition" in context:
81
  composition_parts = []
82
  for item in context["composition"]:
83
  if isinstance(item, dict):
84
  for role, members in item.items():
85
- member_text = f"the {role} is {members}" if isinstance(members, str) else f"the {role} are: {', '.join(members)}"
 
86
  composition_parts.append(member_text)
87
  text_parts.append(f", the composition is: {'; '.join(composition_parts)}.")
88
-
89
  if "remarks" in context and context["remarks"]:
90
  remarks_text = format_remarks(context["remarks"])
91
  text_parts.append(f" Important remarks include: {remarks_text}")
92
 
93
- return " ".join(text_parts)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
94
 
95
  def process_entry(data: Dict, parent_context: Dict = None) -> List[Dict]:
96
  """
97
- The definitive processing function. It traverses the JSON and uses a set of handlers
98
- to create highly descriptive, self-contained chunks.
99
  """
100
  context = {**(parent_context or {}), **data}
101
  chunks = []
102
 
103
- # --- Handler 1: Simple Item Lists (e.g., Annexure A, Financial Concurrence) ---
104
  list_key = next((key for key in ["items", "exclusions"] if key in data and isinstance(data.get(key), list)), None)
105
  if list_key:
106
  base_title = context.get('title', 'a policy')
107
  for item in data[list_key]:
108
  if isinstance(item, str):
109
- chunks.append(create_chunk(context, f"A rule regarding '{base_title}' is: {item}."))
 
 
 
 
110
  return chunks
111
 
112
- # --- Handler 2: Recursive Traversal ---
113
  has_recursed = False
114
  for key, value in data.items():
115
  if isinstance(value, list) and value and all(isinstance(item, dict) for item in value):
116
  for item in value:
117
  chunks.extend(process_entry(item, context))
118
  has_recursed = True
119
-
120
- # --- Handler 3: Leaf Node Creation ---
121
  if not has_recursed and ("delegation" in data or "composition" in data or "description" in data):
122
  text = build_descriptive_text(context)
123
- chunks.append(create_chunk(context, text))
 
 
124
 
125
  return chunks
126
 
 
127
  def main():
128
- """Main function to read, process, and write."""
129
- print(f"Starting to process '{INPUT_FILE}' with the definitive chunking strategy...")
130
  all_chunks = []
131
-
132
  try:
133
  with open(INPUT_FILE, 'r', encoding='utf-8') as f:
134
  for i, line in enumerate(f):
135
  try:
136
  data = json.loads(line)
137
- processed_chunks = process_entry(data)
138
- if processed_chunks:
139
- all_chunks.extend(processed_chunks)
140
  except json.JSONDecodeError:
141
  print(f"Warning: Skipping malformed JSON on line {i+1}")
142
  continue
@@ -144,17 +192,23 @@ def main():
144
  print(f"Error: Input file '{INPUT_FILE}' not found.")
145
  return
146
 
147
- print(f"Deconstructed into {len(all_chunks)} highly descriptive chunks.")
 
 
 
 
 
148
 
149
- # Remove duplicates before writing
150
- unique_chunks = {chunk['text']: chunk for chunk in all_chunks}.values()
151
- print(f"Removed duplicates, writing {len(unique_chunks)} unique chunks.")
152
 
153
- with open(OUTPUT_FILE, 'w', encoding='utf-8') as f:
 
154
  for chunk in unique_chunks:
155
- f.write(json.dumps(chunk) + '\n')
 
 
156
 
157
- print(f"Successfully created improved granular chunks file: '{OUTPUT_FILE}'")
158
 
159
  if __name__ == "__main__":
160
  main()
 
2
  import json
3
  import re
4
  from typing import List, Dict, Any
5
+ import nltk
6
+
7
+ # Download punkt tokenizer if not already done (Ensure this runs once in your environment setup)
8
+ nltk.download('punkt')
9
 
10
  # --- Configuration ---
11
  INPUT_FILE = "combined_context.jsonl"
12
+ OUTPUT_FILE = "granular_chunks_final.jsonl" # Keep filename consistent
13
+
14
 
15
  # --- Global State ---
16
  chunk_counter = 0
17
 
18
+
19
  def get_unique_id() -> str:
20
  """Returns a unique, incrementing ID for each chunk."""
21
  global chunk_counter
22
  chunk_counter += 1
23
  return f"chunk-{chunk_counter}"
24
 
25
+
26
  def create_chunk(context: Dict, text: str) -> Dict:
27
  """Creates a standardized chunk dictionary with rich metadata."""
28
  metadata = {
 
31
  "title": context.get("title"),
32
  "source_description": context.get("description"),
33
  }
34
+ # Add other primitive metadata keys
35
  for key, value in context.items():
36
  if key not in metadata and isinstance(value, (str, int, float, bool)):
37
  metadata[key] = value
38
+
39
  return {
40
  "id": get_unique_id(),
41
+ "text": text.strip(),
42
  "metadata": {k: v for k, v in metadata.items() if v is not None}
43
  }
44
 
45
+
46
  def format_delegation_text(delegation: Any) -> str:
47
  """
48
  Formats a delegation dictionary or string into a readable string.
49
+ Explicitly includes "NIL" or "---" to capture no power cases.
 
 
50
  """
51
  if not isinstance(delegation, dict):
52
  return str(delegation)
 
53
  parts = [f"the limit for {auth} is {limit if limit and str(limit) != '---' else 'NIL'}" for auth, limit in delegation.items()]
54
  return ", ".join(parts) if parts else "No specific delegation provided."
55
 
56
+
57
  def format_remarks(remarks: Any) -> str:
58
  """Safely formats the 'remarks' field, handling various data types."""
59
  if isinstance(remarks, list):
 
67
  return " ".join(remark_parts)
68
  return str(remarks)
69
 
70
+
71
  def build_descriptive_text(context: Dict) -> str:
72
  """
73
+ Builds a clear, descriptive, natural language text by combining fields.
74
+ Focused for best relevance and contextual richness.
75
  """
76
  text_parts = []
77
+
78
  if context.get("title"):
79
+ text_parts.append(f"Regarding the policy '{context['title']}'")
80
 
81
  specific_desc = context.get('description') or context.get('method')
82
  if specific_desc and specific_desc != context.get('title'):
83
+ text_parts.append(f"specifically for '{specific_desc}'")
84
 
85
  if "delegation" in context:
86
  delegation_text = format_delegation_text(context["delegation"])
87
+ text_parts.append(f", financial delegations are: {delegation_text}.")
88
  elif "composition" in context:
89
  composition_parts = []
90
  for item in context["composition"]:
91
  if isinstance(item, dict):
92
  for role, members in item.items():
93
+ member_text = (f"the {role} is {members}" if isinstance(members, str)
94
+ else f"the {role} are: {', '.join(members)}")
95
  composition_parts.append(member_text)
96
  text_parts.append(f", the composition is: {'; '.join(composition_parts)}.")
97
+
98
  if "remarks" in context and context["remarks"]:
99
  remarks_text = format_remarks(context["remarks"])
100
  text_parts.append(f" Important remarks include: {remarks_text}")
101
 
102
+ # Join all parts into a flowing sentence
103
+ return " ".join(text_parts).strip()
104
+
105
+
106
+ def split_text_into_chunks(text: str, max_char_length: int = 1500, overlap: int = 200) -> List[str]:
107
+ """
108
+ Splits a long text into smaller chunks with controlled overlap.
109
+ Uses sentence tokenization for natural splits.
110
+ """
111
+ text = text.strip()
112
+ if len(text) <= max_char_length:
113
+ return [text]
114
+
115
+ sentences = nltk.sent_tokenize(text)
116
+ chunks = []
117
+ current_chunk = ""
118
+
119
+ for sentence in sentences:
120
+ # +1 for space/newline likely added between sentences
121
+ if len(current_chunk) + len(sentence) + 1 <= max_char_length:
122
+ current_chunk += (" " + sentence) if current_chunk else sentence
123
+ else:
124
+ chunks.append(current_chunk.strip())
125
+ # Start next chunk with overlap from end of previous chunk (by characters)
126
+ if overlap < len(current_chunk):
127
+ current_chunk = current_chunk[-overlap:] + " " + sentence
128
+ else:
129
+ current_chunk = sentence
130
+
131
+ if current_chunk:
132
+ chunks.append(current_chunk.strip())
133
+ return chunks
134
+
135
 
136
  def process_entry(data: Dict, parent_context: Dict = None) -> List[Dict]:
137
  """
138
+ Processes a JSON policy entry and returns granular, context-rich chunks.
139
+ Applies recursive traversal and implements chunk size limiting.
140
  """
141
  context = {**(parent_context or {}), **data}
142
  chunks = []
143
 
144
+ # Handler 1: Simple Item Lists (ex: rules, exclusions)
145
  list_key = next((key for key in ["items", "exclusions"] if key in data and isinstance(data.get(key), list)), None)
146
  if list_key:
147
  base_title = context.get('title', 'a policy')
148
  for item in data[list_key]:
149
  if isinstance(item, str):
150
+ # Build chunk text with clear descriptive prefix for relevance
151
+ text = f"A rule regarding '{base_title}' is: {item}."
152
+ # Split if too long
153
+ for sub_chunk in split_text_into_chunks(text):
154
+ chunks.append(create_chunk(context, sub_chunk))
155
  return chunks
156
 
157
+ # Handler 2: Recursive traversal for nested dictionaries/lists
158
  has_recursed = False
159
  for key, value in data.items():
160
  if isinstance(value, list) and value and all(isinstance(item, dict) for item in value):
161
  for item in value:
162
  chunks.extend(process_entry(item, context))
163
  has_recursed = True
164
+
165
+ # Handler 3: Leaf nodes with delegation, composition or description
166
  if not has_recursed and ("delegation" in data or "composition" in data or "description" in data):
167
  text = build_descriptive_text(context)
168
+ # Split long descriptive text intelligently
169
+ for chunk_text in split_text_into_chunks(text):
170
+ chunks.append(create_chunk(context, chunk_text))
171
 
172
  return chunks
173
 
174
+
175
  def main():
176
+ """Main orchestration to read input, process, and write chunks."""
177
+ print(f"Starting to process '{INPUT_FILE}' for improved granular chunking...")
178
  all_chunks = []
179
+
180
  try:
181
  with open(INPUT_FILE, 'r', encoding='utf-8') as f:
182
  for i, line in enumerate(f):
183
  try:
184
  data = json.loads(line)
185
+ processed = process_entry(data)
186
+ if processed:
187
+ all_chunks.extend(processed)
188
  except json.JSONDecodeError:
189
  print(f"Warning: Skipping malformed JSON on line {i+1}")
190
  continue
 
192
  print(f"Error: Input file '{INPUT_FILE}' not found.")
193
  return
194
 
195
+ print(f"Generated {len(all_chunks)} chunks before deduplication.")
196
+
197
+ # Deduplicate by text content (retaining last occurrences)
198
+ unique_chunks_map = {}
199
+ for chunk in all_chunks:
200
+ unique_chunks_map[chunk['text']] = chunk
201
 
202
+ unique_chunks = list(unique_chunks_map.values())
203
+ print(f"{len(unique_chunks)} unique chunks after deduplication.")
 
204
 
205
+ # Write output in JSONL format for later vector DB ingestion
206
+ with open(OUTPUT_FILE, 'w', encoding='utf-8') as outf:
207
  for chunk in unique_chunks:
208
+ outf.write(json.dumps(chunk, ensure_ascii=False) + "\n")
209
+
210
+ print(f"Successfully wrote improved granular chunks to '{OUTPUT_FILE}'.")
211
 
 
212
 
213
  if __name__ == "__main__":
214
  main()