J'ai pour déclencher une pyspark module à partir de la circulation de l'air à l'aide d'un sparksubmit de l'opérateur. Mais, le pyspark module besoin de prendre l'étincelle variable de session en tant qu'argument. J'ai utilisé application_args pour passer le paramètre à la pyspark module. Mais, quand j'ai couru le groupe de l'étincelle soumettre opérateur est arriver a échoué et que le paramètre je suis passé dans considéré comme Aucun type de variable. Besoin de savoir comment passer d'un argument à un pyspark module déclenchée par spark_submit_operator.
Le DAG code est ci-dessous:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PRJT").enableHiveSupport().getOrCreate()
spark_config = {
'conn_id': 'spark_default',
'driver_memory': '1g',
'executor_cores': 1,
'num_executors': 1,
'executor_memory': '1g'
}
dag = DAG(
dag_id="spark_session_prgm",
default_args=default_args,
schedule_interval='@daily',
catchup=False)
spark_submit_task1 = SparkSubmitOperator(
task_id='spark_submit_task1',
application='/home/airflow_home/dags/tmp_spark_1.py',
application_args=['spark'],
**spark_config, dag=dag)
L'exemple de code dans tmp_spark_1.py programme: