Complete Examples¶
Handling Keyboard Interupt¶
Graceful exit on KeyboardInterupts
from aio_trader.kite import KiteFeed
from aio_trader import utils
from typing import List
import asyncio, sys
# Define handlers
def on_tick(tick: List, binary=False):
print(tick)
def on_connect(kws: KiteFeed):
print("connected")
# Subscribe to Aarti Industries
asyncio.create_task(kws.subscribe_symbols([1793]))
def on_order_update(data: dict):
# Store order data in database or other operations
pass
async def cleanup(kws, kite):
# perform cleanup operations here
await kws.close()
await kite.close()
async def main():
await kite.authorize(
request_token=request_token,
api_key=api_key,
secret=secret,
)
# Client session and logger instance are shared
async with KiteFeed(
user_id=user_id,
enctoken=enctoken,
session=kite.session
logger=kite.log
) as kws:
kws.on_tick = on_tick
kws.on_connect = on_connect
kws.on_order_update = on_order_update
# Handle KeyboardInterupt
utils.add_signal_handlers(cleanup, kws, kite)
# No code executes after this line
await kws.connect()
# Credentials loaded from a database or JSON file
enctoken = "STORED_ENCTOKEN"
user_id = "KITE_WEB_USER_ID"
# Defined globally to allow access across the script
kite = Kite(access_token=access_token)
if "win" in sys.platform:
# Required for windows users
asyncio.set_event_loop_policy(
asyncio.WindowsSelectorEventLoopPolicy(),
)
asyncio.run(main())
Downloading historical data¶
When trying to download historical data synchronously, the typical approach would be:
Request historical data
Wait for a response from the server
Process the data
Repeat in a loop for the remaining symbols.
The process is blocked while waiting for the server’s response. No other task is executed, during this time.
With Async,
I am iterating over the symbols, creating a
Task
for each request.The
Task
does not execute the request immediately. Instead, it is scheduled for concurrent execution.Once the
Task
executes, it does not wait for the server response. It yields control back to the event loop to run other tasks.As the response returns, the
Task
is marked completed, and the result is available for processing. It eliminates a lot of time waiting on the network.
Long CPU-intensive or IO-bound tasks can block the event loop. Use loop.run_in_executor
to execute functions in separate threads or processes, without blocking the event loop.
from aio_trader.kite import Kite
from aio_trader import utils
from pathlib import Path
from typing import List
import pandas as pd, asyncio
async def main():
DIR = Path(__file__).parent
instruments_file = DIR / "nse_instruments.csv"
sym_list = (
"AARTIIND",
"ASIANPAINT",
"BATAINDIA",
"HDFCLIFE",
"BLUEDART",
"BRITANNIA",
"DEEPAKFERT",
"DRREDDY",
"EICHERMOT",
"EIDPARRY",
)
columns = ("Date", "Open", "High", "Low", "Close", "Volume")
async with Kite() as kite:
await kite.authorize()
if not instruments_file.exists():
instruments_file.write_bytes(
await kite.instruments(exchange=kite.EXCHANGE_NSE)
)
df = pd.read_csv(instruments_file, index_col="tradingsymbol")
tasks: List[asyncio.Task] = []
for sym in sym_list:
# An async function returns a coroutine unless awaited
coroutine = kite.historical_data(
df.loc[sym, "instrument_token"],
from_dt="2024-04-22",
to_dt="2024-04-26",
interval="day",
)
# Assign the coroutine to a task, to schedule it for execution
# in the event loop.
# Here the task name is assigned the symbol name.
# We can use task.get_name to return this value
task = asyncio.create_task(coroutine, name=sym)
tasks.append(task)
# asyncio.as_completed returns a coroutine with no reference to the task.
# utils.as_completed returns the original task object from the previous
# step.
async for task in utils.as_completed(tasks):
sym_name = task.get_name()
result = task.result()
candles = result["data"]["candles"]
df = pd.DataFrame(candles, columns=columns, copy=False)
df["Date"] = pd.to_datetime(df["Date"])
df.set_index("Date", drop=True, inplace=True)
# save the result or perform further processing
asyncio.run(main())