from rocketchat_API.rocketchat import RocketChat import dev_config as cfg import os import json from datetime import datetime from monthdelta import monthdelta def main(): crapauds_total = 0 crapauds_recent = 0 channels_total = 0 channels_recent = 0 channels_list = [] messages_total = 0 messages_recent = 0 recent_date = datetime.now() - monthdelta() rocket = RocketChat(cfg.rocket['user'], cfg.rocket['password'], server_url=cfg.rocket['server']) users = getAllUsers(rocket) crapauds_total = len(users) for user in users: date = user["createdAt"] user_date = datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%fZ") if user_date > recent_date: crapauds_recent += 1 channels = getAllChannels(rocket) channels_total = len(channels) for channel in channels: messages_total += int(channel["msgs"]) date = channel["ts"] channel_date = datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%fZ") if channel_date > recent_date: channels_recent += 1 channels_list.append(channel["name"]) messages = rocket.channels_history(channel["_id"], oldest= recent_date, count= 10000).json() messages_recent += len(messages["messages"]) info = { "crapauds": { "total": crapauds_total, "recent": crapauds_recent }, "canaux": { "total": channels_total, "recent": channels_recent, "liste": channels_list }, "messages": { "total": messages_total, "recent": messages_recent } } save(info) def save(info): # Récupération du répertoire racine du repo rootFolder = os.path.join(os.path.dirname(__file__), '..') # Répertoire pour stocker le fichier de sortie dataFolder = os.path.join(rootFolder, 'site', 'data') statsFilePath = os.path.abspath( os.path.join(dataFolder, 'chat.json')) with open(statsFilePath, "w") as file_write: json.dump(info, file_write) def getAllChannels(rocket): index = 0 allChannels = [] while True: channels = rocket.channels_list(offset = index).json() allChannels.extend([ channel for channel in channels['channels'] if 'archived' not in channel]) if channels['count'] + channels['offset'] >= channels['total']: break index += channels['count'] return allChannels def getAllUsers(rocket): index = 0 allUsers = [] while True: users = rocket.users_list(offset = index).json() allUsers.extend(users["users"]) if users['count'] + users['offset'] >= users['total']: break index += users['count'] return allUsers def getAllMessages(rocket, roomid, begindate): index = 0 allMessages = [] while True: messages = rocket.channels_history(roomid, offset = index, oldest= begindate).json() allMessages.extend(messages["m"]) if messages['count'] + messages['offset'] >= messages['total']: break index += messages['count'] return allMessages if __name__ == "__main__": main()