Skip to content

Commit f79bb4a

Browse files
Refactor final linter fixes (#313)
1 parent b0d009f commit f79bb4a

File tree

18 files changed

+93
-87
lines changed

18 files changed

+93
-87
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@ repos:
33
rev: v0.12.3
44
hooks:
55
- id: ruff
6-
args: ["--exit-zero"]
6+
args: ["."]

pyproject.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,6 @@ select = [
8787
"TID",
8888
# implicit string concatenation
8989
"ISC",
90-
# flake8-type-checking
91-
"TC",
9290
# comprehensions
9391
"C4",
9492
# pygrep-hooks
@@ -128,7 +126,9 @@ ignore = [
128126
# too-many-branches
129127
"PLR0912",
130128
# too-many-return-statements
131-
"PLR0911"
129+
"PLR0911",
130+
# flake8-type-checking (150 errors)
131+
"TC"
132132
]
133133

134134
[tool.ruff.lint.isort]

src/conductor/client/ai/orchestrator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def associate_prompt_template(self, name: str, ai_integration: str, ai_models: L
5050
def test_prompt_template(self, text: str, variables: dict,
5151
ai_integration: str,
5252
text_complete_model: str,
53-
stop_words: List[str] = None, max_tokens: int = 100,
53+
stop_words: Optional[List[str]] = None, max_tokens: int = 100,
5454
temperature: int = 0,
5555
top_p: int = 1):
5656
stop_words = stop_words or []

src/conductor/client/automator/task_handler.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@
2929
set_start_method("fork")
3030
_mp_fork_set = True
3131
except Exception as e:
32-
logger.info(f"error when setting multiprocessing.set_start_method - maybe the context is set {e.args}")
32+
logger.info("error when setting multiprocessing.set_start_method - maybe the context is set %s", e.args)
3333
if platform == "darwin":
3434
os.environ["no_proxy"] = "*"
3535

3636
def register_decorated_fn(name: str, poll_interval: int, domain: str, worker_id: str, func):
37-
logger.info(f"decorated {name}")
37+
logger.info("decorated %s", name)
3838
_decorated_functions[(name, domain)] = {
3939
"func": func,
4040
"poll_interval": poll_interval,
@@ -60,24 +60,24 @@ def __init__(
6060
importlib.import_module("conductor.client.worker.worker_task")
6161
if import_modules is not None:
6262
for module in import_modules:
63-
logger.info(f"loading module {module}")
63+
logger.info("loading module %s", module)
6464
importlib.import_module(module)
6565

6666
elif not isinstance(workers, list):
6767
workers = [workers]
6868
if scan_for_annotated_workers is True:
6969
for (task_def_name, domain), record in _decorated_functions.items():
70-
fn = record['func']
71-
worker_id = record['worker_id']
72-
poll_interval = record['poll_interval']
70+
fn = record["func"]
71+
worker_id = record["worker_id"]
72+
poll_interval = record["poll_interval"]
7373

7474
worker = Worker(
7575
task_definition_name=task_def_name,
7676
execute_function=fn,
7777
worker_id=worker_id,
7878
domain=domain,
7979
poll_interval=poll_interval)
80-
logger.info(f"created worker with name={task_def_name} and domain={domain}")
80+
logger.info("created worker with name=%s and domain=%s", task_def_name, domain)
8181
workers.append(worker)
8282

8383
self.__create_task_runner_processes(workers, configuration, metrics_settings)
@@ -156,7 +156,7 @@ def __start_task_runner_processes(self):
156156
for task_runner_process in self.task_runner_processes:
157157
task_runner_process.start()
158158
n = n + 1
159-
logger.info(f"Started {n} TaskRunner process")
159+
logger.info("Started %s TaskRunner process", n)
160160

161161
def __join_metrics_provider_process(self):
162162
if self.metrics_provider_process is None:
@@ -180,12 +180,12 @@ def __stop_process(self, process: Process):
180180
if process is None:
181181
return
182182
try:
183-
logger.debug(f"Terminating process: {process.pid}")
183+
logger.debug("Terminating process: %s", process.pid)
184184
process.terminate()
185185
except Exception as e:
186-
logger.debug(f"Failed to terminate process: {process.pid}, reason: {e}")
186+
logger.debug("Failed to terminate process: %s, reason: %s", process.pid, e)
187187
process.kill()
188-
logger.debug(f"Killed process: {process.pid}")
188+
logger.debug("Killed process: %s", process.pid)
189189

190190

191191
# Setup centralized logging queue

src/conductor/client/automator/task_runner.py

Lines changed: 46 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,12 @@ def run(self) -> None:
5454
logger.setLevel(logging.DEBUG)
5555

5656
task_names = ",".join(self.worker.task_definition_names)
57-
logger.info(f"Polling task {task_names} with domain {self.worker.get_domain()} with polling "
58-
f"interval {self.worker.get_polling_interval_in_seconds()}")
57+
logger.info(
58+
"Polling task %s with domain %s with polling interval %s",
59+
task_names,
60+
self.worker.get_domain(),
61+
self.worker.get_polling_interval_in_seconds()
62+
)
5963

6064
while True:
6165
self.run_once()
@@ -68,13 +72,13 @@ def run_once(self) -> None:
6872
self.__update_task(task_result)
6973
self.__wait_for_polling_interval()
7074
self.worker.clear_task_definition_name_cache()
71-
except Exception as e:
75+
except Exception:
7276
pass
7377

7478
def __poll_task(self) -> Task:
7579
task_definition_name = self.worker.get_task_definition_name()
7680
if self.worker.paused():
77-
logger.debug(f"Stop polling task for: {task_definition_name}")
81+
logger.debug("Stop polling task for: %s", task_definition_name)
7882
return None
7983
if self.metrics_collector is not None:
8084
self.metrics_collector.increment_task_poll(
@@ -104,24 +108,29 @@ def __poll_task(self) -> Task:
104108
if self.metrics_collector is not None:
105109
self.metrics_collector.increment_task_poll_error(task_definition_name, type(e))
106110
logger.error(
107-
f"Failed to poll task for: {task_definition_name}, reason: {traceback.format_exc()}"
111+
"Failed to poll task for: %s, reason: %s",
112+
task_definition_name,
113+
traceback.format_exc()
108114
)
109115
return None
110116
if task is not None:
111117
logger.debug(
112-
f"Polled task: {task_definition_name}, worker_id: {self.worker.get_identity()}, domain: {self.worker.get_domain()}")
118+
"Polled task: %s, worker_id: %s, domain: %s",
119+
task_definition_name,
120+
self.worker.get_identity(),
121+
self.worker.get_domain()
122+
)
113123
return task
114124

115125
def __execute_task(self, task: Task) -> TaskResult:
116126
if not isinstance(task, Task):
117127
return None
118128
task_definition_name = self.worker.get_task_definition_name()
119129
logger.debug(
120-
"Executing task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}".format(
121-
task_id=task.task_id,
122-
workflow_instance_id=task.workflow_instance_id,
123-
task_definition_name=task_definition_name
124-
)
130+
"Executing task, id: %s, workflow_instance_id: %s, task_definition_name: %s",
131+
task.task_id,
132+
task.workflow_instance_id,
133+
task_definition_name
125134
)
126135
try:
127136
start_time = time.time()
@@ -138,11 +147,10 @@ def __execute_task(self, task: Task) -> TaskResult:
138147
sys.getsizeof(task_result)
139148
)
140149
logger.debug(
141-
"Executed task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}".format(
142-
task_id=task.task_id,
143-
workflow_instance_id=task.workflow_instance_id,
144-
task_definition_name=task_definition_name
145-
)
150+
"Executed task, id: %s, workflow_instance_id: %s, task_definition_name: %s",
151+
task.task_id,
152+
task.workflow_instance_id,
153+
task_definition_name
146154
)
147155
except Exception as e:
148156
if self.metrics_collector is not None:
@@ -159,12 +167,12 @@ def __execute_task(self, task: Task) -> TaskResult:
159167
task_result.logs = [TaskExecLog(
160168
traceback.format_exc(), task_result.task_id, int(time.time()))]
161169
logger.error(
162-
"Failed to execute task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, reason: {reason}".format(
163-
task_id=task.task_id,
164-
workflow_instance_id=task.workflow_instance_id,
165-
task_definition_name=task_definition_name,
166-
reason=traceback.format_exc()
167-
)
170+
"Failed to execute task, id: %s, workflow_instance_id: %s, "
171+
"task_definition_name: %s, reason: %s",
172+
task.task_id,
173+
task.workflow_instance_id,
174+
task_definition_name,
175+
traceback.format_exc()
168176
)
169177
return task_result
170178

@@ -173,11 +181,10 @@ def __update_task(self, task_result: TaskResult):
173181
return None
174182
task_definition_name = self.worker.get_task_definition_name()
175183
logger.debug(
176-
"Updating task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}".format(
177-
task_id=task_result.task_id,
178-
workflow_instance_id=task_result.workflow_instance_id,
179-
task_definition_name=task_definition_name
180-
)
184+
"Updating task, id: %s, workflow_instance_id: %s, task_definition_name: %s",
185+
task_result.task_id,
186+
task_result.workflow_instance_id,
187+
task_definition_name
181188
)
182189
for attempt in range(4):
183190
if attempt > 0:
@@ -186,12 +193,11 @@ def __update_task(self, task_result: TaskResult):
186193
try:
187194
response = self.task_client.update_task(body=task_result)
188195
logger.debug(
189-
"Updated task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, response: {response}".format(
190-
task_id=task_result.task_id,
191-
workflow_instance_id=task_result.workflow_instance_id,
192-
task_definition_name=task_definition_name,
193-
response=response
194-
)
196+
"Updated task, id: %s, workflow_instance_id: %s, task_definition_name: %s, response: %s",
197+
task_result.task_id,
198+
task_result.workflow_instance_id,
199+
task_definition_name,
200+
response
195201
)
196202
return response
197203
except Exception as e:
@@ -200,13 +206,11 @@ def __update_task(self, task_result: TaskResult):
200206
task_definition_name, type(e)
201207
)
202208
logger.error(
203-
"Failed to update task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, "
204-
"task_definition_name: {task_definition_name}, reason: {reason}".format(
205-
task_id=task_result.task_id,
206-
workflow_instance_id=task_result.workflow_instance_id,
207-
task_definition_name=task_definition_name,
208-
reason=traceback.format_exc()
209-
)
209+
"Failed to update task, id: %s, workflow_instance_id: %s, task_definition_name: %s, reason: %s",
210+
task_result.task_id,
211+
task_result.workflow_instance_id,
212+
task_definition_name,
213+
traceback.format_exc()
210214
)
211215
return None
212216

@@ -230,14 +234,14 @@ def __set_worker_properties(self) -> None:
230234
try:
231235
self.worker.poll_interval = float(polling_interval)
232236
except Exception:
233-
logger.error(f"error reading and parsing the polling interval value {polling_interval}")
237+
logger.error("error reading and parsing the polling interval value %s", polling_interval)
234238
self.worker.poll_interval = self.worker.get_polling_interval_in_seconds()
235239

236240
if polling_interval:
237241
try:
238242
self.worker.poll_interval = float(polling_interval)
239243
except Exception as e:
240-
logger.error("Exception in reading polling interval from environment variable: {0}.".format(str(e)))
244+
logger.error("Exception in reading polling interval from environment variable: %s", e)
241245

242246
def __get_property_value_from_env(self, prop, task_type):
243247
"""

src/conductor/client/automator/utils.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def convert_from_dict(cls: type, data: dict) -> object:
5454
if not ((str(typ).startswith("dict[") or
5555
str(typ).startswith("typing.Dict[") or
5656
str(typ).startswith("requests.structures.CaseInsensitiveDict[") or
57-
typ == dict or str(typ).startswith("OrderedDict["))):
57+
typ is dict or str(typ).startswith("OrderedDict["))):
5858
data = {}
5959

6060
members = inspect.signature(cls.__init__).parameters
@@ -82,7 +82,7 @@ def convert_from_dict(cls: type, data: dict) -> object:
8282
elif (str(typ).startswith("dict[") or
8383
str(typ).startswith("typing.Dict[") or
8484
str(typ).startswith("requests.structures.CaseInsensitiveDict[") or
85-
typ == dict or str(typ).startswith("OrderedDict[")):
85+
typ is dict or str(typ).startswith("OrderedDict[")):
8686

8787
values = {}
8888
generic_type = object
@@ -111,13 +111,10 @@ def get_value(typ: type, val: object) -> object:
111111
if typ in simple_types:
112112
return val
113113
elif str(typ).startswith("typing.List[") or str(typ).startswith("typing.Set[") or str(typ).startswith("list["):
114-
values = []
115-
for val in val:
116-
converted = get_value(type(val), val)
117-
values.append(converted)
114+
values = [get_value(type(item), item) for item in val]
118115
return values
119116
elif str(typ).startswith("dict[") or str(typ).startswith(
120-
"typing.Dict[") or str(typ).startswith("requests.structures.CaseInsensitiveDict[") or typ == dict:
117+
"typing.Dict[") or str(typ).startswith("requests.structures.CaseInsensitiveDict[") or typ is dict:
121118
values = {}
122119
for k in val:
123120
v = val[k]

src/conductor/client/configuration/settings/metrics_settings.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
from __future__ import annotations
22
import logging
33
import os
4-
from typing import Optional
54
from pathlib import Path
65

6+
from typing import Optional
7+
78
from conductor.client.configuration.configuration import Configuration
89

910
logger = logging.getLogger(
@@ -20,7 +21,7 @@ def get_default_temporary_folder() -> str:
2021
class MetricsSettings:
2122
def __init__(
2223
self,
23-
directory: str = None,
24+
directory: Optional[str] = None,
2425
file_name: str = "metrics.log",
2526
update_interval: float = 0.1):
2627
if directory is None:
@@ -35,6 +36,6 @@ def __set_dir(self, dir: str) -> None:
3536
os.mkdir(dir)
3637
except Exception as e:
3738
logger.warning(
38-
"Failed to create metrics temporary folder, reason: ", e)
39+
"Failed to create metrics temporary folder, reason: %s", e)
3940

4041
self.directory = dir

src/conductor/client/exceptions/api_exception_handler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,4 @@ def decorate(cls):
5959
if callable(getattr(cls, attr)) and attr not in exclude_local:
6060
setattr(cls, attr, decorator(getattr(cls, attr)))
6161
return cls
62-
return decorate
62+
return decorate

src/conductor/client/helpers/helper.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import logging
33
import re
44
from dateutil.parser import parse
5-
from typing import Any, ClassVar, Dict, Tuple
5+
from typing import ClassVar, Dict, Tuple
66

77
import six
88
from requests.structures import CaseInsensitiveDict
@@ -19,10 +19,10 @@
1919

2020

2121
class ObjectMapper(object):
22-
PRIMITIVE_TYPES = (float, bool, bytes, six.text_type) + six.integer_types
23-
NATIVE_TYPES_MAPPING = {
22+
PRIMITIVE_TYPES: ClassVar[Tuple] = (float, bool, bytes, six.text_type, *six.integer_types)
23+
NATIVE_TYPES_MAPPING: ClassVar[Dict] = {
2424
"int": int,
25-
"long": int if six.PY3 else long, # noqa: F821
25+
"long": int if six.PY3 else long, # noqa: F821, YTT202
2626
"float": float,
2727
"str": str,
2828
"bool": bool,
@@ -68,7 +68,7 @@ def __deserialize(self, data, klass):
6868
if data is None:
6969
return None
7070

71-
if type(klass) == str:
71+
if isinstance(klass, str):
7272
if klass.startswith("list["):
7373
sub_kls = re.match(r"list\[(.*)\]", klass).group(1)
7474

src/conductor/client/orkes/orkes_workflow_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ def get_by_correlation_ids(
202202
def remove_workflow(self, workflow_id: str):
203203
self.workflowResourceApi.delete(workflow_id)
204204

205-
def update_variables(self, workflow_id: str, variables: Dict[str, object] = None) -> None:
205+
def update_variables(self, workflow_id: str, variables: Optional[Dict[str, object]] = None) -> None:
206206
variables = variables or {}
207207
self.workflowResourceApi.update_workflow_state(variables, workflow_id)
208208

0 commit comments

Comments
 (0)