Skip to content

Commit 5d39c03

Browse files
committed
Format
1 parent 94ec1e8 commit 5d39c03

File tree

7 files changed

+27
-10
lines changed

7 files changed

+27
-10
lines changed

mars/deploy/oscar/tests/test_ray_error_recovery.py

+14
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
# Copyright 1999-2021 Alibaba Group Holding Ltd.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
115
import os
216
import numpy as np
317
import pandas as pd

mars/services/task/execution/mars/executor.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
from .....utils import Timer
3838
from ....context import ThreadedServiceContext
3939
from ....cluster.api import ClusterAPI
40-
from ....cluster.core import NodeStatus
4140
from ....lifecycle.api import LifecycleAPI
4241
from ....meta.api import MetaAPI
4342
from ....scheduling import SchedulingAPI
@@ -95,7 +94,8 @@ def __init__(
9594
self._meta_api = meta_api
9695

9796
self._stage_processors = []
98-
self._stage_id_to_processor = weakref.WeakValueDictionary()
97+
self._stage_tile_progresses = []
98+
self._stage_id_to_processor = weakref.WeakValueDictionary()
9999
self._cur_stage_processor = None
100100
self._result_tileables_lifecycle = None
101101
self._subtask_decref_events = dict()

mars/services/task/execution/mars/stage.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -279,8 +279,8 @@ async def set_subtask_result(self, result: SubtaskResult, band: BandType = None)
279279
)
280280
await self.cancel()
281281
else:
282-
await async_call(
283-
self._scheduling_api.finish_subtasks([result.subtask_id], bands=[band])
282+
await self._scheduling_api.finish_subtasks(
283+
[result.subtask_id], bands=[band]
284284
)
285285
logger.debug(
286286
"Continue to schedule subtasks after subtask %s finished.",
@@ -306,7 +306,7 @@ async def set_subtask_result(self, result: SubtaskResult, band: BandType = None)
306306
self.task.task_id,
307307
self.stage_id,
308308
)
309-
await async_call(self._schedule_subtasks([to_schedule_subtask]))
309+
await self._schedule_subtasks([to_schedule_subtask])
310310
except KeyError:
311311
logger.exception("Got KeyError.")
312312

mars/services/task/execution/ray/executor.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -1074,8 +1074,8 @@ def gc():
10741074
last_check_slow_time = curr_time
10751075
# Fast to next loop and give it a chance to update object_ref_to_subtask.
10761076
await asyncio.sleep(interval_seconds if len(ready_objects) == 0 else 0)
1077-
1078-
def get_stage_generation_order(self, stage_id: str):
1077+
1078+
def get_stage_generation_order(self, stage_id: str):
10791079
raise NotImplementedError(
10801080
"RayTaskExecutor doesn't support stage generation order."
10811081
)

mars/services/task/supervisor/processor.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
from ....typing import TileableType, ChunkType
3131
from ....utils import Timer
3232
from ...context import FailOverContext
33-
from ...subtask import SubtaskResult, Subtask
33+
from ...subtask import SubtaskResult, SubtaskGraph, Subtask
3434
from ..core import Task, TaskResult, TaskStatus, new_task_id
3535
from ..execution.api import TaskExecutor, ExecutionChunkResult
3636
from .preprocessor import TaskPreprocessor
@@ -481,8 +481,9 @@ def _finish(self):
481481

482482
def is_done(self) -> bool:
483483
return self.done.is_set()
484+
484485
def get_generation_order(self, stage_id: str):
485486
return self._executor.get_stage_generation_order(stage_id)
486487

487488
def get_subtask(self, chunk_data_key: str):
488-
return self._chunk_data_key_to_subtask.get(chunk_data_key)
489+
return self._chunk_data_key_to_subtask.get(chunk_data_key)

mars/services/task/supervisor/tests/task_preprocessor.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ def analyze(
177177
self._config,
178178
chunk_to_subtasks,
179179
shuffle_fetch_type=shuffle_fetch_type,
180-
stage_id=stage_id,
180+
stage_id=stage_id,
181181
)
182182
subtask_graph = analyzer.gen_subtask_graph()
183183
results = set(

mars/storage/ray.py

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import inspect
16+
import logging
1617

1718
from typing import Any, Dict, List, Tuple
1819
from ..lib import sparse
@@ -28,6 +29,7 @@
2829
from .core import BufferWrappedFileObject, StorageFileObject
2930

3031
ray = lazy_import("ray")
32+
logger = logging.getLogger(__name__)
3133

3234

3335
# TODO(fyrestone): make the SparseMatrix pickleable.

0 commit comments

Comments
 (0)