sagemaker.core.spark.processing#

This module is the entry to run spark processing script.

This module contains code related to Spark Processors, which are used for Processing jobs. These jobs let customers perform data pre-processing, post-processing, feature engineering, data validation, and model evaluation on SageMaker using Spark and PySpark.

Classes

FileType(value)

Enum of file type

PySparkProcessor([role, instance_type, ...])

Handles Amazon SageMaker processing tasks for jobs using PySpark.

SparkConfigUtils()

Util class for spark configurations

SparkJarProcessor([role, instance_type, ...])

Handles Amazon SageMaker processing tasks for jobs using Spark with Java or Scala Jars.

class sagemaker.core.spark.processing.FileType(value)[source]#

Bases: Enum

Enum of file type

FILE = 3#
JAR = 1#
PYTHON = 2#
class sagemaker.core.spark.processing.PySparkProcessor(role: str | None = None, instance_type: str | PipelineVariable | None = None, instance_count: int | PipelineVariable | None = None, framework_version: str | None = None, py_version: str | None = None, container_version: str | None = None, image_uri: str | PipelineVariable | None = None, volume_size_in_gb: int | PipelineVariable = 30, volume_kms_key: str | PipelineVariable | None = None, output_kms_key: str | PipelineVariable | None = None, configuration_location: str | None = None, dependency_location: str | None = None, max_runtime_in_seconds: int | PipelineVariable | None = None, base_job_name: str | None = None, sagemaker_session: Session | None = None, env: Dict[str, str | PipelineVariable] | None = None, tags: List[Dict[str, str | PipelineVariable]] | Dict[str, str | PipelineVariable] | None = None, network_config: NetworkConfig | None = None)[source]#

Bases: _SparkProcessorBase

Handles Amazon SageMaker processing tasks for jobs using PySpark.

get_run_args(submit_app, submit_py_files=None, submit_jars=None, submit_files=None, inputs=None, outputs=None, arguments=None, job_name=None, configuration=None, spark_event_logs_s3_uri=None)[source]#

Returns a RunArgs object.

This object contains the normalized inputs, outputs and arguments needed when using a PySparkProcessor in a ProcessingStep.

Parameters:
  • submit_app (str) – Path (local or S3) to Python file to submit to Spark as the primary application. This is translated to the code property on the returned RunArgs object.

  • submit_py_files (list[str]) – List of paths (local or S3) to provide for spark-submit –py-files option

  • submit_jars (list[str]) – List of paths (local or S3) to provide for spark-submit –jars option

  • submit_files (list[str]) – List of paths (local or S3) to provide for spark-submit –files option

  • inputs (list[ProcessingInput]) – Input files for the processing job. These must be provided as ProcessingInput objects (default: None).

  • outputs (list[ProcessingOutput]) – Outputs for the processing job. These can be specified as either path strings or ProcessingOutput objects (default: None).

  • arguments (list[str]) – A list of string arguments to be passed to a processing job (default: None).

  • job_name (str) – Processing job name. If not specified, the processor generates a default job name, based on the base job name and current timestamp.

  • configuration (list[dict] or dict) – Configuration for Hadoop, Spark, or Hive. List or dictionary of EMR-style classifications. https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps.html

  • spark_event_logs_s3_uri (str) – S3 path where spark application events will be published to.

run(submit_app: str, submit_py_files: List[str | PipelineVariable] | None = None, submit_jars: List[str | PipelineVariable] | None = None, submit_files: List[str | PipelineVariable] | None = None, inputs: List[ProcessingInput] | None = None, outputs: List[ProcessingOutput] | None = None, arguments: List[str | PipelineVariable] | None = None, wait: bool = True, logs: bool = True, job_name: str | None = None, experiment_config: Dict[str, str] | None = None, configuration: List[Dict] | Dict | None = None, spark_event_logs_s3_uri: str | PipelineVariable | None = None, kms_key: str | None = None)[source]#

Runs a processing job.

