Tips Building Data Pipelines with Apache Airflow

transweet

New member
[TIẾNG VIỆT]:
Apache Airflow là một công cụ điều phối dòng công việc nguồn mở phổ biến có thể được sử dụng để xây dựng và quản lý các đường ống dữ liệu.Nó được thiết kế để có thể mở rộng và chịu lỗi, và nó có thể được sử dụng để phối hợp cả đường ống dữ liệu hàng loạt và phát trực tuyến.

Trong bài viết này, chúng tôi sẽ chỉ cho bạn cách xây dựng một đường ống dữ liệu với luồng khí Apache.Chúng tôi sẽ sử dụng một ví dụ đơn giản về đường ống dữ liệu trích xuất dữ liệu từ tệp CSV, chuyển đổi dữ liệu và tải nó vào cơ sở dữ liệu.

## Điều kiện tiên quyết

Để làm theo với hướng dẫn này, bạn sẽ cần những điều sau đây:

* Máy tính Linux hoặc MacOS
* Python 3.6 trở lên
* Apache Airflow CLI
* Các phụ thuộc luồng khí sau:
* `Apache-Airflow [Postgres]`
* `Apache-airflow [mysql]`
* `Apache-Airflow [Hive]`
* `Apache-Airflow [Presto]`

## Tạo đường ống dữ liệu

Để tạo đường ống dữ liệu với luồng khí Apache, chúng tôi sẽ cần tạo các mục sau:

* Một DAG (Đồ thị Acyclic được định hướng)
* Một bộ nhiệm vụ
* Một tập hợp các phụ thuộc giữa các nhiệm vụ

### Tạo DAG

DAG là một biểu đồ acyclic có hướng đại diện cho các phụ thuộc giữa các tác vụ trong một đường ống dữ liệu.Để tạo DAG, chúng ta có thể sử dụng lệnh `Airflow DAG tạo`.Ví dụ: lệnh sau tạo DAG có tên là `my_dag`:

`` `
luồng không khí tạo ra my_dag
`` `

### Tạo nhiệm vụ

Một nhiệm vụ là một đơn vị công việc được thực hiện trong một đường ống dữ liệu.Để tạo một tác vụ, chúng ta có thể sử dụng lệnh `Các tác vụ không khí tạo ra`.Ví dụ: lệnh sau tạo một tác vụ có tên là `my_task` trích xuất dữ liệu từ tệp CSV:

`` `
Nhiệm vụ của luồng không khí tạo ra my_task--python có thể gọi được = my_task.extract
`` `

Hàm `my_task.extract` là hàm Python trích xuất dữ liệu từ tệp CSV.Hàm có các đối số sau:

* `FilePath`: Đường dẫn đến tệp CSV.
* `Delimiter`: DELIMITER được sử dụng trong tệp CSV.
* `quotechar`: ký tự trích dẫn được sử dụng trong tệp CSV.

Hàm trả về một đối tượng DataFrame của gấu trúc chứa dữ liệu từ tệp CSV.

### Tạo sự phụ thuộc giữa các nhiệm vụ

Các nhiệm vụ trong DAG được kết nối bởi các phụ thuộc.Một phụ thuộc chỉ định rằng một nhiệm vụ phải được hoàn thành trước khi có thể bắt đầu nhiệm vụ khác.Để tạo một sự phụ thuộc, chúng ta có thể sử dụng lệnh `Các tác vụ Air-UpStream`.Ví dụ: lệnh sau tạo ra sự phụ thuộc giữa tác vụ `my_task` và tác vụ` my_other_task`:

`` `
Các tác vụ luồng khí đã thiết lập MY_TASK my_other_task
`` `

Điều này có nghĩa là tác vụ `my_other_task` không thể được bắt đầu cho đến khi tác vụ` my_task` đã được hoàn thành.

## Chạy đường ống dữ liệu

Khi bạn đã tạo một DAG và một bộ tác vụ, bạn có thể chạy đường ống dữ liệu bằng lệnh `Airflow DAGS Run`.Ví dụ: lệnh sau chạy `my_dag` DAG:

`` `
luồng không khí chạy my_dag
`` `

Lệnh `Airflow Dags Run` có các đối số sau:

* `DAG_ID`: ID của DAG để chạy.
* `start_date`: Ngày bắt đầu của DAG chạy.
* `end_date`: Ngày kết thúc của DAG chạy.

## Giám sát đường ống dữ liệu

Bạn có thể theo dõi tiến trình của một đường ống dữ liệu bằng giao diện người dùng Web Airflow.UI web có thể được truy cập tại URL sau:

`` `
http: // localhost: 8080/
`` `

UI Web cung cấp một biểu diễn đồ họa của DAG và trạng thái của từng tác vụ.Bạn cũng có thể sử dụng giao diện người dùng web để xem nhật ký của mỗi tác vụ.

## Phần kết luận

Trong bài viết này, chúng tôi đã chỉ cho bạn cách xây dựng một đường ống dữ liệu với luồng khí Apache.Chúng tôi đã sử dụng một ví dụ đơn giản về đường ống dữ liệu trích xuất dữ liệu từ tệp CSV, chuyển đổi dữ liệu và tải nó vào cơ sở dữ liệu.

Luồng không khí Apache là một

[ENGLISH]:
Apache Airflow is a popular open-source workflow orchestration tool that can be used to build and manage data pipelines. It is designed to be scalable and fault-tolerant, and it can be used to orchestrate both batch and streaming data pipelines.

In this article, we will show you how to build a data pipeline with Apache Airflow. We will use a simple example of a data pipeline that extracts data from a CSV file, transforms the data, and loads it into a database.

## Prerequisites

To follow along with this tutorial, you will need the following:

* A Linux or macOS computer
* Python 3.6 or later
* The Apache Airflow CLI
* The following Airflow dependencies:
* `apache-airflow[postgres]`
* `apache-airflow[mysql]`
* `apache-airflow[hive]`
* `apache-airflow[presto]`

## Creating a Data Pipeline

To create a data pipeline with Apache Airflow, we will need to create the following:

* A DAG (Directed Acyclic Graph)
* A set of tasks
* A set of dependencies between the tasks

### Creating a DAG

A DAG is a directed acyclic graph that represents the dependencies between tasks in a data pipeline. To create a DAG, we can use the `airflow dags create` command. For example, the following command creates a DAG named `my_dag`:

```
airflow dags create my_dag
```

### Creating Tasks

A task is a unit of work that is performed in a data pipeline. To create a task, we can use the `airflow tasks create` command. For example, the following command creates a task named `my_task` that extracts data from a CSV file:

```
airflow tasks create my_task --python-callable=my_task.extract
```

The `my_task.extract` function is a Python function that extracts data from a CSV file. The function takes the following arguments:

* `filepath`: The path to the CSV file.
* `delimiter`: The delimiter used in the CSV file.
* `quotechar`: The quote character used in the CSV file.

The function returns a Pandas DataFrame object that contains the data from the CSV file.

### Creating Dependencies Between Tasks

The tasks in a DAG are connected by dependencies. A dependency specifies that one task must be completed before another task can be started. To create a dependency, we can use the `airflow tasks set-upstream` command. For example, the following command creates a dependency between the `my_task` task and the `my_other_task` task:

```
airflow tasks set-upstream my_task my_other_task
```

This means that the `my_other_task` task cannot be started until the `my_task` task has been completed.

## Running a Data Pipeline

Once you have created a DAG and a set of tasks, you can run the data pipeline using the `airflow dags run` command. For example, the following command runs the `my_dag` DAG:

```
airflow dags run my_dag
```

The `airflow dags run` command takes the following arguments:

* `dag_id`: The ID of the DAG to run.
* `start_date`: The start date of the DAG run.
* `end_date`: The end date of the DAG run.

## Monitoring a Data Pipeline

You can monitor the progress of a data pipeline using the Airflow web UI. The web UI can be accessed at the following URL:

```
http://localhost:8080/```

The web UI provides a graphical representation of the DAG and the status of each task. You can also use the web UI to view the logs of each task.

## Conclusion

In this article, we showed you how to build a data pipeline with Apache Airflow. We used a simple example of a data pipeline that extracts data from a CSV file, transforms the data, and loads it into a database.

Apache Airflow is a
 
Join Telegram ToolsKiemTrieuDoGroup
Back
Top