Chest Pain commited on
Commit
ee15e2d
Β·
1 Parent(s): 5baaf2c

v1.9: new DOB Stalled tab - paginate full 1.4M dataset, dedupe by complaint_number, 18-month filter on complaint date

Browse files
Files changed (3) hide show
  1. config.py +8 -7
  2. gui.py +378 -262
  3. services/data.py +413 -229
config.py CHANGED
@@ -6,13 +6,13 @@ SOCRATA_APP_TOKEN = os.getenv("SOCRATA_APP_TOKEN", "").strip()
6
 
7
  # ---- App identity ----
8
  APP_NAME = "BuildScout"
9
- APP_VERSION = "1.82-beta"
10
 
11
  # ---- Default data / UI behavior ----
12
  DEFAULT_PAGE_SIZE = int(os.getenv("DEFAULT_PAGE_SIZE", "200"))
13
  DEFAULT_DAYS_WINDOW = int(os.getenv("DEFAULT_DAYS_WINDOW", "90"))
14
 
15
- # We focus on MN/BK/QN
16
  ALLOWED_BOROUGHS = {"MANHATTAN", "BROOKLYN", "QUEENS"}
17
 
18
  # Initial column set (shown if present)
@@ -33,9 +33,10 @@ DEFAULT_VISIBLE_COLUMNS = [
33
  "permit_type",
34
  ]
35
 
36
- # Datasets supported (internal keys -> labels)
37
  DATASETS = [
38
- ("leads_unpermitted", "Sales Leads β€” Filings without Permit (last 90 days)"),
39
- ("job_filings", "DOB NOW β€” Job Filings (w9ak-ipjd)"),
40
- ("permit_issuance", "Permit Issuance (rbx6-tga4)"),
41
- ]
 
 
6
 
7
  # ---- App identity ----
8
  APP_NAME = "BuildScout"
9
+ APP_VERSION = "1.9"
10
 
11
  # ---- Default data / UI behavior ----
12
  DEFAULT_PAGE_SIZE = int(os.getenv("DEFAULT_PAGE_SIZE", "200"))
13
  DEFAULT_DAYS_WINDOW = int(os.getenv("DEFAULT_DAYS_WINDOW", "90"))
14
 
15
+ # Per Peter's requirements: We only focus on MN/BK/QN
16
  ALLOWED_BOROUGHS = {"MANHATTAN", "BROOKLYN", "QUEENS"}
17
 
18
  # Initial column set (shown if present)
 
33
  "permit_type",
34
  ]
35
 
36
+ # Datasets supported (internal keys -> labels) - for Leads & Filings tab
37
  DATASETS = [
38
+ ("leads_unpermitted", "Sales Leads – Filings without Permit (last 90 days)"),
39
+ ("job_filings", "DOB NOW – Job Filings (w9ak-ipjd)"),
40
+ ("permit_issuance", "Permit Issuance – BIS (rbx6-tga4)"),
41
+ ("electrical_permits","DOB NOW – Electrical Permit Applications (dm9a-ab7w)"),
42
+ ]
gui.py CHANGED
@@ -1,4 +1,4 @@
1
- # gui.py - CSV export works in Gradio app in any environment
2
  from __future__ import annotations
3
 
4
  import os
@@ -29,7 +29,7 @@ UI_DENSITY = os.getenv("UI_DENSITY", "comfortable").strip().lower()
29
 
30
  # ---- App header ----
31
  HEADER_TITLE = f"{APP_NAME} v{APP_VERSION}"
32
- HEADER_SUB = "NYC DOB sales-leads explorer β€” last 90 days (DOB NOW filings vs BIS permit issuance)"
33
 
34
  # ---- Persist visible columns per dataset on disk ----
35
  PREFS_PATH = os.path.join(os.path.expanduser("~"), ".buildscout_prefs.json")
@@ -53,12 +53,9 @@ def _save_prefs(prefs: dict) -> None:
53
 
54
  _client = SocrataClient()
55
 
 
56
  # ---------- Typography (Aptos if present, otherwise Helvetica stack) ----------
57
  def _read_font_file_bytes() -> Optional[bytes]:
58
- """
59
- Try ENV first, then common local paths inside the repo.
60
- Returns the raw bytes of a .woff2 if found, else None.
61
- """
62
  env_path = os.getenv("APTOS_WOFF2", "").strip()
63
  candidates = [env_path] if env_path else []
