Join our Community Kickstart Hackathon to win a MacBook and other great prizes

Sign up on Discord
hello
Strategies and best practices for data engineers to develop performant and reliable data extraction scripts with Python.

If you're working in data engineering, you probably know Python is pretty much the Swiss Army knife for pulling data from all sorts of places. A lot of folks go straight for the requests library and try to grab everything in one go with requests.get(url). But, that can quickly get dicey. Holding all that data in memory is a bit like trying to cram six people in a sedan—sooner or later, you're going to run into trouble.

In this article, we'll deep dive into:

Performance

  • How you can utilize pagination and query parameters for handling large data requests.
  • Streaming responses to process data incrementally, thus avoiding the need to load everything into memory at once.
  • Leveraging sessions to reuse connection objects.
  • Caching responses to minimize redundant calls and reduce load times.

Reliability

  • How we can gracefully handle exceptions and understand error status codes
  • Implementing Python requests timeout, retries, and backoff strategies to deal with network issues and API rate limits.

Performance

When we're setting up our data ingestion pipelines, we're always looking for ways to make things run smoother and faster. Here are a few strategies to improve performance:

Query parameters and pagination

You can reduce the server load and client memory by breaking the data retrieval process into smaller chunks.

One option is paginating results. Say, you are collecting data on books from a large online library’s API, which contains millions of records. Fetching all records in one go isn’t feasible due to memory constraints and API rate limits. Instead, you can paginate results and have Python fetch smaller batches of results at a time:

import requests

BASE_URL = 'https://api.library.com/books'
def fetch_books(page=1):
  response = requests.get(f'{BASE_URL}?page={page}')
  return response.json()

books = []
page = 1
while True:
  results = fetch_books(page)
  print(f'Received page {page} with {len(results)} records')
  if not results:
      break  # No more books to fetch
  books.extend(results)
  page += 1

The output will be:

Received page 1 with 10000 records
Received page 2 with 10000 records
..
Received page 11 with 0 records

A similar option is to pass query parameters to filter the request. Suppose you’re developing an application that aggregates news articles. You need to fetch articles from a news API based on certain criteria, such as keywords, date ranges, or specific categories like technology or health. By using query parameters, you can tailor the API request to return only the articles that match your application's current needs.

import requests

# Define your search criteria
keywords = "data vault"
from_date = "2024-01-01"
to_date = "2024-01-31"
category = "technology"

# Construct the request with query parameters
response = requests.get(
  'https://api.newsprovider.com/articles',
  params={'q': keywords, 'from': from_date, 'to': to_date, 'category': category},
)

# Process the filtered articles
articles = response.json()['articles']
for article in articles:
  print(article['title'], article['url'])

Streaming large responses

For large data sets, you can use streaming to process data incrementally as it arrives. This minimizes memory consumption.Let’s say you are downloading a large dataset of genomic sequences, which is too large to hold in memory. Using streaming, you process the data as it arrives:

response = requests.get('https://api.genomics.com/large_dataset', stream=True)

for chunk in response.iter_content(chunk_size=1024):
  process_chunk(chunk)  # Process each 1KB chunk as it arrives

You can think of this like sipping through a straw; instead of trying to drink the whole glass of water in one gulp, you're taking it in manageable sips, making it easier to handle without overwhelming yourself.

Sessions

Sessions are used to persist parameters across requests. For example, if you're developing a weather application that fetches the current weather, forecasts, and historical weather data for multiple locations from the same API, instead of establishing a new connection for each request, you can use a session:

import requests

# Create a session object
with requests.Session() as session:
  session.headers.update({'Accept': 'application/json'})  # Common header for all requests
  for city in ['New York', 'London', 'Tokyo']:
    response = session.get(f'https://api.weather.com/v1/{city}/current')
    print(f'Current weather in {city}: {response.json()["temperature"]}')

Caching responses

Caching is storing requests data temporarily so it can be instantly accessed in subsequent calls without making additional requests to the API. This helps reduce the number of requests, saves time, and improves performance. However, setting the expire_after parameter too high could lead to processing stale data.

