Python源码示例:aiohttp.get()
示例1
def alog(self, *, username):
"""Gets a users recent adventure log"""
username = username.replace(" ", "_")
if feedparser is None:
await self.bot.say("You'll need to run `pip3 install feedparser` "
"before you can get a user's adventure log.")
return
url = self.alog_url + username
try:
page = await aiohttp.get(url)
text = await page.text()
text = text.replace("\r", "")
except:
await self.bot.say("No user found.")
feed = feedparser.parse(text)
titles = [post.title for post in feed.entries]
await self.bot.say(self._fmt_alog(username, titles))
示例2
def on_message(message):
# we do not want the bot to reply to itself
if message.author == client.user:
return
for channel in RAID_IMAGE_CHANNELS:
if message.channel.id == channel:
print(message.attachments)
print(len(message.attachments))
print(message.attachments[0]['url'])
for attachment in message.attachments:
if attachment['url'] is not None:
async with aiohttp.get(attachment['url']) as r:
if r.status == 200:
img = await r.read()
with open(attachment['filename'], 'wb') as f:
f.write(img)
print(attachment['filename'], 'saved')
file_name_save = str(attachment['filename']).replace('_', '-')
save_path = SCREENSHOT_SAVE_PATH+'/'+file_name_save
print(save_path)
shutil.move(attachment['filename'], save_path)
示例3
def setbanned_info(self, ctx):
"""Display settings."""
server = ctx.message.server
self.check_server_settings(server)
em = discord.Embed(title="Banned: Settings")
em.add_field(
name="Spreadsheet ID",
value=self.settings[server.id]["SHEET_ID"])
em.add_field(
name="Service Key Uploaded",
value=os.path.exists(SERVICE_KEY_JSON))
role_ids = self.settings[server.id]["ROLES"]
roles = [discord.utils.get(server.roles, id=id) for id in role_ids]
role_names = [r.name for r in roles]
if len(role_names):
em.add_field(
name="Roles with edit permission",
value=', '.join(role_names))
else:
em.add_field(
name="Roles with edit permission",
value="None")
await self.bot.say(embed=em)
示例4
def setbanned_addrole(self, ctx, *, role):
"""Add roles allowed to edit bans."""
server = ctx.message.server
self.check_server_settings(server)
server_role = discord.utils.get(server.roles, name=role)
if server_role is None:
await self.bot.say(
'{} is not a valid role on this server.'.format(role))
return
self.check_server_settings(server)
if server_role.id in self.settings[server.id]["ROLES"]:
await self.bot.say(
'{} is already in the list.'.format(role))
return
self.settings[server.id]["ROLES"].append(server_role.id)
role_ids = self.settings[server.id]["ROLES"]
roles = [discord.utils.get(server.roles, id=id) for id in role_ids]
role_names = [r.name for r in roles]
await self.bot.say(
'List of roles updated: {}.'.format(
', '.join(role_names)))
dataIO.save_json(JSON, self.settings)
示例5
def setcalendar_gapisecret(self, ctx):
"""Set Google API service account Key.
This is a json file downloadable from the Google API Console.
"""
await self.bot.say(
"Please upload the Google API service account key (json).")
answer = await self.bot.wait_for_message(
timeout=30.0,
author=ctx.message.author)
if answer is None:
await self.bot.say("Time out.")
return
if not len(answer.attachments):
await self.bot.say("Cannot find attachments.")
return
attach = answer.attachments[0]
url = attach["url"]
async with aiohttp.get(url) as cred:
with open(SERVICE_KEY_JSON, "wb") as f:
f.write(await cred.read())
await self.bot.say(
"Attachment received: {}".format(SERVICE_KEY_JSON))
示例6
def default(self, *, channel: "channel"):
"""Default response."""
response = await (await aiohttp.get(
"https://beam.pro/api/v1/channels/{}".format(channel)
)).json()
if "id" in response:
data = await (await aiohttp.get(
self.BEAM_MANIFEST_URL.format(channel=response["id"])
)).json()
if "startedAt" in data:
time = datetime.datetime.utcnow() - datetime.datetime.strptime(
data["startedAt"], "%Y-%m-%dT%H:%M:%S.%fZ")
time -= datetime.timedelta(microseconds=time.microseconds)
return "Channel has been live for {}.".format(time)
return "Channel is offline."
示例7
def nsfw(self, ctx):
'''Vergibt die Rolle um auf die NSFW Channel zugreifen zu können'''
if ctx.guild.id == loadconfig.__botserverid__:
if loadconfig.__selfassignrole__:
role = discord.utils.get(ctx.guild.roles, name=loadconfig.__selfassignrole__)
if role in ctx.author.roles:
try:
await ctx.author.remove_roles(role)
except:
pass
tmp = await ctx.send(f':x: Rolle **{role}** wurde entfernt')
else:
try:
await ctx.author.add_roles(role)
except:
pass
tmp = await ctx.send(f':white_check_mark: Rolle **{role}** wurde hinzugefügt')
else:
tmp = await ctx.send('**:no_entry:** Es wurde keine Rolle für den Bot eingestellt! Wende dich bitte an den Bot Admin')
else:
tmp = await ctx.send(f'**:no_entry:** This command don\'t work on this server!')
await asyncio.sleep(2 * 60)
await tmp.delete()
await ctx.message.delete()
示例8
def ping(self, ctx):
'''Misst die Response Time'''
ping = ctx.message
pong = await ctx.send('**:ping_pong:** Pong!')
delta = pong.created_at - ping.created_at
delta = int(delta.total_seconds() * 1000)
await pong.edit(content=f':ping_pong: Pong! ({delta} ms)\n*Discord WebSocket Latenz: {round(self.bot.latency, 5)} ms*')
# @commands.command()
# @commands.cooldown(1, 2, commands.cooldowns.BucketType.guild)
# async def github(self, ctx):
# '''In progress'''
# url = 'https://api.github.com/repos/Der-Eddy/discord_bot/stats/commit_activity'
# async with aiohttp.get(url) as r:
# if r.status == 200:
# content = await r.json()
# commitCount = 0
# for week in content:
# commitCount += week['total']
#
# embed = discord.Embed(title="GitHub Repo Stats", type='rich', color=0xf1c40f) #Golden
# embed.set_thumbnail(url='https://assets-cdn.github.com/images/modules/logos_page/GitHub-Mark.png')
# embed.add_field(name='Commits', value=commitCount, inline=True)
# embed.add_field(name='Link', value='https://github.com/Der-Eddy/discord_bot')
# await ctx.send(embed=embed)
# else:
# await ctx.send(':x: Konnte nicht aufs GitHub API zugreifen\nhttps://github.com/Der-Eddy/discord_bot')
示例9
def emoji(self, ctx, emojiname: str):
'''Gibt eine vergrößerte Version eines angegebenen Emojis zurück
Beispiel:
-----------
:emoji Emilia
'''
emoji = discord.utils.find(lambda e: e.name.lower() == emojiname.lower(), self.bot.emojis)
if emoji:
tempEmojiFile = 'tempEmoji.png'
async with aiohttp.ClientSession() as cs:
async with cs.get(emoji.url) as img:
with open(tempEmojiFile, 'wb') as f:
f.write(await img.read())
f = discord.File(tempEmojiFile)
await ctx.send(file=f)
os.remove(tempEmojiFile)
else:
await ctx.send(':x: Konnte das angegebene Emoji leider nicht finden :(')
示例10
def check_sad(self, message):
# check if setting is on in this server
#let sadfaces happen in PMs always
server = message.server
if server != None:
if server.id not in self.servers:
#default off
self.servers[server.id] = False
# sadface is off, so ignore
if not self.servers[server.id]:
return
# comments explaining next section. seemed easier to read this way
# check for a phrase in message
# if sadface isn't downloaded yet, dl it
# try
# get image from url
# write image to file
# it worked \o/
# send it
# except
# there was a problem, print an error then try to send the url instead
# else sadface image already downloaded, send it
if "D:" in message.content.split():
if not self.sadLoaded:
try:
async with aiohttp.get(self.url) as r:
image = await r.content.read()
with open('data/sadface/sadface.png','wb') as f:
f.write(image)
self.sadLoaded = os.path.exists('data/sadface/sadface.png')
await self.bot.send_file(message.channel,self.image)
except Exception as e:
print(e)
print("Sadface error D: I couldn't download the file, so we're gonna use the url instead")
await self.bot.send_message(message.channel,self.url)
else:
await self.bot.send_file(message.channel,self.image)
示例11
def _auto_color(self, url:str, ranks):
phrases = ["Calculating colors..."] # in case I want more
#try:
await self.bot.say("**{}**".format(random.choice(phrases)))
clusters = 10
async with aiohttp.get(url) as r:
image = await r.content.read()
with open('data/leveler/temp_auto.png','wb') as f:
f.write(image)
im = Image.open('data/leveler/temp_auto.png').convert('RGBA')
im = im.resize((290, 290)) # resized to reduce time
ar = scipy.misc.fromimage(im)
shape = ar.shape
ar = ar.reshape(scipy.product(shape[:2]), shape[2])
codes, dist = scipy.cluster.vq.kmeans(ar.astype(float), clusters)
vecs, dist = scipy.cluster.vq.vq(ar, codes) # assign codes
counts, bins = scipy.histogram(vecs, len(codes)) # count occurrences
# sort counts
freq_index = []
index = 0
for count in counts:
freq_index.append((index, count))
index += 1
sorted_list = sorted(freq_index, key=operator.itemgetter(1), reverse=True)
colors = []
for rank in ranks:
color_index = min(rank, len(codes))
peak = codes[sorted_list[color_index][0]] # gets the original index
peak = peak.astype(int)
colors.append(''.join(format(c, '02x') for c in peak))
return colors # returns array
#except:
#await self.bot.say("```Error or no scipy. Install scipy doing 'pip3 install numpy' and 'pip3 install scipy' or read here: https://github.com/AznStevy/Maybe-Useful-Cogs/blob/master/README.md```")
# converts hex to rgb
示例12
def setlevel(self, ctx, user : discord.Member, level:int):
'''Set a user's level. (What a cheater C:).'''
org_user = ctx.message.author
server = user.server
channel = ctx.message.channel
# creates user if doesn't exist
await self._create_user(user, server)
userinfo = db.users.find_one({'user_id':user.id})
if server.id in self.settings["disabled_servers"]:
await self.bot.say("Leveler commands for this server are disabled.")
return
if level < 0:
await self.bot.say("**Please enter a positive number.**")
return
# get rid of old level exp
old_server_exp = 0
for i in range(userinfo["servers"][server.id]["level"]):
old_server_exp += self._required_exp(i)
userinfo["total_exp"] -= old_server_exp
userinfo["total_exp"] -= userinfo["servers"][server.id]["current_exp"]
# add in new exp
total_exp = self._level_exp(level)
userinfo["servers"][server.id]["current_exp"] = 0
userinfo["servers"][server.id]["level"] = level
userinfo["total_exp"] += total_exp
db.users.update_one({'user_id':user.id}, {'$set':{
"servers.{}.level".format(server.id): level,
"servers.{}.current_exp".format(server.id): 0,
"total_exp": userinfo["total_exp"]
}})
await self.bot.say("**{}'s Level has been set to `{}`.**".format(self._is_mention(user), level))
await self._handle_levelup(user, userinfo, server, channel)
示例13
def _valid_image_url(self, url):
max_byte = 1000
try:
async with aiohttp.get(url) as r:
image = await r.content.read()
with open('data/leveler/test.png','wb') as f:
f.write(image)
image = Image.open('data/leveler/test.png').convert('RGBA')
os.remove('data/leveler/test.png')
return True
except:
return False
示例14
def _process_user_top(self, ctx, username, gamemode: int):
key = self.osu_api_key["osu_api_key"]
channel = ctx.message.channel
user = ctx.message.author
server = user.server
# determine api to use
username, api = self._determine_api(server, list(username))
username = username[0]
# gives the final input for osu username
test_username = await self._process_username(ctx, username)
if test_username:
username = test_username
else:
return
# get userinfo
userinfo = list(await get_user(key, api, username, gamemode))
userbest = list(await get_user_best(key, api, username, gamemode, self.osu_settings['num_best_plays']))
if userinfo and userbest:
msg, top_plays = await self._get_user_top(ctx, api, userinfo[0], userbest, gamemode)
await self.bot.say(msg, embed=top_plays)
else:
await self.bot.say("**`{}` was not found or not enough plays.**".format(username))
## processes username. probably the worst chunck of code in this project so far. will fix/clean later
示例15
def get_beatmap(key, api:str, beatmap_id):
url_params = []
url_params.append(parameterize_key(key))
url_params.append(parameterize_id("b", beatmap_id))
async with aiohttp.get(build_request(url_params, "https://{}/api/get_beatmaps?".format(api))) as resp:
return await resp.json()
# Gets the beatmap set
示例16
def get_beatmapset(key, api:str, set_id):
url_params = []
url_params.append(parameterize_key(key))
url_params.append(parameterize_id("s", set_id))
async with aiohttp.get(build_request(url_params, "https://{}/api/get_beatmaps?".format(api))) as resp:
return await resp.json()
# Grabs the scores
示例17
def get_scores(key, api:str, beatmap_id, user_id, mode):
url_params = []
url_params.append(parameterize_key(key))
url_params.append(parameterize_id("b", beatmap_id))
url_params.append(parameterize_id("u", user_id))
url_params.append(parameterize_mode(mode))
async with aiohttp.get(build_request(url_params, "https://{}/api/get_scores?".format(api))) as resp:
return await resp.json()
示例18
def get_user_best(key, api:str, user_id, mode, limit):
url_params = []
url_params.append(parameterize_key(key))
url_params.append(parameterize_id("u", user_id))
url_params.append(parameterize_mode(mode))
url_params.append(parameterize_limit(limit))
async with aiohttp.get(build_request(url_params, "https://{}/api/get_user_best?".format(api))) as resp:
return await resp.json()
# Returns the user's ten most recent plays.
示例19
def get_user_recent(key, api:str, user_id, mode):
url_params = []
url_params.append(parameterize_key(key))
url_params.append(parameterize_id("u", user_id))
url_params.append(parameterize_mode(mode))
async with aiohttp.get(build_request(url_params, "https://{}/api/get_user_recent?".format(api))) as resp:
return await resp.json()
# Returns the full API request URL using the provided base URL and parameters.
示例20
def get_meme_id(self, meme):
url = self.search.format(self.username, self.password)
try:
async with aiohttp.ClientSession() as session:
async with session.get(self.search) as r:
results = await r.json()
for memes in results["data"]["memes"]:
if meme.lower() in memes["name"].lower():
return memes["id"]
except:
await self.get_memes()
示例21
def get_memes(self, ctx):
url = self.search.format(self.username, self.password)
prefix = self.get_prefix(ctx.message.server, ctx.message.content)
memelist = "```{}meme or id;text1;text2\n\n".format(prefix)
async with aiohttp.ClientSession() as session:
async with session.get(self.search) as r:
results = await r.json()
for memes in results["data"]["memes"]:
memelist += memes["name"] + ", "
if len(memelist) > 1500:
await self.bot.say(memelist + "```")
memelist = "```"
await self.bot.say(memelist[:len(memelist)-2] +
"``` Find a meme https://imgflip.com/memetemplates click blank template and get the Template ID for more!")
示例22
def meme(self, ctx, *, memeText: str):
""" Pulls a custom meme from imgflip"""
msg = memeText.split(";")
prefix = self.get_prefix(ctx.message.server, ctx.message.content)
await self.bot.send_typing(ctx.message.channel)
if len(msg) == 1:
meme, text1, text2 = msg[0], " ", " "
if len(msg) == 2:
meme, text1, text2 = msg[0], msg[1], " "
if len(msg) == 3:
meme, text1, text2 = msg[0], msg[1], msg[2]
text1 = text1[:20] if len(text1) > 20 else text1
text2 = text1[:20] if len(text2) > 20 else text2
username = self.settings["IMGFLIP_USERNAME"]
password = self.settings["IMGFLIP_PASSWORD"]
if not meme.isdigit():
meme = await self.get_meme_id(meme)
url = self.url.format(meme, username, password, text1, text2)
try:
async with aiohttp.get(url) as r:
result = await r.json()
if result["data"] != []:
url = result["data"]["url"]
await self.bot.say(url)
except:
await self.get_memes(ctx)
示例23
def add(self, ctx, prefix, github):
"""Add a new GitHub repo with the given prefix.
Format for adding a new GitHub repo is \"Username/Repository\""""
server = ctx.message.server
prefix = prefix.lower() # I'm Lazy okay :(
if prefix in self.settings[server.id]:
await self.bot.say('This prefix already exists in this server. Please use something else.')
elif len(github.split('/')) != 2:
await self.bot.say('Invalid format. Please use Username/Repository')
else:
# Confirm the User/Repo exits, We don't want to try and pull from a non existant repo
async with aiohttp.get('https://api.github.com/repos/{}'.format(github)) as response:
if response.status == 404:
await self.bot.say('The repository cannot be found.\nMake sure its a public repository.')
else:
fields = {
'author': True,
'status': True,
'comments': True,
'description': True,
'mergestatus': True,
'labels': True,
'closedby': False,
'locked': False,
'assigned': False,
'createdat': False,
'milestone': False,
'reviews': True
}
self.settings[server.id][prefix] = {'gh': github, 'fields': fields}
self.save_json()
await self.bot.say('All done, you can now use "{}#issue number" to gather information of an issue\nOr use ``{}githubcards edit``'.format(prefix, ctx.prefix))
示例24
def hs(self, *, username):
"""Gets hiscores info"""
username = username.replace(" ", "_")
url = self.base_url + username
try:
page = await aiohttp.get(url)
text = await page.text()
text = text.replace("\r", "")
text = text.split("\n")
except:
await self.bot.say("No user found.")
else:
await self.bot.say(self._fmt_hs(text))
示例25
def get_limit_per_message(self, server):
if server is None:
return 5
if not self._is_enabled(server):
return 5
return self.settings[server.id].get("LIMIT_PER_MESSAGE", 5)
示例26
def update_emote_list(self):
async with self.session.get(self.emote_url) as r:
resp = await r.json()
data = resp.get("emoticons", {})
self.emote_list = data
示例27
def _add_emote(self, server, chan_id):
assert isinstance(server, discord.Server)
if chan_id == -1:
return
if not os.path.exists("data/emotes/{}".format(chan_id)):
os.makedirs("data/emotes/{}".format(chan_id))
await self._remove_all_emotes(server, chan_id)
for emote in self.emote_list:
if chan_id == emote["images"][0].get("emoticon_set", -1):
url = emote["images"][0].get("url", "")
name = emote.get("regex", "")
file_name = url.split('/')[-1]
if url == "" or name == "":
continue
if not os.path.exists('data/emotes/{}/{}'.format(chan_id,
file_name)):
try:
async with aiohttp.get(url) as r:
image = await r.content.read()
except Exception as e:
print(
"Huh, I have no idea what errors aiohttp throws.")
print("This is one of them:")
print(e)
print(dir(e))
print("------")
continue
self._write_image(chan_id, file_name, image)
if server.id not in self.available_emotes:
self.available_emotes[server.id] = {}
self.available_emotes[server.id].append({
"name": name,
"file_name": file_name,
"chan_id": chan_id
})
self.save_available_emotes()
示例28
def check_messages(self, message):
if message.author.id == self.bot.user.id:
return
if message.channel.is_private:
return
if not self._is_enabled(message.server):
return
valid_emotes = self.available_emotes[message.server.id]
splitted = message.content.split(' ')
count = 0
for word in splitted:
for emote in valid_emotes:
if word == emote.get("name", ""):
fname = 'data/emotes/{}/{}'.format(
emote["chan_id"], emote["file_name"])
if not os.path.exists(fname):
break
img = Image.open(fname)
if self.get_scale(message.server) != 1.0:
scale = self.get_scale(message.server)
img = img.resize((int(img.width * scale),
int(img.height * scale)),
Image.ANTIALIAS)
tmpfile = BytesIO()
fmt = os.path.splitext(emote["file_name"])[1].replace('.',
'')
img.save(tmpfile, format=fmt)
tmpfile.seek(0)
await self.bot.send_file(message.channel, tmpfile,
filename=emote["file_name"])
tmpfile.close()
count += 1
if self.get_limit_per_message(message.server) != 0 and \
count >= \
self.get_limit_per_message(message.server):
return
break
示例29
def setfirebase_service_key(self, ctx):
"""Set Firebase Service key.
This is generated by the Firebase Console.
You can get it here:
https://console.firebase.google.com/project/_/settings/serviceaccounts/adminsdk
"""
TIMEOUT = 30.0
await self.bot.say(
"Please upload the Firebase service account key (json). "
"[Timeout: {} seconds]".format(TIMEOUT))
attach_msg = await self.bot.wait_for_message(
timeout=TIMEOUT,
author=ctx.message.author)
if attach_msg is None:
await self.bot.say("Operation time out.")
return
if not len(attach_msg.attachments):
await self.bot.say("Cannot find attachments.")
return
attach = attach_msg.attachments[0]
url = attach["url"]
async with aiohttp.get(url) as cred:
with open(SERVICE_KEY_JSON, "wb") as f:
f.write(await cred.read())
await self.bot.say(
"Attachment received and saved as {}".format(SERVICE_KEY_JSON))
self.settings['SERVICE_ACCOUNT'] = SERVICE_KEY_JSON
dataIO.save_json(JSON, self.settings)
# Delete uploaded attachment
await self.bot.delete_message(attach_msg)
示例30
def setfirebase_server_key(self, ctx, key):
"""Set Firebase Cloud Messaging Server Key.
This is generated by the Firebase Console
You can get it here:
https://console.firebase.google.com/project/_/settings/cloudmessaging
"""
self.settings["SERVER_KEY"] = key
dataIO.save_json(JSON, self.settings)
await self.bot.say("Saved Firebase Cloud Messaging Server Key.")
await self.bot.delete_message(ctx.message)