1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
|
import os
import re
import time
import flow.cmd
import threading
from peafour import P4
import subprocess as sp
#-------------------------------------------------------------------------------
p4_set_result = None
def get_p4_set(prop_name):
if ret := os.getenv(prop_name):
return ret
global p4_set_result
if not p4_set_result:
p4_set_result = {}
try: proc = sp.Popen(("p4", "set", "-q"), stdout=sp.PIPE)
except: return
for line in iter(proc.stdout.readline, b""):
try: key, value = line.split(b"=", 1)
except ValueError: continue
p4_set_result[key.decode()] = value.strip().decode()
proc.wait()
proc.stdout.close()
return p4_set_result.get(prop_name, None)
#-------------------------------------------------------------------------------
def login():
try:
detail = P4.login(s=True).run()
except P4.Error:
raise EnvironmentError("No valid Perforce session found. Run 'p4 login' to authenticate.")
return getattr(detail, "User", None)
#-------------------------------------------------------------------------------
def get_p4config_name():
return os.path.basename(get_p4_set("P4CONFIG") or ".p4config.txt")
#-------------------------------------------------------------------------------
def has_p4config(start_dir):
from pathlib import Path
p4config = get_p4config_name()
for dir in (Path(start_dir) / "x").parents:
candidate = dir / p4config
if os.path.isfile(candidate):
return True, candidate
return False, p4config
#-------------------------------------------------------------------------------
def create_p4config(p4config_path, client, username, port=None):
with open(p4config_path, "wt") as out:
print_args = { "sep" : "", "file" : out }
print("P4CLIENT=", client, **print_args)
print("P4USER=", username, **print_args)
if port:
print("P4PORT", port, **print_args)
#-------------------------------------------------------------------------------
def ensure_p4config(start_dir=None):
start_dir = start_dir or os.getcwd()
found, p4config_name = has_p4config(start_dir)
if found:
return p4config_name, False
username = login()
# Get the client for 'start_dir'
client = get_client_from_dir(start_dir, username)
if not client:
return
client, root_dir = client
# Now we know where to locate a p4config file
p4config_path = f"{root_dir}/{p4config_name}"
create_p4config(p4config_path, client, username)
return p4config_path, True
#-------------------------------------------------------------------------------
def get_client_from_dir(root_dir, username):
import socket
host_name = socket.gethostname().lower()
root_dir = os.path.normpath(root_dir).replace("\\", "/").lower()
clients = (x for x in P4.clients(u=username) if x.Host.lower() == host_name)
for client in clients:
if client.Root.replace("\\", "/").lower() in root_dir:
client_host = client.Host.lower()
if not client_host or client_host == host_name:
return client.client, client.Root
#-------------------------------------------------------------------------------
def get_branch_root(depot_path):
def fstat_paths():
limit = 5 # ...two of which are always required
query_path = "//"
for piece in depot_path[2:].split("/")[:limit]:
query_path += piece + "/"
yield query_path + "GenerateProjectFiles.bat"
print("Probing for well-known file:")
for x in fstat_paths():
print(" ", x)
fstat = P4.fstat(fstat_paths(), T="depotFile")
root_path = fstat.run(on_error=False)
if root_path:
return "/".join(root_path.depotFile.split("/")[:-1]) + "/"
raise ValueError("Unable to establish branch root")
#-------------------------------------------------------------------------------
class TempBranchSpec(object):
def __init__(self, use, username, from_path, to_path, ignore_streams=False):
import hashlib
id = hashlib.md5()
id.update(from_path.encode())
id.update(to_path.encode())
id = id.hexdigest()[:6]
self._name = f"{username}-ushell.{use}-{id}"
# To map between streams we need to extract the internal branchspec that
# Perforce builds. If from/to aren't related streams it will fail so we
# fallback to a conventional trivial branchspec.
try:
if ignore_streams:
raise P4.Error("")
branch = P4.branch(self._name, o=True, S=from_path[:-1], P=to_path[:-1])
result = branch.run()
spec = result.as_dict()
except P4.Error:
spec = {
"Branch" : self._name,
"View0" : f"{from_path}... {to_path}...",
}
P4.branch(i=True).run(input_data=spec, on_error=False)
def __del__(self):
P4.branch(self._name, d=True).run()
def __str__(self):
return self._name
#-------------------------------------------------------------------------------
def _kb_string(value):
return format(value // 1024, ",") + "KB"
#-------------------------------------------------------------------------------
class _SyncRota(object):
class _Worker(object):
def __init__(self, id):
self.id = id
self.work_items = []
self.burden = 0
self.done_size = 0
self.done_items = 0
self.error = False
def __init__(self, changelist, worker_count):
self._workers = [_SyncRota._Worker(x) for x in range(worker_count)]
self.changelist = str(changelist)
def add_work(self, item, rev, cost):
worker = min(self._workers, key=lambda x: x.burden)
worker.work_items.append((item, rev, cost))
worker.burden += cost
def sort(self):
direction = 1
for worker in self._workers:
worker.work_items.sort(key=lambda x: x[2] * direction)
direction *= -1
def read_workers(self):
yield from (x for x in self._workers if x.work_items)
#-------------------------------------------------------------------------------
class Syncer(object):
def __init__(self):
self._paths = []
self._excluded_views = set()
def _read_sync_specs(self, include_excluded=True):
cl_suffix = "@" + self._rota.changelist
cl_suffix = "#0" if cl_suffix == "@0" else cl_suffix
for depot_path in self._paths:
yield depot_path + cl_suffix
if include_excluded:
# Using "@0" results in slow queries it seems
yield from (x + "#0" for x in self._excluded_views)
def _is_excluded(self, path):
return next((True for x in self._re_excludes if x.match(path)), False)
def _build_exclude_re(self):
re_excludes = []
for view in self._excluded_views:
view = view.replace("...", "@")
view = view.replace(".", "\\.")
view = view.replace("*", "[^/]*")
view = view.replace("@", ".*")
re_excludes.append(view)
if re_excludes:
# The expression isn't escaped so hopefully it's not complicated...
try: re_excludes = [re.compile(x, re.IGNORECASE) for x in re_excludes]
except: pass
self._re_excludes = re_excludes
def add_path(self, dir):
self._paths.append(dir)
def add_exclude(self, view):
self._excluded_views.add(view)
def schedule(self, changelist, worker_count=8):
self._build_exclude_re()
# P4.<cmd> uses p4's Python-marshalled output (the -G option). However the
# "p4 sync -n" will report open files via a "info" message instead of a
# structured "stat" one. So we explicitly add open files to the rota.
def read_items():
yield from P4.sync(self._read_sync_specs(), n=True).read(on_error=False)
yield from P4.opened().read(on_error=False)
self._rota = _SyncRota(changelist, worker_count)
# Fill the rota
total_size = 0
count = 0
for item in read_items():
depot_path = item.depotFile
rev = int(item.rev)
if self._is_excluded(depot_path):
if item.action != "deleted":
continue
rev = 0
if count % 17 == 0:
print("\r" + str(count), "files", f"({_kb_string(total_size)})", end="")
size = int(getattr(item, "fileSize", 0)) # deletes have no size attr
self._rota.add_work(depot_path, rev, size)
total_size += size
count += 1
self._rota.sort()
print("\r" + str(count), "files", f"({_kb_string(total_size)})")
def sync(self, *, dryrun=False, echo=False):
# Sum up what we have to do
total_burden = sum(x.burden for x in self._rota.read_workers())
total_items = sum(len(x.work_items) for x in self._rota.read_workers())
print(f"Fetching {_kb_string(total_burden)} in {total_items} files")
# Launch the worker threads
def sync_thread(worker):
def on_error(p4_error):
if "not enough space" in p4_error.data:
worker.error = "Out of disk space"
raise EOFError()
try:
# Annecdotally path@cl appears to be the quickest. path#rev is
# appeared 15% slower, and with -L it was 30%.
def read_sync_items():
cl_prefix = "@" + self._rota.changelist
for path, rev, size in worker.work_items:
yield path + (cl_prefix if rev else "#0")
sync = P4(b=8192).sync(read_sync_items(), n=dryrun)
for item in sync.read(on_error=on_error):
if echo:
print(item.depotFile)
worker.done_size += int(getattr(item, "fileSize", 0)) + 0.01
except EOFError:
pass
def create_thread(worker):
thread = threading.Thread(target=sync_thread, args=(worker,))
thread.start()
return thread
threads = [create_thread(x) for x in self._rota.read_workers()]
print(f"Using {len(threads)} workers")
# While there are active threads, print detail about their progress
total_burden += (0.01 * total_items)
while not echo:
threads = [x for x in threads if x.is_alive()]
if not threads:
break
done_size = sum(x.done_size for x in self._rota.read_workers())
progress = ((done_size * 1000) // total_burden) / 10
print("\r%5.1f%%" % progress, _kb_string(int(done_size)), end="");
time.sleep(0.3)
else:
for thread in threads:
thread.join()
print("\r...done ")
# Check for errors from the workers
for worker in (x for x in self._rota.read_workers() if x.error):
print(flow.cmd.text.red("!!" + str(worker.error)))
return False
# Nothing more to do if this is a dry run as the remaining tasks need a
# sync to operate on.
if dryrun:
return True
# P4.sync() returns 'stat' type events but "p4 sync" will report files
# with a complex sync scenario only as unstructured 'info' messages. As the
# above won't know about these files we'll do a second sync to catch them.
global sync_errors
sync_errors = False
print("Finalising ", end="")
def read_depot_files():
def on_error(data):
msg = data.data.strip()
if "up-to-date" in msg: return
if "not in client view" in msg: return
print("\n", flow.cmd.text.red(msg), end="")
global sync_errors
sync_errors = True
def on_info(data):
print("\n", flow.cmd.text.light_yellow(data.data), end="")
sync = P4.sync(self._read_sync_specs(False), n=True)
for item in sync.read(on_error=on_error, on_info=on_info):
if not self._is_excluded(item.depotFile):
yield item.depotFile
sync = P4.sync(read_depot_files(), q=True)
for i in sync.read(on_error=False):
pass
print()
return not sync_errors
|