惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

酷 壳 – CoolShell
酷 壳 – CoolShell
H
Hacker News: Front Page
P
Palo Alto Networks Blog
T
ThreatConnect
Apple Machine Learning Research
Apple Machine Learning Research
博客园_首页
T
True Tiger Recordings
P
Privacy & Cybersecurity Law Blog
B
Blog
IT之家
IT之家
Last Week in AI
Last Week in AI
F
Full Disclosure
Hacker News: Ask HN
Hacker News: Ask HN
C
Comments on: Blog
Microsoft Azure Blog
Microsoft Azure Blog
C
Cybersecurity and Infrastructure Security Agency CISA
Microsoft Security Blog
Microsoft Security Blog
博客园 - 【当耐特】
N
News and Events Feed by Topic
NISL@THU
NISL@THU
腾讯CDC
雷峰网
雷峰网
Security Latest
Security Latest
李成银的技术随笔
M
Microsoft Research Blog - Microsoft Research
L
LangChain Blog
L
Lohrmann on Cybersecurity
cs.CL updates on arXiv.org
cs.CL updates on arXiv.org
C
Check Point Blog
Y
Y Combinator Blog
Recent Announcements
Recent Announcements
博客园 - Franky
N
News | PayPal Newsroom
V
V2EX
A
About on SuperTechFans
The Register - Security
The Register - Security
月光博客
月光博客
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
Google Online Security Blog
Google Online Security Blog
MyScale Blog
MyScale Blog
Cisco Talos Blog
Cisco Talos Blog
Vercel News
Vercel News
WordPress大学
WordPress大学
C
Cyber Attacks, Cyber Crime and Cyber Security
The Hacker News
The Hacker News
IntelliJ IDEA : IntelliJ IDEA – the Leading IDE for Professional Development in Java and Kotlin | The JetBrains Blog
IntelliJ IDEA : IntelliJ IDEA – the Leading IDE for Professional Development in Java and Kotlin | The JetBrains Blog
爱范儿
爱范儿
A
Arctic Wolf
L
LINUX DO - 最新话题
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More

博客园 - Flyingis

微博开通 ESRI UC 2010开幕视频 & Jack出场 地图缓存技术的标准 2010 新年招聘信息--学生实习版 2010 新年招聘信息--正式员工版 WGS 1984 Web Mercator 对于在线地图服务的意义 - Flyingis Geoprocessing scripts 利用多核进行计算 将地震空间数据转变为专题地图 解析USGS网站页面中的地震空间数据 - Flyingis 空间数据与内容的共享 ESRI UC 2009 见闻和感触 ArcGIS 9.3/9.3.1 客户端 API 更新信息--2009年5月 如何改善软件的用户体验 之 颜色与UI RIA+REST,琴瑟合鸣 ESRI中国(北京)招聘技术工程师 ArcGIS 9.3.1 关键更新 解决 File GDB 文件锁定的问题 ArcGIS Server 开发系列(七)--物流配送 - Flyingis ArcGIS Flex API 中的 Flex 技术(五)--技术列表 - Flyingis
浅尝辄止 Parallel Python
Flyingis · 2009-11-12 · via 博客园 - Flyingis

    作者:Flyingis

    本文欢迎友情转载,但请注明作者及原文链接,严禁用于商业目的

    最近在关注如何提升Python执行效率的问题,自己没有时间去深入研究,就直接选择了开源的Parallel Python,希望能够充分发挥多核CPU及集群环境的优势。
    Parallel Python是Python进行分布式计算的开源模块,能够将计算压力分布到多核CPU或集群的多台计算机上,能够非常方便的在内网中搭建一个自组织的分布式计算平台。先从多核计算开始,普通的Python应用程序只能够使用一个CPU进程,而通过Parallel Python能够很方便的将计算扩展到多个CPU进程中,使用官方网站上的一个例子。

import math, timedef isprime(n):
    
"""Returns True if n is prime and False otherwise"""
    
if not isinstance(n, int):
        
raise TypeError("argument passed to is_prime is not of 'int' type")
    
if n < 2:
        
return False
    
if n == 2:
        
return True
    max 
= int(math.ceil(math.sqrt(n)))
    i 
= 2
    
while i <= max:
        
if n % i == 0:
            
return False
        i 
+= 1
    
return Truedef sum_primes(n):
    
"""Calculates sum of all primes below given integer n"""
    
return sum([x for x in xrange(2,n) if isprime(x)])

start_time 

= time.time()

inputs 

= (100000100100100200100300100400100500100600100700)

jobs 

= [(input, sum_primes(input)) for input in inputs]for input, job in jobs:
    
