Python으로 Kiwoom API를 활용한 RSI 전략 구현하기
이번 포스팅에서는 Kiwoom API를 활용하여 주식 RSI 전략을 자동화하는 코드에 대해 설명합니다. RSIStrategy 클래스는 주식 종목 데이터를 수집하고 가격을 모니터링하며, 실시간 데이터를 활용한 트레이딩 전략을 구동하는 핵심 로직을 담고 있습니다. 코드의 각 부분을 살펴보며 어떻게 동작하는지 자세히 설명하겠습니다.
1. 기본 설정 및 클래스 초기화
import time
import pandas as pd
from api.Kiwoom import *
from util.make_up_universe import *
from util.db_helper import *
from util.time_helper import *
import math
import traceback
우선, 필요한 모듈과 함수들을 가져옵니다. 이 코드에서는 time, pandas를 비롯한 여러 유틸리티 함수와 Kiwoom API를 사용하고 있습니다.
class RSIStrategy(QThread):
def __init__(self):
QThread.__init__(self)
self.strategy_name = "RSIStrategy"
self.kiwoom = Kiwoom()
self.universe = {}
self.deposit = 0
self.is_init_success = False
self.init_strategy()
RSIStrategy 클래스는 QThread를 상속하여 멀티스레딩으로 구현됩니다. 초기화 메서드에서 전략 이름을 설정하고, Kiwoom API 객체를 생성하며, 종목 리스트(universe), 예수금(deposit), 초기화 성공 여부(is_init_success)를 정의합니다. 마지막으로 전략 초기화 메서드 init_strategy()를 호출합니다.
2. 전략 초기화: init_strategy
def init_strategy(self):
try:
self.check_and_get_universe()
self.check_and_get_price_data()
self.kiwoom.get_order()
self.kiwoom.get_balance()
self.deposit = self.kiwoom.get_deposit()
self.set_universe_real_time()
self.is_init_success = True
except Exception as e:
print(traceback.format_exc())
init_strategy는 전략 실행에 필요한 데이터를 수집하고 초기 설정을 담당합니다. 각각의 주요 기능을 담은 함수들을 호출한 후, 초기화가 성공하면 is_init_success를 True로 설정합니다. 만약 에러가 발생하면 예외 처리를 통해 오류 메시지를 출력합니다.
3. 종목 리스트 확인 및 생성: check_and_get_universe
def check_and_get_universe(self):
if not check_table_exist(self.strategy_name, 'universe'):
universe_list = get_universe()
print(universe_list)
이 함수는 데이터베이스에 저장된 종목 리스트를 확인하거나, 없을 경우 새로 생성합니다. check_table_exist 함수는 SQLite 데이터베이스에서 'universe'라는 테이블이 존재하는지 확인합니다. 존재하지 않는다면 get_universe() 함수로 종목 리스트를 가져옵니다.
universe = {}
now = datetime.now().strftime("%Y%m%d")
kospi_code_list = self.kiwoom.get_code_list_by_market("0")
kosdaq_code_list = self.kiwoom.get_code_list_by_market("10")
for code in kospi_code_list + kosdaq_code_list:
code_name = self.kiwoom.get_master_code_name(code)
if code_name in universe_list:
universe[code] = code_name
Kiwoom API를 사용하여 KOSPI와 KOSDAQ 종목 코드를 가져옵니다. 각 종목의 이름을 확인하고, 지정된 universe_list에 포함된 종목만 universe 딕셔너리에 저장합니다.
universe_df = pd.DataFrame({
'code': universe.keys(),
'code_name': universe.values(),
'created_at': [now] * len(universe.keys())
})
insert_df_to_db(self.strategy_name, 'universe', universe_df)
종목 코드와 이름을 데이터프레임으로 변환한 후, SQLite 데이터베이스에 삽입합니다.
sql = "select * from universe"
cur = execute_sql(self.strategy_name, sql)
universe_list = cur.fetchall()
for item in universe_list:
idx, code, code_name, created_at = item
self.universe[code] = {
'code_name': code_name
}
print(self.universe)
마지막으로 데이터베이스에 저장된 종목 리스트를 다시 가져와 self.universe에 저장하고 출력합니다.
4. 가격 데이터 수집 및 확인: check_and_get_price_data
def check_and_get_price_data(self):
for idx, code in enumerate(self.universe.keys()):
print("({}/{}) {}".format(idx + 1, len(self.universe), code))
check_and_get_price_data 함수는 각 종목에 대한 가격 데이터를 수집하거나, 데이터베이스에 이미 존재하는 데이터를 확인합니다.
if check_transaction_closed() and not check_table_exist(self.strategy_name, code):
price_df = self.kiwoom.get_price_data(code)
insert_df_to_db(self.strategy_name, code, price_df)
장 마감 여부를 확인하고(check_transaction_closed), 테이블이 존재하지 않으면(check_table_exist), Kiwoom API를 통해 종목의 가격 데이터를 가져와 데이터베이스에 삽입합니다.
else:
if check_transaction_closed():
sql = "SELECT max([index]) FROM universe WHERE code = :code"
cur = execute_sql(self.strategy_name, sql, {"code": code})
last_date = cur.fetchone()
now = datetime.now().strftime("%Y%m%d")
if last_date[0] != now:
price_df = self.kiwoom.get_price_data(code)
insert_df_to_db(self.strategy_name, code, price_df)
가격 데이터가 이미 존재하는 경우, 마지막 데이터가 최신인지 확인하고, 최신 데이터가 없을 경우 추가 데이터를 수집하여 업데이트합니다.
else:
sql = "SELECT * FROM universe WHERE code = :code"
cur = execute_sql(self.strategy_name, sql, {"code": code})
cols = [column[0] for column in cur.description]
price_df = pd.DataFrame.from_records(data=cur.fetchall(), columns=cols)
price_df = price_df.set_index('index')
self.universe[code]['price_df'] = price_df
실시간 거래가 진행 중일 때는 데이터베이스에서 가격 데이터를 가져와 종목별 가격 데이터프레임을 생성하여 self.universe에 저장합니다.
5. 실시간 데이터 설정: set_universe_real_time
def set_universe_real_time(self):
fids = get_fid("체결시간")
codes = ";".join(map(str, self.universe.keys()))
self.kiwoom.set_real_reg("9999", codes, fids, "0")
set_universe_real_time 함수는 Kiwoom API를 통해 실시간 체결 정보를 받아오기 위한 설정을 담당합니다. 종목 리스트를 codes로 변환하고, 실시간 체결 정보 필드(fids)와 함께 Kiwoom API의 set_real_reg 함수를 호출하여 실시간 데이터 수신을 시작합니다.
6. 전략 실행: run
def run(self):
while True:
for idx, code in enumerate(self.universe.keys()):
print('[{}/{}_{}]'.format(idx+1, len(self.universe), self.universe[code]['code_name']))
time.sleep(0.5)
if code in self.kiwoom.universe_realtime_transaction_info.keys():
print(self.kiwoom.universe_realtime_transaction_info[code])
run 함수는 전략의 메인 루프를 실행합니다. 매 0.5초마다 각 종목의 실시간 정보를 출력하며, 실시간 체결 정보를 업데이트합니다. 이는 스레드로 실행되며, 실시간 데이터에 기반한 전략 실행의 핵심이 됩니다.
결론
이 코드는 Kiwoom API를 통해 주식 시장의 실시간 데이터를 모니터링하고 RSI 전략을 자동화하기 위한 구조입니다. 종목 리스트를 생성하고, 가격 데이터를 수집하며, 실시간 거래 정보를 활용하여 트레이딩 로직을 구동합니다.