base frame

This commit is contained in:
madokax 2024-01-02 10:33:01 +08:00
commit f7c72048eb
17 changed files with 1343 additions and 0 deletions

104
.github/close_issue.py vendored Normal file
View File

@ -0,0 +1,104 @@
import os
import requests
issue_labels = ['no respect']
github_repo = 'MagicalMadoka/openai-signup-tool'
github_token = os.getenv("GITHUB_TOKEN")
headers = {
'Authorization': 'Bearer ' + github_token,
'Accept': 'application/vnd.github+json',
'X-GitHub-Api-Version': '2022-11-28',
}
def get_stargazers(repo):
page = 1
_stargazers = {}
while True:
queries = {
'per_page': 100,
'page': page,
}
url = 'https://api.github.com/repos/{}/stargazers?'.format(repo)
resp = requests.get(url, headers=headers, params=queries)
if resp.status_code != 200:
raise Exception('Error get stargazers: ' + resp.text)
data = resp.json()
if not data:
break
for stargazer in data:
_stargazers[stargazer['login']] = True
page += 1
print('list stargazers done, total: ' + str(len(_stargazers)))
return _stargazers
def get_issues(repo):
page = 1
_issues = []
while True:
queries = {
'state': 'open',
'sort': 'created',
'direction': 'desc',
'per_page': 100,
'page': page,
}
url = 'https://api.github.com/repos/{}/issues?'.format(repo)
resp = requests.get(url, headers=headers, params=queries)
if resp.status_code != 200:
raise Exception('Error get issues: ' + resp.text)
data = resp.json()
if not data:
break
_issues += data
page += 1
print('list issues done, total: ' + str(len(_issues)))
return _issues
def close_issue(repo, issue_number):
url = 'https://api.github.com/repos/{}/issues/{}'.format(repo, issue_number)
data = {
'state': 'closed',
'state_reason': 'not_planned',
'labels': issue_labels,
}
resp = requests.patch(url, headers=headers, json=data)
if resp.status_code != 200:
raise Exception('Error close issue: ' + resp.text)
print('issue: {} closed'.format(issue_number))
def lock_issue(repo, issue_number):
url = 'https://api.github.com/repos/{}/issues/{}/lock'.format(repo, issue_number)
data = {
'lock_reason': 'spam',
}
resp = requests.put(url, headers=headers, json=data)
if resp.status_code != 204:
raise Exception('Error lock issue: ' + resp.text)
print('issue: {} locked'.format(issue_number))
if '__main__' == __name__:
stargazers = get_stargazers(github_repo)
issues = get_issues(github_repo)
for issue in issues:
login = issue['user']['login']
if login not in stargazers:
print('issue: {}, login: {} not in stargazers'.format(issue['number'], login))
close_issue(github_repo, issue['number'])
lock_issue(github_repo, issue['number'])
print('done')

24
.github/workflows/cloise-issue.yml vendored Normal file
View File

@ -0,0 +1,24 @@
name: CloseIssue
on:
schedule:
- cron: '0 */6 * * *'
workflow_dispatch:
jobs:
run-python-script:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
- name: Install Dependencies
run: pip install requests
- name: Run close_issue.py Script
env:
GITHUB_TOKEN: ${{ secrets.GHCR_PAT }}
run: python .github/close_issue.py

47
.github/workflows/docker-image.yml vendored Normal file
View File

@ -0,0 +1,47 @@
name: Build Docker Image
on:
push:
branches:
- main
workflow_dispatch:
env:
GHCR_REPO: ghcr.io/magicalmadoka/openai-signup-tool
jobs:
main:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Set up QEMU
uses: docker/setup-qemu-action@v1
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
- name: Login to GitHub Container Registry
uses: docker/login-action@v1
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GHCR_PAT }}
- name: Cache Docker layers
uses: actions/cache@v2
with:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-buildx-cache
- name: Build and push to GHCR
uses: docker/build-push-action@v2
with:
context: .
platforms: linux/amd64,linux/arm64
file: Dockerfile
push: true
tags: ${{ env.GHCR_REPO }}:latest
cache-from: type=local,src=/tmp/.buildx-cache
cache-to: type=local,dest=/tmp/.buildx-cache,mode=max

7
.gitignore vendored Normal file
View File

@ -0,0 +1,7 @@
.idea/
solved/
account.txt
sess.txt
config.json

74
Dockerfile Normal file
View File

@ -0,0 +1,74 @@
FROM python:3.11-slim-bullseye as builder
# Build dummy packages to skip installing them and their dependencies
RUN apt-get update \
&& apt-get install -y --no-install-recommends equivs \
&& equivs-control libgl1-mesa-dri \
&& printf 'Section: misc\nPriority: optional\nStandards-Version: 3.9.2\nPackage: libgl1-mesa-dri\nVersion: 99.0.0\nDescription: Dummy package for libgl1-mesa-dri\n' >> libgl1-mesa-dri \
&& equivs-build libgl1-mesa-dri \
&& mv libgl1-mesa-dri_*.deb /libgl1-mesa-dri.deb \
&& equivs-control adwaita-icon-theme \
&& printf 'Section: misc\nPriority: optional\nStandards-Version: 3.9.2\nPackage: adwaita-icon-theme\nVersion: 99.0.0\nDescription: Dummy package for adwaita-icon-theme\n' >> adwaita-icon-theme \
&& equivs-build adwaita-icon-theme \
&& mv adwaita-icon-theme_*.deb /adwaita-icon-theme.deb
FROM python:3.11-slim-bullseye
# Copy dummy packages
COPY --from=builder /*.deb /
# Install dependencies and create flaresolverr user
# You can test Chromium running this command inside the container:
# xvfb-run -s "-screen 0 1600x1200x24" chromium --no-sandbox
# The error traces is like this: "*** stack smashing detected ***: terminated"
# To check the package versions available you can use this command:
# apt-cache madison chromium
WORKDIR /app
# Install dummy packages
RUN dpkg -i /libgl1-mesa-dri.deb \
&& dpkg -i /adwaita-icon-theme.deb \
# Install dependencies
&& apt-get update \
&& apt-get install -y --no-install-recommends chromium chromium-common chromium-driver xvfb dumb-init \
procps curl vim xauth \
# Remove temporary files and hardware decoding libraries
&& rm -rf /var/lib/apt/lists/* \
&& rm -f /usr/lib/x86_64-linux-gnu/libmfxhw* \
&& rm -f /usr/lib/x86_64-linux-gnu/mfx/* \
# Create flaresolverr user
&& useradd --home-dir /app --shell /bin/sh flaresolverr \
&& mv /usr/bin/chromedriver chromedriver \
&& chown -R flaresolverr:flaresolverr .
# Install Python dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt \
# Remove temporary files
&& rm -rf /root/.cache
USER flaresolverr
RUN mkdir -p "/app/.config/chromium/Crash Reports/pending"
COPY src .
# dumb-init avoids zombie chromium processes
ENTRYPOINT ["/usr/bin/dumb-init", "--"]
CMD ["python", "main.py"]
# Local build
# docker build -t ngosang/flaresolverr:3.3.12 .
# docker run -p 8191:8191 ngosang/flaresolverr:3.3.12
# Multi-arch build
# docker run --rm --privileged multiarch/qemu-user-static --reset -p yes
# docker buildx create --use
# docker buildx build -t ngosang/flaresolverr:3.3.12 --platform linux/386,linux/amd64,linux/arm/v7,linux/arm64/v8 .
# add --push to publish in DockerHub
# Test multi-arch build
# docker run --rm --privileged multiarch/qemu-user-static --reset -p yes
# docker buildx create --use
# docker buildx build -t ngosang/flaresolverr:3.3.12 --platform linux/arm/v7 --load .
# docker run -p 8191:8191 --platform linux/arm/v7 ngosang/flaresolverr:3.3.12

53
README.md Normal file
View File

@ -0,0 +1,53 @@
# openai注册机
## 介绍
基于浏览器方案的openai注册机
## 免责声明
本项目仅供学习交流使用,严禁用于商业用途,否则后果自负。
## 使用方法
### 前置准备
- 一个支持openai注册的域名比如`example.com`
- 一个支持`catch all`的收件服务比如cloudflare或者自建的邮箱服务器
- 一个用于接受`catch all`邮件的且支持imap协议的邮箱比如`outlook`、`gmail`
### 开始使用
1. 配置邮箱
配置好你前序准备的域名和邮箱保证所有openai的邮件都会转发到你的支持imap协议的邮箱中。
2. 克隆本项目
```bash
git clone https://github.com/MagicalMadoka/openai-signup-tool.git
cd openai-signup-tool
```
3. 重命名`config/config.json.example`为`config/config.json`
- `domain`: 必填,你注册用的域名。
- `proxy`: 选填代理地址。背后最好使用高质量的代理池可以减少过cf和arkose的麻烦。如果代理服务器运行在你的本地请使用`host.docker.internal`作为代理地址。
- `clientKey`: 选填,[yescaptcha](https://yescaptcha.com/i/oFmkQz)的clientKey如果出现验证码会尝试进行打码。
- `emailWorkerNum`: 必填,处理邮件的线程个数,根据机器的配置自行决定。
- `signupWorkerNum`: 必填,注册线程的个数,根据机器的配置自行决定。
- `emailAddr`: 必填,你的邮箱地址。
- `emailPassword`: 必填,你的邮箱密码,或应用密码。这取决于你的邮箱服务的提供方。
- `emailImapServer`: 必填你的邮箱的imap服务器地址一般可以在你邮箱服务的提供方的文档中找到。
- `emailImapPort`: 选填你的邮箱的imap服务器端口一般可以在你邮箱服务的提供方的文档中找到。
4. 运行
```bash
docker compose up -d
```
注册成功的账号会出现在`data/accounts.txt`中。如果账号被授权了额度会额外提取sess到`data/sess.txt`中。
## 其他说明
- 本项目在一个正常延时的网络和一个配置正常的机器下测试运行正常,所以如果出现问题,可以先排查网络和机器的问题。当然也欢迎你补充一些异常处理的代码。
- 改方案是基于浏览器的方案,内存不要给的太少,否则会异常退出。
## 参考项目
- https://github.com/FlareSolverr/FlareSolverr
## 交流沟通
- 本项目相关的讨论请提issue
- 其他技术交流。逆向、验证码、深度学习、python、go都可以聊

View File

@ -0,0 +1,11 @@
{
"domain": "",
"proxy": "",
"clientKey": "",
"emailWorkerNum": 1,
"signupWorkerNum": 1,
"emailAddr": "",
"emailPassword": "",
"emailImapServer": "",
"emailImapPort": ""
}

2
data/.gitkeep Normal file
View File

@ -0,0 +1,2 @@
!.gitkeep
.idea/

11
docker-compose.yml Normal file
View File

@ -0,0 +1,11 @@
version: "3"
services:
flaresolverr:
image: ghcr.io/magicalmadoka/openai-signup-tool
container_name: openai-signup-tool
restart: unless-stopped
volumes:
- ./data:/app/data
- ./config:/app/config
extra_hosts:
- "host.docker.internal:host-gateway"

6
requirements.txt Normal file
View File

@ -0,0 +1,6 @@
selenium==4.16.0
requests==2.31.0
loguru==0.7.2
undetected_chromedriver==3.5.4
func_timeout==4.3.5
xvfbwrapper==0.2.9

133
src/cloudflare_solver.py Normal file
View File

@ -0,0 +1,133 @@
import random
import time
from loguru import logger
from selenium.common import TimeoutException
from selenium.webdriver import ActionChains
from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.expected_conditions import title_is, presence_of_element_located, staleness_of
from selenium.webdriver.support.wait import WebDriverWait
from utils import get_webdriver
CHALLENGE_TITLES = [
# Cloudflare
'Just a moment...',
# DDoS-GUARD
'DDoS-Guard'
]
CHALLENGE_SELECTORS = [
# Cloudflare
'#cf-challenge-running', '.ray_id', '.attack-box', '#cf-please-wait', '#challenge-spinner', '#trk_jschal_js',
# Custom CloudFlare for EbookParadijs, Film-Paleis, MuziekFabriek and Puur-Hollands
'td.info #js_info',
# Fairlane / pararius.com
'div.vc div.text-box h2'
]
SHORT_TIMEOUT = 1
def click_verify(driver: WebDriver):
logger.debug("waiting for the Cloudflare verify checkbox...")
time.sleep(random.randint(5,10))
try:
logger.debug("Try to find the Cloudflare verify checkbox...")
iframe = driver.find_element(By.XPATH, "//iframe[starts-with(@id, 'cf-chl-widget-')]")
driver.switch_to.frame(iframe)
checkbox = driver.find_element(
by=By.XPATH,
value='//*[@id="challenge-stage"]/div/label/input',
)
if checkbox:
actions = ActionChains(driver)
actions.move_to_element_with_offset(checkbox, random.randint(2,6), random.randint(2,8))
actions.click(checkbox)
actions.perform()
logger.debug("Cloudflare verify checkbox found and clicked!")
except Exception:
logger.debug("Cloudflare verify checkbox not found on the page.")
finally:
driver.switch_to.default_content()
try:
logger.debug("Try to find the Cloudflare 'Verify you are human' button...")
button = driver.find_element(
by=By.XPATH,
value="//input[@type='button' and @value='Verify you are human']",
)
if button:
actions = ActionChains(driver)
actions.move_to_element_with_offset(button, random.randint(2,6), random.randint(2,8))
actions.click(button)
actions.perform()
logger.debug("The Cloudflare 'Verify you are human' button found and clicked!")
except Exception:
logger.debug("The Cloudflare 'Verify you are human' button not found on the page.")
time.sleep(2)
def bypass(link,driver):
driver.get(link)
driver.start_session()
driver.get(link)
driver.start_session()
# todo check ban
page_title = driver.title
challenge_found = False
for title in CHALLENGE_TITLES:
if title.lower() == page_title.lower():
challenge_found = True
logger.debug("Challenge detected. Title found: " + page_title)
break
if not challenge_found:
# find challenge by selectors
for selector in CHALLENGE_SELECTORS:
found_elements = driver.find_elements(By.CSS_SELECTOR, selector)
if len(found_elements) > 0:
challenge_found = True
logger.debug("Challenge detected. Selector found: " + selector)
break
attempt = 0
if challenge_found:
while True:
try:
attempt = attempt + 1
# wait until the title changes
for title in CHALLENGE_TITLES:
logger.debug("Waiting for title (attempt " + str(attempt) + "): " + title)
WebDriverWait(driver, SHORT_TIMEOUT).until_not(title_is(title))
# then wait until all the selectors disappear
for selector in CHALLENGE_SELECTORS:
logger.debug("Waiting for selector (attempt " + str(attempt) + "): " + selector)
WebDriverWait(driver, SHORT_TIMEOUT).until_not(
presence_of_element_located((By.CSS_SELECTOR, selector)))
# all elements not found
break
except TimeoutException:
logger.debug("Timeout waiting for selector")
click_verify(driver)
# update the html (cloudflare reloads the page every 5 s)
html_element = driver.find_element(By.TAG_NAME, "html")
# waits until cloudflare redirection ends
logger.debug("Waiting for redirect")
# noinspection PyBroadException
try:
WebDriverWait(driver, SHORT_TIMEOUT).until(staleness_of(html_element))
except Exception:
logger.debug("Timeout waiting for redirect")
logger.debug("Challenge solved!")

4
src/config.py Normal file
View File

@ -0,0 +1,4 @@
import json
with open('config/config.json', 'r') as file:
config = json.load(file)

2
src/globals.py Normal file
View File

@ -0,0 +1,2 @@
class GlobalState:
exception = None

40
src/main.py Normal file
View File

@ -0,0 +1,40 @@
import threading
import traceback
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
from config import config
from globals import GlobalState
from signup import Signup, Interrupted
from verify_email import verify_email
def main():
email_worker = threading.Thread(target=verify_email)
email_worker.start()
max_threads = config['signupWorkerNum']
task_queue = Queue(max_threads)
executor = ThreadPoolExecutor(max_threads)
def worker(q, executor):
while True:
task = q.get()
executor.submit(task)
worker_thread = threading.Thread(target=worker, args=(task_queue, executor))
worker_thread.start()
def signup():
try:
s = Signup()
s.sign_up()
except Exception as e:
traceback.print_exc()
while True:
if GlobalState.exception :
raise GlobalState.exception
task_queue.put(signup)
if __name__ == '__main__':
main()

373
src/signup.py Normal file
View File

@ -0,0 +1,373 @@
import base64
import fcntl
import json
import os
import random
import re
import secrets
import string
import time
import traceback
import uuid
import requests
from func_timeout import func_timeout
from loguru import logger
from selenium.common import NoSuchElementException, StaleElementReferenceException, TimeoutException, WebDriverException
from selenium.webdriver import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import cloudflare_solver
from config import config
import utils
from globals import GlobalState
class Interrupted(Exception):
pass
class Signup:
def __init__(self):
self.driver = utils.get_webdriver()
def sign_up(self):
try:
func_timeout(10 * 60, self._sign_up)
except Interrupted as e:
logger.error("error in signup: {}".format(e))
raise e
except Exception as e:
traceback.print_exc()
finally:
self.driver.quit()
def _sign_up(self):
cloudflare_solver.bypass('https://platform.openai.com/signup/', self.driver)
email_input = WebDriverWait(self.driver, 30).until(
EC.presence_of_element_located((By.ID, "email"))
)
email = self._get_email()
email_input.send_keys(email)
# todo check email
submit_btn = self.driver.find_element(By.XPATH, '//button[@type="submit"]')
submit_btn.click()
password_input = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.ID, "password"))
)
password = self._get_password()
password_input.send_keys(password)
submit_btn = self.driver.find_element(By.XPATH, '//button[@type="submit"]')
submit_btn.click()
while True:
try:
self.driver.find_element(By.XPATH, "//h1[text()='Oops!']")
raise Exception("Oops!")
except NoSuchElementException:
pass
try:
self.driver.find_element(By.XPATH, "//p[text()='Too many signups from the same IP']")
GlobalState.exception = Interrupted("Too many signups from the same IP")
raise GlobalState.exception
except NoSuchElementException:
pass
try:
self.driver.find_element(By.XPATH, "//h1[text()='Tell us about you']")
break
except NoSuchElementException:
logger.debug(f"{email} wait for email verification")
time.sleep(6)
self.driver.refresh()
time.sleep(4)
name_input = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "input[placeholder='Full name']"))
)
name_input.send_keys(''.join(random.choices(string.ascii_letters, k=3)))
birthday_input = self.driver.find_element(By.XPATH, "//input[@placeholder='Birthday']")
year = random.randint(1980, 2000)
month = random.randint(1, 12)
day = random.randint(1, 28)
random_date = "{:02d}/{:02d}/{}".format(day, month, year)
birthday_input.send_keys(random_date)
submit_btn = self.driver.find_element(By.XPATH, '//button[@type="submit"]')
submit_btn.click()
self._try_solve_arkose_challenge()
self._save_account(email, password)
def _get_email(self):
return ''.join(
[secrets.choice(string.ascii_letters + string.digits) for _ in range(12)]) + "@" + config['domain']
def _get_password(self):
return ''.join(
[secrets.choice(string.ascii_letters + string.digits + string.punctuation) for _ in range(15)])
def _get_base64(self, bg_image_url):
script = """
var callback = arguments[arguments.length - 1];
var xhr = new XMLHttpRequest();
xhr.responseType = 'blob';
xhr.onload = function() {
var reader = new FileReader();
reader.onloadend = function() {
callback(reader.result);
};
reader.readAsDataURL(xhr.response);
};
xhr.open('GET', arguments[0]);
xhr.send();
"""
# 执行脚本并获取结果
base64_data = self.driver.execute_async_script(script, bg_image_url)
return base64_data
def _get_ans_index(self, que, base64):
url = "https://api.yescaptcha.com"
clientKey = config['clientKey']
if not clientKey:
GlobalState.exception = Interrupted("match funcaptcha but no yes clientKey")
raise GlobalState.exception
json = {
"clientKey": clientKey,
"task": {
"type": "FunCaptchaClassification",
"image": base64,
"question": que
},
"softID": 31275
}
resp = requests.post(url + "/createTask", json=json)
index = resp.json()['solution']['objects'][0]
return index
def _save_and_get_sess(self, email, password):
for i in range(3):
logs = self.driver.get_log('performance')
for log in logs:
log_json = json.loads(log['message'])['message']
if log_json['method'] == 'Network.responseReceived' and 'dashboard/onboarding/create_account' in \
log_json['params']['response']['url']:
request_id = log_json['params']['requestId']
try:
response = self.driver.execute_cdp_cmd('Network.getResponseBody', {'requestId': request_id})
if response and response['body']:
if 'session' in response['body']:
with open("data/account.txt", "a", encoding="utf-8") as f:
fcntl.flock(f, fcntl.LOCK_EX)
f.write(f"{email}----{password}\n")
logger.info(f"{email} signup success")
self._save_challange_image()
sess = json.loads(response['body'])['session']['sensitive_id']
return sess
except WebDriverException as e:
pass
time.sleep(3)
return None
def _save_challange_image(self):
if hasattr(self, 'image_datas') and hasattr(self, 'que') and self.image_datas and self.que:
path = f'data/solved/{self.que}'
os.makedirs(path, exist_ok=True)
for i, image_data in enumerate(self.image_datas):
base64_string = image_data.split(',')[1]
data = base64.b64decode(base64_string)
with open(f"{path}/{uuid.uuid4()}_{self.ans_index[i]}.jpg", "wb") as f:
f.write(data)
def _save_account(self, email, password):
sess = self._save_and_get_sess(email, password)
if sess:
url = "https://api.openai.com/dashboard/billing/credit_grants"
headers = {
"Authorization": f"Bearer {sess}",
"Content-Type": "application/json",
}
resp = requests.get(url, headers=headers, allow_redirects=False)
if resp.status_code == 200:
data = resp.json()
if data['total_available'] > 0:
with open("data/sess.txt", "a", encoding="utf-8") as f:
fcntl.flock(f, fcntl.LOCK_EX)
f.write(f"{sess}\n")
logger.info(f"{email} save sess success")
else:
logger.warning(f"{email} no credit")
else:
logger.warning(f"{email} sess found fail")
def _try_solve_arkose_challenge(self):
for i in range(3):
try:
try:
arkose_frame = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.CSS_SELECTOR, 'iframe[title="Verification challenge"]'))
)
self.driver.switch_to.frame(arkose_frame)
except StaleElementReferenceException:
continue
except TimeoutException:
logger.info("no arkose frame found")
return
try:
change_challenge_frame = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.ID, 'game-core-frame'))
)
except TimeoutException:
logger.info("no change challenge frame found")
return
self.driver.switch_to.frame(change_challenge_frame)
try:
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH, "//button[text()='Begin puzzle']"))
)
for i in range(3):
try:
start_btn = self.driver.find_element(By.XPATH, "//button[text()='Begin puzzle']")
start_btn.click()
except Exception:
pass
except TimeoutException:
pass
game_type, que, num = self._get_funcaptcha_challenge()
if not game_type:
logger.warning("game type not found")
return
logger.info(f"game type: {game_type}, que: {que}, num: {num}")
self.que = que
self.image_datas = []
self.ans_index = []
last_bg_image_url = None
for i in range(num):
image_selector = 'img[style*="background-image"]' if game_type == 4 else 'button[style*="background-image"]'
image_el = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located((By.CSS_SELECTOR, image_selector))
)
style_attribute = image_el.get_attribute('style')
match = re.search(r"url\(['\"]?(.*?)['\"]?\)", style_attribute)
bg_image_url = match.group(1) if match else None
while bg_image_url == last_bg_image_url or bg_image_url is None:
image_el = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located((By.CSS_SELECTOR, image_selector))
)
style_attribute = image_el.get_attribute('style')
match = re.search(r"url\(['\"]?(.*?)['\"]?\)", style_attribute)
bg_image_url = match.group(1) if match else None
last_bg_image_url = bg_image_url
base64 = self._get_base64(bg_image_url)
self.image_datas.append(base64)
index = self._get_ans_index(que, base64)
self.ans_index.append(index)
if game_type == 4:
for j in range(index):
next_btn = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located((By.CSS_SELECTOR, 'a[role="button"].right-arrow'))
)
actions = ActionChains(self.driver)
actions.move_to_element_with_offset(next_btn, random.randint(1, 5), random.randint(1, 5))
actions.click(next_btn)
actions.perform()
sub_btn = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located((By.CSS_SELECTOR, 'button.button'))
)
actions = ActionChains(self.driver)
actions.move_to_element_with_offset(sub_btn, random.randint(1, 10), random.randint(1, 10))
actions.click(sub_btn)
actions.perform()
else:
image_button = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located(
(By.CSS_SELECTOR, f'button[aria-label="Image {index + 1} of 6."]'))
)
actions = ActionChains(self.driver)
actions.move_to_element_with_offset(image_button, random.randint(1, 10), random.randint(1, 10))
actions.click(image_button)
actions.perform()
try:
try_again_btn = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, "//button[text()='Try again']"))
)
try_again_btn.click()
except TimeoutException:
logger.debug("no try again button found may be resolved")
return
except Exception:
traceback.print_exc()
logger.warning(f"fail to resolve arkose challenge current retry num {i}")
finally:
self.driver.switch_to.default_content()
def _get_funcaptcha_challenge(self):
try:
que_el = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "h2[tabindex='-1']"))
)
game_type = 3
que = que_el.text
num_elm = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "p[data-theme='tile-game.roundText']"))
)
num = int(re.search(r'of\s+(\d+)', num_elm.text).group(1))
return game_type, que, num
except TimeoutException:
try:
que_el = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located((By.XPATH, "//span[@role='text']"))
)
game_type = 4
que_full_text = que_el.text
que = que_full_text.split('(')[0].strip()
num = int(re.search(r'\(\d+ of (\d+)\)', que_full_text).group(1))
return game_type, que, num
except TimeoutException:
logger.info("challenge not found")
return None,None,None
if __name__ == '__main__':
s = Signup()
s.sign_up()

335
src/utils.py Normal file
View File

@ -0,0 +1,335 @@
import json
import logging
import os
import re
import shutil
import urllib.parse
import tempfile
from selenium.webdriver.chrome.webdriver import WebDriver
import undetected_chromedriver as uc
from config import config
FLARESOLVERR_VERSION = None
CHROME_EXE_PATH = None
CHROME_MAJOR_VERSION = None
USER_AGENT = None
XVFB_DISPLAY = None
PATCHED_DRIVER_PATH = None
def get_config_log_html() -> bool:
return os.environ.get('LOG_HTML', 'false').lower() == 'true'
def get_config_headless() -> bool:
return os.environ.get('HEADLESS', 'true').lower() == 'true'
def get_flaresolverr_version() -> str:
global FLARESOLVERR_VERSION
if FLARESOLVERR_VERSION is not None:
return FLARESOLVERR_VERSION
package_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), os.pardir, 'package.json')
if not os.path.isfile(package_path):
package_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'package.json')
with open(package_path) as f:
FLARESOLVERR_VERSION = json.loads(f.read())['version']
return FLARESOLVERR_VERSION
def create_proxy_extension(proxy: dict) -> str:
parsed_url = urllib.parse.urlparse(proxy['url'])
scheme = parsed_url.scheme
host = parsed_url.hostname
port = parsed_url.port
username = proxy['username']
password = proxy['password']
manifest_json = """
{
"version": "1.0.0",
"manifest_version": 2,
"name": "Chrome Proxy",
"permissions": [
"proxy",
"tabs",
"unlimitedStorage",
"storage",
"<all_urls>",
"webRequest",
"webRequestBlocking"
],
"background": {"scripts": ["background.js"]},
"minimum_chrome_version": "76.0.0"
}
"""
background_js = """
var config = {
mode: "fixed_servers",
rules: {
singleProxy: {
scheme: "%s",
host: "%s",
port: %d
},
bypassList: ["localhost"]
}
};
chrome.proxy.settings.set({value: config, scope: "regular"}, function() {});
function callbackFn(details) {
return {
authCredentials: {
username: "%s",
password: "%s"
}
};
}
chrome.webRequest.onAuthRequired.addListener(
callbackFn,
{ urls: ["<all_urls>"] },
['blocking']
);
""" % (
scheme,
host,
port,
username,
password
)
proxy_extension_dir = tempfile.mkdtemp()
with open(os.path.join(proxy_extension_dir, "manifest.json"), "w") as f:
f.write(manifest_json)
with open(os.path.join(proxy_extension_dir, "background.js"), "w") as f:
f.write(background_js)
return proxy_extension_dir
def get_webdriver() -> WebDriver:
proxy = {
"url": config['proxy']
}
global PATCHED_DRIVER_PATH, USER_AGENT
logging.debug('Launching web browser...')
# undetected_chromedriver
options = uc.ChromeOptions()
options.add_argument('--no-sandbox')
options.add_argument('--window-size=1920,1080')
# todo: this param shows a warning in chrome head-full
options.add_argument('--disable-setuid-sandbox')
options.add_argument('--disable-dev-shm-usage')
# this option removes the zygote sandbox (it seems that the resolution is a bit faster)
options.add_argument('--no-zygote')
# attempt to fix Docker ARM32 build
options.add_argument('--disable-gpu-sandbox')
options.add_argument('--disable-software-rasterizer')
options.add_argument('--ignore-certificate-errors')
options.add_argument('--ignore-ssl-errors')
# fix GL errors in ASUSTOR NAS
# https://github.com/FlareSolverr/FlareSolverr/issues/782
# https://github.com/microsoft/vscode/issues/127800#issuecomment-873342069
# https://peter.sh/experiments/chromium-command-line-switches/#use-gl
options.add_argument('--use-gl=swiftshader')
options.add_argument('--lang=en')
options.add_experimental_option('prefs', {'intl.accept_languages': 'en,en_US'})
# Fix for Chrome 117 | https://github.com/FlareSolverr/FlareSolverr/issues/910
if USER_AGENT is not None:
options.add_argument('--user-agent=%s' % USER_AGENT)
proxy_extension_dir = None
if proxy and all(key in proxy for key in ['url', 'username', 'password']):
proxy_extension_dir = create_proxy_extension(proxy)
options.add_argument("--load-extension=%s" % os.path.abspath(proxy_extension_dir))
elif proxy and 'url' in proxy:
proxy_url = proxy['url']
logging.debug("Using webdriver proxy: %s", proxy_url)
options.add_argument('--proxy-server=%s' % proxy_url)
# note: headless mode is detected (headless = True)
# we launch the browser in head-full mode with the window hidden
windows_headless = False
if get_config_headless():
if os.name == 'nt':
windows_headless = True
elif os.name == 'darwin':
# debug
windows_headless = False
else:
start_xvfb_display()
# For normal headless mode:
# options.add_argument('--headless')
# if we are inside the Docker container, we avoid downloading the driver
driver_exe_path = None
version_main = None
if os.path.exists("/app/chromedriver"):
# running inside Docker
driver_exe_path = "/app/chromedriver"
else:
version_main = get_chrome_major_version()
if PATCHED_DRIVER_PATH is not None:
driver_exe_path = PATCHED_DRIVER_PATH
# detect chrome path
browser_executable_path = get_chrome_exe_path()
options.set_capability('goog:loggingPrefs', {'performance': 'ALL'})
# downloads and patches the chromedriver
# if we don't set driver_executable_path it downloads, patches, and deletes the driver each time
driver = uc.Chrome(options=options, browser_executable_path=browser_executable_path,
driver_executable_path=driver_exe_path, version_main=version_main,
windows_headless=windows_headless, headless=windows_headless)
# save the patched driver to avoid re-downloads
if driver_exe_path is None:
PATCHED_DRIVER_PATH = os.path.join(driver.patcher.data_path, driver.patcher.exe_name)
if PATCHED_DRIVER_PATH != driver.patcher.executable_path:
shutil.copy(driver.patcher.executable_path, PATCHED_DRIVER_PATH)
# clean up proxy extension directory
if proxy_extension_dir is not None:
shutil.rmtree(proxy_extension_dir)
# selenium vanilla
# options = webdriver.ChromeOptions()
# options.add_argument('--no-sandbox')
# options.add_argument('--window-size=1920,1080')
# options.add_argument('--disable-setuid-sandbox')
# options.add_argument('--disable-dev-shm-usage')
# driver = webdriver.Chrome(options=options)
return driver
def get_chrome_exe_path() -> str:
global CHROME_EXE_PATH
if CHROME_EXE_PATH is not None:
return CHROME_EXE_PATH
# linux pyinstaller bundle
chrome_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'chrome', "chrome")
if os.path.exists(chrome_path):
if not os.access(chrome_path, os.X_OK):
raise Exception(f'Chrome binary "{chrome_path}" is not executable. '
f'Please, extract the archive with "tar xzf <file.tar.gz>".')
CHROME_EXE_PATH = chrome_path
return CHROME_EXE_PATH
# windows pyinstaller bundle
chrome_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'chrome', "chrome.exe")
if os.path.exists(chrome_path):
CHROME_EXE_PATH = chrome_path
return CHROME_EXE_PATH
# system
CHROME_EXE_PATH = uc.find_chrome_executable()
return CHROME_EXE_PATH
def get_chrome_major_version() -> int:
global CHROME_MAJOR_VERSION
if CHROME_MAJOR_VERSION is not None:
return int(CHROME_MAJOR_VERSION)
if os.name == 'nt':
# Example: '104.0.5112.79'
try:
complete_version = extract_version_nt_executable(get_chrome_exe_path())
except Exception:
try:
complete_version = extract_version_nt_registry()
except Exception:
# Example: '104.0.5112.79'
complete_version = extract_version_nt_folder()
else:
chrome_path = get_chrome_exe_path()
process = os.popen(f'"{chrome_path}" --version')
# Example 1: 'Chromium 104.0.5112.79 Arch Linux\n'
# Example 2: 'Google Chrome 104.0.5112.79 Arch Linux\n'
complete_version = process.read()
process.close()
CHROME_MAJOR_VERSION = complete_version.split('.')[0].split(' ')[-1]
return int(CHROME_MAJOR_VERSION)
def extract_version_nt_executable(exe_path: str) -> str:
import pefile
pe = pefile.PE(exe_path, fast_load=True)
pe.parse_data_directories(
directories=[pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_RESOURCE"]]
)
return pe.FileInfo[0][0].StringTable[0].entries[b"FileVersion"].decode('utf-8')
def extract_version_nt_registry() -> str:
stream = os.popen(
'reg query "HKLM\\SOFTWARE\\Wow6432Node\\Microsoft\\Windows\\CurrentVersion\\Uninstall\\Google Chrome"')
output = stream.read()
google_version = ''
for letter in output[output.rindex('DisplayVersion REG_SZ') + 24:]:
if letter != '\n':
google_version += letter
else:
break
return google_version.strip()
def extract_version_nt_folder() -> str:
# Check if the Chrome folder exists in the x32 or x64 Program Files folders.
for i in range(2):
path = 'C:\\Program Files' + (' (x86)' if i else '') + '\\Google\\Chrome\\Application'
if os.path.isdir(path):
paths = [f.path for f in os.scandir(path) if f.is_dir()]
for path in paths:
filename = os.path.basename(path)
pattern = '\d+\.\d+\.\d+\.\d+'
match = re.search(pattern, filename)
if match and match.group():
# Found a Chrome version.
return match.group(0)
return ''
def get_user_agent(driver=None) -> str:
global USER_AGENT
if USER_AGENT is not None:
return USER_AGENT
try:
if driver is None:
driver = get_webdriver()
USER_AGENT = driver.execute_script("return navigator.userAgent")
# Fix for Chrome 117 | https://github.com/FlareSolverr/FlareSolverr/issues/910
USER_AGENT = re.sub('HEADLESS', '', USER_AGENT, flags=re.IGNORECASE)
return USER_AGENT
except Exception as e:
raise Exception("Error getting browser User-Agent. " + str(e))
finally:
if driver is not None:
driver.quit()
def start_xvfb_display():
global XVFB_DISPLAY
if XVFB_DISPLAY is None:
from xvfbwrapper import Xvfb
XVFB_DISPLAY = Xvfb()
XVFB_DISPLAY.start()
def object_to_dict(_object):
json_dict = json.loads(json.dumps(_object, default=lambda o: o.__dict__))
# remove hidden fields
return {k: v for k, v in json_dict.items() if not k.startswith('__')}

117
src/verify_email.py Normal file
View File

@ -0,0 +1,117 @@
import email
import imaplib
import re
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
from func_timeout import FunctionTimedOut,func_timeout
import cloudflare_solver
from loguru import logger
from config import config
from globals import GlobalState
from signup import Interrupted
from utils import get_webdriver
max_threads = config['emailWorkerNum']
task_queue = Queue(max_threads)
executor = ThreadPoolExecutor(max_threads)
def worker(q, executor):
while True:
task = q.get()
executor.submit(task)
worker_thread = threading.Thread(target=worker, args=(task_queue, executor))
worker_thread.start()
def click_verify_link(link):
driver = get_webdriver()
try:
func_timeout(10 * 60, cloudflare_solver.bypass, args=(link, driver))
logger.info('Email verified')
except FunctionTimedOut:
logger.warning('Function timed out')
except Exception as e:
logger.error(e)
finally:
driver.quit()
def verify_email():
username = config['emailAddr']
password = config['emailPassword']
imap_server = config['emailImapServer']
emailImapPort = config['emailImapPort']
if not username or not password or not imap_server:
GlobalState.exception = Interrupted("email config error")
raise GlobalState.exception
if emailImapPort:
mail = imaplib.IMAP4_SSL(imap_server,port=emailImapPort)
else:
mail = imaplib.IMAP4_SSL(imap_server)
try:
mail.login(username, password)
except Exception as e:
GlobalState.exception = Interrupted("email config error")
raise GlobalState.exception
logger.info("start to monitor openai verify email")
def get_html_part(msg):
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == 'text/html':
charset = part.get_content_charset()
payload = part.get_payload(decode=True)
try:
return payload.decode(charset or 'utf-8', errors='replace')
except LookupError:
return payload.decode('utf-8', errors='replace')
else:
if msg.get_content_type() == 'text/html':
charset = msg.get_content_charset()
payload = msg.get_payload(decode=True)
try:
return payload.decode(charset or 'utf-8', errors='replace')
except LookupError:
return payload.decode('utf-8', errors='replace')
def check_mail():
mail.select('INBOX')
status, messages = mail.search(None, '(UNSEEN)')
messages = messages[0].split()
for mail_id in messages:
status, data = mail.fetch(mail_id, '(RFC822)')
for response in data:
if isinstance(response, tuple):
msg = email.message_from_bytes(response[1])
from_ = msg.get('From')
if 'openai' in from_:
html_content = get_html_part(msg)
if 'Verify your email address' in html_content:
link = re.search(r'href="(https://mandrillapp.com[^"]+)"', html_content)
if link:
link = link.group(1)
def task():
click_verify_link(link)
task_queue.put(task)
try:
while True:
check_mail()
time.sleep(10)
finally:
mail.logout()
if __name__ == '__main__':
verify_email()