160 lines
5.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import telebot
import requests
import json
import copy
import time
import threading
from datetime import datetime
saved_points = []
new_points = []
history = []
count = 0
last_check = datetime.now()
start_time = datetime.now()
TOKEN = ''
bot = telebot.TeleBot(TOKEN)
HELP = '''
* start - приветствие.
* help - напечатать справку по программе.
* history - напечатать накопленную историю по банкоматам в которых были $.
* show - напечатать банкоматы в которых на текущий момент есть $.
* clear_history - очистить накопленную историю по банкоматам в которых были $.
* statistics - метрика по программе.
'''
id_chats = []
@bot.message_handler(commands=['start'])
def echo(message):
bot.send_message(message.chat.id, 'Данная программа позволяет в реальном времени (частота обновления данных 10 мин) получать информацию о наличии $ в банкоматах Тинькофф в г. Самара. Для получения справки введите команду: help')
@bot.message_handler(commands=['help'])
def echo(message):
bot.send_message(message.chat.id, HELP)
@bot.message_handler(commands=['clear_history'])
def echo(message):
global history
history = []
history.append(f'История была очищена: {datetime.now()}')
bot.send_message(message.chat.id, 'История очищена')
@bot.message_handler(commands=['history'])
def echo(message):
global history
if len(history) != 0:
for item in history:
bot.send_message(message.chat.id, item)
else:
bot.send_message(message.chat.id, 'История отсутствует')
@bot.message_handler(commands=['show'])
def echo(message):
global new_points
if len(new_points) != 0:
for point in new_points:
decoded = json.loads(point)
atmInfo = decoded['atmInfo']
limits = atmInfo['limits']
for limit in limits:
if limit['currency'] == 'USD':
bot.send_message(message.chat.id, f'{decoded["address"]} - Сумма: {limit["amount"]}$')
else:
bot.send_message(message.chat.id, 'На текущий момент в банкоматах отсутствуют $')
@bot.message_handler(commands=['statistics'])
def echo(message):
global count
global start_time
global last_check
bot.send_message(message.chat.id, f'Дата запуска программы: {start_time}')
bot.send_message(message.chat.id, f'Кол-во проверок банкоматов: {count}')
bot.send_message(message.chat.id, f'Дата и время последней проверки банкоматов: {last_check}')
bot.send_message(message.chat.id, f'Текущее время работы программы: {datetime.now() - start_time}')
def check():
global saved_points
global history
global new_points
global count
global last_check
saved_points = []
request_body = {
"bounds": {
"bottomLeft": {
"lat": 53.38178296125362,
"lng": 49.99419620132086
},
"topRight": {
"lat": 52.89227311754313,
"lng": 50.84048678969975
}
},
"filters": {
"banks": [
"tcs"
],
"showUnavailable": True,
"currencies": [
"USD"
]
},
"zoom": 11
}
url = 'https://api.tinkoff.ru/geo/withdraw/clusters'
headers = {
'content-type': 'application/json'
}
while True:
try:
response = requests.post(
url,
headers=headers,
data=json.dumps(request_body)
)
except requests.ConnectionError:
for id in id_chats:
bot.send_message(id, 'ConnectionError. Waiting and resuming')
time.sleep(10)
continue
if (response.status_code == 200):
response_content = response.content
json_response = json.loads(response_content)
payload = json_response['payload']
clusters = payload['clusters']
new_points = []
for idx_cluster, cluster in enumerate(clusters):
points = cluster['points']
for idx_point, point in enumerate(points):
new_points.append(json.dumps(point))
difference = list(set(new_points) - set(saved_points))
saved_points = copy.copy(new_points)
for point in difference:
decoded = json.loads(point)
atmInfo = decoded['atmInfo']
limits = atmInfo['limits']
for limit in limits:
if limit['currency'] == 'USD':
history.append(f'{datetime.now()}: {decoded["address"]} - Сумма: {limit["amount"]}$')
for id in id_chats:
bot.send_message(id, f'{decoded["address"]} - Сумма: {limit["amount"]}$')
count += 1
last_check = datetime.now()
time.sleep(600)
if __name__ == '__main__':
while True:
try:
my_thread = threading.Thread(target=check)
my_thread.start()
bot.polling(none_stop=True)
except Exception as E:
for id in id_chats:
bot.send_message(id, f'Ошибка: {E}')