print "Sum of primes below", input, "is", jobprint "Time elapsed: ", time.time() - start_time, "s"

 

    计算指定数值范围内所有素数的和,运行程序消耗时间为4.46900010109 s,程序运行结果和CPU使用率如下所示:

Sum of primes below 100000 is 454396537
Sum of primes below 100100 is 454996777
Sum of primes below 100200 is 455898156
Sum of primes below 100300 is 456700218
Sum of primes below 100400 is 457603451
Sum of primes below 100500 is 458407033
Sum of primes below 100600 is 459412387
Sum of primes below 100700 is 460217613
Time elapsed:  4.46900010109 s

    将程序稍作调整,引入pp模块。

#!
#
 File: sum_primes.py
#
 Author: VItalii Vanovschi
#
 Desc: This program demonstrates parallel computations with pp module
#
 It calculates the sum of prime numbers below a given integer in parallel
#
 Parallel Python Software: http://www.parallelpython.com/
import math, sys, time
import pp
def isprime(n):
    
"""Returns True if n is prime and False otherwise"""
    
if not isinstance(n, int):
        
raise TypeError("argument passed to is_prime is not of 'int' type")
    
if n < 2:
        
return False
    
if n == 2:
        
return True
    max 
= int(math.ceil(math.sqrt(n)))
    i 
= 2
    
while i <= max:
        
if n % i == 0:
            
return False
        i 
+= 1
    
return True
def sum_primes(n):
    
"""Calculates sum of all primes below given integer n"""
    
return sum([x for x in xrange(2,n) if isprime(x)])
print """Usage: python sum_primes.py [ncpus]
    [ncpus] - the number of workers to run in parallel, 
    if omitted it will be set to the number of processors in the system
"""
# tuple of all parallel python servers to connect with
ppservers = ()
#ppservers = ("10.0.0.1",)
if len(sys.argv) > 1:
    ncpus 
= int(sys.argv[1])
    
# Creates jobserver with ncpus workers
    job_server = pp.Server(ncpus, ppservers=ppservers)
else:
    
# Creates jobserver with automatically detected number of workers
    job_server = pp.Server(ppservers=ppservers)
print "Starting pp with", job_server.get_ncpus(), "workers"
# Submit a job of calulating sum_primes(100) for execution. 
#
 sum_primes - the function
#
 (100,) - tuple with arguments for sum_primes
#
 (isprime,) - tuple with functions on which function sum_primes depends
#
 ("math",) - tuple with module names which must be imported before sum_primes execution
#
 Execution starts as soon as one of the workers will become available
job1 = job_server.submit(sum_primes, (100,), (isprime,), ("math",))
# Retrieves the result calculated by job1
#
 The value of job1() is the same as sum_primes(100)
#
 If the job has not been finished yet, execution will wait here until result is available
result = job1()
print "Sum of primes below 100 is", result
start_time 
= time.time()
# The following submits 8 jobs and then retrieves the results
inputs = (100000100100100200100300100400100500100600100700)
jobs 
= [(input, job_server.submit(sum_primes,(input,), (isprime,), ("math",))) for input in inputs]
for input, job in jobs:
    
print "Sum of primes below", input, "is", job()
print "Time elapsed: ", time.time() - start_time, "s"
job_server.print_stats()

    再次执行,任务管理器中两个CPU进程齐头并进,咱们不仅仅需要所有CPU努力干活,而且还需得到非常好的效果,通过简单的"time.time() - start_time"发现时间比之前缩短了近100%为2.26600003242,job_server.print_stats()能够得到更加详细的分析结果:

Starting pp with 2 workers
Sum of primes below 100 is 1060
Sum of primes below 100000 is 454396537
Sum of primes below 100100 is 454996777
Sum of primes below 100200 is 455898156
Sum of primes below 100300 is 456700218
Sum of primes below 100400 is 457603451
Sum of primes below 100500 is 458407033
Sum of primes below 100600 is 459412387
Sum of primes below 100700 is 460217613
Time elapsed:  2.26600003242 s
Job execution statistics:
 job count | % of all jobs | job time sum | time per job | job server
         9 |        100.00 |       3.9700 |     0.441111 | local

    看到这样的测试结果,还来不及尝试多计算机的集群计算,就开始好奇该模块在ArcGIS Python空间计算中的应用效果。动手之前先想想问题吧,ArcGIS Python实际上是对粗粒度AO对象的调用,真正的计算压力实际上是在COM里面,而Parallel Python针对的是原生Python脚本中产生的计算量,所以应该不会有明显的性能提升,实践出真理,怎么都得写个Sample测试一番:将指定文件夹内所有personal geodatabase的空间数据拷贝到相应的file geodatabase中,以全国400万数据为例。

# Import native arcgisscripting module
#
import arcgisscripting
import os
import time
# import sys
#
 Create the geoprocessor object
