SageMaker Lineage Tracking - V3 SDK Example#
This notebook demonstrates how to use SageMaker Lineage Tracking with the V3 Python SDK.
Amazon SageMaker Lineage enables events that happen within SageMaker to be traced via a graph structure. The data simplifies generating reports, making comparisons, or discovering relationships between events.
What you will learn#
Create and manage lineage Contexts, Actions, and Artifacts
Create Associations to link entities into a lineage graph
Traverse associations to discover relationships
Clean up lineage data
Setup#
Initialize a SageMaker session using the V3 Session class.
import boto3
from sagemaker.core.helper.session_helper import Session
region = boto3.Session().region_name
sagemaker_session = Session()
default_bucket = sagemaker_session.default_bucket()
print(f"Region: {region}")
print(f"Default bucket: {default_bucket}")
from datetime import datetime
from sagemaker.core.lineage.context import Context
from sagemaker.core.lineage.action import Action
from sagemaker.core.lineage.association import Association
from sagemaker.core.lineage.artifact import Artifact
unique_id = str(int(datetime.now().replace(microsecond=0).timestamp()))
print(f"Unique id is {unique_id}")
Use Case 1: Create a Lineage Context#
Contexts provide a method to logically group other lineage entities. The context name must be unique across all other contexts.
context_name = f"machine-learning-workflow-{unique_id}"
ml_workflow_context = Context.create(
context_name=context_name,
context_type="MLWorkflow",
source_uri=unique_id,
properties={"example": "true"},
)
print(f"Created context: {ml_workflow_context.context_name}")
print(f"Context ARN: {ml_workflow_context.context_arn}")
Use Case 2: List Contexts#
Enumerate existing contexts sorted by creation time.
contexts = Context.list(sort_by="CreationTime", sort_order="Descending")
for ctx in contexts:
print(ctx.context_name)
Use Case 3: Create an Action#
Actions represent computational steps such as model builds, transformations, or training jobs.
model_build_action = Action.create(
action_name=f"model-build-step-{unique_id}",
action_type="ModelBuild",
source_uri=unique_id,
properties={"Example": "Metadata"},
)
print(f"Created action: {model_build_action.action_name}")
Use Case 4: Create Associations#
Associations are directed edges in the lineage graph. The association_type can be Produced, DerivedFrom, AssociatedWith, or ContributedTo.
context_action_association = Association.create(
source_arn=ml_workflow_context.context_arn,
destination_arn=model_build_action.action_arn,
association_type="AssociatedWith",
)
print("Association created between context and action")
Use Case 5: Traverse Associations#
Query incoming and outgoing associations to understand entity relationships.
# List incoming associations to the action
incoming_associations = Association.list(destination_arn=model_build_action.action_arn)
for association in incoming_associations:
print(
f"{model_build_action.action_name} has an incoming association from {association.source_name}"
)
# List outgoing associations from the context
outgoing_associations = Association.list(source_arn=ml_workflow_context.context_arn)
for association in outgoing_associations:
print(
f"{ml_workflow_context.context_name} has an outgoing association to {association.destination_name}"
)
Use Case 6: Create Artifacts#
Artifacts represent URI-addressable objects or data such as datasets, labels, or trained models.
# Create input data artifacts
input_test_images = Artifact.create(
artifact_name="mnist-test-images",
artifact_type="TestData",
source_types=[{"SourceIdType": "Custom", "Value": unique_id}],
source_uri=f"https://sagemaker-example-files-prod-{region}.s3.amazonaws.com/datasets/image/MNIST/t10k-images-idx3-ubyte.gz",
)
input_test_labels = Artifact.create(
artifact_name="mnist-test-labels",
artifact_type="TestLabels",
source_types=[{"SourceIdType": "Custom", "Value": unique_id}],
source_uri=f"https://sagemaker-example-files-prod-{region}.s3.amazonaws.com/datasets/image/MNIST/t10k-labels-idx1-ubyte.gz",
)
print(f"Created artifact: {input_test_images.artifact_name}")
print(f"Created artifact: {input_test_labels.artifact_name}")
# Create output model artifact
output_model = Artifact.create(
artifact_name="mnist-model",
artifact_type="Model",
source_types=[{"SourceIdType": "Custom", "Value": unique_id}],
source_uri=f"https://sagemaker-example-files-prod-{region}.s3.amazonaws.com/datasets/image/MNIST/model/tensorflow-training-2020-11-20-23-57-13-077/model.tar.gz",
)
print(f"Created artifact: {output_model.artifact_name}")
Use Case 7: Link Artifacts to Actions#
Associate data artifacts as inputs to the action, and the action output to the model artifact.
# Associate input data with the model build action
Association.create(
source_arn=input_test_images.artifact_arn,
destination_arn=model_build_action.action_arn,
)
Association.create(
source_arn=input_test_labels.artifact_arn,
destination_arn=model_build_action.action_arn,
)
# Associate the action with the output model
Association.create(
source_arn=model_build_action.action_arn,
destination_arn=output_model.artifact_arn,
)
print("Lineage graph complete: inputs -> action -> output")
Cleanup#
Delete all lineage entities created in this notebook. Associations must be removed before their source or destination entities can be deleted.
def delete_associations(arn):
"""Delete all incoming and outgoing associations for an entity."""
for summary in Association.list(destination_arn=arn):
assct = Association(
source_arn=summary.source_arn,
destination_arn=summary.destination_arn,
sagemaker_session=sagemaker_session,
)
assct.delete()
for summary in Association.list(source_arn=arn):
assct = Association(
source_arn=summary.source_arn,
destination_arn=summary.destination_arn,
sagemaker_session=sagemaker_session,
)
assct.delete()
def delete_lineage_data():
"""Delete all lineage entities created in this notebook."""
print(f"Deleting context {ml_workflow_context.context_name}")
delete_associations(ml_workflow_context.context_arn)
ctx = Context(
context_name=ml_workflow_context.context_name,
sagemaker_session=sagemaker_session,
)
ctx.delete()
print(f"Deleting action {model_build_action.action_name}")
delete_associations(model_build_action.action_arn)
actn = Action(
action_name=model_build_action.action_name,
sagemaker_session=sagemaker_session,
)
actn.delete()
for artifact in [input_test_images, input_test_labels, output_model]:
print(f"Deleting artifact {artifact.artifact_arn} {artifact.artifact_name}")
delete_associations(artifact.artifact_arn)
artfct = Artifact(
artifact_arn=artifact.artifact_arn,
sagemaker_session=sagemaker_session,
)
artfct.delete()
delete_lineage_data()
print("Cleanup complete")