У меня есть эта работа:
import com.twitter.scalding.{Args, Csv, Job}
class ManagersAndTeams(args: Args) extends Job(args)
{
val managersPipe = Csv(args("managers"), skipHeader = true)
.project('managerID, 'teamID)
val teamsPipe = Csv(args("teams"), skipHeader = true)
.project('teamID, 'name)
.rename('teamID, 'teamID_)
managersPipe.joinWithLarger(('teamID, 'teamID_), teamsPipe)
.project('teamID, 'name, 'managerID)
.write(Csv(args("output"), writeHeader = true))
}
И я пытаюсь это проверить. Но во время теста, похоже, не читаются заголовки csv:
Caused by: cascading.tuple.TupleException: unable to select from: [UNKNOWN], using selector: ['managerID', 'teamID']
at cascading.tuple.Tuple.get(Tuple.java:364)
at cascading.flow.stream.OperatorStage$1.makeResult(OperatorStage.java:92)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:95)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
at cascading.flow.stream.SourceStage.map(SourceStage.java:102)
at cascading.flow.stream.SourceStage.call(SourceStage.java:53)
at cascading.flow.stream.SourceStage.call(SourceStage.java:38)
... 4 more
Caused by: cascading.tuple.FieldsResolverException: could not select fields: [{1}:'managerID'], from: [{?}:UNKNOWN]
at cascading.tuple.Fields.indexOf(Fields.java:1016)
at cascading.tuple.Fields.translatePos(Fields.java:957)
at cascading.tuple.Fields.getPos(Fields.java:939)
at cascading.tuple.Tuple.getPos(Tuple.java:373)
at cascading.tuple.Tuple.get(Tuple.java:360)
... 10 more
Это мой тестовый класс:
import com.twitter.scalding.{Csv, JobTest}
import org.scalatest.FunSuite
import org.scalatest.Matchers._
class ManagersAndTeamsSuite extends FunSuite
{
test("joins") {
createJob(
List(
("managerID", "teamID", "x"),
("man1", "team1", "x1"),
("man2", "team2", "x2")
),
List(
("teamID", "name", "y"),
("team1", "the team 1", "y1"),
("team2", "the team 2", "y2")
)
) should be(List(
))
}
def createJob(
managers: List[(String, String, String)],
teams: List[(String, String, String)]
) = {
var r = List.empty[(String, String, String)]
new JobTest(new ManagersAndTeams(_))
.arg("managers", "managers-arg")
.arg("teams", "teams-arg")
.arg("output", "output-arg")
.source(Csv("managers-arg", skipHeader = true), managers)
.source(Csv("teams-arg", skipHeader = true), teams)
.sink[(String, String, String)](Csv("output-arg", writeHeader = true)) {
buffer =>
r = buffer.toList
}.run.finish
r
}
}
Как вы можете видеть, я получил skipHeaders=true как в задании, так и в тесте (я также пробовал тест без них, но получил ту же проблему). При отладке ошпаривающего/каскадного кода кажется, что он не анализирует заголовки CSV, как определено в тесте. Любые идеи о том, как это можно решить?