SageMaker Lineage Tracking - V3 SDK Example#

This notebook demonstrates how to use SageMaker Lineage Tracking with the V3 Python SDK.

Amazon SageMaker Lineage enables events that happen within SageMaker to be traced via a graph structure. The data simplifies generating reports, making comparisons, or discovering relationships between events.

What you will learn#

  • Create and manage lineage Contexts, Actions, and Artifacts

  • Create Associations to link entities into a lineage graph

  • Traverse associations to discover relationships

  • Clean up lineage data

Setup#

Initialize a SageMaker session using the V3 Session class.

import boto3
from sagemaker.core.helper.session_helper import Session

region = boto3.Session().region_name
sagemaker_session = Session()
default_bucket = sagemaker_session.default_bucket()

print(f"Region: {region}")
print(f"Default bucket: {default_bucket}")
from datetime import datetime
from sagemaker.core.lineage.context import Context
from sagemaker.core.lineage.action import Action
from sagemaker.core.lineage.association import Association
from sagemaker.core.lineage.artifact import Artifact

unique_id = str(int(datetime.now().replace(microsecond=0).timestamp()))
print(f"Unique id is {unique_id}")

Use Case 1: Create a Lineage Context#

Contexts provide a method to logically group other lineage entities. The context name must be unique across all other contexts.

context_name = f"machine-learning-workflow-{unique_id}"

ml_workflow_context = Context.create(
    context_name=context_name,
    context_type="MLWorkflow",
    source_uri=unique_id,
    properties={"example": "true"},
)

print(f"Created context: {ml_workflow_context.context_name}")
print(f"Context ARN: {ml_workflow_context.context_arn}")

Use Case 2: List Contexts#

Enumerate existing contexts sorted by creation time.

contexts = Context.list(sort_by="CreationTime", sort_order="Descending")

for ctx in contexts:
    print(ctx.context_name)

Use Case 3: Create an Action#

Actions represent computational steps such as model builds, transformations, or training jobs.

model_build_action = Action.create(
    action_name=f"model-build-step-{unique_id}",
    action_type="ModelBuild",
    source_uri=unique_id,
    properties={"Example": "Metadata"},
)

print(f"Created action: {model_build_action.action_name}")

Use Case 4: Create Associations#

Associations are directed edges in the lineage graph. The association_type can be Produced, DerivedFrom, AssociatedWith, or ContributedTo.

context_action_association = Association.create(
    source_arn=ml_workflow_context.context_arn,
    destination_arn=model_build_action.action_arn,
    association_type="AssociatedWith",
)

print("Association created between context and action")

Use Case 5: Traverse Associations#

Query incoming and outgoing associations to understand entity relationships.

# List incoming associations to the action
incoming_associations = Association.list(destination_arn=model_build_action.action_arn)
for association in incoming_associations:
    print(
        f"{model_build_action.action_name} has an incoming association from {association.source_name}"
    )

# List outgoing associations from the context
outgoing_associations = Association.list(source_arn=ml_workflow_context.context_arn)
for association in outgoing_associations:
    print(
        f"{ml_workflow_context.context_name} has an outgoing association to {association.destination_name}"
    )

Use Case 6: Create Artifacts#

Artifacts represent URI-addressable objects or data such as datasets, labels, or trained models.

# Create input data artifacts
input_test_images = Artifact.create(
    artifact_name="mnist-test-images",
    artifact_type="TestData",
    source_types=[{"SourceIdType": "Custom", "Value": unique_id}],
    source_uri=f"https://sagemaker-example-files-prod-{region}.s3.amazonaws.com/datasets/image/MNIST/t10k-images-idx3-ubyte.gz",
)

input_test_labels = Artifact.create(
    artifact_name="mnist-test-labels",
    artifact_type="TestLabels",
    source_types=[{"SourceIdType": "Custom", "Value": unique_id}],
    source_uri=f"https://sagemaker-example-files-prod-{region}.s3.amazonaws.com/datasets/image/MNIST/t10k-labels-idx1-ubyte.gz",
)

print(f"Created artifact: {input_test_images.artifact_name}")
print(f"Created artifact: {input_test_labels.artifact_name}")
# Create output model artifact
output_model = Artifact.create(
    artifact_name="mnist-model",
    artifact_type="Model",
    source_types=[{"SourceIdType": "Custom", "Value": unique_id}],
    source_uri=f"https://sagemaker-example-files-prod-{region}.s3.amazonaws.com/datasets/image/MNIST/model/tensorflow-training-2020-11-20-23-57-13-077/model.tar.gz",
)

print(f"Created artifact: {output_model.artifact_name}")

Cleanup#

Delete all lineage entities created in this notebook. Associations must be removed before their source or destination entities can be deleted.

def delete_associations(arn):
    """Delete all incoming and outgoing associations for an entity."""
    for summary in Association.list(destination_arn=arn):
        assct = Association(
            source_arn=summary.source_arn,
            destination_arn=summary.destination_arn,
            sagemaker_session=sagemaker_session,
        )
        assct.delete()

    for summary in Association.list(source_arn=arn):
        assct = Association(
            source_arn=summary.source_arn,
            destination_arn=summary.destination_arn,
            sagemaker_session=sagemaker_session,
        )
        assct.delete()


def delete_lineage_data():
    """Delete all lineage entities created in this notebook."""
    print(f"Deleting context {ml_workflow_context.context_name}")
    delete_associations(ml_workflow_context.context_arn)
    ctx = Context(
        context_name=ml_workflow_context.context_name,
        sagemaker_session=sagemaker_session,
    )
    ctx.delete()

    print(f"Deleting action {model_build_action.action_name}")
    delete_associations(model_build_action.action_arn)
    actn = Action(
        action_name=model_build_action.action_name,
        sagemaker_session=sagemaker_session,
    )
    actn.delete()

    for artifact in [input_test_images, input_test_labels, output_model]:
        print(f"Deleting artifact {artifact.artifact_arn} {artifact.artifact_name}")
        delete_associations(artifact.artifact_arn)
        artfct = Artifact(
            artifact_arn=artifact.artifact_arn,
            sagemaker_session=sagemaker_session,
        )
        artfct.delete()


delete_lineage_data()
print("Cleanup complete")