Source code for sagemaker.serve.utils.logging_agent
"""Module for pulling logs from container"""
from __future__ import absolute_import
import logging
from threading import Thread
import queue
from datetime import datetime
from sagemaker.serve.utils.exceptions import (
LocalModelLoadException,
LocalModelOutOfMemoryException,
LocalModelInvocationException,
SkipTuningComboException,
)
logger = logging.getLogger(__name__)
def _get_logs(generator, logs, until):
"""Placeholder docstring"""
now = datetime.now()
try:
for next_entry in generator:
logs.put(next_entry, block=True, timeout=(until - now).total_seconds())
now = datetime.now()
if until < now:
break
logger.debug("Container logging done. All container logs processed.")
except (StopIteration, ValueError):
logger.debug("Container logging stopped. All container logs processed.")
[docs]
def pull_logs(generator, stop, until, final_pull):
"""Placeholder docstring"""
now = datetime.now()
if until < now:
return
logs = queue.Queue(maxsize=10)
log_puller = Thread(
target=_get_logs,
args=(
generator,
logs,
until,
),
)
log_puller.start()
while True:
try:
top = logs.get(block=True, timeout=(until - now).total_seconds())
logger.debug(top)
if "[INFO ]" in top and "OutOfMemoryError" in top:
raise LocalModelOutOfMemoryException(top)
if "CUDA out of memory. Tried to allocate" in top:
raise LocalModelOutOfMemoryException(top)
if "ai.djl.engine.EngineException: OOM" in top:
raise LocalModelOutOfMemoryException(top)
if "4xx.Count:1" in top or "5xx.Count:1" in top:
raise LocalModelInvocationException(top)
if "[ERROR]" in top or "Failed register workflow" in top:
raise LocalModelLoadException(top)
if "Address already in use" in top:
raise LocalModelLoadException(top)
if "not compatible with sharding" in top:
raise SkipTuningComboException(top)
except (
LocalModelLoadException,
LocalModelOutOfMemoryException,
LocalModelInvocationException,
) as e:
stop()
log_puller.join(timeout=5)
raise e
except queue.Empty:
now = datetime.now()
if until < now:
if not final_pull:
return
stop()
# allow logging thread some time to cleanup
log_puller.join(timeout=5)
if log_puller.is_alive():
raise Exception("Logging thread not terminating")
return