diff --git a/core/pom.xml b/core/pom.xml index 9500ffe6..8fbb8197 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -288,11 +288,6 @@ 3.1 test - - mysql - mysql-connector-java - 5.1.25 - org.python jython-standalone @@ -350,12 +345,6 @@ - - typesafereleases - typesafe-releases - http://repo.typesafe.com/typesafe/releases/ - default - AdataoMvnreposSnapshots Adatao Mvnrepos Snapshots @@ -387,4 +376,4 @@ default - \ No newline at end of file + diff --git a/ddf-test/pom.xml b/ddf-test/pom.xml index 2e352ff3..2ae1f0c7 100644 --- a/ddf-test/pom.xml +++ b/ddf-test/pom.xml @@ -287,11 +287,6 @@ 3.1 test - - mysql - mysql-connector-java - 5.1.25 - org.python jython-standalone @@ -314,12 +309,6 @@ - - typesafereleases - typesafe-releases - http://repo.typesafe.com/typesafe/releases/ - default - AdataoMvnreposSnapshots Adatao Mvnrepos Snapshots @@ -351,4 +340,4 @@ default - \ No newline at end of file + diff --git a/examples/pom.xml b/examples/pom.xml index bf4792cb..906ca7ba 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -293,11 +293,6 @@ 3.1 test - - mysql - mysql-connector-java - 5.1.25 - org.python jython-standalone @@ -315,12 +310,6 @@ - - typesafereleases - typesafe-releases - http://repo.typesafe.com/typesafe/releases/ - default - AdataoMvnreposSnapshots Adatao Mvnrepos Snapshots @@ -352,4 +341,4 @@ default - \ No newline at end of file + diff --git a/hdfs/pom.xml b/hdfs/pom.xml index 3b1552a1..6de61104 100644 --- a/hdfs/pom.xml +++ b/hdfs/pom.xml @@ -288,11 +288,6 @@ 3.1 test - - mysql - mysql-connector-java - 5.1.25 - org.python jython-standalone @@ -342,12 +337,6 @@ - - typesafereleases - typesafe-releases - http://repo.typesafe.com/typesafe/releases/ - default - AdataoMvnreposSnapshots Adatao Mvnrepos Snapshots @@ -379,4 +368,4 @@ default - \ No newline at end of file + diff --git a/project/RootBuild.scala b/project/RootBuild.scala index 75431f22..3e2e807b 100644 --- a/project/RootBuild.scala +++ b/project/RootBuild.scala @@ -79,12 +79,6 @@ object RootBuild extends Build { val excludeEverthing = ExclusionRule(organization = "*", name = "*") val excludeEverythingHackForMakePom = ExclusionRule(organization = "_MAKE_POM_EXCLUDE_ALL_", name = "_MAKE_POM_EXCLUDE_ALL_") - // We define this explicitly rather than via unmanagedJars, so that make-pom will generate it in pom.xml as well - // org % package % version - val rforge = Seq( - "net.rforge" % "REngine" % "2.1.1.compiled", - "net.rforge" % "Rserve" % "1.8.2.compiled" - ) val scalaArtifacts = Seq("jline", "scala-compiler", "scala-library", "scala-reflect") val scalaDependencies = scalaArtifacts.map( artifactId => "org.scala-lang" % artifactId % theScalaVersion) @@ -165,7 +159,7 @@ object RootBuild extends Build { List("sh", "-c", "touch spark/" + targetDir + "/*timestamp") }, testOptions in Test += Tests.Argument("-oI"), - libraryDependencies ++= rforge, + libraryDependencies ++= spark_dependencies, if (isLocal) { initialCommands in console := @@ -251,7 +245,7 @@ object RootBuild extends Build { "net.sf" % "jsqlparser" % "0.9.8.8", "commons-io" % "commons-io" % "1.3.2", "org.easymock" % "easymock" % "3.1" % "test", - "mysql" % "mysql-connector-java" % "5.1.25", + //"mysql" % "mysql-connector-java" % "5.1.25", "org.python" % "jython-standalone" % "2.7.0", "joda-time" % "joda-time" % "2.8.1", "org.joda" % "joda-convert" % "1.7" diff --git a/project/plugins.sbt b/project/plugins.sbt index 16c15d8a..84889df3 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -16,6 +16,8 @@ addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.4") addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.7.0") +addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.8.2") + resolvers += "sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/" //addSbtPlugin("io.spray" %% "sbt-twirl" % "0.6.1") diff --git a/s3/pom.xml b/s3/pom.xml index 9b50effa..d3d4d519 100644 --- a/s3/pom.xml +++ b/s3/pom.xml @@ -288,11 +288,6 @@ 3.1 test - - mysql - mysql-connector-java - 5.1.25 - org.python jython-standalone @@ -339,12 +334,6 @@ - - typesafereleases - typesafe-releases - http://repo.typesafe.com/typesafe/releases/ - default - AdataoMvnreposSnapshots Adatao Mvnrepos Snapshots @@ -376,4 +365,4 @@ default - \ No newline at end of file + diff --git a/spark/lib/net/rforge/REngine-2.1.1.compiled.jar b/spark/lib/net/rforge/REngine-2.1.1.compiled.jar deleted file mode 100644 index 2bfb8048..00000000 Binary files a/spark/lib/net/rforge/REngine-2.1.1.compiled.jar and /dev/null differ diff --git a/spark/lib/net/rforge/Rserve-1.8.2.compiled.jar b/spark/lib/net/rforge/Rserve-1.8.2.compiled.jar deleted file mode 100644 index 8b462e54..00000000 Binary files a/spark/lib/net/rforge/Rserve-1.8.2.compiled.jar and /dev/null differ diff --git a/spark/pom.xml b/spark/pom.xml index 395c1a06..2fb9daaa 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -310,11 +310,6 @@ 3.1 test - - mysql - mysql-connector-java - 5.1.25 - org.python jython-standalone @@ -330,16 +325,6 @@ joda-convert 1.7 - - net.rforge - REngine - 2.1.1.compiled - - - net.rforge - Rserve - 1.8.2.compiled - com.databricks spark-csv_2.10 @@ -471,12 +456,6 @@ - - typesafereleases - typesafe-releases - http://repo.typesafe.com/typesafe/releases/ - default - AdataoMvnreposSnapshots Adatao Mvnrepos Snapshots @@ -508,4 +487,4 @@ default - \ No newline at end of file + diff --git a/spark/rlibs/Rserve/DESCRIPTION b/spark/rlibs/Rserve/DESCRIPTION deleted file mode 100644 index 88b90332..00000000 --- a/spark/rlibs/Rserve/DESCRIPTION +++ /dev/null @@ -1,24 +0,0 @@ -Package: Rserve -Version: 1.7-3 -Title: Binary R server -Author: Simon Urbanek -Maintainer: Simon Urbanek -Depends: R (>= 1.5.0) -Suggests: RSclient -SystemRequirements: libR, GNU make -Description: Rserve acts as a socket server (TCP/IP or local sockets) - which allows binary requests to be sent to R. Every - connection has a separate workspace and working - directory. Client-side implementations are available - for popular languages such as C/C++ and Java, allowing - any application to use facilities of R without the need of - linking to R code. Rserve supports remote connection, - user authentication and file transfer. A simple R client - is included in this package as well. -License: GPL-2 | file LICENSE -URL: http://www.rforge.net/Rserve/ -NeedsCompilation: yes -Repository: CRAN -Date/Publication: 2013-08-21 23:35:21 -Built: R 3.0.2; x86_64-apple-darwin10.8.0; 2014-01-09 23:11:06 UTC; unix -Archs: Rserve.so.dSYM diff --git a/spark/rlibs/Rserve/INDEX b/spark/rlibs/Rserve/INDEX deleted file mode 100644 index 1a2adbcc..00000000 --- a/spark/rlibs/Rserve/INDEX +++ /dev/null @@ -1,2 +0,0 @@ -Rserve Server providing R functionality to - applications via TCP/IP diff --git a/spark/rlibs/Rserve/LICENSE b/spark/rlibs/Rserve/LICENSE deleted file mode 100644 index 21fa0a1a..00000000 --- a/spark/rlibs/Rserve/LICENSE +++ /dev/null @@ -1,316 +0,0 @@ - [Summary: GPL-2 with OpenSSL linking exception] - - Rserve - Copyright (C) 2002-2013 Simon Urbanek - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License along - with this program; if not, write to the Free Software Foundation, Inc., - 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - In addition, as a special exception, the copyright holders give - permission to link the code of portions of this program with the - OpenSSL project's "OpenSSL" library (or with modified versions of - it that use the same license as the "OpenSSL" library - see - http://www.openssl.org/), and distribute linked combinations - including the two. - - You must obey the GNU General Public License in all respects - for all of the code used other than OpenSSL. If you modify - file(s) with this exception, you may extend this exception to your - version of the file(s), but you are not obligated to do so. If you - do not wish to do so, delete this exception statement from your - version. If you delete this exception statement from all source - files in the program, then also delete it here. - - -Full text of GPL-2 follows: - - GNU GENERAL PUBLIC LICENSE - Version 2, June 1991 - - Copyright (C) 1989, 1991 Free Software Foundation, Inc., - 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - Everyone is permitted to copy and distribute verbatim copies - of this license document, but changing it is not allowed. - - Preamble - - The licenses for most software are designed to take away your -freedom to share and change it. By contrast, the GNU General Public -License is intended to guarantee your freedom to share and change free -software--to make sure the software is free for all its users. This -General Public License applies to most of the Free Software -Foundation's software and to any other program whose authors commit to -using it. (Some other Free Software Foundation software is covered by -the GNU Lesser General Public License instead.) You can apply it to -your programs, too. - - When we speak of free software, we are referring to freedom, not -price. Our General Public Licenses are designed to make sure that you -have the freedom to distribute copies of free software (and charge for -this service if you wish), that you receive source code or can get it -if you want it, that you can change the software or use pieces of it -in new free programs; and that you know you can do these things. - - To protect your rights, we need to make restrictions that forbid -anyone to deny you these rights or to ask you to surrender the rights. -These restrictions translate to certain responsibilities for you if you -distribute copies of the software, or if you modify it. - - For example, if you distribute copies of such a program, whether -gratis or for a fee, you must give the recipients all the rights that -you have. You must make sure that they, too, receive or can get the -source code. And you must show them these terms so they know their -rights. - - We protect your rights with two steps: (1) copyright the software, and -(2) offer you this license which gives you legal permission to copy, -distribute and/or modify the software. - - Also, for each author's protection and ours, we want to make certain -that everyone understands that there is no warranty for this free -software. If the software is modified by someone else and passed on, we -want its recipients to know that what they have is not the original, so -that any problems introduced by others will not reflect on the original -authors' reputations. - - Finally, any free program is threatened constantly by software -patents. We wish to avoid the danger that redistributors of a free -program will individually obtain patent licenses, in effect making the -program proprietary. To prevent this, we have made it clear that any -patent must be licensed for everyone's free use or not licensed at all. - - The precise terms and conditions for copying, distribution and -modification follow. - - GNU GENERAL PUBLIC LICENSE - TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION - - 0. This License applies to any program or other work which contains -a notice placed by the copyright holder saying it may be distributed -under the terms of this General Public License. The "Program", below, -refers to any such program or work, and a "work based on the Program" -means either the Program or any derivative work under copyright law: -that is to say, a work containing the Program or a portion of it, -either verbatim or with modifications and/or translated into another -language. (Hereinafter, translation is included without limitation in -the term "modification".) Each licensee is addressed as "you". - -Activities other than copying, distribution and modification are not -covered by this License; they are outside its scope. The act of -running the Program is not restricted, and the output from the Program -is covered only if its contents constitute a work based on the -Program (independent of having been made by running the Program). -Whether that is true depends on what the Program does. - - 1. You may copy and distribute verbatim copies of the Program's -source code as you receive it, in any medium, provided that you -conspicuously and appropriately publish on each copy an appropriate -copyright notice and disclaimer of warranty; keep intact all the -notices that refer to this License and to the absence of any warranty; -and give any other recipients of the Program a copy of this License -along with the Program. - -You may charge a fee for the physical act of transferring a copy, and -you may at your option offer warranty protection in exchange for a fee. - - 2. You may modify your copy or copies of the Program or any portion -of it, thus forming a work based on the Program, and copy and -distribute such modifications or work under the terms of Section 1 -above, provided that you also meet all of these conditions: - - a) You must cause the modified files to carry prominent notices - stating that you changed the files and the date of any change. - - b) You must cause any work that you distribute or publish, that in - whole or in part contains or is derived from the Program or any - part thereof, to be licensed as a whole at no charge to all third - parties under the terms of this License. - - c) If the modified program normally reads commands interactively - when run, you must cause it, when started running for such - interactive use in the most ordinary way, to print or display an - announcement including an appropriate copyright notice and a - notice that there is no warranty (or else, saying that you provide - a warranty) and that users may redistribute the program under - these conditions, and telling the user how to view a copy of this - License. (Exception: if the Program itself is interactive but - does not normally print such an announcement, your work based on - the Program is not required to print an announcement.) - -These requirements apply to the modified work as a whole. If -identifiable sections of that work are not derived from the Program, -and can be reasonably considered independent and separate works in -themselves, then this License, and its terms, do not apply to those -sections when you distribute them as separate works. But when you -distribute the same sections as part of a whole which is a work based -on the Program, the distribution of the whole must be on the terms of -this License, whose permissions for other licensees extend to the -entire whole, and thus to each and every part regardless of who wrote it. - -Thus, it is not the intent of this section to claim rights or contest -your rights to work written entirely by you; rather, the intent is to -exercise the right to control the distribution of derivative or -collective works based on the Program. - -In addition, mere aggregation of another work not based on the Program -with the Program (or with a work based on the Program) on a volume of -a storage or distribution medium does not bring the other work under -the scope of this License. - - 3. You may copy and distribute the Program (or a work based on it, -under Section 2) in object code or executable form under the terms of -Sections 1 and 2 above provided that you also do one of the following: - - a) Accompany it with the complete corresponding machine-readable - source code, which must be distributed under the terms of Sections - 1 and 2 above on a medium customarily used for software interchange; or, - - b) Accompany it with a written offer, valid for at least three - years, to give any third party, for a charge no more than your - cost of physically performing source distribution, a complete - machine-readable copy of the corresponding source code, to be - distributed under the terms of Sections 1 and 2 above on a medium - customarily used for software interchange; or, - - c) Accompany it with the information you received as to the offer - to distribute corresponding source code. (This alternative is - allowed only for noncommercial distribution and only if you - received the program in object code or executable form with such - an offer, in accord with Subsection b above.) - -The source code for a work means the preferred form of the work for -making modifications to it. For an executable work, complete source -code means all the source code for all modules it contains, plus any -associated interface definition files, plus the scripts used to -control compilation and installation of the executable. However, as a -special exception, the source code distributed need not include -anything that is normally distributed (in either source or binary -form) with the major components (compiler, kernel, and so on) of the -operating system on which the executable runs, unless that component -itself accompanies the executable. - -If distribution of executable or object code is made by offering -access to copy from a designated place, then offering equivalent -access to copy the source code from the same place counts as -distribution of the source code, even though third parties are not -compelled to copy the source along with the object code. - - 4. You may not copy, modify, sublicense, or distribute the Program -except as expressly provided under this License. Any attempt -otherwise to copy, modify, sublicense or distribute the Program is -void, and will automatically terminate your rights under this License. -However, parties who have received copies, or rights, from you under -this License will not have their licenses terminated so long as such -parties remain in full compliance. - - 5. You are not required to accept this License, since you have not -signed it. However, nothing else grants you permission to modify or -distribute the Program or its derivative works. These actions are -prohibited by law if you do not accept this License. Therefore, by -modifying or distributing the Program (or any work based on the -Program), you indicate your acceptance of this License to do so, and -all its terms and conditions for copying, distributing or modifying -the Program or works based on it. - - 6. Each time you redistribute the Program (or any work based on the -Program), the recipient automatically receives a license from the -original licensor to copy, distribute or modify the Program subject to -these terms and conditions. You may not impose any further -restrictions on the recipients' exercise of the rights granted herein. -You are not responsible for enforcing compliance by third parties to -this License. - - 7. If, as a consequence of a court judgment or allegation of patent -infringement or for any other reason (not limited to patent issues), -conditions are imposed on you (whether by court order, agreement or -otherwise) that contradict the conditions of this License, they do not -excuse you from the conditions of this License. If you cannot -distribute so as to satisfy simultaneously your obligations under this -License and any other pertinent obligations, then as a consequence you -may not distribute the Program at all. For example, if a patent -license would not permit royalty-free redistribution of the Program by -all those who receive copies directly or indirectly through you, then -the only way you could satisfy both it and this License would be to -refrain entirely from distribution of the Program. - -If any portion of this section is held invalid or unenforceable under -any particular circumstance, the balance of the section is intended to -apply and the section as a whole is intended to apply in other -circumstances. - -It is not the purpose of this section to induce you to infringe any -patents or other property right claims or to contest validity of any -such claims; this section has the sole purpose of protecting the -integrity of the free software distribution system, which is -implemented by public license practices. Many people have made -generous contributions to the wide range of software distributed -through that system in reliance on consistent application of that -system; it is up to the author/donor to decide if he or she is willing -to distribute software through any other system and a licensee cannot -impose that choice. - -This section is intended to make thoroughly clear what is believed to -be a consequence of the rest of this License. - - 8. If the distribution and/or use of the Program is restricted in -certain countries either by patents or by copyrighted interfaces, the -original copyright holder who places the Program under this License -may add an explicit geographical distribution limitation excluding -those countries, so that distribution is permitted only in or among -countries not thus excluded. In such case, this License incorporates -the limitation as if written in the body of this License. - - 9. The Free Software Foundation may publish revised and/or new versions -of the General Public License from time to time. Such new versions will -be similar in spirit to the present version, but may differ in detail to -address new problems or concerns. - -Each version is given a distinguishing version number. If the Program -specifies a version number of this License which applies to it and "any -later version", you have the option of following the terms and conditions -either of that version or of any later version published by the Free -Software Foundation. If the Program does not specify a version number of -this License, you may choose any version ever published by the Free Software -Foundation. - - 10. If you wish to incorporate parts of the Program into other free -programs whose distribution conditions are different, write to the author -to ask for permission. For software which is copyrighted by the Free -Software Foundation, write to the Free Software Foundation; we sometimes -make exceptions for this. Our decision will be guided by the two goals -of preserving the free status of all derivatives of our free software and -of promoting the sharing and reuse of software generally. - - NO WARRANTY - - 11. BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY -FOR THE PROGRAM, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN -OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES -PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED -OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS -TO THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE -PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL NECESSARY SERVICING, -REPAIR OR CORRECTION. - - 12. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING -WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR -REDISTRIBUTE THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, -INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING -OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED -TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY -YOU OR THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER -PROGRAMS), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE -POSSIBILITY OF SUCH DAMAGES. - - END OF TERMS AND CONDITIONS diff --git a/spark/rlibs/Rserve/Meta/Rd.rds b/spark/rlibs/Rserve/Meta/Rd.rds deleted file mode 100644 index ea54f5e2..00000000 Binary files a/spark/rlibs/Rserve/Meta/Rd.rds and /dev/null differ diff --git a/spark/rlibs/Rserve/Meta/hsearch.rds b/spark/rlibs/Rserve/Meta/hsearch.rds deleted file mode 100644 index bb116d77..00000000 Binary files a/spark/rlibs/Rserve/Meta/hsearch.rds and /dev/null differ diff --git a/spark/rlibs/Rserve/Meta/links.rds b/spark/rlibs/Rserve/Meta/links.rds deleted file mode 100644 index 14827b11..00000000 Binary files a/spark/rlibs/Rserve/Meta/links.rds and /dev/null differ diff --git a/spark/rlibs/Rserve/Meta/nsInfo.rds b/spark/rlibs/Rserve/Meta/nsInfo.rds deleted file mode 100644 index 195386d6..00000000 Binary files a/spark/rlibs/Rserve/Meta/nsInfo.rds and /dev/null differ diff --git a/spark/rlibs/Rserve/Meta/package.rds b/spark/rlibs/Rserve/Meta/package.rds deleted file mode 100644 index 8533abf2..00000000 Binary files a/spark/rlibs/Rserve/Meta/package.rds and /dev/null differ diff --git a/spark/rlibs/Rserve/NAMESPACE b/spark/rlibs/Rserve/NAMESPACE deleted file mode 100644 index 416dba92..00000000 --- a/spark/rlibs/Rserve/NAMESPACE +++ /dev/null @@ -1,2 +0,0 @@ -useDynLib(Rserve, run_Rserve) -export(Rserve, self.ctrlEval, self.ctrlSource, self.oobSend, self.oobMessage, run.Rserve) diff --git a/spark/rlibs/Rserve/NEWS b/spark/rlibs/Rserve/NEWS deleted file mode 100644 index a38ce4aa..00000000 --- a/spark/rlibs/Rserve/NEWS +++ /dev/null @@ -1,870 +0,0 @@ - NEWS/Changelog for Rserve ---------------------------- - -1.7-3 2013-08-21 - o the handling of server configuration modes has been - inconsistent for some combinations (mostly affected were - combinations involving WebSockets upgrade and OC mode on - HTTP/WS servers). Also the websockets.qap.oc configuration - option has been misspelled. - - o HTTPS->WSS upgrade is now supported - - o mkdist -i installs the built package, use -c for check - - o Windows compatibility has been restored thanks to - David Champagne from Revolution Analytics - - -1.7-2 2013-08-12 - o when uid/gid is changed, create a new tempdir() and set its - permissions as well as the wokring directory's owner to match. - - o bugfix: if the first command is not any of the eval family - the Rserve may respond with an additional, spurious error - response - - o deamonized server would record incorrect pid when pid.file - is used (i#5). The pid is now removed on clean sutdown. - - o added support for keep.alive configuration option - it is - global to all servers and if enabled the client sockets are - instructed to keep the connection alive by periodic messages. - - -1.7-1 2013-07-02 - o remove a spurious character that prevented compilation on Suns - - o add OPENSSL_INCLUDES precious variable that can be used to - point to non-standard location of OpenSSL headers - - o check the usability of OpenSSL headers before enabling TLS - support - - o added the choice of GPLv2 or GPLv2 with OpenSSL linking exception - - -1.7-0 - - *** ---- HEADLINE NEWS ---- - *** new protocols: HTTP, HTTPS, WebSockets and TLS/QAP - *** added protocol switching (from QAP to TLS/QAP) via CMD_switch - *** The R client was moved to RSclient package - *** New in-session server support with run.Rserve() - *** Out-of-band messages via self.oobSend() - *** user-based uid/gid switch, MD5/SHA1 stored passwords - *** preliminary IPv6 support; RSA secure authentication - *** .Rserve.done global env hook for optiopnal cleanup - *** auth.function setting to allow custom authentication - *** Object-capability mode for hardened, secure services - *** ---- END - see below for details ---- - - o This is the first release in the new 1.x Rserve series. Many - of the Rserve internals have been re-written or cleaned - up. The original protocol remains the same (so all clients - that worked with Rserve 0.6 will continue to work), but the - suite of available protocols has been extended (see below). - - o added support for multiple protocols in Rserve: - - ** QAP ** - this is the original Rserve protocol used in Rserve 0.x - series. It works over TCP/IP and unix sockets. It is - enabled by default and can be disabled using - "qap disable" configuration directive. - - ** HTTP ** - this is very similar to the built-in R http server except that - on unix it forks on connection so it allows parallel separate, - persistent connections. It requires a worker function - .http.request to be defined in the session which will handle - incoming requests. This allows the use of facilities like - FastRWeb without an external webserver. This protocol is - disabled by default and can be enabled by setting "http.port" - configuration directive to the desired port to listen to. - (Also see TLS support below for https server). - - The http.raw.body configuration option (default is false which - results in the same behavior as Rhttpd) can be used to force - passing body in raw form to the handler (useful for the - FastRWeb handler which does its own parsing). - - ** WebSockets ** - this protocol is used by HTML5 web browsers for direct access - to R with a persistent connection. This allows implementation - of websites that have a dedicated R session (and could be used - as an R console). There are two subprotocols supported by - Rserve: - - -- WebSocket(*, "QAP") -- - this is a tunnel for the original QAP protocol through - WebSockets. It requires a browser capable of binary - WebSockets protocol (version 01 or higher). It allows very - efficient data transfer, typically by loading ArrayBuffers - directly into GPU or CPU. - It is disabled by default and can be enabled using - "websockets.qap enable" configuration directive. - (If no subprotocol is specified, QAP is assumed) - - -- WebSocket(*, "text") -- - this is a very simplistic protocol using plain text frames - to send input to R and output from R directly as text. It - acts essentially as an R console. This protocol works with - any WebSockets implementation including version hybi-00 and - hixie-75/76 - It is disabled by default and can be enabled using - "websockets.text enable" configuration directive. - - NOTE: The textual WebSockets protocol does NOT provide any - authentication mechanism, so use with extreme care as you - are essentially giving any user with a web browser access to - R and thus to the shell. - - In addition to enabling each or both subprotocols, the port on - which the WebSockets server should listen must be specified in - the configuration using "websockets.port" directive, for - example "websockets.port 8080". Alternatively, the HTTP server - can be enabled to allow connection upgrade to WebSockets on - the same port with "http.upgrade.websockets enable" - - NOTE: remember that the default in Rserve is to disallow - remote connections, so you may need to use "remote enable" in - order to use WebSockets or HTTP in practice, since the point - is to serve remote machines. Typically, if both QAP and either - HTTP or WebSockets are used, it is recommended to use QAP on a - local unix socket for better access control. - - o Rserve now supports SSL/TLS connections on QAP, HTTP and WS - protocols. The TLS key/CA entries are common for all - protocols. The relevant new configuration directives are: - - tls.key - tls.cert - tls.ca - - SSL/TLS can be used in several ways: with separate port for - TLS connections or by switching protocol form a regular QAP - connection using CMD_switch with "TLS" as argument. The latter - can be enabled using - - switch.qap.tls enable - - Enabled switching is advertized by Rserve with the presence - of a "TLS" entry in the ID string. Keys and other TLS entries - must be initialized in order for TLS to be enabled. - - Dedicated TLS servers can be enabled by specifying the port - for the corresponding protocol: - - qap.tls.port - for Rserve/QAP - http.tls.port - for HTTPS - websockets.tls.port - for WebSockets - - (there are synonyms "https.port" for "http.tls.port" and - "tls.port" for "qap.tls.port", however, the *.tls.port - versions are preferred for clarity) - - The use of TLS protocols is encouraged where sensitive data - is transmitted or when requiring secure authentication. For - QAP protocol, using TCL/QAP with SHA1 passwords (also new, see - below) is the currently recommended way where authorization is - required. The only drawback is increased CPU utilization - during transfers casued by the encryption and the fact that - TLS-enabled clients must be used. See also RSA secure - authentication (CMD_keyReq + CMD_secLogin) for a different - method if only the authentication step is to be encrypted. - - When enabling TLS tls.key and tls.cert are mandatory, tls.ca - is optional to establish CA chain of trust (whether this is - needed depends on the client and the certificate). To generate - a key and self-signed certificate, you can use something like - - openssl genrsa -out server.key 2048 - openssl req -new -key server.key -out server.csr - openssl x509 -req -days 365 -in server.csr \ - -signkey server.key -out server.crt - - NOTE: TLS services are started **in addition** to any other - servers, i.e., if you want to enable TLS/QAP only, you have to - set tls.qap.port but also add "qap disable" to disable the - plain Rserve access. - - o QAP servers (classic Rserve QAP, QAP/TLS and WebSocket/QAP) - support object-capability (OC) mode in which all Rserve - commands are disabled except for CMD_OCcall. All messages - including the initial handshake are always QAP and the initial - message defines capabilities (here opaque references to - closures) that can be called. This mode can be enabled using - - qap.oc enable ## for Rserve QAP and QAP/TLS - websockets.qap.oc enable ## for WebSockets/QAP - - In this mode the configuration *must* define a function - oc.init (typically using eval or source configuration - directives) which has to supply OC references that can be used - in calls. If the evaluation of oc.init() fails, the connection - is closed immediately. The use of invalid OC references or any - other commands other than CMD_OCcall results in immediate - connection termination. This allows creation of hardened, - secure services that can disallow arbitrary code execution. - - NOTE: this mode is inherenty incompatible with all classic - Rserve clients. The first four bytes of the initial packet are - "RsOC" instead of "Rsrv" - - o Rserve can now be started from within an existing R session - using run.Rserve() command. This allows the user to prepare a - session "by hand", run Rserve from within that session and go - back to the session (by shutting down the server or sending an - interrupt). This allows the use of Rserve even without the - presence of libR. - - o Rserve now supports out-of-band (OOB) messages. Those can be - sent using the self.oobSend() [one-way] and self.oobMessage() - [roundtrip] functions from within code that is evaluated in - Rserve child instances. OOB messages are not used by Rserve - itself but offer asynchronous notification to clients that - support it (one typical use are WS-QAP1 tunnels to web - browsers that allow status updates as R code is evaluated). - - o Rserve accepts additional command line arguments: - --RS-source (same as "source " in cfg file) - --RS-enable-remote (same as "remote enable" in cfg file) - --RS-enable-control (same as "control enable" in cfg file) - - o The Rserve package no longer includes the R client. It has - been moved to a separate package "RSclient" so that it can be - used on machines separate from the server. - - o There was a bug in QAP storage estimation affecting pairlists, - possibly resulting in buffer overflows. This should be fixed - and an error message will be printed when such overflows are - detected in the future (which hopefully won't happen). - - o Bugfix: command line parsing would skip over some arguments - - o Passwords file can contain MD5 or SHA1 hash of a password - instead of the plaintext password for non-crypt - authentication. In that case the hash must be lowercase hex - representation with preceding $ sign, so for example user - "foo" with password "bar" would have an entry - foo $62cdb7020ff920e5aa642c3d4066950dd1f01f4d - You can use - echo -n 'password' | openssl sha1 - to obtain the SHA1 hash of a password (openssl md5 for MD5 - hash - MD5 is probably more common but less secure than - SHA1). This feature makes sure that passwords are not stored - in plain text and thus are safe from local attacks. - - o Rserve now has the ability to change uid/gid according to the - user that has been authenticated. The following settings - concern this feature (unix-only): - - auto.uid {enable|disable} [disable] - auto.gid {enable}disable} [disable] - default.uid [none] - default.gid [none] - - The auto.uid/gid directives enable setuid/setgid based on - user's uid/gid. In case no uid/gid is specified with the - username, the default.uid/gid settings will be used. If there - is no uid/gid in the username and no defaults are specified, - the user's authentication will fail. User's uid/gid can be - specified in the passwords file by appending /uid,gid to the - username. If gid is not specified, uid will be used for both - uid and gid. So for example user "foo" (from the above MD5 - example) with uid=501 would have an entry on the passwords - file: - - foo/501 $37b51d194a7513e45b56f6524f2d51f2 - - For this to work, Rserve must be started as root. However, - with auto.uid enabled it is safe to do so since Rserve will - prevent any R access until authenticated. You should, however, - use a client capable of secure RSA authentication or use secure - connection such as QAP/TLS as to not send password in - cleartext over the wire. - - o Rserve can now be run in a mode where each connection has a - different uid and/or gid such that separate client instances - are isolated. This allows more restricted setup in cases where - instances may not be trusted and need to be sandboxed. The - following configuration directives are associated with this - functionality: - - random.uid {enable|disable} [disable] - random.gui {enable|disable} [disable] - random.uid.range {..} [32768..65540] - - If random.uid is enabled and random.gid disable then only the - uid of the process is changed. If both are enabled then the - gid is set to match the value of the uid. random.gid cannot be - enabled without random.uid. - - To support sandboxing, the permissions on the working - directory can be specified using - - workdir.mode - - o If any kind of client-process uid switching is enabled in the - configuration, the permissions on the working directory will - match the uid of the process. Also the working directories are - now named by the process ID to facilitate cleanup. - - o Rserve now supports secure authentication even outside of - SSL/TLS. There are two new commands that can be used by the - client: - - CMD_keyReq - requests an authentication key from the server - that will be used to perform a secure - login. The kind of the requested key is - specified as a parameter. Currently, only - "rsa-authkey" is supported which returns server - authentication key (authkey) and a RSA public - key which must be used to encode the authkey - and the authentication information (see below). - The RSA key can be compared on the client side - to ensure the authenticity of the server. - - CMD_secLogin - secure login. It consists of an encrypted data - stream that will authenticate the user. In the - case of the "rsa-authkey" method, the stream - consists of the authkey and the login + - password, all of which must be encrypted using - server's RSA key. - - The RSA key on the server (Rserve) side is specified using - - rsa.key - - configuration file directive. The file is expected to be in - PEM format. You can generate such file, e.g., with: - - openssl genrsa -out server.key 4096 - - where 4096 is the key size in bits. A public key can be - extracted from the private key using - - openssl rsa -pubout -in server.key -out server_pub.key - - The clients can pre-share the public key by other means to - compare it to the key received from the server as to verify - its authenticity. This will prevent them from sending the - authentication information to rogue servers. - - NOTE: if the rsa.key directive is missing, Rserve will - generate a key on the fly when asked for RSA authentication - - although this allows encrypted transmission and thus is safe - from sniffing, it is not safe from man-in-the-middle attacks - where a rogue server intercepts the request and sends its own - public key. Therefore the use of the rsa.key directive is - highly recommended. - - The RSA authentication enables the client to a) check the - authenticity of the server (by comparing the RSA public key) - and b) send authentication information encrypted. This method - is highly recommended in cases where a full TLS/SSL encryption - of the entire connection would be too expensive (i.e. in cases - where the data is large and the security of the transported - data is not crucial). - - o Rserve has a preliminary IPv6 support. Rserve must be - installed with --enable-ipv6 configure flag to enable it in - the Rserve build. In order to start all servers on IPv6 add - - ipv6 enable - - to the configuration file. The option is global, i.e. once - enabled it applies to all servers that support it. Note that - not all features work with IPv6 yet - detaching sessions - (they will use IPv4 for re-attach) and remote client filtering - only work with IPv4 at this point. - - o Rserve now binds only to the loopback interface in - "remote disable" mode. This is safer and prevents remote DoS - attacks. Previously, Rserve would bind on all interfaces and - check the peer IP address. If desired, you can replicate the - old behavior by adding - - remote enable - allow 127.0.0.1 - - to the configuration (if you don't know the difference then - you don't need this -- if you actually need this, then you - probably want to add more "allow" entries for the machine's - other interfaces as well). - - o If a function .Rserve.done() is defined in the global - environment, it will be run after a clean connection - shutdown. This allows custom code to be run when a client - connection is closed. - - o If a function .Rserve.served() is defined in the global - environment of the server, it will be run after a client - connection has been served. For forked servers this is just - after the fork(), for co-operative servers this is after the - client conenction has been closed. It is guaranteed that no - other client is served before the call so it can be used to - manage resources that are unsafe to share with forked - processes (e.g. sockets etc.). - - o The server and client process can be tagged with extra - information in argv[0] so it is possible to distinguish the - server and children. This behavior can be enabled using - - tag.argv enable - - Note, however, that this not always possible and it will have - impact on programs that use argv[0] such as killall. - - o Added configuration option pid.file and command-line option - --RS-pidfile which instructs Rserve to write its process id - (pid) into that file at startup. - - o Added configuration directives http.user, https.user and - websockets.user which take a username and perform - setuid/setgid/initgroups immediately after forking. - This minimizes the amount of code that is run with - elevated privileges in cases where user switching is desired. - - o Added configuration directive - - daemon disable - - which can be used to prevent Rserve from daemonizing. It has - effect only in builds of Rserve that support daemonization. - Note that -DNODAEMON build flag disables daemonization - entirely and can be used in any Rserve version. - - o All commands based on eval now also accept DT_SEXP in addition - to DT_STRING. In such case the parse step is skipped and the - expression is evaluated directly. The intended use of this - functionality is to evaluate language constructs and thus - allow calls with both reference and inlined arguments. - - o QAP decoding is slightly more efficient and avoids protection - cascades. QAP_decode() has now only one argument and it is - guaranteed to not increase the protection stack when returning - (which implies that it is the responsibility of the caller to - protect the result if needed). - - o Both QAP encoding and decoding now use native copy operations - on little-endian machines which can increase the speed - considerably when the compiler cannot do this optimization - on its own (most commoly used compilers don't). - - o Assigning logical NAs now uses the proper NA_LOGICAL value - that is also recognized by R. (PR#276) - - o Forked child processes will now close all server sockets so - that any server can be restarted without closing existing - children. - - o Signal handling has been streamlined: the server process - captures HUP, TERM and INT which will lead to clean - shutdown. Child processes restore signal handlers back to R so - that regular R signal handling rules apply. Note that - interrupt during eval will result in RESP_ERR with code 127 - even if try() is used. - - ---- In order to support new ideas a major re-organization of Rserve --- ---- has been started - almost 10 years after the first release. --- ---- It is time to look ahead again with a new major version. The --- ---- protocol will remain compatible so 1.x series can be used to --- ---- replace the previous 0.x series --- - - -0.6-8 2012-02-20 - o added RSserverEval() and RSserverSource() control commands - in the R client as well as ctrl parameter to RSshutdown(). - - o added new facility that allows R scripts running in Rserve to - issue control commands if allowed. This feature must be - enabled in the Rserve configuration file using - - r-control enable - - This will make self.ctrlEval() and self.ctrlSource() functions - available to code that is running within the Rserve - instance. It is also possible to use this feature without - explicitly loading the Rserve package via - .Call("Rserve_ctrlEval", paste(text, collapse='\n')) - .Call("Rserve_ctrlSource", as.character(file)) - although this may change in the future. - -0.6-7 2012-01-17 - o fix processing of login information - **IMPORTANT**: this fixes a serious security hole in the - remote login mechanism! If you rely on authentication, - please make sure you update your Rserve immediately! - (Thanks to Daniel Faber for reporting) - - o add a namespace to make R 2.14+ happy - - o work around broken readBin() in R 2.14.0 that errors - on unsigned integers (affects R client only) - -0.6-6 2011-12-10 - o fix a bug that can cause heap corruption due to incorrect - addressing in padding of symbols. Unless extremely long symbol - names are used it is unlikely to have a real effect in - practice, but in theory it could be used to zero targetted - parts of the heap. Thanks to Ralph Heinkel for reporting. - - o fix Rserve() call on Windows with quote=FALSE and more than - one argument. - - o clarify that sisocks.h is under LGPL 2.1 as well as the other - headers used by clients. - - o add support for plain S4 objects (S4SEXP) in assignments - (Note: derived S4 objects - those using other native SEXP type - as a base - cannot be supported properly, becasue there is no - way to distinguish them from S3 objects!) - - o Unsupported types in CMD_assign will no longer crash R. - The resulting object is always NULL and an error is printed on - the R side. - - -0.6-5 2011-06-21 - o use new install.libs.R custom installation script in R 2.13.1 - to install binaries - - o install clients by default on Windows as well - - o multi-arch binaries are no longer installed with the arch suffix - in the package root. The canonical place is libs$(R_ARCH) instead. - For now Rserve.exe/Rserve_d.exe are still installed in the root - but they will be also removed in the future as they are not - multi-arch safe. - - -0.6-4 2011-05-19 - o make all buffers capable of using 64-bit sizes. This means - that clients can use more that 4Gb of data on 64-bit platforms - when communicating with Rserve, provided the buffer limits are - either disabled or configured to be high enough. Note that this - does not change the limitations in R with respect to vector - lengths so you still can only use up to 2^31-1 elements. - - o bug fix: contrary to the documentation scalar logicals were sent - in the old XT_BOOL format instead of XT_ARRAY_BOOL - - o work around several issues introduced in R 2.13.0 for Windows - - Rserve() now also allows arguments to be passed to system() for - more fine-grained control of the environment, mostly to work - around bugs and incompatible changes to system() on Windows - in R 2.13.0 (commonly used options are invisible=FALSE to get - back to a more reasonable pre-2.13.0 behavior and wait=TRUE if - using R 2.13.0 that has broken wait=FALSE support). - - o In Rserve() startup wrapper, args are now quoted automatically - if quote=TRUE is set. For backward compatilility args are not - quoted by default if they consist of just one string. - - -0.6-3 2011-01-17 - o bug fix: the child process could get stuck in the server loop - after some abnormal return from the child connection code - Thanks to David Richardson for reporting. - - o set R_ARCH automatically on Windows if a multi-arch R is - detected (such as CRAN binaries since R 2.12.0) - - o add R_ARCH support in Rserve() on Windows to locate the - proper binary - - o bug fix: C++ client did not handle new-style lists (introduced - in Rserve 0.5) properly. Thanks to Carl Martin Grewe for - reporting. - - -0.6-2 2010-09-02 - o add support for NAs in character vectors by using a special - "\xff" string. Any string beginning with '\xff' is - prepended by additional '\xff' to remove ambiuguity and clients - should remove leading '\xff' accordingly. - (Note that UTF-8 encoded strings never contain '\xff' so - in most uses it never occurs). - - The Java client has been updated accordingly and represents - NA strings with null. - - o add a new config file option "interactive" that allows to run - Rserve in interactive or non-interactive mode across platforms. - Previously Windows ran in non-interactive mode and unix in - interactive mode. Non-interactive mode is useful if you want - to prevent R from soliciting user input, but it requires error - option to be set if you don't want to quit R on all errors - (i.e., something like options(error=function() NULL) will do) - - Note: on unix the interactivity flag can only be set *after* R - initialization (due to limitation in R) so you still may have - to pass flags like --no-save in order to appease R. - - o more Windows fixes - Rserve uses R's own initialization in - recent R versions. This also fixes issues with Win64 and more - recent toolchains. - Note that both Widnows and unix now behave consistently with - respect to interactive mode - the default is now interactive - for both platforms but can be changed in the config file. - - -0.6-1 2010-05-24 - o add a safety margin to the send buffer to avoid crashes when - size estimates are off (e.g., due to re-coding) - - o added a very primitive PHP client - - o Win64 fixes by Brian Ripley - - o added new configuration options: - su {now|server|client} - switches user either immediately - as the config file is loaded ("now", default and always the - behavior of Rserve before 0.6-1), when the server is ready - ("server") or when a client is spawned ("client"). The - latter is useful to restrict clients from sending signals - to the server process. - - uid, gid config options are interpreted accordingly to - the su value. - - cachepwd - {no|yes|indefinitely} - allows Rserve to cache - the password file. "no" = read it at each authorization - (default and behavior before 0.6-1), "yes" = read it when - a client is spawned before su, "indefinitely" = read it - just after the config file (most efficient but changes - are only active after re-start). "yes" has only effect - in unix and can be used to restrict permissions on the - password file such that client code has no access to it - (do does "indefinitely" but can be used anywhere). - - -0.6-0 2009-10-27 - o added support for control commands CMD_ctrlEval, - CMD_ctrlSource and CMD_ctrlShutdown. Those commands provide - control over the server process. The side-efect of eval and - source are then available to all future connections. - - Control commands are only available if they are enabled, e.g., - with the config file entry "control enable". In addition if - authorization is required or the passwords file is set only - designated users will have control access (see next point). - - Note that enabling control commands will make Rserve use at - least one file descriptor per active child process, so you may - want to adjust the maximum number of file descriptor in your - system if you expect hundreds of concurrent clients. - - o The passwords file format has been enhanced to give - finer-granularity control over the user authorization. - - Only users with "@" prefix can issue control commands. The - prefix is not part of the user name for authentication - purposes. - - In addition, if the password file contains an entry - starting with "*" it will be interpreted as blank - authorization, i.e. any username/pwd will authenticate. This - may be useful in conjunction with control prefix, e.g., the - following file would give blank authorization to all users but - only the user "joe" will be able to use control commands: - - @joe foobar - * - - o Windows build cleanup (thanks to Brian Ripley) - - o fixed decoding of XT_RAW (it advanced too far), this affected - the use of XT_RAW as non-last element only (thanks to Saptarshi - Guha for reporting) - - o don't advertize ARuc if not supported (this bug only affected - systems without crypt support with plaintext enabled and - required authorization) - - o add assign support for logical vectors - - -0.5-3 2009-01-25 - o fix SET_VECTOR_ELT/SET_STRING_ELT mismatches - - o set object flag when decoding objects that have - a "class" attribute (fixes issues with S3 objects that - were passed from the client to the server). - - o set S4 bit for pure S4 objects (S4SEXP). No other S4 - objects are supported because there is no way to tell - that an assembled object is really an S4 object - - o added string encoding support (where R supports it) - The string encoding can be set in the configuration file - (directive "encoding"), on the command line with --RS-encoding - or within a session by the client command CMD_setEncoding. - - This means that strings are converted to the given encoding - before being sent to the client and also all strings from the - client are assumed to come from the given encoding. - (Previously the strings were always passed as-is with no - conversion). The currently supported encodings are "native" - (same as the server session locale), "utf8" and "latin1". The - server default is currently "native" for compatibility with - previous versions (but may change to "utf8" in the future, so - explicit use of encoding in the config file is advised). - - If a server is used mainly by Java clients, it is advisable to - set the server encoding to "utf8" since that it the only - encoding supported by Java clients. - - For efficieny it is still advisable to run Rserve in the same - locale as the majority of clients to minimize the necessary - conversions. With diverse clients UTF-8 is the most versatile - encoding for the server to run in while it can still serve - latin1 clients as well. - - -0.5-2 2008-10-17 - o fix a bug in CMD_readFile and CMD_setBufferSize that - resulted in invalid buffer sizes (one of the ways to - trigger the bug was to attempt to read a small number of - bytes with readFile). Thanks to H. Rehauer for reporting. - - o ignore attributes if they are not in a LISTSXP - there seem - to be other uses of the ATTRIB entry in conjunction with - character hashes in recent R versions. (BR #76) - - o adapt C++ client to changes in 0.5 (at least to the point - where the demo1 code works) - - o add support for XT_VECTOR_EXP in assignments - - o improve protection for vectors - - o report "remote" setting in --RS-settings - - o updates in the REngine Java client, added documentation - - -0.5-1 2008-07-22 - o fix build issue with R 2.7.x on Windows - - o mergefat now works properly and uses cp if there is no lipo - (this fixes multi-arch issues on Mac OS X and makes sure that - Rserve/Rserve.dbg are installed even on non-Mac systems) - - -0.5-0 2008-07-21 - o added CMD_serEval and CMD_serAssign which are highly efficient - when talking to R clients as they don't need any intermediate - buffer. The corresponding R client functions RSeval and - RSassign have been re-written to use this new API. - - o deprecate scalar types in the protocol - - o add more efficient storage for dotted-pair lists - and symbol names - - o add support for complex numbers - - o new Java client: REngine - it is more flexible than JRclient and it can be used with - other Java/R engines such as JRI. Also it has a much more - clean API and better exeption handling. - - allow NaNs to be passed in raw form to R, i.e. double - NAs can be created using - Double.longBitsToDouble(0x7ff00000000007a2L) - (nice methods for this should follow) - - o C++ client was moved to src/client/cxx - -JRclient: - o change the representation of lists to generic - named vectors (class RList) - - o change the ways attributes are accessed - - -0.4-7 2007-01-14 - o relax DLL versions checking on Windows - - o added more sophisticated implementation of RSassign - in R client to support larger data. Nevertheless, due to - limitations in R, objects must be serializable to - less than 8MB to be assignable via RSassign. - - o added more robust error handling in the R client - - o fixed compilation on systems with custom include dir - (such as Debian) - - o JRclient is now part of the Rserve package. - See clients.txt for details. - It is not compiled by default (but installed when - --with-client is specified), because we cannot assume the - existence of a Java compiler. - - -0.4-6 2006-11-30 - o fixed bug in RSeval when handling large objects - - o minor fix in RSassign - - o add an endianness hack for Windows in case config.h is not - included properly - - -0.4-5 2006-11-29 - o added --with-server option (by default enabled). When disabled, - the server itself is not built. When enabled, R must provide R - shared library, i.e. it must have been compiled with - --enable-R-shlib. - - o added --with-client option (by default disabled). When - enabled, the C/C++ client is built and installed in the - package. It will be copied in the "client" directory of the - package and contains all files necessary for building a - client application. - - This option has no effect on the R client which is always - built and installed. - - o Windows version of Rserve now builds and installs both debug - (Rserve_d.exe) and regular (Rserve.exe) version of Rserve. In - addition, the Rserve function can now be used to launch Rserve - even on Windows. - - o endianness detection now prefers information from the compiler - macros thus allowing cross-compilation. Use -D_BIG_ENDIAN_ or - -D_LITTLE_ENDIAN_ to override it if necessary. - - o allows universal build on Mac OS X - - o adapt to R_ParseVector interface change in R-devel - - -0.4-4 2006-11-15 - o first release on CRAN - - o added support for RAW type (both in and out) - - o added rudimentary client support (thanks to David Reiss for - his contributions) and documentation - - -Previous major releases: - -0.4 2005-08-31 - * added support for sessions - -0.3 2003-10-07 - * new format for boolean arrays - last version: 0.3-18 (2005-08-28) - -0.2 2003-08-21 - * support for large objects - -0.1 2002-07-06 - * first release diff --git a/spark/rlibs/Rserve/R/Rserve b/spark/rlibs/Rserve/R/Rserve deleted file mode 100644 index 3b65e3cb..00000000 --- a/spark/rlibs/Rserve/R/Rserve +++ /dev/null @@ -1,27 +0,0 @@ -# File share/R/nspackloader.R -# Part of the R package, http://www.R-project.org -# -# Copyright (C) 1995-2012 The R Core Team -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# A copy of the GNU General Public License is available at -# http://www.r-project.org/Licenses/ - -local({ - info <- loadingNamespaceInfo() - pkg <- info$pkgname - ns <- .getNamespace(as.name(pkg)) - if (is.null(ns)) - stop("cannot find namespace environment for ", pkg, domain = NA); - dbbase <- file.path(info$libname, pkg, "R", pkg) - lazyLoad(dbbase, ns, filter = function(n) n != ".__NAMESPACE__.") -}) diff --git a/spark/rlibs/Rserve/R/Rserve.rdb b/spark/rlibs/Rserve/R/Rserve.rdb deleted file mode 100644 index 9a5da395..00000000 Binary files a/spark/rlibs/Rserve/R/Rserve.rdb and /dev/null differ diff --git a/spark/rlibs/Rserve/R/Rserve.rdx b/spark/rlibs/Rserve/R/Rserve.rdx deleted file mode 100644 index 85a4cc6e..00000000 Binary files a/spark/rlibs/Rserve/R/Rserve.rdx and /dev/null differ diff --git a/spark/rlibs/Rserve/help/AnIndex b/spark/rlibs/Rserve/help/AnIndex deleted file mode 100644 index 7d7bce49..00000000 --- a/spark/rlibs/Rserve/help/AnIndex +++ /dev/null @@ -1,6 +0,0 @@ -Rserve Rserv -run.Rserve run.Rserve -self.ctrlEval self -self.ctrlSource self -self.oobMessage self -self.oobSend self diff --git a/spark/rlibs/Rserve/help/Rserve.rdb b/spark/rlibs/Rserve/help/Rserve.rdb deleted file mode 100644 index 0c2d697b..00000000 Binary files a/spark/rlibs/Rserve/help/Rserve.rdb and /dev/null differ diff --git a/spark/rlibs/Rserve/help/Rserve.rdx b/spark/rlibs/Rserve/help/Rserve.rdx deleted file mode 100644 index f61382f6..00000000 Binary files a/spark/rlibs/Rserve/help/Rserve.rdx and /dev/null differ diff --git a/spark/rlibs/Rserve/help/aliases.rds b/spark/rlibs/Rserve/help/aliases.rds deleted file mode 100644 index 2cbad003..00000000 Binary files a/spark/rlibs/Rserve/help/aliases.rds and /dev/null differ diff --git a/spark/rlibs/Rserve/help/paths.rds b/spark/rlibs/Rserve/help/paths.rds deleted file mode 100644 index 9dbdcd04..00000000 Binary files a/spark/rlibs/Rserve/help/paths.rds and /dev/null differ diff --git a/spark/rlibs/Rserve/html/00Index.html b/spark/rlibs/Rserve/html/00Index.html deleted file mode 100644 index 914696c1..00000000 --- a/spark/rlibs/Rserve/html/00Index.html +++ /dev/null @@ -1,36 +0,0 @@ - -R: Binary R server - - - -