Parameters:
  • submit_app (str) – Path (local or S3) to Python file to submit to Spark as the primary application

  • submit_py_files (list[str] or list[PipelineVariable]) – List of paths (local or S3) to provide for spark-submit –py-files option

  • submit_jars (list[str] or list[PipelineVariable]) – List of paths (local or S3) to provide for spark-submit –jars option

  • submit_files (list[str] or list[PipelineVariable]) – List of paths (local or S3) to provide for spark-submit –files option

  • inputs (list[ProcessingInput]) – Input files for the processing job. These must be provided as ProcessingInput objects (default: None).

  • outputs (list[ProcessingOutput]) – Outputs for the processing job. These can be specified as either path strings or ProcessingOutput objects (default: None).

  • arguments (list[str] or list[PipelineVariable]) – A list of string arguments to be passed to a processing job (default: None).

  • wait (bool) – Whether the call should wait until the job completes (default: True).

  • logs (bool) – Whether to show the logs produced by the job. Only meaningful when wait is True (default: True).

  • job_name (str) – Processing job name. If not specified, the processor generates a default job name, based on the base job name and current timestamp.

  • experiment_config (dict[str, str]) – Experiment management configuration. Optionally, the dict can contain three keys: ‘ExperimentName’, ‘TrialName’, and ‘TrialComponentDisplayName’. The behavior of setting these keys is as follows: * If ExperimentName is supplied but TrialName is not a Trial will be automatically created and the job’s Trial Component associated with the Trial. * If TrialName is supplied and the Trial already exists the job’s Trial Component will be associated with the Trial. * If both ExperimentName and TrialName are not supplied the trial component will be unassociated. * TrialComponentDisplayName is used for display in Studio.

  • configuration (list[dict] or dict) – Configuration for Hadoop, Spark, or Hive. List or dictionary of EMR-style classifications. https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps.html

  • spark_event_logs_s3_uri (str or PipelineVariable) – S3 path where spark application events will be published to.

  • kms_key (str) – The ARN of the KMS key that is used to encrypt the user code file (default: None).

class sagemaker.core.spark.processing.SparkConfigUtils[source]#

Bases: object

Util class for spark configurations

static validate_configuration(configuration: Dict)[source]#

Validates the user-provided Hadoop/Spark/Hive configuration.

This ensures that the list or dictionary the user provides will serialize to JSON matching the schema of EMR’s application configuration

Parameters:

configuration (Dict) – A dict that contains the configuration overrides to the default values. For more information, please visit: https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps.html

static validate_s3_uri(spark_output_s3_path)[source]#

Validate whether the URI uses an S3 scheme.

In the future, this validation will perform deeper S3 validation.

Parameters:

spark_output_s3_path (str) – The URI of the Spark output S3 Path.

class sagemaker.core.spark.processing.SparkJarProcessor(role: str | None = None, instance_type: str | PipelineVariable | None = None, instance_count: int | PipelineVariable | None = None, framework_version: str | None = None, py_version: str | None = None, container_version: str | None = None, image_uri: str | PipelineVariable | None = None, volume_size_in_gb: int | PipelineVariable = 30, volume_kms_key: str | PipelineVariable | None = None, output_kms_key: str | PipelineVariable | None = None, configuration_location: str | None = None, dependency_location: str | None = None, max_runtime_in_seconds: int | PipelineVariable | None = None, base_job_name: str | None = None, sagemaker_session: Session | None = None, env: Dict[str, str | PipelineVariable] | None = None, tags: List[Dict[str, str | PipelineVariable]] | Dict[str, str | PipelineVariable] | None = None, network_config: NetworkConfig | None = None)[source]#

Bases: _SparkProcessorBase

Handles Amazon SageMaker processing tasks for jobs using Spark with Java or Scala Jars.

get_run_args(submit_app, submit_class=None, submit_jars=None, submit_files=None, inputs=None, outputs=None, arguments=None, job_name=None, configuration=None, spark_event_logs_s3_uri=None)[source]#

