Skip to content

Commit 55aca14

Browse files
committed
operations/facts: add GPG key management operations and facts
- Add GpgFactBase, GpgKey, GpgKeys, GpgSecretKeys, and GpgKeyrings facts - Add gpg.key and gpg.dearmor operations for managing GPG keys - Support for keyserver fetching, local files, URLs, and key removal - Comprehensive test coverage for all GPG operations and facts
1 parent e1ccd30 commit 55aca14

22 files changed

+915
-0
lines changed

src/pyinfra/facts/gpg.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,3 +148,71 @@ def command(self, keyring=None):
148148
return ("gpg --list-secret-keys --with-colons --keyring {0} --no-default-keyring").format(
149149
keyring,
150150
)
151+
152+
153+
class GpgKeyrings(GpgFactBase):
154+
"""
155+
Returns information on all GPG keyrings found in specified directories.
156+
157+
.. code:: python
158+
159+
{
160+
"/etc/apt/keyrings/docker.gpg": {
161+
"format": "gpg",
162+
"keys": {...} # Same format as GpgKeys fact
163+
}
164+
}
165+
"""
166+
167+
@override
168+
def command(self, directories):
169+
if isinstance(directories, str):
170+
directories = [directories]
171+
172+
search_locations = " ".join(f'"{d}"' for d in directories)
173+
174+
# Generate a command that finds keyrings and lists their keys
175+
# We'll use a shell script that outputs keyring path followed by key info
176+
return (
177+
f"for keyring in $(find {search_locations} -type f \\( -name '*.gpg' "
178+
f"-o -name '*.asc' -o -name '*.kbx' \\) 2>/dev/null); do "
179+
f'echo "KEYRING:$keyring"; '
180+
f'if [[ "$keyring" == *.asc ]]; then '
181+
f'gpg --with-colons "$keyring" 2>/dev/null || true; '
182+
f"else "
183+
f'gpg --list-keys --with-colons --keyring "$keyring" --no-default-keyring 2>/dev/null || true; ' # noqa: E501
184+
f"fi; done"
185+
)
186+
187+
@override
188+
def process(self, output):
189+
keyrings = {}
190+
current_keyring = None
191+
current_output: list[str] = []
192+
193+
for line in output:
194+
line = line.strip()
195+
if not line:
196+
continue
197+
198+
if line.startswith("KEYRING:"):
199+
# Process previous keyring if exists
200+
if current_keyring and current_output:
201+
keyring_format = current_keyring.split(".")[-1].lower()
202+
keys = super().process(current_output)
203+
keyrings[current_keyring] = {"format": keyring_format, "keys": keys}
204+
205+
# Start new keyring
206+
current_keyring = line[8:] # Remove "KEYRING:" prefix
207+
current_output = []
208+
else:
209+
# Accumulate GPG output for current keyring
210+
current_output.append(line)
211+
212+
# Process final keyring
213+
if current_keyring and current_output:
214+
keyring_format = current_keyring.split(".")[-1].lower()
215+
keys = super().process(current_output)
216+
keyrings[current_keyring] = {"format": keyring_format, "keys": keys}
217+
218+
return keyrings

src/pyinfra/operations/gpg.py

