Skip to main content

Build a geocoding applicaton with Python

Temporal Python SDK

Introduction

When it comes to building business process applications, coordinating all parts of the application from user interaction to API calls can be complex and unreliable. Temporal helps shield you from these issues by providing reliability and operability.

In this tutorial, you'll build a business process application that does standard tasks, such as getting input from a user and querying an API. Specifically, it asks the user for an API key and an address, then it geocodes the address using Geoapify.

Prerequisites

Before starting this tutorial:

pip install requests

As mentioned in the Hello World tutorial, please be sure you have run temporal server start-dev to start the Temporal Service. If you're having trouble later in the tutorial, sometimes closing that terminal and restarting it can help.

Now that you have your environment ready, it's time to build an invincible geocoder.

Develop a Workflow to orchestrate your interactions with the user and the API

The Workflow is the side-effect-free function that orchestrates the Activities (which are atomic, have potential for side effects, and are perhaps non-deterministic). In this application, the actions are (1) getting information from the user and (2) querying the API.

You'll begin by defining the Workflow outline. Since you don't have any activities defined yet, this will give warnings. That's okay. You'll define them in the next section.

Create a new file called workflow.py and add the following code:

workflow.py

from datetime import timedelta
from temporalio import workflow

# Import activity, passing it through the sandbox without reloading the module
with workflow.unsafe.imports_passed_through():
from activities import (
get_address_from_user,
get_api_key_from_user,
get_lat_long,
QueryParams,
)

_TIMEOUT_5_MINS = 5 * 60

# Decorator for the workflow class.
# This must be set on any registered workflow class.
@workflow.defn
class GeoCode:
"""The Workflow. Orchestrates the Activities."""

# Decorator for the workflow run method.
# This must be set on one and only one async method defined on the same class as @workflow.defn
@workflow.run
async def run(self) -> list:
"""The run method of the Workflow."""

api_key_from_user = await workflow.execute_activity(
get_api_key_from_user,
start_to_close_timeout=timedelta(seconds=_TIMEOUT_5_MINS),
)

address_from_user = await workflow.execute_activity(
get_address_from_user,
start_to_close_timeout=timedelta(seconds=_TIMEOUT_5_MINS),
)

query_params = QueryParams(api_key=api_key_from_user, address=address_from_user)

lat_long = await workflow.execute_activity(
get_lat_long,
query_params,
start_to_close_timeout=timedelta(seconds=_TIMEOUT_5_MINS),
)

return lat_long


The GeoCode class is decorated with @workflow.defn, which tells Temporal that the class is a Workflow.

The async def run() function is decorated with @workflow.run, which tells Temporal that this method is the Workflow's run method. As mentioned in the code comment, the Workflow needs exactly one method to be decorated with @workflow.run.

The Activities are passed into the calls to workflow.execute_activity(activity). If the Activities need arguments, pass the arguments in after the Activity: workflow.execute_activity(activity, args*, ...), as is done in the execution of get_lat_long. It's recommended to collapse the arguments into a single one using a dataclass, which is shown here as QueryParams (it will be defined in the next section).

With the skeleton in place, you can now develop the Activities.

Develop Activities to interact with the user and the API

In this section, you'll implement the Activities that interact with the outside world. The Workflow is doing the orchestration, and the Activities are doing the atomic actions.

Add the following to a new file called activities.py:

activities.py

from temporalio import activity


# Tells Temporal that this is an Activity
@activity.defn
async def get_api_key_from_user() -> str:
return input("Please give your API key: ")


# Tells Temporal that this is an Activity
@activity.defn
async def get_address_from_user() -> str:
return input("Please give an address: ")


The @activity.defn decorator is what tells Temporal that this function is an Activity.

These Activities are called in the beginning of the Workflow you made earlier. After the Workflow calls these two, it has the user's API key and address. Next, it calls an Activity called get_lat_long, with an argument of type QueryParams. You'll implement that next.

Add the following to the activities.py file that you just made:

activities.py

import requests
from dataclasses import dataclass

@dataclass
class QueryParams:
api_key: str
address: str

@activity.defn
async def get_lat_long(query_params: QueryParams) -> list:
base_url = "https://api.geoapify.com/v1/geocode/search"

params = {
"text": query_params.address,
"apiKey": query_params.api_key
}

response = requests.get(base_url, params=params, timeout=1000)

response_json = response.json()

lat_long = response_json['features'][0]['geometry']['coordinates']

return lat_long

As mentioned before, condense the arguments to Activities into a dataclass. In this case, the Activity is an API call that needs the user's location and API key, so you'll bundle those as a data class. That's QueryParams.

