Part 2: Apache Aiflow setup with Postgresql, Celery and Rabbitmq (MacOs, Apple)

Published on
7 mins read
––– views

screenshot


If you have completed the previous Airflow Setup, now lets do some database and executors change
PART 01 - https://www.vijayanandrp.com/blog/tech/apache/airflow/1-airflow-setup-on-macos-apple

Setting up a PostgreSQL Database

You need to create a database and a database user that Airflow will use to access this database. In the example below, a database airflow_db and user with username airflow_user with password airflow_pass will be created

CREATE DATABASE airflow_db;
CREATE USER airflow_user WITH PASSWORD 'airflow_pass';
-- PostgreSQL 15 requires additional privileges:
ALTER DATABASE airflow_db OWNER TO airflow_user;
GRANT ALL PRIVILEGES ON DATABASE airflow_db TO airflow_user;
GRANT ALL ON SCHEMA public TO airflow_user;

We recommend using the psycopg2 driver and specifying it in your SqlAlchemy connection string.

# postgresql+psycopg2://<user>:<password>@<host>/<db>
postgresql+psycopg2://airflow_user:airflow_pass@localhost/airflow_db

Also note that since SqlAlchemy does not expose a way to target a specific schema in the database URI, you need to ensure schema public is in your Postgres user’s search_path.

If you created a new Postgres account for Airflow:

The default search_path for new Postgres user is: "$user", public, no change is needed.

If you use a current Postgres user with custom search_path, search_path can be changed by the command:

ALTER USER airflow_user SET search_path = public;

Update these settings in the Airflow configuration file airflow.cfg:

sql_alchemy_conn = postgresql+psycopg2://airflow_user:airflow_pass@localhost/airflow_db

Stop the Airflow Web Server and Scheduler

ps -ef | egrep 'airflow scheduler' | grep -v grep| awk '{print $2}' | xargs kill -9;
ps -ef | egrep -i 'airflow webserver' | grep -v grep | awk '{print $2}' | xargs kill -9;

Now Airflow uses postgresql database and following command initializes the necessary tables.

airflow db migrate

Output:

DB: postgresql+psycopg2://airflow_user:***@localhost/airflow_db
Performing upgrade to the metadata database postgresql+psycopg2://airflow_user:***@localhost/airflow_db
[2024-03-21T13:58:45.611+0000] {migration.py:216} INFO - Context impl PostgresqlImpl.
[2024-03-21T13:58:45.612+0000] {migration.py:219} INFO - Will assume transactional DDL.
[2024-03-21T13:58:45.614+0000] {migration.py:216} INFO - Context impl PostgresqlImpl.
[2024-03-21T13:58:45.614+0000] {migration.py:219} INFO - Will assume transactional DDL.
INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO [alembic.runtime.migration] Will assume transactional DDL.
INFO [alembic.runtime.migration] Running stamp_revision -> 88344c1d9134
Database migrating done!

Check connection variable is updated in the terminal,

airflow config get-value database sql_alchemy_conn

Previously it was sqlite db.

Output:

postgresql+psycopg2://airflow_user:airflow_pass@localhost/airflow_db

Create an Admin User: Create a user to access the Airflow web interface(Since we migrated to new one).

airflow users create \
--username admin \
--password admin \
--firstname Vijay \
--lastname Anand \
--role Admin \
--email admin@example.com

Setting up RabbitMQ

brew install rabbitmq
brew info rabbitmq
# starts a local RabbitMQ node
brew services start rabbitmq
# highly recommended: enable all feature flags on the running node
/opt/homebrew/sbin/rabbitmqctl enable_feature_flag all
# stops the locally running RabbitMQ node
brew services stop rabbitmq

Output:

==> rabbitmq: stable 3.13.0 (bottled)
Messaging and streaming broker
https://www.rabbitmq.com
/opt/homebrew/Cellar/rabbitmq/3.13.0 (1,521 files, 35.9MB) *
Poured from bottle using the formulae.brew.sh API on 2024-03-20 at 15:58:15
From: https://github.com/Homebrew/homebrew-core/blob/HEAD/Formula/r/rabbitmq.rb
License: MPL-2.0
==> Dependencies
Required: erlang ✔
==> Caveats
Management UI: http://localhost:15672
Homebrew-specific docs: https://rabbitmq.com/install-homebrew.html
To start rabbitmq now and restart at login:
brew services start rabbitmq
Or, if you don't want/need a background service you can just run:
CONF_ENV_FILE="/opt/homebrew/etc/rabbitmq/rabbitmq-env.conf" /opt/homebrew/opt/rabbitmq/sbin/rabbitmq-server
==> Analytics
install: 6,817 (30 days), 19,528 (90 days), 83,643 (365 days)
install-on-request: 6,817 (30 days), 19,527 (90 days), 83,631 (365 days)
build-error: 0 (30 days)
❯ brew services start rabbitmq
==> Successfully started `rabbitmq` (label: homebrew.mxcl.rabbitmq)
❯ brew services stop rabbitmq
Stopping `rabbitmq`... (might take a while)
==> Successfully stopped `rabbitmq` (label: homebrew.mxcl.rabbitmq)
❯ brew services start rabbitmq
==> Successfully started `rabbitmq` (label: homebrew.mxcl.rabbitmq)
❯ /opt/homebrew/sbin/rabbitmqctl enable_feature_flag all
Enabling all feature flags ...

Increase the timeout:

rabbitmqctl eval 'application:set_env(rabbit, consumer_timeout, 36000000).'

Setting Up Celery Executor

What is Celery Executor?

As per the definition, Celery is a powerful, production-ready asynchronous job queue, which allows you to run time-consuming Python functions in the background. A Celery powered application can respond to user requests quickly, while long-running tasks are passed onto the queue.

The Celery Executor is a powerful option for Airflow because it allows tasks to be executed in parallel on multiple worker machines, making it an excellent choice for heavy workloads. This executor uses the Celery framework, an asynchronous distributed task queue that utilizes distributed message passing.

In the Celery Executor model, tasks are pushed into a queue that is available to all the worker nodes, which then execute the tasks independently.

Celery workers architecture

Before diving into the different approaches, let's say a few words about how Celery works. A Celery setup typically involves the following components:

  • One or more celery workers. Each worker can run multiple Celery tasks simultaneously (controlled by a concurrency parameter).
  • A broker which contains one or more queues that workers listen on (Celery supports several broker platforms — see below).
  • An optional backend component to store task results (Celery supports several backend platforms — see below).
  • A task definition file with functions that are exposed as tasks. These tasks can be sent to and consumed from queues.

celery_architecture

Setting Up Celery Executor

Before we can use the Celery Executor, we need to make sure that Celery is installed and configured properly. This requires setting up a Celery backend (like RabbitMQ or Redis) and a result backend (like a relational database or caching system).

pip install 'apache-airflow[celery]'

For our example, let’s assume we’re using RabbitMQ as a Celery backend and PostgreSQL as a result backend. You can specify these settings in the Airflow configuration file airflow.cfg:

[core]
executor = CeleryExecutor
[celery]
result_backend = db+postgresql://airflow_user:airflow_pass@localhost/airflow_db
broker_url = amqp://guest:guest@localhost:5672//

Start the Airflow Web Server and Scheduler

airflow scheduler -D;
airflow webserver --port 8080 -D;

Kickstart Celery

In the context of the Celery Executor, an Airflow worker is simply a machine that fetches tasks from the Celery queue and executes them. Workers can be scaled up or down depending on the workload.

To start a worker, you can use the command:

Open the below command new terminal

airflow celery worker

Monitoring Tasks

One of the benefits of using a Celery Executor is the ability to monitor the status of tasks. You can use the Flower monitoring tool for this purpose:

Open the below command new terminal

airflow celery flower

screenshot

Reference:

https://airflow.apache.org/docs/apache-airflow/2.8.3/howto/set-up-database.html

https://medium.com/@remisharoon/mastering-airflow-deep-dive-into-celery-executors-281fd075d081

https://medium.com/scalereal/understanding-celery-part-1-why-use-celery-and-what-is-celery-b96bf958cd80

https://testdriven.io/blog/concurrency-parallelism-asyncio/