from requests_cache import CachedSession

session = CachedSession('social_network_cache', backend='sqlite', expire_after=3600)

def get_user_profile(user_id):
    response = session.get(f'https://api.socialnetwork.com/users/{user_id}')
    return response.json()

# The first call fetches data from the API and caches it
profile = get_user_profile('user123') #2.6s

# Subsequent calls within the hour fetch data from the cache, and will be instant
profile = get_user_profile('user123') #0.0s

Reliability

Handle exceptions gracefully and status codes

When talking to web APIs, lots can go wrong—like losing your internet connection or the server going down. Python requests exceptions can tell you when things go south, through errors and status codes. Catching these errors lets your app understand what went wrong and decide what to do next. You can find an example of Python requests error handling below:

import requests

try:
  response = requests.get('https://api.example.com/data')
  # Raise an exception for HTTP error codes
  response.raise_for_status() # request raise for status
  print('Request succeeded:', response.json())
except requests.exceptions.HTTPError as e:
  print(f'HTTP error: {e}')
  if e.response.status_code == 401:
      print('Authentication required.')
  elif e.response.status_code == 403:
  print('Access denied.')
  elif e.response.status_code == 404:
      print('Resource not found.')
  elif e.response.status_code == 500:
      print('Server error.') 

except requests.exceptions.ConnectionError:
  print('Connection error.')
except requests.exceptions.Timeout:
  print('Request timed out.')
except requests.exceptions.RequestException as e:
  print(f'Request failed: {e}')

Rate limits, retries, and backoff strategies

Many APIs impose rate limits to prevent abuse and ensure fair resource distribution among users. When your application exceeds these limits, the API may respond with a 429 Too Many Requests status code. But Python's requests library won't do this for you automatically. You can set it up to retry failed requests by using a special setup called a custom transport adapter along with some rules about when to retry. This way, your app keeps trying without you having to tell it to do so every time.

One retry mechanism is to implement an exponential backoff strategy, where the wait time increases exponentially with each retry. The wait time between retries is calculated as backoff_factor * (2 ** retry_number), where retry_number is the current retry attempt number (starting at 0). So, if the backoff factor is set to 1 second, the wait time before the first retry is 1 second, before the second retry is 2 seconds, before the third is 4 seconds, and so on.

import requests
from requests.adapters import HTTPAdapter
from urllib3.util import Retry

# Setup retry strategy
retry_strategy = Retry(
  total=3,  # Total number of retries
  status_forcelist=[429, 500, 502, 503, 504],  # Status codes to retry for
  allowed_methods=["GET", "POST"],  # HTTP methods to retry
  backoff_factor=1  # Backoff time between retries
)

adapter = HTTPAdapter(max_retries=retry_strategy)
http = requests.Session()
# Mount the adapter to the session for all HTTP and HTTPS requests
http.mount("https://", adapter)
http.mount("http://", adapter)

# Make a request with retry logic
try:
  response = http.get('https://api.example.com/data')
  response.raise_for_status()  # Check for HTTP errors
  print('Request succeeded.')
except requests.exceptions.RequestException as e:
  print(f'Request failed: {e}')

Conclusion

So, you’ve armed yourself with some pretty good strategies to develop performant and reliable data extraction scripts with Python. While crafting Python scripts locally to extract data might seem like a breeze, deploying, managing secrets, dependencies and infrastructure are not. This is where Y42 comes into the picture.

YAML file is auto-generated with standardized metadata

YAML files are auto-generated with standardized metadata based on your actions in Y42.

Y42 is designed to simplify the transition of your local data extraction scripts to production. Discover how our solution can assist you by removing the boilerplate code needed to load the extracted data into your data warehouse, without having to worry about infrastructure, and providing standardized metadata, lineage, and documentation right out of the box. Curious about how we make Python data ingestion look easy? Dive in to learn more about our Python ingest assets.

Category

Data Insights

In this article

Share this article

More articles