Lines changed: 327 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,327 @@
1+
"""
2+
Manage GPG keys and keyrings.
3+
"""
4+
5+
from pathlib import PurePosixPath
6+
from urllib.parse import urlparse
7+
8+
from pyinfra import host
9+
from pyinfra.api import OperationError, operation
10+
from pyinfra.facts.gpg import GpgKeyrings
11+
12+
from . import files
13+
14+
15+
@operation()
16+
def key(
17+
src: str | None = None,
18+
dest: str | None = None,
19+
keyserver: str | None = None,
20+
keyid: str | list[str] | None = None,
21+
dearmor: bool = True,
22+
mode: str = "0644",
23+
present: bool = True,
24+
working_dirs: list[str] | None = None,
25+
):
26+
"""
27+
Install or remove GPG keys from various sources.
28+
29+
Args:
30+
src: filename or URL to a key (ASCII .asc or binary .gpg)
31+
dest: destination path for the key file (required for installation, optional for removal)
32+
keyserver: keyserver URL for fetching keys by ID
33+
keyid: key ID or list of key IDs (required with keyserver, optional for removal)
34+
dearmor: whether to convert ASCII armored keys to binary format
35+
mode: file permissions for the installed key
36+
present: whether the key should be present (True) or absent (False)
37+
working_dirs: dirs to search for existing keyrings (required for removal without dest)
38+
When False: if dest is provided, removes from specific keyring;
39+
if dest is None, removes from keyrings found in working_dirs;
40+
if keyid is provided, removes specific key(s);
41+
if keyid is None, removes entire keyring file(s)
42+
43+
Examples:
44+
gpg.key(
45+
name="Install Docker GPG key",
46+
src="https://download.docker.com/linux/debian/gpg",
47+
dest="/etc/apt/keyrings/docker.gpg",
48+
)
49+
50+
gpg.key(
51+
name="Remove old GPG key file",
52+
dest="/etc/apt/keyrings/old-key.gpg",
53+
present=False,
54+
)
55+
56+
gpg.key(
57+
name="Remove specific key by ID",
58+
dest="/etc/apt/keyrings/vendor.gpg",
59+
keyid="0xABCDEF12",
60+
present=False,
61+
)
62+
63+
gpg.key(
64+
name="Remove key from specific directories",
65+
keyid="0xCOMPROMISED123",
66+
present=False,
67+
working_dirs=["/etc/apt/keyrings", "/usr/share/keyrings"],
68+
)
69+
70+
gpg.key(
71+
name="Fetch keys from keyserver",
72+
keyserver="hkps://keyserver.ubuntu.com",
73+
keyid=["0xD88E42B4", "0x7EA0A9C3"],
74+
dest="/etc/apt/keyrings/vendor.gpg",
75+
)
76+
"""
77+
78+
# Validate parameters based on operation type
79+
if present is True:
80+
# For installation, dest is required
81+
if not dest:
82+
raise OperationError("`dest` must be provided for installation")
83+
elif present is False:
84+
# For removal, either dest or (keyid and working_dirs) must be provided
85+
if not dest and not (keyid and working_dirs):
86+
raise OperationError(
87+
"For removal, either `dest` or both `keyid` and `working_dirs` must be provided"
88+
)
89+
90+
# For removal, handle different scenarios
91+
if present is False:
92+
if not dest and keyid:
93+
# Remove key(s) from all keyrings found in specified directories
94+
if isinstance(keyid, str):
95+
keyid = [keyid]
96+
97+
if not working_dirs:
98+
raise OperationError(
99+
"`working_dirs` must be provided when removing keys without `dest`"
100+
)
101+
102+
# Use the GpgKeyrings fact to find all keyrings in specified directories
103+
keyrings_info = host.get_fact(GpgKeyrings, directories=working_dirs)
104+
105+
for keyring_path, keyring_data in keyrings_info.items():
106+
# Get the keys from the GpgKeyrings fact data
107+
keys_in_keyring = keyring_data.get("keys", {})
108+
109+
# Check if any of the target keys exist in this keyring
110+
keys_to_remove = []
111+
for kid in keyid:
112+
# Handle different key ID formats (short, long, with/without 0x prefix)
113+
clean_key = kid.replace("0x", "").replace("0X", "").upper()
114+
115+
# Check for exact match or if the key ID is a suffix/prefix of any key
116+
# in the keyring
117+
for existing_key_id in keys_in_keyring.keys():
118+
if (
119+
clean_key == existing_key_id.upper()
120+
or existing_key_id.upper().endswith(clean_key)
121+
or existing_key_id.upper().startswith(clean_key)
122+
):
123+
keys_to_remove.append(existing_key_id)
124+
125+
if keys_to_remove:
126+
# Remove the entire keyring file if any target keys are found
127+
# This is the safest approach for keyring management
128+
yield from files.file._inner(
129+
path=keyring_path,
130+
present=False,
131+
)
132+
133+
return
134+
135+
elif dest and keyid:
136+
# Remove specific key(s) from a specific keyring file
137+
if isinstance(keyid, str):
138+
keyid = [keyid]
139+
140+
# Check if the destination keyring exists and contains the target keys
141+
keyrings_info = host.get_fact(
142+
GpgKeyrings, directories=[str(PurePosixPath(dest).parent)]
143+
)
144+
145+
if dest in keyrings_info:
146+
keyring_data = keyrings_info[dest]
147+
keys_in_keyring = keyring_data.get("keys", {})
148+
149+
# Check if any of the target keys exist in this keyring
150+
keys_found = False
151+
for kid in keyid:
152+
clean_key = kid.replace("0x", "").replace("0X", "").upper()
153+
for existing_key_id in keys_in_keyring.keys():
154+
# Check for exact match, suffix (short key ID), or prefix match
155+
if (
156+
clean_key == existing_key_id.upper()
157+
or existing_key_id.upper().endswith(clean_key)
158+
or existing_key_id.upper().startswith(clean_key)
159+
):
160+
keys_found = True
161+
break
162+
if keys_found:
163+
break
164+
165+
if keys_found:
166+
# Remove the entire keyring file - safest approach for keyring management
167+
yield from files.file._inner(
168+
path=dest,
169+
present=False,
170+
)
171+
return
172+
173+
elif dest and not keyid:
174+
# Remove entire keyring file
175+
yield from files.file._inner(
176+
path=dest,
177+
present=False,
178+
)
179+
return
180+
181+
else:
182+
raise OperationError("Invalid parameters for removal operation")
183+
184+
# For installation, validate required parameters
185+
if not src and not keyserver:
186+
raise OperationError("Either `src` or `keyserver` must be provided for installation")
187+
188+
if keyserver and not keyid:
189+
raise OperationError("`keyid` must be provided with `keyserver`")
190+
191+
if keyid and not keyserver and not src:
192+
raise OperationError(
193+
"When using `keyid` for installation, either `keyserver` or `src` must be provided"
194+
)
195+
196+
# For installation (present=True), ensure destination directory exists
197+
if dest is None:
198+
raise OperationError("dest is required for installation")
199+
200+
dest_dir = str(PurePosixPath(dest).parent)
201+
yield from files.directory._inner(
202+
path=dest_dir,
203+
mode="0755",
204+
present=True,
205+
)
206+
207+
# --- src branch: install a key from URL or local file ---
208+
if src:
209+
if urlparse(src).scheme in ("http", "https"):
210+
# Remote source: download first, then process
211+
temp_file = host.get_temp_filename(src)
212+
213+
yield from files.download._inner(
214+
src=src,
215+
dest=temp_file,
216+
)
217+
218+
# Install the key and clean up temp file
219+
yield from _install_key_file(temp_file, dest, dearmor, mode)
220+
221+
# Clean up temp file using pyinfra
222+
yield from files.file._inner(
223+
path=temp_file,
224+
present=False,
225+
)
226+
else:
227+
# Local file: install directly
228+
yield from _install_key_file(src, dest, dearmor, mode)
229+
230+
# --- keyserver branch: fetch keys by ID ---
231+
if keyserver:
232+
if keyid is None:
233+
raise OperationError("`keyid` must be provided with `keyserver`")
234+
235+
if isinstance(keyid, str):
236+
keyid = [keyid]
237+
238+
joined = " ".join(keyid)
239+
240+
# Create temporary GPG home directory
241+
temp_dir = f"/tmp/pyinfra-gpg-{host.get_temp_filename('')[-8:]}"
242+
243+
yield from files.directory._inner(
244+
path=temp_dir,
245+
mode="0700", # GPG directories should be more restrictive
246+
present=True,
247+
)
248+
249+
# Export GNUPGHOME and fetch keys
250+
yield f'export GNUPGHOME="{temp_dir}" && gpg --batch --keyserver "{keyserver}" --recv-keys {joined}' # noqa: E501
251+
252+
# Export keys to destination - always use direct binary export
253+
# gpg --export produces binary format by default, no dearmoring needed
254+
yield (f'export GNUPGHOME="{temp_dir}" && gpg --batch --export {joined} > "{dest}"')
255+
256+
# Clean up temporary directory
257+
yield from files.directory._inner(
258+
path=temp_dir,
259+
present=False,
260+
)
261+
262+
# Set proper permissions
263+
yield from files.file._inner(
264+
path=dest,
265+
mode=mode,
266+
present=True,
267+
)
268+
269+
270+
@operation()
271+
def dearmor(src: str, dest: str, mode: str = "0644"):
272+
"""
273+
Convert ASCII armored GPG key to binary format.
274+
275+
Args:
276+
src: source ASCII armored key file
277+
dest: destination binary key file
278+
mode: file permissions for the output file
279+
280+
Example:
281+
gpg.dearmor(
282+
name="Convert key to binary",
283+
src="/tmp/key.asc",
284+
dest="/etc/apt/keyrings/key.gpg",
285+
)
286+
"""
287+
288+
# Ensure destination directory exists
289+
dest_dir = str(PurePosixPath(dest).parent)
290+
yield from files.directory._inner(
291+
path=dest_dir,
292+
mode="0755",
293+
present=True,
294+
)
295+
296+
yield f'gpg --batch --dearmor -o "{dest}" "{src}"'
297+
298+
# Set proper permissions
299+
yield from files.file._inner(
300+
path=dest,
301+
mode=mode,
302+
present=True,
303+
)
304+
305+
306+
def _install_key_file(src_file: str, dest_path: str, dearmor: bool, mode: str):
307+
"""
308+
Helper function to install a GPG key file, dearmoring if necessary.
309+
"""
310+
if dearmor:
311+
# Check if it's an ASCII armored key and handle accordingly
312+
# Note: Could be enhanced using GpgKey fact
313+
yield (
314+
f'if grep -q "BEGIN PGP PUBLIC KEY BLOCK" "{src_file}"; then '
315+
f'gpg --batch --dearmor -o "{dest_path}" "{src_file}"; '
316+
f'else cp "{src_file}" "{dest_path}"; fi'
317+
)
318+
else:
319+
# Simple copy for binary keys or when dearmoring is disabled
320+
yield f'cp "{src_file}" "{dest_path}"'
321+
322+
# Set proper permissions
323+
yield from files.file._inner(
324+
path=dest_path,
325+
mode=mode,
326+
present=True,
327+
)

0 commit comments

Comments
 (0)