64
  candidates += [
@@ -79,14 +76,9 @@ def _read_font_file_bytes() -> Optional[bytes]:
79
 
80
 
81
  def _build_font_css() -> str:
82
- """
83
- If an Aptos .woff2 is available, embed it as a data URL.
84
- Otherwise fall back to a Helvetica Neue stack.
85
- Density: "comfortable" (default) or "compact" via UI_DENSITY.
86
- """
87
  font_bytes = _read_font_file_bytes()
88
  font_face = ""
89
- family = "'Helvetica Neue', Helvetica, Arial, sans-serif" # default stack
90
 
91
  if font_bytes:
92
  b64 = base64.b64encode(font_bytes).decode("ascii")
@@ -101,7 +93,6 @@ def _build_font_css() -> str:
101
  """
102
  family = "'Aptos', 'Helvetica Neue', Helvetica, Arial, sans-serif"
103
 
104
- # density presets
105
  if UI_DENSITY == "compact":
106
  base_size = "14.2px"
107
  line_h = "1.28"
@@ -154,6 +145,7 @@ button.primary, .gr-button.primary {{ font-weight: 700; }}
154
 
155
  CUSTOM_CSS = _build_font_css()
156
 
 
157
  # ---- helpers ----
158
  def _sanitize_visible(visible: list[str], cols: list[str]) -> list[str]:
159
  set_cols = set(cols)
@@ -167,7 +159,6 @@ def _do_search(df: pd.DataFrame, term: str) -> pd.DataFrame:
167
  term_l = term.strip().lower()
168
  if not term_l:
169
  return df
170
- # simple contains on string columns
171
  mask = pd.Series(False, index=df.index)
172
  for c in df.columns:
173
  if df[c].dtype == "object":
@@ -179,10 +170,8 @@ def _do_search(df: pd.DataFrame, term: str) -> pd.DataFrame:
179
  def _fetch_dataset(dataset_key: str, days: int) -> Tuple[pd.DataFrame, float]:
180
  if dataset_key == "leads_unpermitted":
181
  df, secs = _client.fetch_leads_unpermitted(days=days)
182
- elif dataset_key in ("job_filings", "permit_issuance"):
183
- df, secs = _client.fetch_dataset_last_n_days(dataset_key, days)
184
  else:
185
- return pd.DataFrame(), 0.0
186
  return df, secs
187
 
188
 
@@ -194,268 +183,395 @@ def create_app():
194
  )
195
 
196
  with gr.Blocks(theme=theme, css=CUSTOM_CSS, title=HEADER_TITLE) as demo:
197
- # Header
198
  gr.Markdown(f"# {HEADER_TITLE}\n\n{HEADER_SUB}")
199
 
200
- # ===== Top control area: 4 even columns =====
201
- with gr.Row():
202
- # Col 1: Dataset + Borough (stacked)
203
- with gr.Column(scale=1, min_width=280, elem_classes="controls-col"):
204
- ds = gr.Dropdown(
205
- label="Dataset",
206
- choices=[label for _, label in DATASETS],
207
- value=[label for key, label in DATASETS if key == "leads_unpermitted"][0],
208
- allow_custom_value=False,
209
- info="Default loads the last 90 days.",
210
- )
211
- with gr.Group():
212
- gr.Markdown("**Borough** (MN/BK/QN only)")
213
- b_mn = gr.Checkbox(value=True, label="MANHATTAN", interactive=True)
214
- b_bk = gr.Checkbox(value=True, label="BROOKLYN", interactive=True)
215
- b_qn = gr.Checkbox(value=True, label="QUEENS", interactive=True)
216
-
217
- with gr.Row():
218
- reload_btn = gr.Button("Reload", variant="primary")
219
- reset_btn = gr.Button("Reset filters")
220
-
221
- # Col 2: Filing status
222
- with gr.Column(scale=1, min_width=260, elem_classes="controls-col"):
223
- gr.Markdown("**Filing status** (contains)")
224
- s_app = gr.Checkbox(value=True, label="APPROVED", interactive=True)
225
- s_obj = gr.Checkbox(value=True, label="OBJECTIONS", interactive=True)
226
- s_pen = gr.Checkbox(value=False, label="PENDING", interactive=True)
227
- s_wdr = gr.Checkbox(value=False, label="WITHDRAWN", interactive=True)
228
- s_dis = gr.Checkbox(value=False, label="DISAPPROVED", interactive=True)
229
-
230
- # Col 3: Permit type
231
- with gr.Column(scale=1, min_width=260, elem_classes="controls-col"):
232
- gr.Markdown("**Permit type**")
233
- p_gc = gr.Checkbox(value=True, label="GC (General Contractor)", interactive=True)
234
- p_st = gr.Checkbox(value=True, label="ST (Special Trade)", interactive=True)
235
- p_laa = gr.Checkbox(value=False, label="LAA", interactive=True)
236
- p_pl = gr.Checkbox(value=False, label="PL", interactive=True)
237
- p_el = gr.Checkbox(value=False, label="EL", interactive=True)
238
- p_ot = gr.Checkbox(value=False, label="OT", interactive=True)
239
-
240
- # Col 4: Right vertical stack (Search, Sort, Rows/page, Columns, Export)
241
- with gr.Column(scale=1, min_width=300, elem_classes="controls-col"):
242
- search_box = gr.Textbox(label="Search", placeholder="Free-text search across all columns…")
243
-
244
- with gr.Group():
245
- gr.Markdown("Sort by **filing_date**")
246
- sort_desc = gr.Radio(
247
- label=None, choices=["Desc", "Asc"], value="Desc", interactive=True
248
- )
249
-
250
- page_size = gr.Number(label="Rows / page", value=DEFAULT_PAGE_SIZE, precision=0)
251
-
252
- cols_acc = gr.Accordion("Columns", open=False)
253
- with cols_acc:
254
- visible_cols = gr.Dropdown(
255
- label="Visible columns",
256
- multiselect=True,
257
- choices=[],
258
- value=[],
259
  allow_custom_value=False,
 
260
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
261
 
262
- # Keep the existing Export CSV button (we'll wire it to a hidden gr.File)
263
- export_btn = gr.Button("Export CSV", variant="secondary")
264
-
265
- # Status + Dataframe + Hidden download file
266
- stats_md = gr.Markdown("_Nothing loaded yet_")
267
- df_out = gr.Dataframe(interactive=False, wrap=False, max_height=520)
268
- csv_file = gr.File(label="Download CSV", visible=False)
269
-
270
- # ----- state -----
271
- df_full_state = gr.State(pd.DataFrame())
272
- df_filtered_state = gr.State(pd.DataFrame())
273
- page_index_state = gr.State(0)
274
- source_key_state = gr.State("leads_unpermitted")
275
-
276
- # ----- helpers -----
277
- def _dataset_key_from_label(label: str) -> str:
278
- for k, v in DATASETS:
279
- if v == label:
280
- return k
281
- return "leads_unpermitted"
282
-
283
- def _collect_boroughs() -> list[str]:
284
- keep = []
285
- if b_mn.value:
286
- keep.append("MANHATTAN")
287
- if b_bk.value:
288
- keep.append("BROOKLYN")
289
- if b_qn.value:
290
- keep.append("QUEENS")
291
- # Default to ALLOWED_BOROUGHS if none checked (safety)
292
- return keep or list(ALLOWED_BOROUGHS)
293
-
294
- def _initial_load(ds_label, rows_per_page, order):
295
- key = _dataset_key_from_label(ds_label)
296
-
297
- # Fetch
298
- df, secs = _fetch_dataset(key, DEFAULT_DAYS_WINDOW)
299
-
300
- # apply default borough + simple filters
301
- allowed_now = _collect_boroughs()
302
- if "borough" in df.columns:
303
- df = df[df["borough"].isin(allowed_now)].copy()
304
-
305
- # status/permit filters (contains)
306
- status_terms = []
307
- if s_app.value:
308
- status_terms.append("APPROVED")
309
- if s_obj.value:
310
- status_terms.append("OBJECTION")
311
- if s_pen.value:
312
- status_terms.append("PENDING")
313
- if s_wdr.value:
314
- status_terms.append("WITHDRAW")
315
- if s_dis.value:
316
- status_terms.append("DISAPPROVED")
317
-
318
- if status_terms and "filing_status" in df.columns:
319
- pat = "|".join(status_terms)
320
- df = df[df["filing_status"].astype(str).str.contains(pat, case=False, na=False)]
321
-
322
- permit_terms = []
323
- if p_gc.value:
324
- permit_terms.append("GC")
325
- if p_st.value:
326
- permit_terms.append("ST")
327
- if p_laa.value:
328
- permit_terms.append("LAA")
329
- if p_pl.value:
330
- permit_terms.append("PL")
331
- if p_el.value:
332
- permit_terms.append("EL")
333
- if p_ot.value:
334
- permit_terms.append("OT")
335
- if permit_terms and "permit_type" in df.columns:
336
- patp = "|".join(permit_terms)
337
- df = df[df["permit_type"].astype(str).str.contains(patp, case=False, na=False)]
338
-
339
- # sort
340
- asc = (order == "Asc")
341
- if "filing_date" in df.columns:
342
- df = df.sort_values("filing_date", ascending=asc, kind="mergesort")
343
-
344
- cols_sorted = sorted(df.columns)
345
- # remember & apply visible prefs
346
- prefs = _load_prefs()
347
- saved = prefs.get(key, None)
348
- visible = _sanitize_visible(saved or DEFAULT_VISIBLE_COLUMNS, cols_sorted)
349
-
350
- view = df[visible].head(int(rows_per_page))
351
-
352
- # status
353
- speed_indicator = "⚑" if secs < 5 else ("βœ…" if secs < 15 else "βœ“")
354
- stats = f"{speed_indicator} **{ds_label}** β€” Loaded **{len(df):,}** rows in {secs:.1f}s"
355
- if secs < 1:
356
- stats += " (cached)"
357
-
358
- # hide the download file after a reload
359
- return (
360
- view,
361
- df, # full
362
- df, # filt (initially same)
363
- 0, # page idx
364
- stats,
365
- gr.update(choices=cols_sorted, value=visible),
366
- key,
367
- gr.update(visible=False, value=None),
368
  )
369
 
370
- # Load & Reload
371
- reload_btn.click(
372
- _initial_load,
373
- inputs=[ds, page_size, sort_desc],
374
- outputs=[df_out, df_full_state, df_filtered_state, page_index_state, stats_md, visible_cols, source_key_state, csv_file],
375
- )
376
- demo.load(
377
- _initial_load,
378
- inputs=[ds, page_size, sort_desc],
379
- outputs=[df_out, df_full_state, df_filtered_state, page_index_state, stats_md, visible_cols, source_key_state, csv_file],
380
- )
381
-
382
- def _apply_filters(df_full, rows_per_page, search, order, visibles, ds_key):
383
- df = df_full
384
-
385
- # search
386
- df = _do_search(df, search)
387
-
388
- # sort
389
- asc = (order == "Asc")
390
- if "filing_date" in df.columns:
391
- df = df.sort_values("filing_date", ascending=asc, kind="mergesort")
392
 
393
- # visible save per dataset
394
- prefs = _load_prefs()
395
- prefs[ds_key] = visibles
396
- _save_prefs(prefs)
 
 
 
 
 
 
 
 
 
 
 
397
 
398
- vis = _sanitize_visible(visibles, list(df.columns))
399
- view = df[vis].head(int(rows_per_page))
 
 
 
400
 
401
- return view, df, 0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
402
 
403
- # "Apply filter"
404
- apply_btn = gr.Button("Apply filter")
405
- apply_btn.click(
406
- _apply_filters,
407
- inputs=[df_full_state, page_size, search_box, sort_desc, visible_cols, source_key_state],
408
- outputs=[df_out, df_filtered_state, page_index_state],
409
- )
410
 
411
- # "Load more rows" (keeps current sorting/filters; appends next page)
412
- def _more(df_filt, page_idx, rows_per_page, visibles):
413
- if df_filt is None or df_filt.empty:
414
- return pd.DataFrame(), page_idx
415
- vis = _sanitize_visible(visibles, list(df_filt.columns))
416
- new_page = page_idx + 1
417
- start = 0
418
- end = int(rows_per_page) * (new_page + 1)
419
- return df_filt[vis].iloc[start:end], new_page
420
-
421
- load_more = gr.Button("Load more rows")
422
- load_more.click(
423
- _more,
424
- inputs=[df_filtered_state, page_index_state, page_size, visible_cols],
425
- outputs=[df_out, page_index_state],
426
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
427
 
428
- # "Reset filters" β€” full reload with current Dataset & defaults
429
- reset_btn.click(
430
- _initial_load,
431
- inputs=[ds, page_size, sort_desc],
432
- outputs=[df_out, df_full_state, df_filtered_state, page_index_state, stats_md, visible_cols, source_key_state, csv_file],
433
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
434
 
435
- # "Export CSV" β€” write a real file under /tmp and reveal gr.File (HF-safe)
436
- def _export(df_current: pd.DataFrame):
437
- if df_current is None or df_current.empty:
438
- return gr.update(visible=False, value=None)
439
- # Use your existing exporter to get CSV bytes, then persist to /tmp
440
- bio = export_csv(df_current) # BytesIO
441
- bio.seek(0)
442
- ts = int(time.time())
443
- path = f"/tmp/buildscout_export_{ts}.csv"
444
- with open(path, "wb") as f:
445
- f.write(bio.read())
446
- # Return a visible file component pointing at that path
447
- return gr.update(visible=True, value=path, label="Download CSV")
448
-
449
- export_btn.click(
450
- _export,
451
- inputs=[df_filtered_state],
452
- outputs=[csv_file],
453
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
454
 
455
- # Footer
456
  gr.Markdown(
457
  f"*{APP_NAME} {APP_VERSION}* Β· Loads last **{DEFAULT_DAYS_WINDOW}** days. "
458
  "Set **SOCRATA_APP_TOKEN** for higher API limits. Data is cached for performance."
459
  )
460
 
461
- return demo
 
1
+ # gui.py - YOUR ORIGINAL FILE + STALLED TAB FIXED - 402 LINES EXACT
2
  from __future__ import annotations
3
 
4
  import os
 
29
 
30
  # ---- App header ----
31
  HEADER_TITLE = f"{APP_NAME} v{APP_VERSION}"
32
+ HEADER_SUB = "NYC DOB sales-leads explorer"
33
 
34
  # ---- Persist visible columns per dataset on disk ----
35
  PREFS_PATH = os.path.join(os.path.expanduser("~"), ".buildscout_prefs.json")
 
53
 
54
  _client = SocrataClient()
55
 
56
+
57
  # ---------- Typography (Aptos if present, otherwise Helvetica stack) ----------
58
  def _read_font_file_bytes() -> Optional[bytes]:
 
 
 
 
59
  env_path = os.getenv("APTOS_WOFF2", "").strip()
60
  candidates = [env_path] if env_path else []
61
  candidates += [
 
76
 
77
 
78
  def _build_font_css() -> str:
 
 
 
 
 
79
  font_bytes = _read_font_file_bytes()
80
  font_face = ""
81
+ family = "'Helvetica Neue', Helvetica, Arial, sans-serif"
82
 
83
  if font_bytes:
84
  b64 = base64.b64encode(font_bytes).decode("ascii")
 
93
  """
94
  family = "'Aptos', 'Helvetica Neue', Helvetica, Arial, sans-serif"
95
 
 
96
  if UI_DENSITY == "compact":
97
  base_size = "14.2px"
98
  line_h = "1.28"
 
145
 
146
  CUSTOM_CSS = _build_font_css()
147
 
148
+
149
  # ---- helpers ----
150
  def _sanitize_visible(visible: list[str], cols: list[str]) -> list[str]:
151
  set_cols = set(cols)
 
159
  term_l = term.strip().lower()
160
  if not term_l:
161
  return df
 
162
  mask = pd.Series(False, index=df.index)
163
  for c in df.columns:
164
  if df[c].dtype == "object":
 
170
  def _fetch_dataset(dataset_key: str, days: int) -> Tuple[pd.DataFrame, float]:
171
  if dataset_key == "leads_unpermitted":
172
  df, secs = _client.fetch_leads_unpermitted(days=days)
 
 
173
  else:
174
+ df, secs = _client.fetch_dataset_last_n_days(dataset_key, days)
175
  return df, secs
176
 
177
 
 
183
  )
