Spaces:
Sleeping
Sleeping
| import gspread | |
| from oauth2client.service_account import ServiceAccountCredentials | |
| from typing import Dict | |
| class SheetCRUDRepository: | |
| def __init__(self, worksheet): | |
| self.worksheet = worksheet | |
| self.titles = self.worksheet.row_values(1) # Assuming titles are in the first row | |
| assert len(set(self.titles)) == len(self.titles), f"Failed to init {SheetCRUDRepository.__class__}, titles: {self.titles} contain duplicated values!" | |
| def create(self, data: Dict): | |
| values = [data.get(title, '') for title in self.titles] | |
| self.worksheet.append_row(values) | |
| def read(self, row_index: int) -> Dict: | |
| """ | |
| return {} if empty | |
| """ | |
| values = self.worksheet.row_values(row_index) | |
| return {title: value for title, value in zip(self.titles, values)} | |
| def update(self, row_index: int, data: Dict): | |
| values = [data.get(title, '') for title in self.titles] | |
| self.worksheet.update(f"A{row_index}:Z{row_index}", [values]) | |
| def delete(self, row_index: int): | |
| self.worksheet.delete_row(row_index) | |
| def find(self, search_dict): | |
| for col_title, value in search_dict.items(): | |
| if col_title in self.titles: | |
| col_index = self.titles.index(col_title) + 1 # Adding 1 to match gspread indexing | |
| cell = self.worksheet.find(value, in_column=col_index) | |
| if cell is None: | |
| break | |
| row_number = cell.row | |
| return row_number, self.read(row_number) | |
| return None | |
| class Converter: | |
| def parse_one_to_obj(field_name, value): | |
| if value in ['TRUE', 'FALSE']: | |
| return field_name, value == 'TRUE' | |
| if isinstance(value, str): | |
| if value.startswith('[DURATION]'): | |
| if 'NONE' in value.upper(): | |
| return field_name, None | |
| value = value.replace('[DURATION]', '').replace("\n", '').rstrip() | |
| sign = 1 | |
| if 'before' in value: | |
| sign = -1 | |
| if 'after' in value: | |
| sign = 1 | |
| value = value.replace('after', '').replace('before', '').rstrip() | |
| if 'h' in value: | |
| value = value.replace('h', '') | |
| return field_name, {"hours": int(value) * sign} | |
| if 'm' in value: | |
| value = value.replace('m', '') | |
| return field_name, {"minutes": int(value) * sign} | |
| return field_name, value | |
| def parse_one_to_row(field_name, value): | |
| if isinstance(value, str): | |
| if value in ['TRUE', 'FALSE']: | |
| return field_name, value == "TRUE" | |
| if isinstance(value, dict): | |
| if 'hours' in value or 'minutes' in value: | |
| # ignore | |
| return None, None | |
| return field_name, value | |
| def convert_to_obj(row): | |
| if row is None: | |
| return None | |
| obj = {} | |
| for key in row.keys(): | |
| new_key, value = Converter.parse_one_to_obj(key, row[key]) | |
| if new_key is not None: | |
| obj[new_key] = value | |
| return obj | |
| def convert_to_row(obj): | |
| if obj is None: | |
| return None | |
| row = {} | |
| for key in obj.keys(): | |
| new_key, value = Converter.parse_one_to_row(key, obj[key]) | |
| if new_key is not None: | |
| row[new_key] = value | |
| return row | |
| def create_repositories(): | |
| scope = [ | |
| 'https://www.googleapis.com/auth/spreadsheets', | |
| 'https://www.googleapis.com/auth/drive' | |
| ] | |
| creds = ServiceAccountCredentials.from_json_keyfile_name('credentials.json', scope) | |
| client = gspread.authorize(creds) | |
| # sheet_url = "https://docs.google.com/spreadsheets/d/17OxKF0iP_aJJ0HCgJkwFsH762EUrtcEIYcPmyiiKnaM" | |
| sheet_url = "https://docs.google.com/spreadsheets/d/1KzUYgWwbvYXGfyehOTyZCCQf0udZiwVXxaxpmkXEe3E/edit?usp=sharing" | |
| sheet = client.open_by_url(sheet_url) | |
| run_stt_repository = SheetCRUDRepository(sheet.get_worksheet(0)) | |
| config_repository = SheetCRUDRepository(sheet.get_worksheet(1)) | |
| log_repository = SheetCRUDRepository(sheet.get_worksheet(2)) | |
| secret_repository = SheetCRUDRepository(sheet.get_worksheet(3)) | |
| return run_stt_repository, config_repository, log_repository, secret_repository | |
| run_stt_repo, conf_repo, log_repo, secret_repo = create_repositories() | |
| if __name__ == "__main__": | |
| a = create_repositories() | |
| print(a) |