#
gp = arcgisscripting.create(9.3)
# Allow for the overwriting of file geodatabases, if they previously exist
#
gp.OverWriteOutput = 1
# Set workspace to folder containing personal geodatabases
#
#
 gp.Workspace = sys.argv[1]
gp.workspace = "D:\\Dev\\Python\\pp\\arcgis_test"
# Identify personal geodatabases
#
pgdbs = gp.ListWorkspaces("""Access")
start_time 
= time.time()
for pgdb in pgdbs:
    
# Set workspace to current personal geodatabase
    #
    gp.workspace = pgdb
    
# Create file geodatabase based on personal geodatabase
    #
    fgdb = pgdb[:-4+ ".gdb"
    gp.CreateFileGDB(os.path.dirname(fgdb), os.path.basename(fgdb))
    
# Identify feature classes and copy to file gdb
    #
    fcs = gp.ListFeatureClasses()
    
for fc in fcs:
        
print "Copying feature class " + fc + " to " + fgdb
        gp.Copy(fc, fgdb 
+ os.sep + fc)
    
# Identify tables and copy to file gdb
    #
    tables = gp.ListTables()
    
for table in tables:
        
print "Copying table " + table + " to " + fgdb
        gp.Copy(table, fgdb 
+ os.sep + table)
    
# Identify datasets and copy to file gdb
    #   Copy will include contents of datasets
    #
    datasets = gp.ListDatasets()
    
for dataset in datasets:
        
print "Copying dataset " + dataset + " to " + fgdb
        gp.Copy(dataset, fgdb 
+ os.sep + dataset)print "Time elapsed: ", time.time() - start_time, "s"

计算完成后消耗时间为73.453999962s,引入pp模块。

import arcgisscripting
import pp
import os
import time
import sysdef func(ws = "D:\\Dev\\Python\\pp\\arcgis_test"):
    
# Create the geoprocessor object
    #
    gp = arcgisscripting.create(9.3)# Allow for the overwriting of file geodatabases, if they previously exist
    #
    gp.OverWriteOutput = 1# Set workspace to folder containing personal geodatabases
    #
    # gp.Workspace = sys.argv[1]
    gp.workspace = ws# Identify personal geodatabases
    #
    pgdbs = gp.ListWorkspaces("""Access")for pgdb in pgdbs:
        
# Set workspace to current personal geodatabase
        #
        gp.workspace = pgdb# Create file geodatabase based on personal geodatabase
        #
        fgdb = pgdb[:-4+ ".gdb"
        gp.CreateFileGDB(os.path.dirname(fgdb), os.path.basename(fgdb))
# Identify feature classes and copy to file gdb
        #
        fcs = gp.ListFeatureClasses()for fc in fcs:
            
print "Copying feature class " + fc + " to " + fgdb
            gp.Copy(fc, fgdb 
+ os.sep + fc)# Identify tables and copy to file gdb
        #
        tables = gp.ListTables()for table in tables:
            
print "Copying table " + table + " to " + fgdb
            gp.Copy(table, fgdb 
+ os.sep + table)# Identify datasets and copy to file gdb
        #   Copy will include contents of datasets
        #
        datasets = gp.ListDatasets()for dataset in datasets:
            
print "Copying dataset " + dataset + " to " + fgdb
            gp.Copy(dataset, fgdb 
+ os.sep + dataset)# tuple of all parallel python servers to connect with
ppservers = ()
#ppservers = ("10.0.0.1",)

if len(sys.argv) > 1:
    ncpus 
= int(sys.argv[1])
    
# Creates jobserver with ncpus workers
    job_server = pp.Server(ncpus, ppservers=ppservers)
else:
    
# Creates jobserver with automatically detected number of workers
    job_server = pp.Server(ppservers=ppservers)print "Starting pp with", job_server.get_ncpus(), "workers"

start_time 

= time.time()# job1 = job_server.submit(sum_primes, (100,), (isprime,), ("math",))
job_server.submit(func(), ("D:\\Dev\\Python\\pp\\arcgis_test", ), (), ("arcgisscripting""os"))print "Time elapsed: ", time.time() - start_time, "s"
job_server.print_stats()

 

    时间几乎相同,没有太大变化,这也证明了最初的分析,看来提升ArcGIS Python应用效率还得从根本入手,当然也有可能是对Parallel Python和Python多线程处理了解不够深入,关于Parallel Python的问题,网上有一些说法:

    1.如果数据交换存在瓶颈,大数据量的应用效果不明显,或反而效率更低。

    2.高计算复杂度的应用效果不明显,ArcGIS Python中的分析计算恰好属于此类。

    当然测试脚本针对的主要是空间数据拷贝的操作,如果不死心还可以试试空间分析。接着深入,待续吧。