Python CDK Speedrun: Creating a Source
CDK Speedrun (HTTP API Source Creation Any Route)
This is a blazing fast guide to building an HTTP source connector. Think of it as the TL;DR version of this tutorial.
If you are a visual learner and want to see a video version of this guide going over each part in detail, check it out below.
Dependencies
- Python >= 3.9
- Poetry
- Docker
- NodeJS
Generate the Template
# # clone the repo if you havent already
# git clone --depth 1 https://github.com/airbytehq/airbyte/
# cd airbyte # start from repo root
cd airbyte-integrations/connector-templates/generator
./generate.sh
Select the Python HTTP API Source
and name it python-http-example
.
Create Dev Environment
cd ../../connectors/source-python-http-example
poetry install
Define Connector Inputs
cd source_python_http_example
We're working with the PokeAPI, so we need to define our input schema to reflect that. Open the spec.yaml
file here and replace it with:
documentationUrl: https://docs.airbyte.com/integrations/sources/pokeapi
connectionSpecification:
$schema: http://json-schema.org/draft-07/schema#
title: Pokeapi Spec
type: object
required:
- pokemon_name
properties:
pokemon_name:
type: string
description: Pokemon requested from the API.
pattern: ^[a-z0-9_\-]+$
examples:
- ditto
- luxray
- snorlax
As you can see, we have one input to our input schema, which is pokemon_name
, which is required. Normally, input schemas will contain information such as API keys and client secrets that need to get passed down to all endpoints or streams.
Ok, let's write a function that checks the inputs we just defined. Nuke the source.py
file. Now add this code to it. For a crucial time skip, we're going to define all the imports we need in the future here. Also note that your AbstractSource
class name must be a camel-cased version of the name you gave in the generation phase. In our case, this is SourcePythonHttpExample
.
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
import requests
import logging
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from . import pokemon_list
logger = logging.getLogger("airbyte")
class SourcePythonHttpExample(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]:
logger.info("Checking Pokemon API connection...")
input_pokemon = config["pokemon_name"]
if input_pokemon not in pokemon_list.POKEMON_LIST:
result = f"Input Pokemon {input_pokemon} is invalid. Please check your spelling and input a valid Pokemon."
logger.info(f"PokeAPI connection failed: {result}")
return False, result
else:
logger.info(f"PokeAPI connection success: {input_pokemon} is a valid Pokemon")
return True, None
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
return [Pokemon(pokemon_name=config["pokemon_name"])]
Create a new file called pokemon_list.py
at the same level. This will handle input validation for us so that we don't input invalid Pokemon. Let's start with a very limited list - any Pokemon not included in this list will get rejected.
"""
pokemon_list.py includes a list of all known pokemon for config validation in source.py.
"""
POKEMON_LIST = [
"bulbasaur",
"charizard",
"wartortle",
"pikachu",
"crobat",
]
Test it.
cd ..
mkdir sample_files
echo '{"pokemon_name": "pikachu"}' > sample_files/config.json
echo '{"pokemon_name": "chikapu"}' > sample_files/invalid_config.json
python main.py check --config sample_files/config.json
python main.py check --config sample_files/invalid_config.json
Expected output:
> python main.py check --config sample_files/config.json
{"type": "CONNECTION_STATUS", "connectionStatus": {"status": "SUCCEEDED"}}
> python main.py check --config sample_files/invalid_config.json
{"type": "CONNECTION_STATUS", "connectionStatus": {"status": "FAILED", "message": "'Input Pokemon chikapu is invalid. Please check your spelling our input a valid Pokemon.'"}}