Binary R server - -

-
-
-[Up] -[Top] -

Documentation for package ‘Rserve’ version 1.7-3

- - - -

Help Pages

- - - - - - - - - - - - - - - -
RserveServer providing R functionality to applications via TCP/IP or local unix sockets
run.RserveStart Rserve within the current R process.
self.ctrlEvalFunctions usable for R code run inside Rserve
self.ctrlSourceFunctions usable for R code run inside Rserve
self.oobMessageFunctions usable for R code run inside Rserve
self.oobSendFunctions usable for R code run inside Rserve
- diff --git a/spark/rlibs/Rserve/html/R.css b/spark/rlibs/Rserve/html/R.css deleted file mode 100644 index 6f058f3d..00000000 --- a/spark/rlibs/Rserve/html/R.css +++ /dev/null @@ -1,57 +0,0 @@ -BODY{ background: white; - color: black } - -A:link{ background: white; - color: blue } -A:visited{ background: white; - color: rgb(50%, 0%, 50%) } - -H1{ background: white; - color: rgb(55%, 55%, 55%); - font-family: monospace; - font-size: x-large; - text-align: center } - -H2{ background: white; - color: rgb(40%, 40%, 40%); - font-family: monospace; - font-size: large; - text-align: center } - -H3{ background: white; - color: rgb(40%, 40%, 40%); - font-family: monospace; - font-size: large } - -H4{ background: white; - color: rgb(40%, 40%, 40%); - font-family: monospace; - font-style: italic; - font-size: large } - -H5{ background: white; - color: rgb(40%, 40%, 40%); - font-family: monospace } - -H6{ background: white; - color: rgb(40%, 40%, 40%); - font-family: monospace; - font-style: italic } - -IMG.toplogo{ vertical-align: middle } - -IMG.arrow{ width: 30px; - height: 30px; - border: 0 } - -span.acronym{font-size: small} -span.env{font-family: monospace} -span.file{font-family: monospace} -span.option{font-family: monospace} -span.pkg{font-weight: bold} -span.samp{font-family: monospace} - -div.vignettes a:hover { - background: rgb(85%, 85%, 85%); -} - diff --git a/spark/rlibs/Rserve/libs/Rserve b/spark/rlibs/Rserve/libs/Rserve deleted file mode 100755 index 0cdd3176..00000000 Binary files a/spark/rlibs/Rserve/libs/Rserve and /dev/null differ diff --git a/spark/rlibs/Rserve/libs/Rserve.dbg b/spark/rlibs/Rserve/libs/Rserve.dbg deleted file mode 100755 index d59f3825..00000000 Binary files a/spark/rlibs/Rserve/libs/Rserve.dbg and /dev/null differ diff --git a/spark/rlibs/Rserve/libs/Rserve.so.dSYM/Contents/Info.plist b/spark/rlibs/Rserve/libs/Rserve.so.dSYM/Contents/Info.plist deleted file mode 100644 index 6be59b74..00000000 --- a/spark/rlibs/Rserve/libs/Rserve.so.dSYM/Contents/Info.plist +++ /dev/null @@ -1,20 +0,0 @@ - - - - - CFBundleDevelopmentRegion - English - CFBundleIdentifier - com.apple.xcode.dsym.Rserve.so - CFBundleInfoDictionaryVersion - 6.0 - CFBundlePackageType - dSYM - CFBundleSignature - ???? - CFBundleShortVersionString - 1.0 - CFBundleVersion - 1 - - diff --git a/spark/src/main/scala/io/ddf/spark/content/KryoRegistrator.scala b/spark/src/main/scala/io/ddf/spark/content/KryoRegistrator.scala index 4ad17390..d740b583 100644 --- a/spark/src/main/scala/io/ddf/spark/content/KryoRegistrator.scala +++ b/spark/src/main/scala/io/ddf/spark/content/KryoRegistrator.scala @@ -7,8 +7,6 @@ import io.ddf.types.Matrix import io.ddf.types.Vector import io.ddf.spark.ml.ROCComputer import org.jblas.DoubleMatrix -import org.rosuda.REngine.REXP -import org.rosuda.REngine.RList import io.ddf.ml.RocMetric @@ -19,8 +17,6 @@ class KryoRegistrator extends SparkKryoRegistrator { kryo.register(classOf[DoubleMatrix]) kryo.register(classOf[ROCComputer]) kryo.register(classOf[RocMetric]) - kryo.register(classOf[REXP]) - kryo.register(classOf[RList], new FieldSerializer(kryo, classOf[RList])) //super.registerClasses(kryo) } } diff --git a/spark/src/main/scala/io/ddf/spark/content/RDDRow2REXP.scala b/spark/src/main/scala/io/ddf/spark/content/RDDRow2REXP.scala deleted file mode 100644 index 3f5305a9..00000000 --- a/spark/src/main/scala/io/ddf/spark/content/RDDRow2REXP.scala +++ /dev/null @@ -1,102 +0,0 @@ -package io.ddf.spark.content - -import io.ddf.DDF -import io.ddf.content.{Representation, ConvertFunction} -import org.apache.spark.rdd.RDD -import io.ddf.spark.{SparkDDFManager, SparkDDF} -import org.apache.spark.sql.Row -import io.ddf.content.Schema.{ColumnType, Column} -import org.apache.spark.sql.types.StructField -import scala.collection.JavaConversions._ -import scala.collection.mutable.ArrayBuffer -import org.rosuda.REngine._ - -/** - * // TODO review. What is this for? - */ -class RDDROW2REXP(@transient ddf: DDF) extends ConvertFunction(ddf) { - - override def apply(representation: Representation): Representation = { - val columnList = ddf.getSchemaHandler.getColumns - - representation.getValue match { - case rdd: RDD[Row] => { - val rddREXP = rdd.mapPartitions { - iterator => { - val arrayBufferColumns: List[ArrayBuffer[_]] = columnList.map { - col => col.getType match { - case ColumnType.INT => new ArrayBuffer[Int] - case ColumnType.DOUBLE | ColumnType.BIGINT => new ArrayBuffer[Double] - case ColumnType.STRING => new ArrayBuffer[String] - } - }.toList - - while (iterator.hasNext) { - val row = iterator.next() - var i = 0 - while (i < row.size) { - columnList(i).getType match { - case ColumnType.INT => { - val buffer = arrayBufferColumns(i).asInstanceOf[ArrayBuffer[Int]] - if (row.isNullAt(i)) { - buffer += REXPInteger.NA - } else { - buffer += row.getInt(i) - } - } - - case ColumnType.BIGINT => { - val buffer = arrayBufferColumns(i).asInstanceOf[ArrayBuffer[Double]] - if (row.isNullAt(i)) { - buffer += REXPDouble.NA - } else { - buffer += row.getLong(i).toDouble - } - } - - case ColumnType.DOUBLE | ColumnType.BIGINT => { - val buffer = arrayBufferColumns(i).asInstanceOf[ArrayBuffer[Double]] - if (row.isNullAt(i)) { - buffer += REXPDouble.NA - } else { - buffer += row.getDouble(i) - } - } - - case ColumnType.STRING => { - val buffer = arrayBufferColumns(i).asInstanceOf[ArrayBuffer[String]] - if (row.isNullAt(i)) { - buffer += null - } else { - buffer += row.getString(i) - } - } - } - i += 1 - } - } - val rVectors = columnList zip arrayBufferColumns map { - case (col, buffer) => col.getType match { - case ColumnType.INT => { - new REXPInteger(buffer.asInstanceOf[ArrayBuffer[Int]].toArray).asInstanceOf[REXP] - } - case ColumnType.DOUBLE | ColumnType.BIGINT => { - new REXPDouble(buffer.asInstanceOf[ArrayBuffer[Double]].toArray).asInstanceOf[REXP] - } - case ColumnType.STRING => { - new REXPString(buffer.asInstanceOf[ArrayBuffer[String]].toArray).asInstanceOf[REXP] - } - } - } - val dfList = new RList(rVectors, columnList.map { - col => col.getName - }) - Iterator(REXP.createDataFrame(dfList)) - } - } - - new Representation(rddREXP, RepresentationHandler.RDD_REXP.getTypeSpecsString) - } - } - } -} diff --git a/spark/src/main/scala/io/ddf/spark/content/REXP2ArrayObject.scala b/spark/src/main/scala/io/ddf/spark/content/REXP2ArrayObject.scala deleted file mode 100644 index 3fef7f07..00000000 --- a/spark/src/main/scala/io/ddf/spark/content/REXP2ArrayObject.scala +++ /dev/null @@ -1,97 +0,0 @@ -package io.ddf.spark.content - -import io.ddf.DDF -import io.ddf.content.{Representation, ConvertFunction, Schema} -import org.apache.spark.rdd.RDD -import org.rosuda.REngine.{REXPString, REXPInteger, REXPDouble, REXP} -import scala.collection.JavaConverters._ -import scala.collection.JavaConversions.asScalaIterator -import scala.collection.JavaConversions.seqAsJavaList - -/** - */ -class REXP2ArrayObject(@transient ddf: DDF) extends ConvertFunction(ddf) { - - override def apply(representation: Representation): Representation = { - val rddArrObj = representation.getValue match { - case rdd: RDD[REXP] => { - val rddArr = REXP2ArrayObject.RDataFrameToArrayObject(rdd, ddf.getSchema.getColumns.asScala.map(_.getType).toList) - rddArr - } - } - new Representation(rddArrObj, RepresentationHandler.RDD_ARR_OBJECT.getTypeSpecsString) - } -} - -object REXP2ArrayObject { - /** - * Convert a RDD of R data.frames into a RDD of Object[] - */ - def RDataFrameToArrayObject(rdd: RDD[REXP], colTypes: List[Schema.ColumnType]): RDD[Array[Object]] = { - - val rddarrobj = rdd.flatMap { - partdf ⇒ - val dflist = partdf.asList() - val partitionSize = (0 until dflist.size()).map(j ⇒ dflist.at(j).length()).reduce { - (x, y) ⇒ math.max(x, y) - } - - // mLog.info("partdf.len = " + partdf.length()) - // mLog.info("partitionSize = " + partitionSize) - - // big allocation! - val jdata = Array.ofDim[Object](partitionSize, dflist.size()) - - // convert R column-oriented AtomicVector to row-oriented Object[] - // TODO: would be nice to be able to create BigR DataFrame from columnar vectors - (0 until dflist.size()).foreach { - j ⇒ - val rcolvec = dflist.at(j) - dflist.at(j) match { - case v: REXPDouble ⇒ { - val data = rcolvec.asDoubles() // no allocation - var i = 0 // row idx - while (i < partitionSize) { - if (REXPDouble.isNA(data(i))) - jdata(i)(j) = null - else { - if (colTypes(j) == Schema.ColumnType.BIGINT) { - jdata(i)(j) = data(i).toLong.asInstanceOf[Object] - } else { - jdata(i)(j) = data(i).asInstanceOf[Object] - } - - } - - i += 1 - } - } - case v: REXPInteger ⇒ { - val data = rcolvec.asIntegers() // no allocation - var i = 0 // row idx - while (i < partitionSize) { - if (REXPInteger.isNA(data(i))) - jdata(i)(j) = null - else - jdata(i)(j) = data(i).asInstanceOf[Object] - i += 1 - } - } - case v: REXPString ⇒ { - val data = rcolvec.asStrings() // no allocation - var i = 0 // row idx - while (i < partitionSize) { - jdata(i)(j) = data(i) - i += 1 - } - } - // TODO: case REXPLogical - } - } - - jdata - } - - rddarrobj - } -} diff --git a/spark/src/main/scala/io/ddf/spark/content/RepresentationHandler.scala b/spark/src/main/scala/io/ddf/spark/content/RepresentationHandler.scala index 425154e1..169cdf6c 100644 --- a/spark/src/main/scala/io/ddf/spark/content/RepresentationHandler.scala +++ b/spark/src/main/scala/io/ddf/spark/content/RepresentationHandler.scala @@ -15,7 +15,6 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Row import org.python.core.PyObject -import org.rosuda.REngine._ import scala.collection.JavaConversions._ import scala.reflect.Manifest @@ -33,12 +32,10 @@ class RepresentationHandler(mDDF: DDF) extends RH(mDDF) { this.addConvertFunction(RDD_ARR_DOUBLE, RDD_LABELED_POINT, new ArrayDouble2LabeledPoint(this.mDDF)) this.addConvertFunction(RDD_ARR_OBJECT, RDD_ARR_DOUBLE, new ArrayObject2ArrayDouble(this.mDDF)) this.addConvertFunction(RDD_ROW, RDD_ARR_STRING, new RDDRow2ArrayString(this.mDDF)) - this.addConvertFunction(RDD_REXP, RDD_ARR_OBJECT, new REXP2ArrayObject(this.mDDF)) this.addConvertFunction(RDD_ROW, RDD_ARR_OBJECT, new RDDRow2ArrayObject(this.mDDF)) this.addConvertFunction(RDD_ROW, RDD_ARR_DOUBLE, new RDDRow2ArrayDouble(this.mDDF)) this.addConvertFunction(RDD_ARR_DOUBLE, RDD_VECTOR, new ArrayDouble2Vector(this.mDDF)) this.addConvertFunction(RDD_ARR_OBJECT, DATAFRAME, new ArrayObject2DataFrame(this.mDDF)) - this.addConvertFunction(RDD_ROW, RDD_REXP, new RDDROW2REXP(this.mDDF)) this.addConvertFunction(RDD_PYOBJ, RDD_ARR_OBJECT, new PyObj2ArrayObject(this.mDDF)) this.addConvertFunction(RDD_ROW, RDD_PYOBJ, new RDDRow2PyObj(this.mDDF)) this.addConvertFunction(DATAFRAME, RDD_MATRIX_VECTOR, new DataFrame2MatrixVector(this.mDDF)) @@ -140,7 +137,6 @@ object RepresentationHandler { val RDD_ARR_STRING = new Representation(classOf[RDD[_]], classOf[Array[String]]) val RDD_LABELED_POINT = new Representation(classOf[RDD[_]], classOf[LabeledPoint]) val RDD_MATRIX_VECTOR = new Representation(classOf[RDD[_]], classOf[TupleMatrixVector]) - val RDD_REXP = new Representation(classOf[RDD[_]], classOf[REXP]) val RDD_PYOBJ = new Representation(classOf[RDD[_]], classOf[PyObject]) val DATAFRAME = new Representation(classOf[DataFrame]) val RDD_ROW = new Representation(classOf[RDD[_]], classOf[Row]) diff --git a/spark/src/main/scala/io/ddf/spark/etl/TransformationHandler.scala b/spark/src/main/scala/io/ddf/spark/etl/TransformationHandler.scala index 33b1454a..cd4cf7dd 100644 --- a/spark/src/main/scala/io/ddf/spark/etl/TransformationHandler.scala +++ b/spark/src/main/scala/io/ddf/spark/etl/TransformationHandler.scala @@ -14,15 +14,7 @@ import scala.collection.JavaConverters._ import scala.collection.JavaConversions.asScalaIterator import scala.collection.JavaConversions.seqAsJavaList import org.apache.spark.rdd.RDD -import org.rosuda.REngine.REXP -import org.rosuda.REngine.REXPDouble -import org.rosuda.REngine.REXPInteger -import org.rosuda.REngine.REXPList -import org.rosuda.REngine.REXPLogical -import org.rosuda.REngine.REXPString -import org.rosuda.REngine.RList -import org.rosuda.REngine.Rserve.RConnection -import org.rosuda.REngine.Rserve.StartRserve + import _root_.io.ddf.DDF import _root_.io.ddf.content.Schema import _root_.io.ddf.content.Schema.Column @@ -87,68 +79,8 @@ class TransformationHandler(mDDF: DDF) extends CoreTransformationHandler(mDDF) { } override def transformMapReduceNative(mapFuncDef: String, reduceFuncDef: String, mapsideCombine: Boolean = true): DDF = { - - // Prepare data as REXP objects - val dfrdd = mDDF.getRepresentationHandler.get(classOf[RDD[_]], classOf[REXP]).asInstanceOf[RDD[REXP]] - - // 1. map! - val rMapped = dfrdd.map { - partdf ⇒ - try { - TransformationHandler.preShuffleMapper(partdf, mapFuncDef, reduceFuncDef, mapsideCombine) - } catch { - case e: Exception ⇒ { - - e match { - case aExc: DDFException ⇒ throw aExc - case rserveExc: org.rosuda.REngine.Rserve.RserveException ⇒ { - throw new DDFException(rserveExc.getMessage, null) - } - case _ ⇒ throw new DDFException(e.getMessage, null) - } - } - } - } - - // 2. extract map key and shuffle! - val groupped = TransformationHandler.doShuffle(rMapped) - - // 3. reduce! - val rReduced = groupped.mapPartitions { - partdf ⇒ - try { - TransformationHandler.postShufflePartitionMapper(partdf, reduceFuncDef) - } catch { - case e: Exception ⇒ { - e match { - case aExc: DDFException ⇒ throw aExc - case rserveExc: org.rosuda.REngine.Rserve.RserveException ⇒ { - throw new DDFException(rserveExc.getMessage, null) - - } - case _ ⇒ throw new DDFException(e.getMessage, null) - } - } - } - }.filter { - partdf ⇒ - // mapPartitions after groupByKey may cause some empty partitions, - // which will result in empty data.frame - val dflist = partdf.asList() - dflist.size() > 0 && dflist.at(0).length() > 0 - } - - // convert R-processed DF partitions back to BigR DataFrame - val columnArr = TransformationHandler.RDataFrameToColumnListMR(rReduced) - - - val newSchema = new Schema(mDDF.getSchemaHandler.newTableName(), columnArr.toList); - - val manager = this.getManager - val ddf = manager.newDDF(manager, rReduced, Array(classOf[RDD[_]], - classOf[REXP]), manager.getNamespace, null, - newSchema) - ddf + throw new DDFException("Method is not implemented") + null } override def transformNativeRserve(transformExpression: String): DDF = { @@ -160,63 +92,8 @@ class TransformationHandler(mDDF: DDF) extends CoreTransformationHandler(mDDF) { } override def transformNativeRserve(transformExpressions: Array[String], inPlace: java.lang.Boolean): DDF = { - val dfrdd = mDDF.getRepresentationHandler.get(classOf[RDD[_]], classOf[REXP]).asInstanceOf[RDD[REXP]] - - // process each DF partition in R - val rMapped = dfrdd.map { - partdf ⇒ - try { - // check if Rserve is running, if not: start it - if (!StartRserve.checkLocalRserve()) throw new RuntimeException("Unable to start Rserve") - // one connection for each compute job - val rconn = new RConnection() - - // send the df.partition to R process environment - val dfvarname = "df.partition" - rconn.assign(dfvarname, partdf) - - val expr = String.format("%s <- transform(%s, %s)", dfvarname, dfvarname, transformExpressions.mkString(",")) - - // mLog.info(">>>>>>>>>>>>.expr=" + expr.toString()) - - // compute! - TransformationHandler.tryEval(rconn, expr, errMsgHeader = "failed to eval transform expression") - - // transfer data to JVM - val partdfres = rconn.eval(dfvarname) - - // uncomment this to print whole content of the df.partition for debug - // rconn.voidEval(String.format("print(%s)", dfvarname)) - rconn.close() - - partdfres - } catch { - case e: DDFException ⇒ { - throw new DDFException("Unable to perform NativeRserve transformation", e) - - } - } - } - - // new columns from the expressions - // Notice that the users can enter expression without the new column name - // Thus the extracted column name is not correct in these cases - val newCols = transformExpressions.map(_.split("=")(0)) - - // convert R-processed data partitions back to RDD[Array[Object]] - val columnArr = TransformationHandler.RDataFrameToColumnList(rMapped, mDDF.getSchema.getColumns, newCols) - - - val newSchema = new Schema(mDDF.getSchemaHandler.newTableName(), columnArr.toList); - - val manager = this.getManager - val ddf = manager.newDDF(manager, rMapped, Array(classOf[RDD[_]], - classOf[REXP]), manager.getNamespace, null, - newSchema) - mLog.info(">>>>> adding ddf to manager: " + ddf.getName) - ddf.getMetaDataHandler.copyFactor(this.getDDF) - - if (inPlace) this.getDDF.updateInplace(ddf) else ddf + throw new DDFException("Method is not implemented") + null } override def transformNativeRserve(transformExpressions: Array[String]): DDF = { @@ -398,57 +275,57 @@ object TransformationHandler { /** * eval the R expr and return all captured output */ - def evalCaptureOutput(rconn: RConnection, expr: String): String = { - rconn.eval("paste(capture.output(print(" + expr + ")), collapse='\\n')").asString() - } - - def RDataFrameToColumnListMR(rdd: RDD[REXP]): Array[Column] = { - val firstdf = rdd.first() - val names = firstdf.getAttribute("names").asStrings() - val columns = new Array[Column](firstdf.length) - - for (j ← 0 until firstdf.length()) { - val ddfType = firstdf.asList().at(j) match { - case v: REXPDouble ⇒ "DOUBLE" - case v: REXPInteger ⇒ "INT" - case v: REXPString ⇒ "STRING" - case _ ⇒ throw new DDFException("Only support atomic vectors of type int|double|string!") - } - columns(j) = new Column(names(j), ddfType) - } - columns - } - - def RDataFrameToColumnList(rdd: RDD[REXP], orgColumns: List[Column], newColumns: Array[String]): Array[Column] = { - val firstdf = rdd.first() - val names = firstdf.getAttribute("names").asStrings() - val columns = new Array[Column](firstdf.length) - - val orgColumnNames = orgColumns.asScala.map(_.getName).toSet - - val orgColumnTypes = orgColumns.asScala.map(c => (c.getName, c.getType)).toMap - - for (j ← 0 until firstdf.length()) { - val ddfType = firstdf.asList().at(j) match { - case v: REXPDouble ⇒ - if (!orgColumnNames.contains(names(j)) || newColumns.toSet.contains(names(j))) { - "DOUBLE" - } - else { - // BigInt columns are converted to Double ones in R data.frame - // So if they are not mutated by expressions, we need to set their types - // correctly on the resulted DDF - orgColumnTypes(names(j)).toString - } - case v: REXPInteger ⇒ "INT" - case v: REXPString ⇒ "STRING" - case _ ⇒ throw new DDFException("Only support atomic vectors of type int|double|string!") - } - columns(j) = new Column(names(j), ddfType) - } - columns - - } +// def evalCaptureOutput(rconn: RConnection, expr: String): String = { +// rconn.eval("paste(capture.output(print(" + expr + ")), collapse='\\n')").asString() +// } +// +// def RDataFrameToColumnListMR(rdd: RDD[REXP]): Array[Column] = { +// val firstdf = rdd.first() +// val names = firstdf.getAttribute("names").asStrings() +// val columns = new Array[Column](firstdf.length) +// +// for (j ← 0 until firstdf.length()) { +// val ddfType = firstdf.asList().at(j) match { +// case v: REXPDouble ⇒ "DOUBLE" +// case v: REXPInteger ⇒ "INT" +// case v: REXPString ⇒ "STRING" +// case _ ⇒ throw new DDFException("Only support atomic vectors of type int|double|string!") +// } +// columns(j) = new Column(names(j), ddfType) +// } +// columns +// } +// +// def RDataFrameToColumnList(rdd: RDD[REXP], orgColumns: List[Column], newColumns: Array[String]): Array[Column] = { +// val firstdf = rdd.first() +// val names = firstdf.getAttribute("names").asStrings() +// val columns = new Array[Column](firstdf.length) +// +// val orgColumnNames = orgColumns.asScala.map(_.getName).toSet +// +// val orgColumnTypes = orgColumns.asScala.map(c => (c.getName, c.getType)).toMap +// +// for (j ← 0 until firstdf.length()) { +// val ddfType = firstdf.asList().at(j) match { +// case v: REXPDouble ⇒ +// if (!orgColumnNames.contains(names(j)) || newColumns.toSet.contains(names(j))) { +// "DOUBLE" +// } +// else { +// // BigInt columns are converted to Double ones in R data.frame +// // So if they are not mutated by expressions, we need to set their types +// // correctly on the resulted DDF +// orgColumnTypes(names(j)).toString +// } +// case v: REXPInteger ⇒ "INT" +// case v: REXPString ⇒ "STRING" +// case _ ⇒ throw new DDFException("Only support atomic vectors of type int|double|string!") +// } +// columns(j) = new Column(names(j), ddfType) +// } +// columns +// +// } /** * Lots of type casting @@ -499,309 +376,309 @@ object TransformationHandler { /** * Perform map and mapsideCombine phase */ - def preShuffleMapper(partdf: REXP, mapFuncDef: String, reduceFuncDef: String, mapsideCombine: Boolean): REXP = { - // check if Rserve is running, if not: start it - if (!StartRserve.checkLocalRserve()) throw new RuntimeException("Unable to start Rserve") - // one connection for each compute job - val rconn = new RConnection() - - // send the df.partition to R process environment - rconn.assign("df.partition", partdf) - rconn.assign("mapside.combine", new REXPLogical(mapsideCombine)) - - TransformationHandler.tryEval(rconn, "map.func <- " + mapFuncDef, - errMsgHeader = "fail to eval map.func definition") - TransformationHandler.tryEval(rconn, "combine.func <- " + reduceFuncDef, - errMsgHeader = "fail to eval combine.func definition") - - // pre-amble to define internal functions - // copied from: https://github.com/adatao/RClient/blob/master/io.pa/R/mapreduce.R - // tests: https://github.com/adatao/RClient/blob/mapreduce/io.pa/inst/tests/test-mapreduce.r#L106 - // should consider some packaging to synchroncize code - rconn.voidEval( - """ - |#' Emit keys and values for map/reduce. - |keyval <- function(key, val) { - | if (! is.atomic(key)) - | stop(paste("keyval: key argument must be an atomic vector: ", paste(key, collapse=" "))) - | if (! is.null(dim(key))) - | stop(paste("keyval: key argument must be one-dimensional: dim(key) = ", - | paste(dim(key), collapse=" "))) - | nkey <- length(key) - | nval <- if (! is.null(nrow(val))) nrow(val) else length(val) - | if (nkey != nval) - | stop(sprintf("keyval: key and val arguments must match in length/nrow: %s != %s", nkey, nval)) - | kv <- list(key=key, val=val); - | attr(kv, "adatao-2d-kv-pair") <- T; - | kv - |} - | - |#' Emit a single key and value pair for map/reduce. - |keyval.row <- function(key, val) { - | if (! is.null(dim(key))) - | stop(paste("keyval: key argument must be a scala value, not n-dimensional: dim(key) = ", - | paste(dim(key), collapse=" "))) - | if (length(key) != 1) - | stop(paste("keyval.row: key argument must be a scalar value: ", paste(key, collapse=" "))) - | if (! is.null(dim(val))) - | stop(paste("keyval: val argument must be one-: dim(val) = ", - | paste(dim(val), collapse=" "))) - | kv <- list(key=key, val=val); - | attr(kv, "adatao-1d-kv-pair") <- T; - | kv - |} - | - |#' does the kv pair have a adatao-defined attr? - |is.adatao.kv <- function(kv) { (! is.null(attr(kv, "adatao-1d-kv-pair"))) | (! is.null(attr(kv, "adatao-2d-kv-pair"))) } - | - |#' should this be splitted? - |is.adatao.1d.kv <- function(kv) { ! is.null(attr(kv, "adatao-1d-kv-pair")) } - | - |do.pre.shuffle <- function(partition, map.func, combine.func, mapside.combine = T, debug = F) { - | print("==== map phase begins ...") - | kv <- map.func(partition) - | if (debug) { print("kv = "); str(kv) } - | - | if (is.adatao.1d.kv(kv)) { - | # list of a single keyval object, with the serialized - | return(list(keyval.row(kv$key, serialize(kv$val, NULL)))) - | } else if (!is.adatao.kv(kv)) { - | print(paste("skipping non-adatao kv = ", kv)) - | } - | - | val.bykey <- split(kv$val, f=kv$key) - | if (debug) { print("val.bykey ="); str(val.bykey) } - | keys <- names(val.bykey) - | - | result <- if (mapside.combine) { - | combine.result <- vector('list', length(keys)) - | for (i in 1:length(val.bykey)) { - | kv <- combine.func(keys[[i]], val.bykey[[i]]) - | if (debug) { print("combined kv = "); str(kv) } - | combine.result[[i]] <- keyval.row(kv$key, serialize(kv$val, NULL)) - | } - | # if (debug) print(combine.result) - | combine.result - | } else { - | kvlist.byrow <- vector('list', length(kv$key)) - | z <- 1 - | for (i in 1:length(keys)) { - | k <- keys[[i]] - | vv <- val.bykey[[i]] - | if (is.atomic(vv)) { - | for (j in 1:length(vv)) { - | kvlist.byrow[[z]] <- keyval.row(k, serialize(vv[[j]], NULL)) - | z <- z + 1 - | } - | } else { - | for (j in 1:nrow(vv)) { - | kvlist.byrow[[z]] <- keyval.row(k, serialize(vv[j, ], NULL)) - | z <- z + 1 - | } - | } - | } - | # if (debug) print(kvlist.byrow) - | kvlist.byrow - | } - | print("==== map phase completed") - | # if (debug) { print("kvlist.byrow = "); str(kvlist.byrow) } - | result - |} - """.stripMargin) - - // map! - TransformationHandler.tryEval(rconn, "pre.shuffle.result <- do.pre.shuffle(df.partition, map.func, combine.func, mapside.combine, debug=T)", - errMsgHeader = "fail to apply map.func to data partition") - - // transfer pre-shuffle result into JVM - val result = rconn.eval("pre.shuffle.result") - - // we will another RConnection because we will now shuffle data - rconn.close() - - result - } +// def preShuffleMapper(partdf: REXP, mapFuncDef: String, reduceFuncDef: String, mapsideCombine: Boolean): REXP = { +// // check if Rserve is running, if not: start it +// if (!StartRserve.checkLocalRserve()) throw new RuntimeException("Unable to start Rserve") +// // one connection for each compute job +// val rconn = new RConnection() +// +// // send the df.partition to R process environment +// rconn.assign("df.partition", partdf) +// rconn.assign("mapside.combine", new REXPLogical(mapsideCombine)) +// +// TransformationHandler.tryEval(rconn, "map.func <- " + mapFuncDef, +// errMsgHeader = "fail to eval map.func definition") +// TransformationHandler.tryEval(rconn, "combine.func <- " + reduceFuncDef, +// errMsgHeader = "fail to eval combine.func definition") +// +// // pre-amble to define internal functions +// // copied from: https://github.com/adatao/RClient/blob/master/io.pa/R/mapreduce.R +// // tests: https://github.com/adatao/RClient/blob/mapreduce/io.pa/inst/tests/test-mapreduce.r#L106 +// // should consider some packaging to synchroncize code +// rconn.voidEval( +// """ +// |#' Emit keys and values for map/reduce. +// |keyval <- function(key, val) { +// | if (! is.atomic(key)) +// | stop(paste("keyval: key argument must be an atomic vector: ", paste(key, collapse=" "))) +// | if (! is.null(dim(key))) +// | stop(paste("keyval: key argument must be one-dimensional: dim(key) = ", +// | paste(dim(key), collapse=" "))) +// | nkey <- length(key) +// | nval <- if (! is.null(nrow(val))) nrow(val) else length(val) +// | if (nkey != nval) +// | stop(sprintf("keyval: key and val arguments must match in length/nrow: %s != %s", nkey, nval)) +// | kv <- list(key=key, val=val); +// | attr(kv, "adatao-2d-kv-pair") <- T; +// | kv +// |} +// | +// |#' Emit a single key and value pair for map/reduce. +// |keyval.row <- function(key, val) { +// | if (! is.null(dim(key))) +// | stop(paste("keyval: key argument must be a scala value, not n-dimensional: dim(key) = ", +// | paste(dim(key), collapse=" "))) +// | if (length(key) != 1) +// | stop(paste("keyval.row: key argument must be a scalar value: ", paste(key, collapse=" "))) +// | if (! is.null(dim(val))) +// | stop(paste("keyval: val argument must be one-: dim(val) = ", +// | paste(dim(val), collapse=" "))) +// | kv <- list(key=key, val=val); +// | attr(kv, "adatao-1d-kv-pair") <- T; +// | kv +// |} +// | +// |#' does the kv pair have a adatao-defined attr? +// |is.adatao.kv <- function(kv) { (! is.null(attr(kv, "adatao-1d-kv-pair"))) | (! is.null(attr(kv, "adatao-2d-kv-pair"))) } +// | +// |#' should this be splitted? +// |is.adatao.1d.kv <- function(kv) { ! is.null(attr(kv, "adatao-1d-kv-pair")) } +// | +// |do.pre.shuffle <- function(partition, map.func, combine.func, mapside.combine = T, debug = F) { +// | print("==== map phase begins ...") +// | kv <- map.func(partition) +// | if (debug) { print("kv = "); str(kv) } +// | +// | if (is.adatao.1d.kv(kv)) { +// | # list of a single keyval object, with the serialized +// | return(list(keyval.row(kv$key, serialize(kv$val, NULL)))) +// | } else if (!is.adatao.kv(kv)) { +// | print(paste("skipping non-adatao kv = ", kv)) +// | } +// | +// | val.bykey <- split(kv$val, f=kv$key) +// | if (debug) { print("val.bykey ="); str(val.bykey) } +// | keys <- names(val.bykey) +// | +// | result <- if (mapside.combine) { +// | combine.result <- vector('list', length(keys)) +// | for (i in 1:length(val.bykey)) { +// | kv <- combine.func(keys[[i]], val.bykey[[i]]) +// | if (debug) { print("combined kv = "); str(kv) } +// | combine.result[[i]] <- keyval.row(kv$key, serialize(kv$val, NULL)) +// | } +// | # if (debug) print(combine.result) +// | combine.result +// | } else { +// | kvlist.byrow <- vector('list', length(kv$key)) +// | z <- 1 +// | for (i in 1:length(keys)) { +// | k <- keys[[i]] +// | vv <- val.bykey[[i]] +// | if (is.atomic(vv)) { +// | for (j in 1:length(vv)) { +// | kvlist.byrow[[z]] <- keyval.row(k, serialize(vv[[j]], NULL)) +// | z <- z + 1 +// | } +// | } else { +// | for (j in 1:nrow(vv)) { +// | kvlist.byrow[[z]] <- keyval.row(k, serialize(vv[j, ], NULL)) +// | z <- z + 1 +// | } +// | } +// | } +// | # if (debug) print(kvlist.byrow) +// | kvlist.byrow +// | } +// | print("==== map phase completed") +// | # if (debug) { print("kvlist.byrow = "); str(kvlist.byrow) } +// | result +// |} +// """.stripMargin) +// +// // map! +// TransformationHandler.tryEval(rconn, "pre.shuffle.result <- do.pre.shuffle(df.partition, map.func, combine.func, mapside.combine, debug=T)", +// errMsgHeader = "fail to apply map.func to data partition") +// +// // transfer pre-shuffle result into JVM +// val result = rconn.eval("pre.shuffle.result") +// +// // we will another RConnection because we will now shuffle data +// rconn.close() +// +// result +// } /** * Eval the expr in rconn, if succeeds return null (like rconn.voidEval), * if fails raise AdataoException with captured R error message. * See: http://rforge.net/Rserve/faq.html#errors */ - def tryEval(rconn: RConnection, expr: String, errMsgHeader: String) { - rconn.assign(".tmp.", expr) - val r = rconn.eval("r <- try(eval(parse(text=.tmp.)), silent=TRUE); if (inherits(r, 'try-error')) r else NULL") - if (r.inherits("try-error")) throw new DDFException(errMsgHeader + ": " + r.asString()) - } - - /** - * By now, whether mapsideCombine is true or false, - * we both have each partition as a list of list(key=..., val=...) - */ - // def doShuffle(rMapped: RDD[REXP]): RDD[(String, Iterable[REXP])] = { - def doShuffle(rMapped: RDD[REXP]): RDD[(String, Iterable[REXP])] = { - val groupped = rMapped.flatMap { - rexp ⇒ - rexp.asList().iterator.map { - kv ⇒ - val kvl = kv.asInstanceOf[REXP].asList - - val (k, v) = (kvl.at("key").asString(), kvl.at("val")) - (k, v) - } - }.groupByKey() - //TODO - groupped - } +// def tryEval(rconn: RConnection, expr: String, errMsgHeader: String) { +// rconn.assign(".tmp.", expr) +// val r = rconn.eval("r <- try(eval(parse(text=.tmp.)), silent=TRUE); if (inherits(r, 'try-error')) r else NULL") +// if (r.inherits("try-error")) throw new DDFException(errMsgHeader + ": " + r.asString()) +// } +// +// /** +// * By now, whether mapsideCombine is true or false, +// * we both have each partition as a list of list(key=..., val=...) +// */ +// // def doShuffle(rMapped: RDD[REXP]): RDD[(String, Iterable[REXP])] = { +// def doShuffle(rMapped: RDD[REXP]): RDD[(String, Iterable[REXP])] = { +// val groupped = rMapped.flatMap { +// rexp ⇒ +// rexp.asList().iterator.map { +// kv ⇒ +// val kvl = kv.asInstanceOf[REXP].asList +// +// val (k, v) = (kvl.at("key").asString(), kvl.at("val")) +// (k, v) +// } +// }.groupByKey() +// //TODO +// groupped +// } /** * serialize data to R, perform reduce, * then assemble each resulting partition as a data.frame of REXP in Java */ - def postShufflePartitionMapper(input: Iterator[(String, Iterable[REXP])], reduceFuncDef: String): Iterator[REXP] = { - // check if Rserve is running, if not: start it - if (!StartRserve.checkLocalRserve()) throw new RuntimeException("Unable to start Rserve") - val rconn = new RConnection() - - // pre-amble - // copied from: https://github.com/adatao/RClient/blob/master/io.pa/R/mapreduce.R - // tests: https://github.com/adatao/RClient/blob/mapreduce/io.pa/inst/tests/test-mapreduce.r#L238 - // should consider some packaging to synchronize code - rconn.voidEval( - """ - |#' Emit keys and values for map/reduce. - |keyval <- function(key, val) { - | if (! is.atomic(key)) - | stop(paste("keyval: key argument must be an atomic vector: ", paste(key, collapse=" "))) - | if (! is.null(dim(key))) - | stop(paste("keyval: key argument must be one-dimensional: dim(key) = ", - | paste(dim(key), collapse=" "))) - | nkey <- length(key) - | nval <- if (! is.null(nrow(val))) nrow(val) else length(val) - | if (nkey != nval) - | stop(sprintf("keyval: key and val arguments must match in length/nrow: %s != %s", nkey, nval)) - | kv <- list(key=key, val=val); - | attr(kv, "adatao-2d-kv-pair") <- T; - | kv - |} - | - |#' Emit a single key and value pair for map/reduce. - |keyval.row <- function(key, val) { - | if (! is.null(dim(key))) - | stop(paste("keyval: key argument must be a scala value, not n-dimensional: dim(key) = ", - | paste(dim(key), collapse=" "))) - | if (length(key) != 1) - | stop(paste("keyval.row: key argument must be a scalar value: ", paste(key, collapse=" "))) - | if (! is.null(dim(val))) - | stop(paste("keyval: val argument must be one-: dim(val) = ", - | paste(dim(val), collapse=" "))) - | kv <- list(key=key, val=val); - | attr(kv, "adatao-1d-kv-pair") <- T; - | kv - |} - | - |#' does the kv pair have a adatao-defined attr? - |is.adatao.kv <- function(kv) { (! is.null(attr(kv, "adatao-1d-kv-pair"))) | (! is.null(attr(kv, "adatao-2d-kv-pair"))) } - | - |#' should this be splitted? - |is.adatao.1d.kv <- function(kv) { ! is.null(attr(kv, "adatao-1d-kv-pair")) } - | - |# flatten the reduced kv pair. - |flatten.kvv <- function(rkv) { - | if (length(rkv$val) > 1) { - | row <- vector('list', length(rkv$val) + 1) - | row[1] <- rkv$key - | row[2:(length(rkv$val)+1)] <- rkv$val - | names(row) <- c("key", names(rkv$val)) - | row - | } else { - | rkv - | } - |} - | - |#' bind together list of values from the same keys as rows of a data.frame - |rbind.vv <- function(vvlist) { - | df <- do.call(rbind.data.frame, vvlist) - | if (length(vvlist) > 0) { - | head <- vvlist[[1]] - | if ( is.null(names(head)) ) { - | if (length(head) == 1) { - | names(df) <- c("val") - | } else { - | names(df) <- Map(function(x){ paste("val", x, sep="") }, 1:length(head)) - | } - | } - | } - | df - |} - | - |handle.reduced.kv <- function(rkv) { - | if (is.adatao.1d.kv(rkv)) { - | row <- flatten.kvv(rkv) - | row - | } else if (is.adatao.kv(rkv)) { - | df <- rkv$val - | df$key <- rkv$key - | df - | } else { - | print("skipping not-supported reduce.func output = "); str(rkv) - | NULL - | } - |} - """.stripMargin) - - TransformationHandler.tryEval(rconn, "reduce.func <- " + reduceFuncDef, - errMsgHeader = "fail to eval reduce.func definition") - - rconn.voidEval("reductions <- list()") - rconn.voidEval("options(stringsAsFactors = F)") - - // we do this in a loop because each of the seqv could potentially be very large - input.zipWithIndex.foreach { - case ((k: String, seqv: Seq[_]), i: Int) ⇒ - - // send data to R to compute reductions - rconn.assign("idx", new REXPInteger(i)) - rconn.assign("reduce.key", k) - rconn.assign("reduce.serialized.vvlist", new REXPList(new RList(seqv))) - - // print to Rserve log - rconn.voidEval("print(paste('====== processing key = ', reduce.key))") - - TransformationHandler.tryEval(rconn, "reduce.vvlist <- lapply(reduce.serialized.vvlist, unserialize)", - errMsgHeader = "fail to unserialize shuffled values for key = " + k) - - TransformationHandler.tryEval(rconn, "reduce.vv <- rbind.vv(reduce.vvlist)", - errMsgHeader = "fail to merge (using rbind.vv) shuffled values for key = " + k) - - // reduce! - TransformationHandler.tryEval(rconn, "reduced.kv <- reduce.func(reduce.key, reduce.vv)", - errMsgHeader = "fail to apply reduce func to data partition") - - // flatten the nested val list if needed - TransformationHandler.tryEval(rconn, "reduced <- handle.reduced.kv(reduced.kv)", - errMsgHeader = "malformed reduce.func output, please run mapreduce.local to test your reduce.func") - - // assign reduced item to reductions list - rconn.voidEval("if (!is.null(reduced)) { reductions[[idx+1]] <- reduced } ") - } - - // bind the reduced rows together, it contains rows of the resulting BigDataFrame - TransformationHandler.tryEval(rconn, "reduced.partition <- do.call(rbind.data.frame, reductions)", - errMsgHeader = "fail to use rbind.data.frame on reductions list, reduce.func cannot be combined as a BigDataFrame") - - // remove weird row names - rconn.voidEval("rownames(reduced.partition) <- NULL") - - // transfer reduced data back to JVM - val result = rconn.eval("reduced.partition") - - // print to Rserve log - rconn.voidEval("print('==== reduce phase completed')") - - // done R computation for this partition - rconn.close() - - // wrap it on a Iterator to satisfy mapPartitions - Iterator.single(result) - } +// def postShufflePartitionMapper(input: Iterator[(String, Iterable[REXP])], reduceFuncDef: String): Iterator[REXP] = { +// // check if Rserve is running, if not: start it +// if (!StartRserve.checkLocalRserve()) throw new RuntimeException("Unable to start Rserve") +// val rconn = new RConnection() +// +// // pre-amble +// // copied from: https://github.com/adatao/RClient/blob/master/io.pa/R/mapreduce.R +// // tests: https://github.com/adatao/RClient/blob/mapreduce/io.pa/inst/tests/test-mapreduce.r#L238 +// // should consider some packaging to synchronize code +// rconn.voidEval( +// """ +// |#' Emit keys and values for map/reduce. +// |keyval <- function(key, val) { +// | if (! is.atomic(key)) +// | stop(paste("keyval: key argument must be an atomic vector: ", paste(key, collapse=" "))) +// | if (! is.null(dim(key))) +// | stop(paste("keyval: key argument must be one-dimensional: dim(key) = ", +// | paste(dim(key), collapse=" "))) +// | nkey <- length(key) +// | nval <- if (! is.null(nrow(val))) nrow(val) else length(val) +// | if (nkey != nval) +// | stop(sprintf("keyval: key and val arguments must match in length/nrow: %s != %s", nkey, nval)) +// | kv <- list(key=key, val=val); +// | attr(kv, "adatao-2d-kv-pair") <- T; +// | kv +// |} +// | +// |#' Emit a single key and value pair for map/reduce. +// |keyval.row <- function(key, val) { +// | if (! is.null(dim(key))) +// | stop(paste("keyval: key argument must be a scala value, not n-dimensional: dim(key) = ", +// | paste(dim(key), collapse=" "))) +// | if (length(key) != 1) +// | stop(paste("keyval.row: key argument must be a scalar value: ", paste(key, collapse=" "))) +// | if (! is.null(dim(val))) +// | stop(paste("keyval: val argument must be one-: dim(val) = ", +// | paste(dim(val), collapse=" "))) +// | kv <- list(key=key, val=val); +// | attr(kv, "adatao-1d-kv-pair") <- T; +// | kv +// |} +// | +// |#' does the kv pair have a adatao-defined attr? +// |is.adatao.kv <- function(kv) { (! is.null(attr(kv, "adatao-1d-kv-pair"))) | (! is.null(attr(kv, "adatao-2d-kv-pair"))) } +// | +// |#' should this be splitted? +// |is.adatao.1d.kv <- function(kv) { ! is.null(attr(kv, "adatao-1d-kv-pair")) } +// | +// |# flatten the reduced kv pair. +// |flatten.kvv <- function(rkv) { +// | if (length(rkv$val) > 1) { +// | row <- vector('list', length(rkv$val) + 1) +// | row[1] <- rkv$key +// | row[2:(length(rkv$val)+1)] <- rkv$val +// | names(row) <- c("key", names(rkv$val)) +// | row +// | } else { +// | rkv +// | } +// |} +// | +// |#' bind together list of values from the same keys as rows of a data.frame +// |rbind.vv <- function(vvlist) { +// | df <- do.call(rbind.data.frame, vvlist) +// | if (length(vvlist) > 0) { +// | head <- vvlist[[1]] +// | if ( is.null(names(head)) ) { +// | if (length(head) == 1) { +// | names(df) <- c("val") +// | } else { +// | names(df) <- Map(function(x){ paste("val", x, sep="") }, 1:length(head)) +// | } +// | } +// | } +// | df +// |} +// | +// |handle.reduced.kv <- function(rkv) { +// | if (is.adatao.1d.kv(rkv)) { +// | row <- flatten.kvv(rkv) +// | row +// | } else if (is.adatao.kv(rkv)) { +// | df <- rkv$val +// | df$key <- rkv$key +// | df +// | } else { +// | print("skipping not-supported reduce.func output = "); str(rkv) +// | NULL +// | } +// |} +// """.stripMargin) +// +// TransformationHandler.tryEval(rconn, "reduce.func <- " + reduceFuncDef, +// errMsgHeader = "fail to eval reduce.func definition") +// +// rconn.voidEval("reductions <- list()") +// rconn.voidEval("options(stringsAsFactors = F)") +// +// // we do this in a loop because each of the seqv could potentially be very large +// input.zipWithIndex.foreach { +// case ((k: String, seqv: Seq[_]), i: Int) ⇒ +// +// // send data to R to compute reductions +// rconn.assign("idx", new REXPInteger(i)) +// rconn.assign("reduce.key", k) +// rconn.assign("reduce.serialized.vvlist", new REXPList(new RList(seqv))) +// +// // print to Rserve log +// rconn.voidEval("print(paste('====== processing key = ', reduce.key))") +// +// TransformationHandler.tryEval(rconn, "reduce.vvlist <- lapply(reduce.serialized.vvlist, unserialize)", +// errMsgHeader = "fail to unserialize shuffled values for key = " + k) +// +// TransformationHandler.tryEval(rconn, "reduce.vv <- rbind.vv(reduce.vvlist)", +// errMsgHeader = "fail to merge (using rbind.vv) shuffled values for key = " + k) +// +// // reduce! +// TransformationHandler.tryEval(rconn, "reduced.kv <- reduce.func(reduce.key, reduce.vv)", +// errMsgHeader = "fail to apply reduce func to data partition") +// +// // flatten the nested val list if needed +// TransformationHandler.tryEval(rconn, "reduced <- handle.reduced.kv(reduced.kv)", +// errMsgHeader = "malformed reduce.func output, please run mapreduce.local to test your reduce.func") +// +// // assign reduced item to reductions list +// rconn.voidEval("if (!is.null(reduced)) { reductions[[idx+1]] <- reduced } ") +// } +// +// // bind the reduced rows together, it contains rows of the resulting BigDataFrame +// TransformationHandler.tryEval(rconn, "reduced.partition <- do.call(rbind.data.frame, reductions)", +// errMsgHeader = "fail to use rbind.data.frame on reductions list, reduce.func cannot be combined as a BigDataFrame") +// +// // remove weird row names +// rconn.voidEval("rownames(reduced.partition) <- NULL") +// +// // transfer reduced data back to JVM +// val result = rconn.eval("reduced.partition") +// +// // print to Rserve log +// rconn.voidEval("print('==== reduce phase completed')") +// +// // done R computation for this partition +// rconn.close() +// +// // wrap it on a Iterator to satisfy mapPartitions +// Iterator.single(result) +// } } diff --git a/spark/src/test/java/io/ddf/spark/etl/TransformationHandlerTest.java b/spark/src/test/java/io/ddf/spark/etl/TransformationHandlerTest.java index 38ae0837..eec5c14b 100644 --- a/spark/src/test/java/io/ddf/spark/etl/TransformationHandlerTest.java +++ b/spark/src/test/java/io/ddf/spark/etl/TransformationHandlerTest.java @@ -39,7 +39,7 @@ public void setUp() throws Exception { s3DDFManager = (S3DDFManager) DDFManager.get(DDFManager.EngineType.S3, s3dsd); } - @Test + @Ignore public void testTransformNativeRserve() throws DDFException { DDF newddf = ddf.Transform.transformNativeRserve("newcol = deptime / arrtime"); LOG.info("name " + ddf.getName()); @@ -52,7 +52,7 @@ public void testTransformNativeRserve() throws DDFException { Assert.assertEquals(10, res.size()); } - @Test + @Ignore public void testTransformNativeRserveSingleExpressionInPlaceTrue() throws DDFException { Boolean inPlace = Boolean.TRUE; DDF newDdf = ddf.copy(); @@ -67,7 +67,7 @@ public void testTransformNativeRserveSingleExpressionInPlaceTrue() throws DDFExc Assert.assertEquals(10, res.size()); } - @Test + @Ignore public void testTransformNativeRserveSingleExpressionInPlaceFalse() throws DDFException { Boolean inPlace = Boolean.FALSE; DDF newDdf3 = ddf.copy(); @@ -79,7 +79,7 @@ public void testTransformNativeRserveSingleExpressionInPlaceFalse() throws DDFEx Assert.assertFalse("With inPlace being false, two DDF should have different UUID", newDdf3.getUUID().equals(newDdf4.getUUID())); } - @Test + @Ignore public void testTransformNativeRserveMultipleExpressionInPlaceTrue() throws DDFException { Boolean inPlace = Boolean.TRUE; String[] expressions = {"newcol = deptime / arrtime","newcol2=log(arrdelay)"}; @@ -95,7 +95,7 @@ public void testTransformNativeRserveMultipleExpressionInPlaceTrue() throws DDFE Assert.assertEquals("transformed DDF newDdf2 should have newcol2 added", "newcol2", newDdf2.getColumnName(9)); } - @Test + @Ignore public void testTransformNativeRserveMultipleExpressionInPlaceFalse() throws DDFException { Boolean inPlace = Boolean.FALSE; String[] expressions = {"newcol = deptime / arrtime","newcol2=log(arrdelay)"}; @@ -165,7 +165,7 @@ public void testTransformPythonInPlace() throws DDFException { } } - @Test + @Ignore public void testTransformNativeRserveMultipleExpressions() throws DDFException { String[] expressions = {"newcol = deptime / arrtime","newcol2=log(arrdelay)"}; DDF newddf = ddf.Transform.transformNativeRserve(expressions); @@ -174,7 +174,7 @@ public void testTransformNativeRserveMultipleExpressions() throws DDFException { Assert.assertEquals("newcol2", newddf.getColumnName(9)); } - @Test + @Ignore public void testTransformNativeRserveBigIntSupport() throws DDFException { DDF ddf = manager.sql2ddf("select year, month, dayofweek, uniquecarrier, deptime, arrtime, " + "distance, arrdelay, depdelay from airline_bigint", "SparkSQL"); diff --git a/spark/src/test/scala/io/ddf/spark/content/RepresentationHandlerSuite.scala b/spark/src/test/scala/io/ddf/spark/content/RepresentationHandlerSuite.scala index cc23c5f1..76f1440c 100644 --- a/spark/src/test/scala/io/ddf/spark/content/RepresentationHandlerSuite.scala +++ b/spark/src/test/scala/io/ddf/spark/content/RepresentationHandlerSuite.scala @@ -13,7 +13,6 @@ import scala.collection.JavaConversions._ import io.ddf.spark.{ATestSuite, SparkDDF} import org.apache.spark.sql.{Row, DataFrame} import org.apache.spark.mllib.linalg.{SparseVector, DenseVector, Vectors, Vector} -import org.rosuda.REngine.REXP import io.ddf.etl.IHandleMissingData.Axis /** @@ -129,21 +128,6 @@ class RepresentationHandlerSuite extends ATestSuite { assert(rdd == null) } - test("Can do sql queries after Transform Rserve") { - createTableMtcars() - val ddf = manager.sql2ddf("select * from mtcars", "SparkSQL") - val newDDF = ddf.Transform.transformNativeRserve("z1 = mpg / cyl, " + - "z2 = disp * 0.4251437075, " + - "z3 = rpois(nrow(df.partition), 1000)") - val rddREXP = newDDF.getRepresentationHandler.get(classOf[RDD[_]], classOf[REXP]).asInstanceOf[RDD[REXP]] - assert(newDDF != null) - val st = newDDF.VIEWS.head(32) - val ddf1 = newDDF.sql2ddf("select * from @this") - - assert(ddf1.getNumRows == 32) - assert(ddf1 != null) - } - test("Can get RDD(Array[String])") { createTableText8Sample val ddf = manager.sql2ddf("select * from text8sample","SparkSQL")