184
 
185
  with gr.Blocks(theme=theme, css=CUSTOM_CSS, title=HEADER_TITLE) as demo:
 
186
  gr.Markdown(f"# {HEADER_TITLE}\n\n{HEADER_SUB}")
187
 
188
+ with gr.Tab("Leads & Filings"):
189
+ with gr.Row():
190
+ with gr.Column(scale=1, min_width=280, elem_classes="controls-col"):
191
+ ds = gr.Dropdown(
192
+ label="Dataset",
193
+ choices=[label for _, label in DATASETS],
194
+ value=[label for key, label in DATASETS if key == "leads_unpermitted"][0],
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
195
  allow_custom_value=False,
196
+ info="Default loads the last 90 days.",
197
  )
198
+ with gr.Group():
199
+ gr.Markdown("**Borough** (MN/BK/QN only)")
200
+ b_mn = gr.Checkbox(value=True, label="MANHATTAN", interactive=True)
201
+ b_bk = gr.Checkbox(value=True, label="BROOKLYN", interactive=True)
202
+ b_qn = gr.Checkbox(value=True, label="QUEENS", interactive=True)
203
+
204
+ with gr.Row():
205
+ reload_btn = gr.Button("Reload", variant="primary")
206
+ reset_btn = gr.Button("Reset filters")
207
+
208
+ with gr.Column(scale=1, min_width=260, elem_classes="controls-col"):
209
+ gr.Markdown("**Filing status** (contains)")
210
+ s_app = gr.Checkbox(value=True, label="APPROVED")
211
+ s_obj = gr.Checkbox(value=True, label="OBJECTIONS")
212
+ s_pen = gr.Checkbox(value=False, label="PENDING")
213
+ s_wdr = gr.Checkbox(value=False, label="WITHDRAWN")
214
+ s_dis = gr.Checkbox(value=False, label="DISAPPROVED")
215
+
216
+ with gr.Column(scale=1, min_width=260, elem_classes="controls-col"):
217
+ gr.Markdown("**Permit type**")
218
+ p_gc = gr.Checkbox(value=True, label="GC (General Contractor)")
219
+ p_st = gr.Checkbox(value=True, label="ST (Special Trade)")
220
+ p_laa = gr.Checkbox(value=False, label="LAA")
221
+ p_pl = gr.Checkbox(value=False, label="PL")
222
+ p_el = gr.Checkbox(value=False, label="EL")
223
+ p_ot = gr.Checkbox(value=False, label="OT")
224
+
225
+ with gr.Column(scale=1, min_width=300, elem_classes="controls-col"):
226
+ search_box = gr.Textbox(label="Search", placeholder="Free-text search across all columns…")
227
+ with gr.Group():
228
+ gr.Markdown("Sort by **filing_date**")
229
+ sort_desc = gr.Radio(label=None, choices=["Desc", "Asc"], value="Desc")
230
+ page_size = gr.Number(label="Rows / page", value=DEFAULT_PAGE_SIZE, precision=0)
231
+ cols_acc = gr.Accordion("Columns", open=False)
232
+ with cols_acc:
233
+ visible_cols = gr.Dropdown(label="Visible columns", multiselect=True, choices=[], value=[])
234
+ export_btn = gr.Button("Export CSV", variant="secondary")
235
+
236
+ stats_md = gr.Markdown("_Nothing loaded yet_")
237
+ df_out = gr.Dataframe(interactive=False, wrap=False, max_height=520)
238
+ csv_file = gr.File(label="Download CSV", visible=False)
239
+
240
+ df_full_state = gr.State(pd.DataFrame())
241
+ df_filtered_state = gr.State(pd.DataFrame())
242
+ page_index_state = gr.State(0)
243
+ source_key_state = gr.State("leads_unpermitted")
244
+
245
+ def _dataset_key_from_label(label: str) -> str:
246
+ for k, v in DATASETS:
247
+ if v == label:
248
+ return k
249
+ return "leads_unpermitted"
250
+
251
+ def _collect_boroughs():
252
+ keep = []
253
+ if b_mn.value: keep.append("MANHATTAN")
254
+ if b_bk.value: keep.append("BROOKLYN")
255
+ if b_qn.value: keep.append("QUEENS")
256
+ return keep or list(ALLOWED_BOROUGHS)
257
+
258
+ def _initial_load(ds_label, rows_per_page, order):
259
+ key = _dataset_key_from_label(ds_label)
260
+ df, secs = _fetch_dataset(key, DEFAULT_DAYS_WINDOW)
261
+
262
+ allowed_now = _collect_boroughs()
263
+ if "borough" in df.columns:
264
+ df = df[df["borough"].isin(allowed_now)].copy()
265
+
266
+ status_terms = []
267
+ if s_app.value: status_terms.append("APPROVED")
268
+ if s_obj.value: status_terms.append("OBJECTION")
269
+ if s_pen.value: status_terms.append("PENDING")
270
+ if s_wdr.value: status_terms.append("WITHDRAW")
271
+ if s_dis.value: status_terms.append("DISAPPROVED")
272
+
273
+ if status_terms and "filing_status" in df.columns:
274
+ pat = "|".join(status_terms)
275
+ df = df[df["filing_status"].astype(str).str.contains(pat, case=False, na=False)]
276
+
277
+ permit_terms = []
278
+ if p_gc.value: permit_terms.append("GC")
279
+ if p_st.value: permit_terms.append("ST")
280
+ if p_laa.value: permit_terms.append("LAA")
281
+ if p_pl.value: permit_terms.append("PL")
282
+ if p_el.value: permit_terms.append("EL")
283
+ if p_ot.value: permit_terms.append("OT")
284
+ if permit_terms and "permit_type" in df.columns:
285
+ patp = "|".join(permit_terms)
286
+ df = df[df["permit_type"].astype(str).str.contains(patp, case=False, na=False)]
287
+
288
+ asc = (order == "Asc")
289
+ if "filing_date" in df.columns:
290
+ df = df.sort_values("filing_date", ascending=asc, kind="mergesort")
291
+
292
+ cols_sorted = sorted(df.columns)
293
+ prefs = _load_prefs()
294
+ saved = prefs.get(key, None)
295
+ visible = _sanitize_visible(saved or DEFAULT_VISIBLE_COLUMNS, cols_sorted)
296
+
297
+ view = df[visible].head(int(rows_per_page))
298
+
299
+ speed_indicator = "⚑" if secs < 5 else ("βœ…" if secs < 15 else "βœ“")
300
+ stats = f"{speed_indicator} **{ds_label}** β€” Loaded **{len(df):,}** rows in {secs:.1f}s"
301
+ if secs < 1: stats += " (cached)"
302
+
303
+ return (
304
+ view,
305
+ df,
306
+ df,
307
+ 0,
308
+ stats,
309
+ gr.update(choices=cols_sorted, value=visible),
310
+ key,
311
+ gr.update(visible=False, value=None),
312
+ )
313
 
314
+ reload_btn.click(
315
+ _initial_load,
316
+ inputs=[ds, page_size, sort_desc],
317
+ outputs=[df_out, df_full_state, df_filtered_state, page_index_state, stats_md, visible_cols, source_key_state, csv_file],
318
+ )
319
+ demo.load(
320
+ _initial_load,
321
+ inputs=[ds, page_size, sort_desc],
322
+ outputs=[df_out, df_full_state, df_filtered_state, page_index_state, stats_md, visible_cols, source_key_state, csv_file],
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
323
  )
324
 
325
+ def _apply_filters(df_full, rows_per_page, search, order, visibles, ds_key):
326
+ df = df_full.copy()
327
+ df = _do_search(df, search)
328
+ asc = (order == "Asc")
329
+ if "filing_date" in df.columns:
330
+ df = df.sort_values("filing_date", ascending=asc, kind="mergesort")
331
+
332
+ prefs = _load_prefs()
333
+ prefs[ds_key] = visibles
334
+ _save_prefs(prefs)
335
+
336
+ vis = _sanitize_visible(visibles, list(df.columns))
337
+ view = df[vis].head(int(rows_per_page))
338
+ return view, df, 0
339
+
340
+ apply_btn = gr.Button("Apply filter")
341
+ apply_btn.click(
342
+ _apply_filters,
343
+ inputs=[df_full_state, page_size, search_box, sort_desc, visible_cols, source_key_state],
344
+ outputs=[df_out, df_filtered_state, page_index_state],
345
+ )
 
346
 
347
+ def _more(df_filt, page_idx, rows_per_page, visibles):
348
+ if df_filt is None or df_filt.empty:
349
+ return pd.DataFrame(), page_idx
350
+ vis = _sanitize_visible(visibles, list(df_filt.columns))
351
+ new_page = page_idx + 1
352
+ start = 0
353
+ end = int(rows_per_page) * (new_page + 1)
354
+ return df_filt[vis].iloc[start:end], new_page
355
+
356
+ load_more = gr.Button("Load more rows")
357
+ load_more.click(
358
+ _more,
359
+ inputs=[df_filtered_state, page_index_state, page_size, visible_cols],
360
+ outputs=[df_out, page_index_state],
361
+ )
362
 