Now that you have the Workflow and Actions, you need to run them. In the next section, you'll begin that process by making and running a worker.

Create a Worker to host your Workflow and Activities

The last conceptual piece you need is a worker. The worker is the process that connects to the Temporal Service, and listens on a certain Task Queue for any work to do. Here is how you can make a worker factory.

Make a new file called make_worker.py and enter the following:

make_worker.py

from temporalio.client import Client
from temporalio.worker import Worker

from activities import get_address_from_user, get_api_key_from_user, get_lat_long
from workflow import GeoCode


def make_worker(client: Client):

worker = Worker(
client,
task_queue="geocode-task-queue",
workflows=[GeoCode],
activities=[get_address_from_user, get_api_key_from_user, get_lat_long]
)

return worker

The argument, client, is the connection to the Temporal Service. The task_queue is the Task Queue that the Worker listens on (later, when you run the Workflow, you'll put items on that Task Queue). It also accepts a workflows argument. This is the list of Workflows it can process. Lastly, it accepts a list of Activities that it can work on.

Now that you have a factory to make a Worker, it's time to connect to the Temporal Service, use that factory to make a Worker, and run the Worker.

Make a new file called run_worker.py and enter the following:

run_worker.py

import asyncio

from temporalio.client import Client

from make_worker import make_worker


async def main():

client = await Client.connect("localhost:7233", namespace="default")

worker = make_worker(client)

await worker.run()

if __name__ == "__main__":
asyncio.run(main())

This code connects to the Temporal Service using Client.connect. You'll run this code soon, and when you do, the Temporal Service will need to be running for this line to work (which is why it was mentioned in the prerequisites).

Next, the code passes that client into the preceeding make_worker function. This returns a Worker, which you use to call the .run() method. This is what makes the Worker run and start listening for work on the Task Queue. You will run it now.

  1. Open a new terminal (keep the service running in a different terminal).
  2. Navigate to the project directory, and run the following command (it won't output anything yet).
$ python run_worker.py

It will start listening, but it has nothing to do yet because there is nothing on the queue. That's what the next section is for. In it, you will run the Workflow, which will put tasks on the queue for the Worker you just made.

Run the Workflow to execute the business process

The last thing you need to do is execute the Workflow. To do this, you need to connect to the Temporal Service (again, with Client.connect), and then call the .execute_workflow() method.

Enter the following code in a new file called run_workflow.py.

run_workflow.py

import asyncio

from workflow import GeoCode
from temporalio.client import Client


async def main():
# Create a client connected to the server at the given address
client = await Client.connect("localhost:7233")

# Execute a workflow
lat_long = await client.execute_workflow(
GeoCode.run, id="geocode-workflow", task_queue="geocode-task-queue"
)

print(f"Lat long: {lat_long}")


if __name__ == "__main__":
asyncio.run(main())

In this piece, you connect to the service, then call execute_workflow(). The arguments are the following:

  • The Workflow method that was decorated with @workflow.defn.
  • The ID for the Workflow.
  • The Task Queue. This is the queue to which the Workflow (and its Activities) are added.

You're ready to run the code. Do the following 4 steps:

  1. Open a third terminal (the other two processes should still be running)
  2. Navigate to the project directory, and run the following command:
$ python run_workflow.py

At this point, the application is running. If you look at the terminal that's running the Worker (not the terminal that's running the Workflow), it should be asking you for your API key.

  1. Enter the Geoapify API key mentioned in the prerequisites.

Next, it will ask you for an address.

  1. Enter an address.

It will then query the Geoapify API to geocode the address, and it will return the latitude and longitude of the address to the terminal running the Workflow.

Conclusion

You have built a business process application that runs invincibly with Temporal.

For more detail on how Temporal can help business process applications, please see Temporal's Chief Product Officer's discussion in The Top 3 Use Cases for Temporal (ft. Temporal PMs).

Next Steps

Now on your own, try defining a Retry Policy and applying it to the Activities.

What are some examples of things that would be done in an Activity instead of in the body of the Workflow? Which ones did you do in this tutorial?
Activities perform anything that may be non-deterministic, may fail, or have side effects. This could be writing to disk, reading from or writing to a database, or getting information from a user. In this Activity, you got input from the user via the command line, and you queried a REST API.
What pieces of information does a Worker need when instantiated? This example had four.
  • A client/connection to the Temporal Service.
  • A Task Queue.
  • A list of Workflows to work on.
  • A list of Activities to work on.
How do you denote that a piece of Python code is a Workflow?
You make a class, and decorate it with the @workflow.defn decorator. You then decorate exactly one method with the decorator @workflow.run.