Error

kombu.exceptions.OperationalError: timed out
[2024-03-21T14:31:00.748+0000] {base_events.py:1744} ERROR - Future exception was never retrieved
future: <Future finished exception=OperationalError('timed out')>
Traceback (most recent call last):
File "/Users/vapr/.pyenv/versions/3.10.4/lib/python3.10/site-packages/kombu/connection.py", line 472, in _reraise_as_library_errors
yield
File "/Users/vapr/.pyenv/versions/3.10.4/lib/python3.10/site-packages/kombu/connection.py", line 459, in _ensure_connection
return retry_over_time(
File "/Users/vapr/.pyenv/versions/3.10.4/lib/python3.10/site-packages/kombu/utils/functional.py", line 318, in retry_over_time
return fun(*args, **kwargs)
File "/Users/vapr/.pyenv/versions/3.10.4/lib/python3.10/site-packages/kombu/connection.py", line 934, in _connection_factory
self._connection = self._establish_connection()
File "/Users/vapr/.pyenv/versions/3.10.4/lib/python3.10/site-packages/kombu/connection.py", line 860, in _establish_connection
conn = self.transport.establish_connection()
File "/Users/vapr/.pyenv/versions/3.10.4/lib/python3.10/site-packages/kombu/transport/pyamqp.py", line 203, in establish_connection
conn.connect()
File "/Users/vapr/.pyenv/versions/3.10.4/lib/python3.10/site-packages/amqp/connection.py", line 330, in connect
self.drain_events(timeout=self.connect_timeout)
File "/Users/vapr/.pyenv/versions/3.10.4/lib/python3.10/site-packages/amqp/connection.py", line 526, in drain_events
while not self.blocking_read(timeout):
File "/Users/vapr/.pyenv/versions/3.10.4/lib/python3.10/site-packages/amqp/connection.py", line 531, in blocking_read
frame = self.transport.read_frame()
File "/Users/vapr/.pyenv/versions/3.10.4/lib/python3.10/site-packages/amqp/transport.py", line 294, in read_frame
frame_header = read(7, True)
File "/Users/vapr/.pyenv/versions/3.10.4/lib/python3.10/site-packages/amqp/transport.py", line 629, in _read
s = recv(n - len(rbuf))
TimeoutError: timed out
[2024-03-21, 14:29:25 UTC] {scheduler_job_runner.py:781} ERROR - Executor reports task instance <TaskInstance: my_dag.hello_task scheduled__2023-01-15T00:00:00+00:00 [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task killed externally?
[2024-03-21 15:23:00 +0000] [38311] [ERROR] Connection in use: ('::', 8793)
[2024-03-21 15:23:00 +0000] [38311] [ERROR] Retrying in 1 second.
[2024-03-21 15:23:01 +0000] [38311] [ERROR] Connection in use: ('::', 8793)
[2024-03-21 15:23:01 +0000] [38311] [ERROR] Retrying in 1 second.
[2024-03-21 15:23:02 +0000] [38311] [ERROR] Connection in use: ('::', 8793)
[2024-03-21 15:23:02 +0000] [38311] [ERROR] Retrying in 1 second.
[2024-03-21 15:23:03 +0000] [38311] [ERROR] Connection in use: ('::', 8793)
[2024-03-21 15:23:03 +0000] [38311] [ERROR] Retrying in 1 second.
[2024-03-21 15:23:03,825: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:15672//: timed out.
Trying again in 2.00 seconds... (1/100)
[2024-03-21 15:23:04 +0000] [38311] [ERROR] Can't connect to ('::', 8793)
[2024-03-21 15:23:09,837: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:15672//: timed out.
Trying again in 4.00 seconds... (2/100)
[2024-03-21 15:23:17,855: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:15672//: timed out.
Trying again in 6.00 seconds... (3/100)
[2024-03-21 15:23:27,886: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:15672//: timed out.
Trying again in 8.00 seconds... (4/100)

Solution:

From within your airflow containers, you should be able to connect to the service rabbit1. So all you need to do is to change `amqp://user:**@localhost:5672//`: to `amqp://user:**@rabbit1:5672//`: and it should work.