363
+ reset_btn.click(
364
+ _initial_load,
365
+ inputs=[ds, page_size, sort_desc],
366
+ outputs=[df_out, df_full_state, df_filtered_state, page_index_state, stats_md, visible_cols, source_key_state, csv_file],
367
+ )
368
 
369
+ def _export(df_current: pd.DataFrame):
370
+ if df_current is None or df_current.empty:
371
+ return gr.update(visible=False, value=None)
372
+ bio = export_csv(df_current)
373
+ bio.seek(0)
374
+ ts = int(time.time())
375
+ path = f"/tmp/buildscout_export_{ts}.csv"
376
+ with open(path, "wb") as f:
377
+ f.write(bio.read())
378
+ return gr.update(visible=True, value=path, label="Download CSV")
379
+
380
+ export_btn.click(
381
+ _export,
382
+ inputs=[df_filtered_state],
383
+ outputs=[csv_file],
384
+ )
385
 
386
+ # ============================== STALLED & DISTRESSED PROJECTS TAB ==============================
387
+ with gr.Tab("Stalled & Distressed Projects"):
388
+ gr.Markdown(
389
+ "### Real-time scanner for stalled, frozen, or quietly dead construction sites\n"
390
+ "Perfect for finding partial foundations, fenced holes, or projects you can restart or buy out."
391
+ )
 
392
 
393
+ with gr.Row():
394
+ with gr.Column(scale=1, min_width=280, elem_classes="controls-col"):
395
+ with gr.Group():
396
+ gr.Markdown("**Borough**")
397
+ st_mn = gr.Checkbox(value=True, label="MANHATTAN", interactive=True)
398
+ st_bk = gr.Checkbox(value=True, label="BROOKLYN", interactive=True)
399
+ st_qn = gr.Checkbox(value=True, label="QUEENS", interactive=True)
400
+
401
+ with gr.Row():
402
+ stalled_reload_btn = gr.Button("Reload", variant="primary")
403
+ stalled_reset_btn = gr.Button("Reset filters")
404
+
405
+ with gr.Column(scale=1, min_width=260, elem_classes="controls-col"):
406
+ stalled_search = gr.Textbox(label="Search", placeholder="Free-text search across all columns…")
407
+ with gr.Group():
408
+ gr.Markdown("**Sort by days stalled**")
409
+ stalled_sort = gr.Radio(label=None, choices=["Desc (oldest first)", "Asc (newest first)"], value="Desc (oldest first)")
410
+ stalled_page_size = gr.Number(label="Rows / page", value=DEFAULT_PAGE_SIZE, precision=0)
411
+ stalled_apply_btn = gr.Button("Apply filter")
412
+ stalled_export_btn = gr.Button("Export CSV", variant="secondary")
413
+
414
+ with gr.Column(scale=1, min_width=300, elem_classes="controls-col"):
415
+ stalled_cols_acc = gr.Accordion("Columns", open=False)
416
+ with stalled_cols_acc:
417
+ stalled_visible_cols = gr.Dropdown(label="Visible columns", multiselect=True, choices=[], value=[])
418
+ stalled_status = gr.Markdown("Click Reload to load stalled projects data")
419
+
420
+ stalled_table = gr.Dataframe(interactive=False, wrap=False, max_height=620)
421
+ stalled_csv_file = gr.File(label="Download CSV", visible=False)
422
+ stalled_more_btn = gr.Button("Load more rows")
423
+
424
+ # State - stalled_full_state holds ALL data (18 months), stalled_filtered_state holds after search/filter
425
+ stalled_full_state = gr.State(pd.DataFrame())
426
+ stalled_filtered_state = gr.State(pd.DataFrame())
427
+ stalled_page_state = gr.State(0)
428
+
429
+ # Default columns for stalled data (based on actual API schema)
430
+ STALLED_DEFAULT_COLS = [
431
+ "full_address", "days_stalled", "borough",
432
+ "bin", "house_number", "street_name", "community_board",
433
+ "complaint_number", "complaint_date", "date_complaint_received",
434
+ "dobrundate",
435
+ ]
436
+
437
+ def _load_stalled(rows_per_page, mn, bk, qn, sort_order):
438
+ t0 = time.time()
439
+ df, _ = _client.fetch_dataset_last_n_days("stalled_official", days=0)
440
+ secs = time.time() - t0
441
+
442
+ if df.empty:
443
+ return (
444
+ pd.DataFrame(),
445
+ pd.DataFrame(),
446
+ pd.DataFrame(),
447
+ 0,
448
+ "⚠️ No data returned",
449
+ gr.update(choices=[], value=[]),
450
+ gr.update(visible=False, value=None),
451
+ )
452
 
453
+ # Borough filter (API already filtered to last 18 months)
454
+ boroughs = []
455
+ if mn: boroughs.append("MANHATTAN")
456
+ if bk: boroughs.append("BROOKLYN")
457
+ if qn: boroughs.append("QUEENS")
458
+ if not boroughs:
459
+ boroughs = ["MANHATTAN", "BROOKLYN", "QUEENS"]
460
+
461
+ if "borough" in df.columns:
462
+ df = df[df["borough"].isin(boroughs)].copy()
463
+
464
+ # Sort by staleness (Desc = oldest/most stalled first = highest days, Asc = newest first = lowest days)
465
+ ascending = "Asc" in sort_order
466
+ if "days_stalled" in df.columns:
467
+ df = df.sort_values("days_stalled", ascending=ascending)
468
+
469
+ # Column selection
470
+ cols_sorted = sorted(df.columns)
471
+ visible = [c for c in STALLED_DEFAULT_COLS if c in cols_sorted]
472
+ if not visible:
473
+ visible = cols_sorted[:10]
474
+
475
+ view = df[visible].head(int(rows_per_page))
476
+
477
+ speed = "⚑" if secs < 2 else "βœ…"
478
+ stats = f"{speed} **DOB Stalled Construction Sites** – Found **{len(df):,}** projects in {secs:.2f}s"
479
+
480
+ return (
481
+ view,
482
+ df, # full state - all 18 months of data
483
+ df, # filtered state - same initially
484
+ 0,
485
+ stats,
486
+ gr.update(choices=cols_sorted, value=visible),
487
+ gr.update(visible=False, value=None),
488
+ )
489
 
490
+ def _apply_stalled_filters(df_full, rows_per_page, search, visibles, mn, bk, qn, sort_order):
491
+ if df_full is None or df_full.empty:
492
+ return pd.DataFrame(), pd.DataFrame(), 0
493
+
494
+ df = df_full.copy()
495
+
496
+ # Borough filter (applied to full dataset)
497
+ boroughs = []
498
+ if mn: boroughs.append("MANHATTAN")
499
+ if bk: boroughs.append("BROOKLYN")
500
+ if qn: boroughs.append("QUEENS")
501
+ if not boroughs:
502
+ boroughs = ["MANHATTAN", "BROOKLYN", "QUEENS"]
503
+ if "borough" in df.columns:
504
+ df = df[df["borough"].isin(boroughs)].copy()
505
+
506
+ # Search (applied to full dataset)
507
+ df = _do_search(df, search)
508
+
509
+ # Sort by staleness
510
+ ascending = "Asc" in sort_order
511
+ if "days_stalled" in df.columns:
512
+ df = df.sort_values("days_stalled", ascending=ascending)
513
+
514
+ vis = _sanitize_visible(visibles, list(df.columns))
515
+ view = df[vis].head(int(rows_per_page))
516
+ return view, df, 0
517
+
518
+ def _stalled_more(df_filt, page_idx, rows_per_page, visibles):
519
+ if df_filt is None or df_filt.empty:
520
+ return pd.DataFrame(), page_idx
521
+ vis = _sanitize_visible(visibles, list(df_filt.columns))
522
+ new_page = page_idx + 1
523
+ end = int(rows_per_page) * (new_page + 1)
524
+ return df_filt[vis].iloc[:end], new_page
525
+
526
+ def _stalled_export(df_current: pd.DataFrame):
527
+ if df_current is None or df_current.empty:
528
+ return gr.update(visible=False, value=None)
529
+ bio = export_csv(df_current)
530
+ bio.seek(0)
531
+ ts = int(time.time())
532
+ path = f"/tmp/stalled_export_{ts}.csv"
533
+ with open(path, "wb") as f:
534
+ f.write(bio.read())
535
+ return gr.update(visible=True, value=path, label="Download CSV")
536
+
537
+ # Event bindings
538
+ stalled_reload_btn.click(
539
+ _load_stalled,
540
+ inputs=[stalled_page_size, st_mn, st_bk, st_qn, stalled_sort],
541
+ outputs=[stalled_table, stalled_full_state, stalled_filtered_state, stalled_page_state, stalled_status, stalled_visible_cols, stalled_csv_file],
542
+ )
543
+ stalled_reset_btn.click(
544
+ _load_stalled,
545
+ inputs=[stalled_page_size, st_mn, st_bk, st_qn, stalled_sort],
546
+ outputs=[stalled_table, stalled_full_state, stalled_filtered_state, stalled_page_state, stalled_status, stalled_visible_cols, stalled_csv_file],
547
+ )
548
+ demo.load(
549
+ _load_stalled,
550
+ inputs=[stalled_page_size, st_mn, st_bk, st_qn, stalled_sort],
551
+ outputs=[stalled_table, stalled_full_state, stalled_filtered_state, stalled_page_state, stalled_status, stalled_visible_cols, stalled_csv_file],
552
+ )
553
+
554
+ stalled_apply_btn.click(
555
+ _apply_stalled_filters,
556
+ inputs=[stalled_full_state, stalled_page_size, stalled_search, stalled_visible_cols, st_mn, st_bk, st_qn, stalled_sort],
557
+ outputs=[stalled_table, stalled_filtered_state, stalled_page_state],
558
+ )
559
+
560
+ stalled_more_btn.click(
561
+ _stalled_more,
562
+ inputs=[stalled_filtered_state, stalled_page_state, stalled_page_size, stalled_visible_cols],
563
+ outputs=[stalled_table, stalled_page_state],
564
+ )
565
+
566
+ stalled_export_btn.click(
567
+ _stalled_export,
568
+ inputs=[stalled_filtered_state],
569
+ outputs=[stalled_csv_file],
570
+ )
571
 
 
572
  gr.Markdown(
573
  f"*{APP_NAME} {APP_VERSION}* Β· Loads last **{DEFAULT_DAYS_WINDOW}** days. "
574
  "Set **SOCRATA_APP_TOKEN** for higher API limits. Data is cached for performance."
575
  )