Returns a RunArgs object.

This object contains the normalized inputs, outputs and arguments needed when using a SparkJarProcessor in a ProcessingStep.

Parameters:
  • submit_app (str) – Path (local or S3) to Python file to submit to Spark as the primary application. This is translated to the code property on the returned RunArgs object

  • submit_class (str) – Java class reference to submit to Spark as the primary application

  • submit_jars (list[str]) – List of paths (local or S3) to provide for spark-submit –jars option

  • submit_files (list[str]) – List of paths (local or S3) to provide for spark-submit –files option

  • inputs (list[ProcessingInput]) – Input files for the processing job. These must be provided as ProcessingInput objects (default: None).

  • outputs (list[ProcessingOutput]) – Outputs for the processing job. These can be specified as either path strings or ProcessingOutput objects (default: None).

  • arguments (list[str]) – A list of string arguments to be passed to a processing job (default: None).

  • job_name (str) – Processing job name. If not specified, the processor generates a default job name, based on the base job name and current timestamp.

  • configuration (list[dict] or dict) – Configuration for Hadoop, Spark, or Hive. List or dictionary of EMR-style classifications. https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps.html

  • spark_event_logs_s3_uri (str) – S3 path where spark application events will be published to.

run(submit_app: str, submit_class: str | PipelineVariable, submit_jars: List[str | PipelineVariable] | None = None, submit_files: List[str | PipelineVariable] | None = None, inputs: List[ProcessingInput] | None = None, outputs: List[ProcessingOutput] | None = None, arguments: List[str | PipelineVariable] | None = None, wait: bool = True, logs: bool = True, job_name: str | None = None, experiment_config: Dict[str, str] | None = None, configuration: List[Dict] | Dict | None = None, spark_event_logs_s3_uri: str | PipelineVariable | None = None, kms_key: str | None = None)[source]#

Runs a processing job.

Parameters:
  • submit_app (str) – Path (local or S3) to Jar file to submit to Spark as the primary application

  • submit_class (str or PipelineVariable) – Java class reference to submit to Spark as the primary application

  • submit_jars (list[str] or list[PipelineVariable]) – List of paths (local or S3) to provide for spark-submit –jars option

  • submit_files (list[str] or list[PipelineVariable]) – List of paths (local or S3) to provide for spark-submit –files option

  • inputs (list[ProcessingInput]) – Input files for the processing job. These must be provided as ProcessingInput objects (default: None).

  • outputs (list[ProcessingOutput]) – Outputs for the processing job. These can be specified as either path strings or ProcessingOutput objects (default: None).

  • arguments (list[str] or list[PipelineVariable]) – A list of string arguments to be passed to a processing job (default: None).

  • wait (bool) – Whether the call should wait until the job completes (default: True).

  • logs (bool) – Whether to show the logs produced by the job. Only meaningful when wait is True (default: True).

  • job_name (str) – Processing job name. If not specified, the processor generates a default job name, based on the base job name and current timestamp.

  • experiment_config (dict[str, str]) – Experiment management configuration. Optionally, the dict can contain three keys: ‘ExperimentName’, ‘TrialName’, and ‘TrialComponentDisplayName’. The behavior of setting these keys is as follows: * If ExperimentName is supplied but TrialName is not a Trial will be automatically created and the job’s Trial Component associated with the Trial. * If TrialName is supplied and the Trial already exists the job’s Trial Component will be associated with the Trial. * If both ExperimentName and TrialName are not supplied the trial component will be unassociated. * TrialComponentDisplayName is used for display in Studio.

  • configuration (list[dict] or dict) – Configuration for Hadoop, Spark, or Hive. List or dictionary of EMR-style classifications. https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps.html

  • spark_event_logs_s3_uri (str or PipelineVariable) – S3 path where spark application events will be published to.

  • kms_key (str) – The ARN of the KMS key that is used to encrypt the user code file (default: None).