Python源码示例:airflow.plugins.AirflowPlugin()

示例1
def test_should_warning_about_incompatible_plugins(self):
        class AirflowDeprecatedAdminViewsPlugin(AirflowPlugin):
            name = "test_admin_views_plugin"

            admin_views = [mock.MagicMock()]

        class AirflowDeprecatedAdminMenuLinksPlugin(AirflowPlugin):
            name = "test_menu_links_plugin"

            menu_links = [mock.MagicMock()]

        from airflow import plugins_manager
        plugins_manager.plugins = [
            AirflowDeprecatedAdminViewsPlugin(),
            AirflowDeprecatedAdminMenuLinksPlugin()
        ]
        with self.assertLogs(plugins_manager.log) as cm:
            plugins_manager.initialize_web_ui_plugins()

        self.assertEqual(cm.output, [
            'WARNING:airflow.plugins_manager:Plugin \'test_admin_views_plugin\' may not be '
            'compatible with the current Airflow version. Please contact the author of '
            'the plugin.',
            'WARNING:airflow.plugins_manager:Plugin \'test_menu_links_plugin\' may not be '
            'compatible with the current Airflow version. Please contact the author of '
            'the plugin.'
        ]) 
示例2
def test_should_response_200_support_plugins(self):
        class GoogleLink(BaseOperatorLink):
            name = "Google"

            def get_link(self, operator, dttm):
                return "https://www.google.com"

        class S3LogLink(BaseOperatorLink):
            name = "S3"
            operators = [BigQueryExecuteQueryOperator]

            def get_link(self, operator, dttm):
                return "https://s3.amazonaws.com/airflow-logs/{dag_id}/{task_id}/{execution_date}".format(
                    dag_id=operator.dag_id,
                    task_id=operator.task_id,
                    execution_date=quote_plus(dttm.isoformat()),
                )

        class AirflowTestPlugin(AirflowPlugin):
            name = "test_plugin"
            global_operator_extra_links = [
                GoogleLink(),
            ]
            operator_extra_links = [
                S3LogLink(),
            ]

        with mock_plugin_manager(plugins=[AirflowTestPlugin]):
            response = self.client.get(
                "/api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_SINGLE_QUERY/links"
            )

            self.assertEqual(200, response.status_code, response.data)
            self.assertEqual(
                {
                    "BigQuery Console": None,
                    "Google": "https://www.google.com",
                    "S3": (
                        "https://s3.amazonaws.com/airflow-logs/"
                        "TEST_DAG_ID/TEST_SINGLE_QUERY/2020-01-01T00%3A00%3A00%2B00%3A00"
                    ),
                },
                response.json,
            )