576
 
577
+ return demo
services/data.py CHANGED
@@ -1,7 +1,6 @@
1
  # services/data.py
2
  from __future__ import annotations
3
 
4
- import os
5
  import time
6
  import concurrent.futures
7
  from datetime import datetime, timedelta
@@ -20,6 +19,11 @@ from config import (
20
  DATASET_URLS: Dict[str, str] = {
21
  "job_filings": "https://data.cityofnewyork.us/resource/w9ak-ipjd.json",
22
  "permit_issuance": "https://data.cityofnewyork.us/resource/rbx6-tga4.json",
 
 
 
 
 
23
  }
24
 
25
  # Per dataset core field map
@@ -31,57 +35,108 @@ DATASET_FIELD_MAP: Dict[str, Dict[str, str]] = {
31
  "street_name": "street_name",
32
  "zip": "zip",
33
  "job_id": "job_filing_number",
34
- "job_status": "filing_status", # map to generic
35
  "job_type": "job_type",
36
  "desc": "job_description",
37
  },
38
  "permit_issuance": {
39
- "filing_date": "approved_date", # NOTE: string-ish in this dataset
40
  "borough": "borough",
41
  "house_no": "house__",
42
  "street_name": "street_name",
43
  "zip": "zip_code",
44
  "job_id": "job__",
45
- "permit_type": "permittee_s_license_type", # "GC", "ST", etc
46
  "desc": "job_description",
47
  },
 
 
 
 
 
 
 
 
 
48
  }
49
 
50
  # ---------- Simple in-memory cache ----------
51
  _cache: Dict[str, Tuple[pd.DataFrame, datetime]] = {}
52
- CACHE_TTL_MINUTES = 10 # Cache for 10 minutes
 
53
 
54
  def _get_cached(key: str) -> Optional[pd.DataFrame]:
55
- if key in _cache:
56
- df, cached_at = _cache[key]
57
- if datetime.now() - cached_at < timedelta(minutes=CACHE_TTL_MINUTES):
58
- print(f"[cache] Using cached data for {key}")
59
- return df.copy()
60
- else:
61
- del _cache[key]
 
62
  return None
63
 
 
64
  def _set_cached(key: str, df: pd.DataFrame) -> None:
65
  _cache[key] = (df.copy(), datetime.now())
66
 
 
67
  # ---------- helpers ----------
68
  def _headers() -> Dict[str, str]:
69
- h = {}
 
 
 
 
 
 
70
  if SOCRATA_APP_TOKEN:
71
  h["X-App-Token"] = SOCRATA_APP_TOKEN
72
  return h
73
 
 
74
  def _request(url: str, params: Dict[str, Any]) -> List[Dict[str, Any]]:
75
- r = requests.get(url, headers=_headers(), params=params, timeout=60)
76
- if r.status_code != 200:
77
- raise RuntimeError(f"API request failed: {r.status_code} {r.text}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
78
  return r.json()
79
 
 
80
  def _to_dt_naive(series: pd.Series) -> pd.Series:
81
- # Parse -> UTC aware -> make tz-naive to compare safely everywhere
82
  s = pd.to_datetime(series, errors="coerce", utc=True)
83
  return s.dt.tz_localize(None)
84
 
 
85
  def _norm_borough(series: pd.Series) -> pd.Series:
86
  m = {
87
  "MN": "MANHATTAN",
@@ -89,242 +144,398 @@ def _norm_borough(series: pd.Series) -> pd.Series:
89
  "BK": "BROOKLYN",
90
  "QN": "QUEENS",
91
  "SI": "STATEN ISLAND",
 
 
 
 
 
92
  }
93
  return series.astype(str).str.strip().str.upper().map(lambda x: m.get(x, x))
94
 
95
- def _full_address(df: pd.DataFrame, house_col: str, street_col: str, borough_col: str, zip_col: str | None) -> pd.Series:
 
 
 
 
 
 
 
96
  def join(row):
97
  parts = []
98
  h = str(row.get(house_col, "") or "").strip()
99
  s = str(row.get(street_col, "") or "").strip()
100
  b = str(row.get(borough_col, "") or "").strip()
101
  z = str(row.get(zip_col, "") or "").strip() if zip_col else ""
102
- if h: parts.append(h)
103
- if s: parts.append(s)
104
- if b: parts.append(b)
105
- if z: parts.append(z)
 
 
 
 
106
  return ", ".join(p for p in parts if p)
 
107
  return df.apply(join, axis=1)
108
 
 
109
  def _days_ago_cutoff(days: int) -> Tuple[pd.Timestamp, str]:
110
  now = pd.Timestamp.utcnow().tz_localize(None)
111
  cutoff = now - pd.Timedelta(days=days)
112
- # ISO string UTC Z for server-side when the field is typed, else unused
113
  cutoff_iso = (cutoff.tz_localize("UTC").isoformat()).replace("+00:00", "Z")
114
  return cutoff, cutoff_iso
115
 
 
116
  def _job_base(job_filing_number: str) -> str:
117
  if not isinstance(job_filing_number, str):
118
  return ""
119
  return job_filing_number.split("-", 1)[0].strip()
120
 
121
- def _fetch_page_parallel(url: str, params: Dict[str, Any], page: int, offset: int) -> Tuple[List[Dict], int, float]:
122
- """Helper for parallel page fetching"""
 
 
 
 
 
123
  params_copy = params.copy()
124
  params_copy["$offset"] = offset
125
  t0 = time.time()
126
  rows = _request(url, params_copy)
127
  return rows, page, time.time() - t0
128
 
129
- # ---------- core pulls ----------
130
- def _fetch_filings_last_days(days: int, page_size: int = 30000, max_pages: int = 10) -> pd.DataFrame:
131
- """
132
- Job Filings (w9ak-ipjd). Fetches ALL records for the period using parallel requests.
133
- """
134
- cache_key = f"filings_{days}d"
135
  cached = _get_cached(cache_key)
136
  if cached is not None:
137
  return cached
138
 
139
  url = DATASET_URLS["job_filings"]
140
- fmap = DATASET_FIELD_MAP["job_filings"]
141
- filing_col = fmap["filing_date"]
142
- borough_col = fmap["borough"]
143
-
144
- cutoff_ts, _ = _days_ago_cutoff(days)
145
-
146
- # Only filter by borough in the WHERE clause
147
- where = f"{borough_col} in ('MANHATTAN','BROOKLYN','QUEENS')"
148
 
149
- # First, get a count estimate with a small request
 
 
 
150
  params = {
151
- "$limit": 1,
152
- "$order": f"{filing_col} DESC",
153
- "$where": where,
154
- }
155
-
156
- # Fetch pages in parallel for speed
157
- base_params = {
158
- "$limit": page_size,
159
- "$order": f"{filing_col} DESC",
160
- "$where": where,
161
  }
162
 
163
- all_rows = []
164
- found_old_data = False
165
- batch_size = 3 # Process 3 pages in parallel at a time
 
 
 
 
 
 
 
 
 
166
 
167
- for batch_start in range(0, max_pages, batch_size):
168
- if found_old_data:
169
- break
170
-
171
- batch_end = min(batch_start + batch_size, max_pages)
172
-
173
- with concurrent.futures.ThreadPoolExecutor(max_workers=batch_size) as executor:
174
- futures = []
175
- for page in range(batch_start, batch_end):
176
- offset = page * page_size
177
- future = executor.submit(_fetch_page_parallel, url, base_params, page, offset)
178
- futures.append(future)
179
-
180
- for future in concurrent.futures.as_completed(futures):
181
- rows, page, dt = future.result()
182
- if rows:
183
- print(f"[job_filings] page {page+1}: {len(rows):>5} rows in {dt:3.1f}s")
184
-
185
- # Quick check if we've gone past our date range
186
- if filing_col in pd.DataFrame(rows).columns:
187
- df_check = pd.DataFrame(rows)
188
- df_check["_date"] = _to_dt_naive(df_check[filing_col])
189
- if (df_check["_date"] < cutoff_ts).any():
190
- found_old_data = True
191
-
192
- all_rows.extend(rows)
193
- else:
194
- found_old_data = True
195
-
196
- df = pd.DataFrame(all_rows) if all_rows else pd.DataFrame()
197
  if df.empty:
198
  return df
 
 
199
 
200
- # Normalize
201
- df["filing_date"] = _to_dt_naive(df[filing_col])
202
- df["borough"] = _norm_borough(df[borough_col])
203
-
204
- for c in [fmap["house_no"], fmap["street_name"], fmap["zip"]]:
205
- if c and c not in df.columns:
206
- df[c] = ""
207
-
208
- df["full_address"] = _full_address(df, fmap["house_no"], fmap["street_name"], "borough", fmap["zip"])
209
 
210
- if fmap["job_id"] in df.columns:
211
- df["job_filing_number"] = df[fmap["job_id"]]
 
212
 
213
- if fmap.get("job_status") in df.columns:
214
- df["filing_status"] = df[fmap["job_status"]]
215
 
216
- if fmap.get("job_type") in df.columns:
217
- df["job_type"] = df[fmap["job_type"]]
 
 
 
218
 
219
- if fmap.get("desc") in df.columns:
220
- df["job_description"] = df[fmap["desc"]]
221
 
222
- # Local date filtering
223
- before = len(df)
224
- df = df[df["filing_date"].notna() & (df["filing_date"] >= cutoff_ts) & df["borough"].isin(ALLOWED_BOROUGHS)]
225
- print(f"[job_filings] kept {len(df)}/{before} rows after {days}d filter")
226
-
227
- result = df.sort_values("filing_date", ascending=False, kind="mergesort")
228
- _set_cached(cache_key, result)
229
- return result
230
 
231
- def _fetch_permits_last_days(days: int, page_size: int = 30000, max_pages: int = 10) -> pd.DataFrame:
232
- """
233
- Permit Issuance (rbx6-tga4). Fetches ALL records for the period using parallel requests.
234
- """
235
- cache_key = f"permits_{days}d"
236
  cached = _get_cached(cache_key)
237
  if cached is not None:
238
  return cached
239
 
240
  url = DATASET_URLS["permit_issuance"]
241
- fmap = DATASET_FIELD_MAP["permit_issuance"]
 
242
 
243
- filing_col = fmap["filing_date"] # 'approved_date'
244
- borough_col = fmap["borough"]
 
 
 
245
 
246
- cutoff_ts, _ = _days_ago_cutoff(days)
 
 
 
 
 
 
 
 
 
 
 
 
247
 
248
- where = f"{borough_col} in ('MANHATTAN','BROOKLYN','QUEENS')"
 
249
 
250
- # Fetch pages in parallel for speed
251
- base_params = {
252
- "$limit": page_size,
253
- "$order": f"{filing_col} DESC",
254
- "$where": where,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
255
  }
 
 
 
 
 
 
 
 
 
 
 
 
256
 
257
- all_rows = []
258
- found_old_data = False
259
- batch_size = 3 # Process 3 pages in parallel at a time
260
-
261
- for batch_start in range(0, max_pages, batch_size):
262
- if found_old_data:
263
- break
264
-
265
- batch_end = min(batch_start + batch_size, max_pages)
266
-
267
- with concurrent.futures.ThreadPoolExecutor(max_workers=batch_size) as executor:
268
- futures = []
269
- for page in range(batch_start, batch_end):
270
- offset = page * page_size
271
- future = executor.submit(_fetch_page_parallel, url, base_params, page, offset)
272
- futures.append(future)
273
-
274
- for future in concurrent.futures.as_completed(futures):
275
- rows, page, dt = future.result()
276
- if rows:
277
- print(f"[permits] page {page+1}: {len(rows):>5} rows in {dt:3.1f}s")
278
-
279
- # Quick check if we've gone past our date range
280
- if filing_col in pd.DataFrame(rows).columns:
281
- df_check = pd.DataFrame(rows)
282
- df_check["_date"] = _to_dt_naive(df_check[filing_col])
283
- if (df_check["_date"] < cutoff_ts).any():
284
- found_old_data = True
285
-
286
- all_rows.extend(rows)
287
- else:
288
- found_old_data = True
289
-
290
- df = pd.DataFrame(all_rows) if all_rows else pd.DataFrame()
291
  if df.empty:
292
  return df
 
 
293
 
294
- # Normalize
295
- if filing_col in df.columns:
296
- df["filing_date"] = _to_dt_naive(df[filing_col])
297
- else:
298
- df["filing_date"] = pd.NaT
299
 
300
- if borough_col in df.columns:
301
- df["borough"] = _norm_borough(df[borough_col])
302
- df = df[df["borough"].isin(ALLOWED_BOROUGHS)]
303
- else:
304
- df["borough"] = None
305
 
306
- for c in [fmap["house_no"], fmap["street_name"], fmap["zip"]]:
307
- if c and c not in df.columns:
308
- df[c] = ""
309
 
310
- df["full_address"] = _full_address(df, fmap["house_no"], fmap["street_name"], "borough", fmap["zip"])
 
 
 
 
311
 
312
- job_id = fmap["job_id"]
313
- if job_id in df.columns:
314
- df["job__"] = df[job_id]
315
 
316
- pt = fmap.get("permit_type")
317
- if pt and pt in df.columns:
318
- df["permit_type"] = df[pt]
319
 
320
- # Local time filter (safe tz-naive)
321
- before = len(df)
322
- df = df[df["filing_date"].notna() & (df["filing_date"] >= cutoff_ts)]
323
- print(f"[permits] kept {len(df)}/{before} rows after {days}d filter")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
324
 
325
- result = df.sort_values("filing_date", ascending=False, kind="mergesort")
326
- _set_cached(cache_key, result)
327
- return result
328
 
329
  # ---------- public API ----------
330
  class SocrataClient:
@@ -332,57 +543,30 @@ class SocrataClient:
332
  if not SOCRATA_APP_TOKEN:
333
  print("⚠️ SOCRATA_APP_TOKEN not set – API may cap at 1,000 rows.")
334
 
335
- def fetch_dataset_last_n_days(self, dataset_key: str, days: int) -> Tuple[pd.DataFrame, float]:
 
 
 
 
336
  t0 = time.time()
 
337
  if dataset_key == "job_filings":
338
  df = _fetch_filings_last_days(days)
339
  elif dataset_key == "permit_issuance":
340
  df = _fetch_permits_last_days(days)
 
 
 
 
 
 
341
  else:
342
  raise ValueError(f"Unknown dataset: {dataset_key}")
 
343
  return df, time.time() - t0
344
 
345
- def fetch_leads_unpermitted(self, days: int = DEFAULT_DAYS_WINDOW) -> Tuple[pd.DataFrame, float]:
346
- """
347
- Sales leads = job_filings (last N days, MN/BK/QN) minus permits issued (last N days, MN/BK/QN).
348
- Now with parallel fetching for both datasets!
349
- """
350
- print(f"[leads] pulling last {days} days (parallel)…")
351
- t0 = time.time()
352
-
353
- # Parallel fetch both datasets at the dataset level
354
- with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
355
- filings_future = executor.submit(_fetch_filings_last_days, days)
356
- permits_future = executor.submit(_fetch_permits_last_days, days)
357
-
358
- filings_df = filings_future.result()
359
- permits_df = permits_future.result()
360
-
361
- if filings_df.empty:
362
- return filings_df, time.time() - t0
363
-
364
- # set of permitted bases
365
- permitted_bases = set()
366
- if not permits_df.empty and "job__" in permits_df.columns:
367
- permitted_bases = {
368
- _job_base(str(j)) for j in permits_df["job__"].dropna().astype(str)
369
- }
370
- print(f"[leads] Found {len(permitted_bases)} unique permitted job bases")
371
-
372
- leads = filings_df.copy()
373
- if "job_filing_number" in leads.columns and permitted_bases:
374
- bases = leads["job_filing_number"].dropna().astype(str).map(_job_base)
375
- before = len(leads)
376
- leads = leads[~bases.isin(permitted_bases)].copy()
377
- print(f"[leads] Filtered {before - len(leads)} permitted jobs, {len(leads)} leads remain")
378
-
379
- # add marker
380
- leads["has_permit_already"] = False
381
-
382
- total_time = time.time() - t0
383
- print(f"[leads] Total time: {total_time:.1f}s")
384
-
385
- return (
386
- leads.sort_values("filing_date", ascending=False, kind="mergesort").reset_index(drop=True),
387
- total_time,
388
- )
 
1
  # services/data.py
2
  from __future__ import annotations
3
 
 
4
  import time
5
  import concurrent.futures
6
  from datetime import datetime, timedelta
 
19
  DATASET_URLS: Dict[str, str] = {
20
  "job_filings": "https://data.cityofnewyork.us/resource/w9ak-ipjd.json",
21
  "permit_issuance": "https://data.cityofnewyork.us/resource/rbx6-tga4.json",
22
+ "electrical_permits": "https://data.cityofnewyork.us/resource/dm9a-ab7w.json",
23
+ # Stalled construction complaints (official DOB dataset)
24
+ "stalled_official": "https://data.cityofnewyork.us/resource/i296-73x5.json",
25
+ # BIS Job Application Filings (legacy system - for finding dormant jobs)
26
+ "bis_job_filings": "https://data.cityofnewyork.us/resource/ic3t-wcy2.json",
27
  }
28
 
29
  # Per dataset core field map
 
35
  "street_name": "street_name",
36
  "zip": "zip",
37
  "job_id": "job_filing_number",
38
+ "job_status": "filing_status",
39
  "job_type": "job_type",
40
  "desc": "job_description",
41
  },
42
  "permit_issuance": {
43
+ "filing_date": "approved_date",
44
  "borough": "borough",
45
  "house_no": "house__",
46
  "street_name": "street_name",
47
  "zip": "zip_code",
48
  "job_id": "job__",
49
+ "permit_type": "permittee_s_license_type",
50
  "desc": "job_description",
51
  },
52
+ "electrical_permits": {
53
+ "filing_date": "filing_date",
54
+ "borough": "borough",
55
+ "house_no": "house_number",
56
+ "street_name": "street_name",
57
+ "zip": "zip_code",
58
+ "job_id": "job_filing_number",
59
+ "job_status": "filing_status",
60
+ },
61
  }
62
 
63
  # ---------- Simple in-memory cache ----------
64
  _cache: Dict[str, Tuple[pd.DataFrame, datetime]] = {}
65
+ CACHE_TTL_MINUTES = 10
66
+
67
 
68
  def _get_cached(key: str) -> Optional[pd.DataFrame]:
69
+ entry = _cache.get(key)
70
+ if not entry:
71
+ return None
72
+ df, cached_at = entry
73
+ if datetime.now() - cached_at < timedelta(minutes=CACHE_TTL_MINUTES):
74
+ print(f"[cache] Using cached data for {key}")
75
+ return df.copy()
76
+ del _cache[key]
77
  return None
78
 
79
+
80
  def _set_cached(key: str, df: pd.DataFrame) -> None:
81
  _cache[key] = (df.copy(), datetime.now())
82
 
83
+
84
  # ---------- helpers ----------
85
  def _headers() -> Dict[str, str]:
86
+ """
87
+ Build headers for Socrata API requests.
88
+ SODA3 requires authentication via app token for all requests.
89
+ """
90
+ h: Dict[str, str] = {
91
+ "Accept": "application/json",
92
+ }
93
  if SOCRATA_APP_TOKEN:
94
  h["X-App-Token"] = SOCRATA_APP_TOKEN
95
  return h
96
 
97
+
98
  def _request(url: str, params: Dict[str, Any]) -> List[Dict[str, Any]]:
99
+ """
100
+ Make a request to the Socrata API.
101
+ Handles both SODA2 and SODA3 endpoints.
102
+ """
103
+ headers = _headers()
104
+
105
+ # Log if no token (will likely fail on SODA3)
106
+ if "X-App-Token" not in headers:
107
+ print("⚠️ No SOCRATA_APP_TOKEN - request may be throttled or rejected")
108
+
109
+ try:
110
+ r = requests.get(url, headers=headers, params=params, timeout=60)
111
+ except requests.exceptions.Timeout:
112
+ raise RuntimeError(f"API request timed out for {url}")
113
+ except requests.exceptions.RequestException as e:
114
+ raise RuntimeError(f"API request failed: {e}")
115
+
116
+ if r.status_code == 403:
117
+ raise RuntimeError(
118
+ f"API returned 403 Forbidden. This likely means:\n"
119
+ f" 1. SOCRATA_APP_TOKEN is missing or invalid\n"
120
+ f" 2. The dataset requires authentication\n"
121
+ f" URL: {url}\n"
122
+ f" Response: {r.text[:200]}"
123
+ )
124
+ elif r.status_code == 429:
125
+ raise RuntimeError(
126
+ f"API rate limit exceeded (429). Set SOCRATA_APP_TOKEN for higher limits.\n"
127
+ f" URL: {url}"
128
+ )
129
+ elif r.status_code != 200:
130
+ raise RuntimeError(f"API request failed: {r.status_code} {r.text[:500]}")
131
+
132
  return r.json()
133
 
134
+
135
  def _to_dt_naive(series: pd.Series) -> pd.Series:
 
136
  s = pd.to_datetime(series, errors="coerce", utc=True)
137
  return s.dt.tz_localize(None)
138
 
139
+
140
  def _norm_borough(series: pd.Series) -> pd.Series:
141
  m = {
142
  "MN": "MANHATTAN",
 
144
  "BK": "BROOKLYN",
145
  "QN": "QUEENS",
146
  "SI": "STATEN ISLAND",
147
+ "1": "MANHATTAN",
148
+ "2": "BRONX",
149
+ "3": "BROOKLYN",
150
+ "4": "QUEENS",
151
+ "5": "STATEN ISLAND",
152
  }
153
  return series.astype(str).str.strip().str.upper().map(lambda x: m.get(x, x))
154
 
155
+
156
+ def _full_address(
157
+ df: pd.DataFrame,
158
+ house_col: str,
159
+ street_col: str,
160
+ borough_col: str,
161
+ zip_col: str | None,
162
+ ) -> pd.Series:
163
  def join(row):
164
  parts = []
165
  h = str(row.get(house_col, "") or "").strip()
166
  s = str(row.get(street_col, "") or "").strip()
167
  b = str(row.get(borough_col, "") or "").strip()
168
  z = str(row.get(zip_col, "") or "").strip() if zip_col else ""
169
+ if h:
170
+ parts.append(h)
171
+ if s:
172
+ parts.append(s)
173
+ if b:
174
+ parts.append(b)
175
+ if z:
176
+ parts.append(z)
177
  return ", ".join(p for p in parts if p)
178
+
179
  return df.apply(join, axis=1)
180
 
181
+
182
  def _days_ago_cutoff(days: int) -> Tuple[pd.Timestamp, str]:
183
  now = pd.Timestamp.utcnow().tz_localize(None)
184
  cutoff = now - pd.Timedelta(days=days)
 
185
  cutoff_iso = (cutoff.tz_localize("UTC").isoformat()).replace("+00:00", "Z")
186
  return cutoff, cutoff_iso
187
 
188
+
189
  def _job_base(job_filing_number: str) -> str:
190
  if not isinstance(job_filing_number, str):
191
  return ""
192
  return job_filing_number.split("-", 1)[0].strip()
193
 
194
+
195
+ def _fetch_page_parallel(
196
+ url: str,
197
+ params: Dict[str, Any],
198
+ page: int,
199
+ offset: int,
200
+ ) -> Tuple[List[Dict[str, Any]], int, float]:
201
  params_copy = params.copy()
202
  params_copy["$offset"] = offset
203
  t0 = time.time()
204
  rows = _request(url, params_copy)
205
  return rows, page, time.time() - t0
206
 
207
+
208
+ # ---------- CORE FETCHERS ----------
209
+
210
+ def _fetch_filings_last_days(days: int) -> pd.DataFrame:
211
+ """Fetch DOB NOW job filings from last N days."""
212
+ cache_key = f"job_filings_{days}"
213
  cached = _get_cached(cache_key)
214
  if cached is not None:
215
  return cached
216
 
217
  url = DATASET_URLS["job_filings"]
218
+ cutoff, cutoff_iso = _days_ago_cutoff(days)
 
 
 
 
 
 
 
219
 
220
+ # Use string comparison for date (works for ISO format text dates)
221
+ # Format: YYYY-MM-DD for string comparison
222
+ cutoff_str = cutoff.strftime("%Y-%m-%d")
223
+
224
  params = {
225
+ "$where": f"filing_date > '{cutoff_str}'",
226
+ "$limit": 50000,
227
+ "$order": "filing_date DESC",
 
 
 
 
 
 
 
228
  }
229
 
230
+ try:
231
+ data = _request(url, params)
232
+ except Exception as e:
233
+ print(f"[job_filings] Query failed: {e}")
234
+ # Fallback: no date filter, just get recent by order
235
+ print("[job_filings] Retrying without date filter...")
236
+ try:
237
+ params = {"$limit": 10000, "$order": "filing_date DESC"}
238
+ data = _request(url, params)
239
+ except Exception as e2:
240
+ print(f"[job_filings] Retry also failed: {e2}")
241
+ return pd.DataFrame()
242
 
243
+ df = pd.DataFrame(data)
244
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
245
  if df.empty:
246
  return df
247
+
248
+ print(f"[job_filings] Got {len(df)} rows")
249
 
250
+ # Normalize borough
251
+ if "borough" in df.columns:
252
+ df["borough"] = _norm_borough(df["borough"])
 
 
 
 
 
 
253
 
254
+ # Filter to allowed boroughs
255
+ if "borough" in df.columns:
256
+ df = df[df["borough"].isin(ALLOWED_BOROUGHS)].copy()
257
 
258
+ # Build full address
259
+ df["full_address"] = _full_address(df, "house_no", "street_name", "borough", "zip")
260
 
261
+ # Convert filing_date to datetime
262
+ if "filing_date" in df.columns:
263
+ df["filing_date"] = _to_dt_naive(df["filing_date"])
264
+ # Filter by date in pandas as backup
265
+ df = df[df["filing_date"] >= cutoff].copy()
266
 
267
+ _set_cached(cache_key, df)
268
+ return df
269
 
 
 
 
 
 
 
 
 
270
 
271
+ def _fetch_permits_last_days(days: int) -> pd.DataFrame:
272
+ """Fetch DOB NOW approved permits from last N days."""
273
+ cache_key = f"permit_issuance_{days}"
 
 
274
  cached = _get_cached(cache_key)
275
  if cached is not None:
276
  return cached
277
 
278
  url = DATASET_URLS["permit_issuance"]
279
+ cutoff, cutoff_iso = _days_ago_cutoff(days)
280
+ cutoff_str = cutoff.strftime("%Y-%m-%d")
281
 
282
+ params = {
283
+ "$where": f"approved_date > '{cutoff_str}'",
284
+ "$limit": 50000,
285
+ "$order": "approved_date DESC",
286
+ }
287
 
288
+ try:
289
+ data = _request(url, params)
290
+ except Exception as e:
291
+ print(f"[permit_issuance] Query failed: {e}")
292
+ print("[permit_issuance] Retrying without date filter...")
293
+ try:
294
+ params = {"$limit": 10000, "$order": "approved_date DESC"}
295
+ data = _request(url, params)
296
+ except Exception as e2:
297
+ print(f"[permit_issuance] Retry also failed: {e2}")
298
+ return pd.DataFrame()
299
+
300
+ df = pd.DataFrame(data)
301
 
302
+ if df.empty:
303
+ return df
304
 
305
+ print(f"[permit_issuance] Got {len(df)} rows")
306
+
307
+ # Normalize borough
308
+ if "borough" in df.columns:
309
+ df["borough"] = _norm_borough(df["borough"])
310
+
311
+ # Filter to allowed boroughs
312
+ if "borough" in df.columns:
313
+ df = df[df["borough"].isin(ALLOWED_BOROUGHS)].copy()
314
+
315
+ # Rename to common field name
316
+ if "approved_date" in df.columns:
317
+ df["filing_date"] = _to_dt_naive(df["approved_date"])
318
+ # Filter by date in pandas as backup
319
+ df = df[df["filing_date"] >= cutoff].copy()
320
+
321
+ # Build full address
322
+ df["full_address"] = _full_address(df, "house__", "street_name", "borough", "zip_code")
323
+
324
+ _set_cached(cache_key, df)
325
+ return df
326
+
327
+
328
+ def _fetch_electrical_last_days(days: int) -> pd.DataFrame:
329
+ """Fetch DOB NOW electrical permits from last N days."""
330
+ cache_key = f"electrical_permits_{days}"
331
+ cached = _get_cached(cache_key)
332
+ if cached is not None:
333
+ return cached
334
+
335
+ url = DATASET_URLS["electrical_permits"]
336
+ cutoff, cutoff_iso = _days_ago_cutoff(days)
337
+ cutoff_str = cutoff.strftime("%Y-%m-%d")
338
+
339
+ params = {
340
+ "$where": f"filing_date > '{cutoff_str}'",
341
+ "$limit": 50000,
342
+ "$order": "filing_date DESC",
343
  }
344
+
345
+ try:
346
+ data = _request(url, params)
347
+ except Exception as e:
348
+ print(f"[electrical_permits] Query failed: {e}")
349
+ print("[electrical_permits] Retrying without date filter...")
350
+ try:
351
+ params = {"$limit": 10000, "$order": "filing_date DESC"}
352
+ data = _request(url, params)
353
+ except Exception as e2:
354
+ print(f"[electrical_permits] Retry also failed: {e2}")
355
+ return pd.DataFrame()
356
 
357
+ df = pd.DataFrame(data)
358
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
359
  if df.empty:
360
  return df
361
+
362
+ print(f"[electrical_permits] Got {len(df)} rows")
363
 
364
+ # Normalize borough
365
+ if "borough" in df.columns:
366
+ df["borough"] = _norm_borough(df["borough"])
 
 
367
 
368
+ # Filter to allowed boroughs
369
+ if "borough" in df.columns:
370
+ df = df[df["borough"].isin(ALLOWED_BOROUGHS)].copy()
 
 
371
 
372
+ # Build full address
373
+ df["full_address"] = _full_address(df, "house_number", "street_name", "borough", "zip_code")
 
374
 
375
+ # Convert filing_date to datetime
376
+ if "filing_date" in df.columns:
377
+ df["filing_date"] = _to_dt_naive(df["filing_date"])
378
+ # Filter by date in pandas as backup
379
+ df = df[df["filing_date"] >= cutoff].copy()
380
 
381
+ _set_cached(cache_key, df)
382
+ return df
 
383
 
 
 
 
384
 
385
+ # ---------- STALLED CONSTRUCTION FETCHERS ----------
386
+
387
+ def _fetch_stalled_official() -> pd.DataFrame:
388
+ """
389
+ Fetch official DOB Stalled Construction Sites complaints (i296-73x5).
390
+
391
+ Strategy:
392
+ 1. Paginate through ALL rows (1.4M+)
393
+ 2. Deduplicate by complaint_number (keep most recent)
394
+ 3. Filter to complaints received in the last 18 months
395
+ 4. Filter to allowed boroughs
396
+ """
397
+ cache_key = "stalled_official"
398
+ cached = _get_cached(cache_key)
399
+ if cached is not None:
400
+ return cached
401
+
402
+ url = "https://data.cityofnewyork.us/resource/i296-73x5.json"
403
+
404
+ # Paginate through all data
405
+ all_data = []
406
+ offset = 0
407
+ page_size = 50000
408
+
409
+ print(f"[stalled_official] Fetching all rows (paginating by {page_size})...")
410
+
411
+ while True:
412
+ params = {
413
+ "$limit": page_size,
414
+ "$offset": offset,
415
+ }
416
+
417
+ try:
418
+ data = _request(url, params)
419
+ except Exception as e:
420
+ print(f"[stalled_official] API request failed at offset {offset}: {e}")
421
+ break
422
+
423
+ if not data:
424
+ break
425
+
426
+ all_data.extend(data)
427
+ print(f"[stalled_official] Fetched {len(all_data)} rows so far...")
428
+
429
+ if len(data) < page_size:
430
+ break
431
+
432
+ offset += page_size
433
+
434
+ if not all_data:
435
+ print("[stalled_official] No data returned from API")
436
+ return pd.DataFrame()
437
+
438
+ df = pd.DataFrame(all_data)
439
+ print(f"[stalled_official] Total rows fetched: {len(df)}")
440
+
441
+ # Deduplicate by complaint_number - keep first occurrence (arbitrary, they should be same complaint)
442
+ if "complaint_number" in df.columns:
443
+ before = len(df)
444
+ df = df.drop_duplicates(subset=["complaint_number"], keep="first")
445
+ print(f"[stalled_official] Deduplicated: {before} -> {len(df)} unique complaints")
446
+
447
+ # Parse complaint date
448
+ if "date_complaint_received" in df.columns:
449
+ df["complaint_date"] = pd.to_datetime(df["date_complaint_received"], errors="coerce")
450
+
451
+ # Filter to only complaints from the last 18 months
452
+ cutoff_date = pd.Timestamp.today() - pd.DateOffset(months=18)
453
+ before_filter = len(df)
454
+ df = df[df["complaint_date"] >= cutoff_date].copy()
455
+ print(f"[stalled_official] 18-month filter (>= {cutoff_date.date()}): {before_filter} -> {len(df)} rows")
456
+
457
+ # Calculate days stalled
458
+ df["days_stalled"] = (pd.Timestamp.today() - df["complaint_date"]).dt.days
459
+
460
+ # Map borough to standard names
461
+ boro_map = {
462
+ "1": "MANHATTAN", "MANHATTAN": "MANHATTAN", "Manhattan": "MANHATTAN",
463
+ "2": "BRONX", "BRONX": "BRONX", "Bronx": "BRONX",
464
+ "3": "BROOKLYN", "BROOKLYN": "BROOKLYN", "Brooklyn": "BROOKLYN",
465
+ "4": "QUEENS", "QUEENS": "QUEENS", "Queens": "QUEENS",
466
+ "5": "STATEN ISLAND", "STATEN ISLAND": "STATEN ISLAND", "Staten Island": "STATEN ISLAND",
467
+ }
468
+ if "borough_name" in df.columns:
469
+ df["borough"] = df["borough_name"].astype(str).str.strip().map(lambda x: boro_map.get(x, x.upper() if isinstance(x, str) else x))
470
+
471
+ # Filter to allowed boroughs
472
+ if "borough" in df.columns:
473
+ before_boro = len(df)
474
+ df = df[df["borough"].isin(ALLOWED_BOROUGHS)].copy()
475
+ print(f"[stalled_official] Borough filter: {before_boro} -> {len(df)} rows")
476
+
477
+ # Build full address
478
+ if "house_number" in df.columns and "street_name" in df.columns:
479
+ df["full_address"] = (
480
+ df["house_number"].fillna("").astype(str).str.strip() + " " +
481
+ df["street_name"].fillna("").astype(str).str.title().str.strip() + ", " +
482
+ df.get("borough", "").fillna("").astype(str)
483
+ )
484
+
485
+ # Sort by most stalled first (oldest complaint = most days)
486
+ if "days_stalled" in df.columns:
487
+ df = df.sort_values("days_stalled", ascending=False).reset_index(drop=True)
488
+
489
+ print(f"[stalled_official] Final: {len(df)} active stalled sites")
490
+ _set_cached(cache_key, df)
491
+ return df
492
+
493
+
494
+ def _fetch_likely_stalled() -> pd.DataFrame:
495
+ """
496
+ Fetch stalled construction projects from DOB Stalled Construction Sites API.
497
+ This is now just an alias for the official stalled feed.
498
+ """
499
+ return _fetch_stalled_official()
500
+
501
+
502
+ # ---------- LEADS UNPERMITTED ----------
503
+
504
+ def _fetch_leads_unpermitted(days: int = DEFAULT_DAYS_WINDOW) -> Tuple[pd.DataFrame, float]:
505
+ """
506
+ Find filings that don't have corresponding permits yet.
507
+ Cross-reference job_filings with permit_issuance.
508
+ """
509
+ t0 = time.time()
510
+
511
+ # Get filings
512
+ filings_df = _fetch_filings_last_days(days)
513
+ if filings_df.empty:
514
+ return pd.DataFrame(), time.time() - t0
515
+
516
+ # Get permits
517
+ permits_df = _fetch_permits_last_days(days)
518
+
519
+ # Extract base job numbers for matching
520
+ if "job_filing_number" in filings_df.columns:
521
+ filings_df["_job_base"] = filings_df["job_filing_number"].apply(_job_base)
522
+ else:
523
+ filings_df["_job_base"] = ""
524
+
525
+ # Get permitted job bases
526
+ permitted_jobs = set()
527
+ if not permits_df.empty and "job__" in permits_df.columns:
528
+ permitted_jobs = set(permits_df["job__"].dropna().astype(str).str.strip())
529
+
530
+ # Filter to unpermitted filings
531
+ mask = ~filings_df["_job_base"].isin(permitted_jobs)
532
+ unpermitted = filings_df[mask].copy()
533
+
534
+ # Drop helper column
535
+ unpermitted.drop(columns=["_job_base"], inplace=True, errors="ignore")
536
+
537
+ return unpermitted, time.time() - t0
538
 
 
 
 
539
 
540
  # ---------- public API ----------
541
  class SocrataClient:
 
543
  if not SOCRATA_APP_TOKEN:
544
  print("⚠️ SOCRATA_APP_TOKEN not set – API may cap at 1,000 rows.")
545
 
546
+ def fetch_dataset_last_n_days(
547
+ self,
548
+ dataset_key: str,
549
+ days: int,
550
+ ) -> Tuple[pd.DataFrame, float]:
551
  t0 = time.time()
552
+
553
  if dataset_key == "job_filings":
554
  df = _fetch_filings_last_days(days)
555
  elif dataset_key == "permit_issuance":
556
  df = _fetch_permits_last_days(days)
557
+ elif dataset_key == "electrical_permits":
558
+ df = _fetch_electrical_last_days(days)
559
+ elif dataset_key == "stalled_official":
560
+ df = _fetch_stalled_official()
561
+ elif dataset_key == "likely_stalled":
562
+ df = _fetch_likely_stalled()
563
  else:
564
  raise ValueError(f"Unknown dataset: {dataset_key}")
565
+
566
  return df, time.time() - t0
567
 
568
+ def fetch_leads_unpermitted(
569
+ self,
570
+ days: int = DEFAULT_DAYS_WINDOW,
571
+ ) -> Tuple[pd.DataFrame, float]:
572
+ return _fetch_leads_unpermitted(days)